java实现对Hadoop的操作

2025-05-29 0 24

基本操作

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.junit.Test;

import org.junit.jupiter.api.BeforeEach;

import org.junit.jupiter.api.DisplayName;

import org.junit.runner.RunWith;

import org.junit.runners.JUnit4;

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.Arrays;

@RunWith(JUnit4.class)

@DisplayName("Test using junit4")

public class HadoopClientTest {

private FileSystem fileSystem = null;

@BeforeEach

public void init() throws URISyntaxException, IOException, InterruptedException {

Configuration configuration = new Configuration();

configuration.set("dfs.replication", "1");

configuration.set("dfs.blocksize", "64m");

fileSystem = FileSystem.get(new URI("hdfs://hd-even-01:9000"), configuration, "root");

}

/**

* 从本地复制文件到Hadoop

*

* @throws URISyntaxException

* @throws IOException

* @throws InterruptedException

*/

@Test

public void copyFileFromLocal() throws URISyntaxException, IOException, InterruptedException {

// 上传文件

fileSystem.copyFromLocalFile(new Path("C:\\\\Users\\\\Administrator\\\\Desktop\\\\win10激活.txt"), new Path("/even1"));

// 关闭流,报错winUtils,因为使用了linux的tar包,如果windows要使用,则需要编译好这个winUtils包才能使用

fileSystem.close();

}

/**

* 从Hadoop下载文件到本地,下载需要配置Hadoop环境,并添加winutils到bin目录

*

* @throws URISyntaxException

* @throws IOException

* @throws InterruptedException

*/

@Test

public void copyFileToLocal() throws URISyntaxException, IOException, InterruptedException {

// 下载文件

fileSystem.copyToLocalFile(new Path("/win10激活.txt"), new Path("E:/"));

// 关闭流,报错winUtils,因为使用了linux的tar包,如果windows要使用,则需要编译好这个winUtils包才能使用

fileSystem.close();

}

/**

* 创建文件夹

*

* @throws IOException

*/

@Test

public void hdfsMkdir() throws IOException {

// 调用创建文件夹方法

fileSystem.mkdirs(new Path("/even1"));

// 关闭方法

fileSystem.close();

}

/**

* 移动文件/修改文件名

*/

public void hdfsRename() throws IOException {

fileSystem.rename(new Path(""), new Path(""));

fileSystem.close();

}

/**

* 删除文件/文件夹

*

* @throws IOException

*/

@Test

public void hdfsRm() throws IOException {

// fileSystem.delete(new Path(""));

// 第二个参数表示递归删除

fileSystem.delete(new Path(""), true);

fileSystem.close();

}

/**

* 查看hdfs指定目录的信息

*

* @throws IOException

*/

@Test

public void hdfsLs() throws IOException {

// 调用方法返回远程迭代器,第二个参数是把目录文件夹内的文件也列出来

RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);

while (listFiles.hasNext()) {

LocatedFileStatus locatedFileStatus = listFiles.next();

System.out.println("文件路径:" + locatedFileStatus.getPath());

System.out.println("块大小:" + locatedFileStatus.getBlockSize());

System.out.println("文件长度:" + locatedFileStatus.getLen());

System.out.println("副本数量:" + locatedFileStatus.getReplication());

System.out.println("块信息:" + Arrays.toString(locatedFileStatus.getBlockLocations()));

}

fileSystem.close();

}

/**

* 判断是文件还是文件夹

*/

@Test

public void findHdfs() throws IOException {

// 1,展示状态信息

FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));

// 2,遍历所有文件

for (FileStatus fileStatus : listStatus) {

if (fileStatus.isFile())

System.out.println("是文件:" + fileStatus.getPath().getName());

else if (fileStatus.isDirectory())

System.out.println("是文件夹:" + fileStatus.getPath().getName());

}

fileSystem.close();

}

}

文件读写

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.junit.Before;

import org.junit.Test;

import org.junit.jupiter.api.DisplayName;

import org.junit.runner.RunWith;

import org.junit.runners.JUnit4;

import java.io.*;

import java.net.URI;

import java.net.URISyntaxException;

