摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.
废话不多说, 直接贴代码了. spark1.4 + hbase0.98
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source
object Word2VecDemo {
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName( "Word2Vec Demo" )
sparkConf.set( "spark.serializer" , "org.apache.spark.serializer.KryoSerializer" )
sparkConf.set( "spark.kryoserializer.buffer" , "256m" )
sparkConf.set( "spark.kryoserializer.buffer.max" , "2046m" )
sparkConf.set( "spark.akka.frameSize" , "500" )
sparkConf.set( "spark.rpc.askTimeout" , "30" )
val sc = new SparkContext(sparkConf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set( "hbase.zookeeper.quorum" , "myzookeeper" )
hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled" )
val scan = new Scan()
val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
val comp:RegexStringComparator = new RegexStringComparator( "" ".{1500,}" "" )
val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
"data" .getBytes,
"article" .getBytes,
CompareOp.EQUAL,
comp
)
filterList.addFilter(articleFilter)
filterList.addFilter( new PageFilter( 100 ))
scan.setFilter(filterList)
scan.setCaching( 50 )
scan.setCacheBlocks( false )
hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
val crawledRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]
)
val articlesRDD = crawledRDD.filter{
case (_,result) => {
val content = Bytes.toString(result.getValue( "data" .getBytes, "article" .getBytes))
content != null
}
}
val wordsInDoc = articlesRDD.map{
case (_,result) => {
val content = Bytes.toString(result.getValue( "data" .getBytes, "article" .getBytes))
if (content!= null )ToAnalysis.parse(content).asScala.map(_.getName).toSeq
else Seq( "" )
}
}
val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
val word2vec = new Word2Vec()
val model = word2vec.fit(fitleredWordsInDoc)
//---------------------------------------重点看这里-------------------------------------------------------------
//将上面的模型存储到hdfs
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set( "fs.defaultFS" , "hdfs://myhadoop:9000/" )
val fileSystem = FileSystem.get(hadoopConf)
val path = new Path( "/user/hadoop/data/mllib/word2vec-object" )
val oos = new ObjectOutputStream( new FSDataOutputStream(fileSystem.create(path)))
oos.writeObject(model)
oos.close
//这里示例另外一个程序直接从hdfs读取序列化对象使用模型
val ois = new ObjectInputStream( new FSDataInputStream(fileSystem.open(path)))
val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
/*
* //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
* import java.io._
* import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
* val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
* val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
* ois.close
*/
//--------------------------------------------------------------------------------------------------------------
}
}
|
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
原文链接:https://my.oschina.net/waterbear/blog/525347
相关文章
猜你喜欢
- ASP.NET本地开发时常见的配置错误及解决方法? 2025-06-10
- ASP.NET自助建站系统的数据库备份与恢复操作指南 2025-06-10
- 个人网站服务器域名解析设置指南:从购买到绑定全流程 2025-06-10
- 个人网站搭建:如何挑选具有弹性扩展能力的服务器? 2025-06-10
- 个人服务器网站搭建:如何选择适合自己的建站程序或框架? 2025-06-10
TA的动态
- 2025-07-10 怎样使用阿里云的安全工具进行服务器漏洞扫描和修复?
- 2025-07-10 怎样使用命令行工具优化Linux云服务器的Ping性能?
- 2025-07-10 怎样使用Xshell连接华为云服务器,实现高效远程管理?
- 2025-07-10 怎样利用云服务器D盘搭建稳定、高效的网站托管环境?
- 2025-07-10 怎样使用阿里云的安全组功能来增强服务器防火墙的安全性?
快网idc优惠网
QQ交流群
您的支持,是我们最大的动力!
热门文章
-
2025-05-25 11
-
2025-06-04 61
-
2025-06-04 62
-
2025-05-25 94
-
2025-05-29 47
热门评论