Java/Web调用Hadoop进行MapReduce示例代码

2025-05-29 0 66

hadoop环境搭建详见此文章 https://www.zzvips.com/article/140756.html。

我们已经知道hadoop能够通过hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让java/web来调用它?使得用户可以用方便的方式上传文件到hadoop并进行处理,获得结果。首先,***.jar是一个hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它。input 和output则将用户上传的文件使用hadoop的javaapi put到hadoop的文件系统中。然后再通过hadoop的javaapi 从文件系统中取得结果文件。

搭建javaweb工程。本文使用spring、springmvc、mybatis框架, 当然,这不是重点,就算没有使用任何框架也能实现。

项目框架如下:

Java/Web调用Hadoop进行MapReduce示例代码

项目中使用到的jar包如下:

Java/Web调用Hadoop进行MapReduce示例代码Java/Web调用Hadoop进行MapReduce示例代码

在spring的配置文件中,加入

?

1

2

3

4

5
<bean id="multipartresolver" class="org.springframework.web.multipart.commons.commonsmultipartresolver">

<property name="defaultencoding" value="utf-8" />

<property name="maxuploadsize" value="10485760000" />

<property name="maxinmemorysize" value="40960" />

</bean>

使得项目支持文件上传。

新建一个login.jsp 点击登录后进入user/login

Java/Web调用Hadoop进行MapReduce示例代码

user/login中处理登录,登录成功后,【在hadoop文件系统中创建用户文件夹】,然后跳转到console.jsp

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132
package com.chenjie.controller;

import java.io.ioexception;

import javax.annotation.resource;

import javax.servlet.http.httpservletrequest;

import javax.servlet.http.httpservletresponse;

import org.apache.hadoop.conf.configuration;

import org.apache.hadoop.fs.filesystem;

import org.apache.hadoop.fs.path;

import org.springframework.stereotype.controller;

import org.springframework.web.bind.annotation.requestmapping;

import com.chenjie.pojo.jsonresult;

import com.chenjie.pojo.user;

import com.chenjie.service.userservice;

import com.chenjie.util.appconfig;

import com.google.gson.gson;

/**

* 用户请求控制器

*

* @author chen

*

*/

@controller

// 声明当前类为控制器

@requestmapping("/user")

// 声明当前类的路径

public class usercontroller {

@resource(name = "userservice")

private userservice userservice;// 由spring容器注入一个userservice实例

/**

* 登录

*

* @param user

* 用户

* @param request

* @param response

* @throws ioexception

*/

@requestmapping("/login")

// 声明当前方法的路径

public string login(user user, httpservletrequest request,

httpservletresponse response) throws ioexception {

response.setcontenttype("application/json");// 设置响应内容格式为json

user result = userservice.login(user);// 调用userservice的登录方法

request.getsession().setattribute("user", result);

if (result != null) {

createhadoopfsfolder(result);

return "console";

}

return "login";

}

public void createhadoopfsfolder(user user) throws ioexception {

configuration conf = new configuration();

conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));

conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));

filesystem filesystem = filesystem.get(conf);

system.out.println(filesystem.geturi());

path file = new path("/user/" + user.getu_username());

if (filesystem.exists(file)) {

system.out.println("haddop hdfs user foler exists.");

filesystem.delete(file, true);

system.out.println("haddop hdfs user foler delete success.");

}

filesystem.mkdirs(file);

system.out.println("haddop hdfs user foler creat success.");

}

}

console.jsp中进行文件上传和任务提交、

Java/Web调用Hadoop进行MapReduce示例代码

文件上传和任务提交:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215
package com.chenjie.controller;

import java.io.file;

import java.io.ioexception;

import java.net.inetsocketaddress;

import java.net.uri;

import java.util.arraylist;

import java.util.iterator;

import java.util.list;

import javax.servlet.http.httpservletrequest;

import javax.servlet.http.httpservletresponse;

import org.apache.hadoop.conf.configuration;

import org.apache.hadoop.fs.fsdatainputstream;

import org.apache.hadoop.fs.filesystem;

import org.apache.hadoop.fs.path;

