java 中Spark中将对象序列化存储到hdfs

2025-05-29 0 17

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130
import org.apache.spark.storage.StorageLevel

import scala.collection.JavaConverters._

import java.io.File

import java.io.FileInputStream

import java.io.FileOutputStream

import java.io.ObjectInputStream

import java.io.ObjectOutputStream

import java.net.URI

import java.util.Date

import org.ansj.library.UserDefineLibrary

import org.ansj.splitWord.analysis.NlpAnalysis

import org.ansj.splitWord.analysis.ToAnalysis

import org.apache.hadoop.fs.FSDataInputStream

import org.apache.hadoop.fs.FSDataOutputStream

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.FileUtil

import org.apache.hadoop.fs.Path

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.filter.FilterList

import org.apache.hadoop.hbase.filter.PageFilter

import org.apache.hadoop.hbase.filter.RegexStringComparator

import org.apache.hadoop.hbase.filter.SingleColumnValueFilter

import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import com.feheadline.fespark.db.Neo4jManager

import com.feheadline.fespark.util.Env

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd._

import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

import scala.math.log

import scala.io.Source

object Word2VecDemo {

def convertScanToString(scan: Scan) = {

val proto = ProtobufUtil.toScan(scan)

Base64.encodeBytes(proto.toByteArray)

}

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("Word2Vec Demo")

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

sparkConf.set("spark.kryoserializer.buffer", "256m")

sparkConf.set("spark.kryoserializer.buffer.max","2046m")

sparkConf.set("spark.akka.frameSize", "500")

sparkConf.set("spark.rpc.askTimeout", "30")

val sc = new SparkContext(sparkConf)

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

val scan = new Scan()

val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)

val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")

val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(

"data".getBytes,

"article".getBytes,

CompareOp.EQUAL,

comp

)

filterList.addFilter(articleFilter)

filterList.addFilter(new PageFilter(100))

scan.setFilter(filterList)

scan.setCaching(50)

scan.setCacheBlocks(false)

hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

val crawledRDD = sc.newAPIHadoopRDD(

hbaseConf,

classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result]

)

val articlesRDD = crawledRDD.filter{

case (_,result) => {

val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))

content != null

}

}

val wordsInDoc = articlesRDD.map{

case (_,result) => {

val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))

if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq

else Seq("")

}

}

val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)

val word2vec = new Word2Vec()

val model = word2vec.fit(fitleredWordsInDoc)

//---------------------------------------重点看这里-------------------------------------------------------------

//将上面的模型存储到hdfs

val hadoopConf = sc.hadoopConfiguration

hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")

val fileSystem = FileSystem.get(hadoopConf)

val path = new Path("/user/hadoop/data/mllib/word2vec-object")

val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))

oos.writeObject(model)

oos.close

//这里示例另外一个程序直接从hdfs读取序列化对象使用模型

val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))

val sample_model = ois.readObject.asInstanceOf[Word2VecModel]

/*

* //你还可以将序列化文件从hdfs放到本地, scala程序使用模型

* import java.io._

* import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

* val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))

* val sample_model = ois.readObject.asInstanceOf[Word2VecModel]

* ois.close

*/

//--------------------------------------------------------------------------------------------------------------

}

}

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

原文链接:https://my.oschina.net/waterbear/blog/525347

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 java 中Spark中将对象序列化存储到hdfs https://www.kuaiidc.com/115996.html

相关文章

发表评论
暂无评论