import java.nio.charset.StandardCharsets;

import java.util.Arrays;

@RunWith(JUnit4.class)

@DisplayName("this is read write test!")

public class HadoopReadWriteTest {

FileSystem fileSystem = null;

Configuration configuration = null;

@Before

public void init() throws URISyntaxException, IOException, InterruptedException {

// 1,加载配置

configuration = new Configuration();

// 2,构建客户端

fileSystem = FileSystem.get(new URI("hdfs://hd-even-01:9000/"), configuration, "root");

}

@Test

public void testReadData() throws IOException {

// 1,获取hdfs文件流

FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt"));

// 2,设置一次获取的大小

byte[] bytes = new byte[1024];

// 3,读取数据

while (open.read(bytes) != -1)

System.out.println(Arrays.toString(bytes));

open.close();

fileSystem.close();

}

/**

* 使用缓存流

*

* @throws IOException

*/

@Test

public void testReadData1() throws IOException {

FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt"));

// 使用缓冲流会快点

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open, StandardCharsets.UTF_8));

String line = "";

while ((line = bufferedReader.readLine()) != null) {

System.out.println(line);

}

bufferedReader.close();

open.close();

fileSystem.close();

}

/**

* 指定偏移量来实现只读部分内容

*/

@Test

public void readSomeData() throws IOException {

FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt"));

// 指定开始的index

open.seek(14);

// 指定读的多少

byte[] bytes = new byte[5];

while (open.read(bytes) != -1)

System.out.println(new String(bytes));

open.close();

fileSystem.close();

}

/**

* 流方式写数据

* @throws IOException

*/

@Test

public void writeData() throws IOException {

// 1,获取输出流

FSDataOutputStream out = fileSystem.create(new Path("/win11.txt"), false);

// 2,获取需要写的文件输入流

FileInputStream in = new FileInputStream(new File("C:\\\\Users\\\\Administrator\\\\Desktop\\\\xixi.txt"));

byte[] b = new byte[1024];

int read = 0;

while ((read = in.read(b)) != -1) {

out.write(b, 0, read);

}

in.close();

out.close();

fileSystem.close();

}

/**

* 直接写字符串

*/

@Test

public void writeData1() throws IOException {

// 1,创建输出流

FSDataOutputStream out = fileSystem.create(new Path("/aibaobao.txt"), false);

// 2,写数据

out.write("wochaoaibaobao".getBytes());

// 3,关闭流

IOUtils.closeStream(out);

fileSystem.close();

}

/**

* IOUtils方式上传

*

* @throws IOException

*/

@Test

public void putToHdfs() throws IOException {

// 1,获取输入流

FileInputStream in = new FileInputStream(new File("C:\\\\Users\\\\Administrator\\\\Desktop\\\\xixi.txt"));

// 2,获取输出流

FSDataOutputStream out = fileSystem.create(new Path("/haddopPut.txt"), false);

// 3,拷贝

IOUtils.copyBytes(in, out, configuration);

// 4,关闭流

IOUtils.closeStream(in);

IOUtils.closeStream(out);

fileSystem.close();

}

/**

* IOUtils方式下载

* @throws IOException

*/

@Test

public void getFromHdfs() throws IOException {

// 1,获取输入流

FSDataInputStream open = fileSystem.open(new Path("/haddopPut.txt"));

// 2,获取输出流

FileOutputStream out = new FileOutputStream(new File("C:\\\\Users\\\\Administrator\\\\Desktop\\\\haddopPut.txt"));

// 3,拷贝

IOUtils.copyBytes(open, out, configuration);

// 4,关闭流

IOUtils.closeStream(open);

IOUtils.closeStream(out);

fileSystem.close();

}

}

到此这篇关于java实现对Hadoop的操作的文章就介绍到这了,更多相关Java Hadoop内容请搜索快网idc以前的文章或继续浏览下面的相关文章希望大家以后多多支持快网idc!

原文链接:https://blog.csdn.net/weixin_37581297/article/details/84349916

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 java实现对Hadoop的操作 https://www.kuaiidc.com/105511.html

相关文章

发表评论
暂无评论