import org.apache.hadoop.mapred.jobclient;

import org.apache.hadoop.mapred.jobconf;

import org.apache.hadoop.mapred.jobid;

import org.apache.hadoop.mapred.jobstatus;

import org.apache.hadoop.mapred.runningjob;

import org.springframework.stereotype.controller;

import org.springframework.web.bind.annotation.requestmapping;

import org.springframework.web.multipart.multipartfile;

import org.springframework.web.multipart.multiparthttpservletrequest;

import org.springframework.web.multipart.commons.commonsmultipartresolver;

import com.chenjie.pojo.user;

import com.chenjie.util.utils;

@controller

// 声明当前类为控制器

@requestmapping("/hadoop")

// 声明当前类的路径

public class hadoopcontroller {

@requestmapping("/upload")

// 声明当前方法的路径

//文件上传

public string upload(httpservletrequest request,

httpservletresponse response) throws ioexception {

list<string> filelist = (list<string>) request.getsession()

.getattribute("filelist");//得到用户已上传文件列表

if (filelist == null)

filelist = new arraylist<string>();//如果文件列表为空,则新建

user user = (user) request.getsession().getattribute("user");

if (user == null)

return "login";//如果用户未登录,则跳转登录页面

commonsmultipartresolver multipartresolver = new commonsmultipartresolver(

request.getsession().getservletcontext());//得到在spring配置文件中注入的文件上传组件

if (multipartresolver.ismultipart(request)) {//如果请求是文件请求

multiparthttpservletrequest multirequest = (multiparthttpservletrequest) request;

iterator<string> iter = multirequest.getfilenames();//得到文件名迭代器

while (iter.hasnext()) {

multipartfile file = multirequest.getfile((string) iter.next());

if (file != null) {

string filename = file.getoriginalfilename();

file folder = new file("/home/chenjie/cjhadooponline/"

+ user.getu_username());

if (!folder.exists()) {

folder.mkdir();//如果文件不目录存在,则在服务器本地创建

}

string path = "/home/chenjie/cjhadooponline/"

+ user.getu_username() + "/" + filename;

file localfile = new file(path);

file.transferto(localfile);//将上传文件拷贝到服务器本地目录

// filelist.add(path);

}

handleuploadfiles(user, filelist);//处理上传文件

}

}

request.getsession().setattribute("filelist", filelist);//将上传文件列表保存在session中

return "console";//返回console.jsp继续上传文件

}

@requestmapping("/wordcount")

//调用hadoop进行mapreduce

public void wordcount(httpservletrequest request,

httpservletresponse response) {

system.out.println("进入controller wordcount ");

user user = (user) request.getsession().getattribute("user");

system.out.println(user);

// if(user == null)

// return "login";

wordcount c = new wordcount();//新建单词统计任务

string username = user.getu_username();

string input = "hdfs://chenjie-virtual-machine:9000/user/" + username

+ "/wordcountinput";//指定hadoop文件系统的输入文件夹

string output = "hdfs://chenjie-virtual-machine:9000/user/" + username

+ "/wordcountoutput";//指定hadoop文件系统的输出文件夹

string reslt = output + "/part-r-00000";//默认输出文件

try {

thread.sleep(3*1000);

c.main(new string[] { input, output });//调用单词统计任务

configuration conf = new configuration();//新建hadoop配置

conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加hadoop配置,找到hadoop部署信息

conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//hadoop配置,找到文件系统

filesystem filesystem = filesystem.get(conf);//得打文件系统

path file = new path(reslt);//找到输出结果文件

fsdatainputstream instream = filesystem.open(file);//打开

uri uri = file.touri();//得到输出文件路径

system.out.println(uri);

string data = null;

while ((data = instream.readline()) != null) {

//system.out.println(data);

response.getoutputstream().println(data);//讲结果文件写回用户网页

}

// inputstream in = filesystem.open(file);

// outputstream out = new fileoutputstream("result.txt");

// ioutils.copybytes(in, out, 4096, true);

instream.close();

} catch (exception e) {

system.err.println(e.getmessage());

}

}

@requestmapping("/mapreducestates")

//得到mapreduce的状态

public void mapreduce(httpservletrequest request,

httpservletresponse response) {

float[] progress=new float[2];

try {

configuration conf1=new configuration();

conf1.set("mapred.job.tracker", utils.jobtracker);

jobstatus jobstatus = utils.getjobstatus(conf1);

// while(!jobstatus.isjobcomplete()){

// progress = utils.getmapreduceprogess(jobstatus);

// response.getoutputstream().println("map:" + progress[0] + "reduce:" + progress[1]);

// thread.sleep(1000);

// }

jobconf jc = new jobconf(conf1);

jobclient jobclient = new jobclient(jc);

jobstatus[] jobsstatus = jobclient.getalljobs();

//这样就得到了一个jobstatus数组,随便取出一个元素取名叫jobstatus

jobstatus = jobsstatus[0];

jobid jobid = jobstatus.getjobid(); //通过jobstatus获取jobid

runningjob runningjob = jobclient.getjob(jobid); //通过jobid得到runningjob对象

runningjob.getjobstate();//可以获取作业状态,状态有五种,为jobstatus.failed 、jobstatus.killed、jobstatus.prep、jobstatus.running、jobstatus.succeeded

jobstatus.getusername();//可以获取运行作业的用户名。

runningjob.getjobname();//可以获取作业名。

jobstatus.getstarttime();//可以获取作业的开始时间,为utc毫秒数。

float map = runningjob.mapprogress();//可以获取map阶段完成的比例,0~1,

system.out.println("map=" + map);

float reduce = runningjob.reduceprogress();//可以获取reduce阶段完成的比例。

system.out.println("reduce="+reduce);

runningjob.getfailureinfo();//可以获取失败信息。

runningjob.getcounters();//可以获取作业相关的计数器,计数器的内容和作业监控页面上看到的计数器的值一样。

} catch (ioexception e) {

progress[0] = 0;

progress[1] = 0;

}

request.getsession().setattribute("map", progress[0]);

request.getsession().setattribute("reduce", progress[1]);

}

//处理文件上传

public void handleuploadfiles(user user, list<string> filelist) {

file folder = new file("/home/chenjie/cjhadooponline/"

+ user.getu_username());

if (!folder.exists())

return;

if (folder.isdirectory()) {

file[] files = folder.listfiles();

for (file file : files) {

system.out.println(file.getname());

try {

putfiletohadoopfsfolder(user, file, filelist);//将单个文件上传到hadoop文件系统

} catch (ioexception e) {

system.err.println(e.getmessage());

}

}

}

}

//将单个文件上传到hadoop文件系统

private void putfiletohadoopfsfolder(user user, file file,

list<string> filelist) throws ioexception {

configuration conf = new configuration();

conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));

conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));

filesystem filesystem = filesystem.get(conf);

system.out.println(filesystem.geturi());

path localfile = new path(file.getabsolutepath());

path foler = new path("/user/" + user.getu_username()

+ "/wordcountinput");

if (!filesystem.exists(foler)) {

filesystem.mkdirs(foler);

}

path hadoopfile = new path("/user/" + user.getu_username()

+ "/wordcountinput/" + file.getname());

// if (filesystem.exists(hadoopfile)) {

// system.out.println("file exists.");

// } else {

// filesystem.mkdirs(hadoopfile);

// }

filesystem.copyfromlocalfile(true, true, localfile, hadoopfile);

filelist.add(hadoopfile.touri().tostring());

}

}

启动hadoop:

Java/Web调用Hadoop进行MapReduce示例代码

运行结果:

可以在任意平台下,登录该项目地址,上传文件,得到结果。

Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码


Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码

Java/Web调用Hadoop进行MapReduce示例代码

运行成功。

源代码:https://github.com/tudoupaisimalingshu/cjhadooponline

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

原文链接:http://blog.csdn.net/csj941227/article/details/71786040

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Java/Web调用Hadoop进行MapReduce示例代码 https://www.kuaiidc.com/113788.html

相关文章

发表评论
暂无评论