Hadoop对文本文件的快速全局排序实现方法及分析

2025-05-27 0 120

一、背景

Hadoop中实现了用于全局排序的InputSampler类和TotalOrderPartitioner类,调用示例是org.apache.hadoop.examples.Sort。

但是当我们以Text文件作为输入时,结果并非按Text中的string列排序,而且输出结果是SequenceFile。

原因:

1) hadoop在处理Text文件时,key是行号LongWritable类型,InputSampler抽样的是key,TotalOrderPartitioner也是用key去查找分区。这样,抽样得到的partition文件是对行号的抽样,结果自然是根据行号来排序

2)大数据量时,InputSampler抽样速度会非常慢。比如,RandomSampler需要遍历所有数据,IntervalSampler需要遍历文件数与splits数一样。SplitSampler效率比较高,但它只抽取每个文件前面的记录,不适合应用于文件内有序的情况。

二、功能

1. 实现了一种局部抽样方法PartialSampler,适用于输入数据各文件是独立同分布的情况

2. 使RandomSampler、IntervalSampler、SplitSampler支持对文本的抽样

3. 实现了针对Text文件string列的TotalOrderPartitioner

三、实现

1. PartialSampler

PartialSampler从第一份输入数据中随机抽取第一列文本数据。PartialSampler有两个属性:freq(采样频率),numSamples(采样总数)。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {

InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());

ArrayList<K> samples = new ArrayList<K>(numSamples);

Random r = new Random();

long seed = r.nextLong();

r.setSeed(seed);

LOG.debug("seed: " + seed);

// 对splits【0】抽样

for (int i = 0; i < 1; i++) {

System.out.println("PartialSampler will getSample splits["+i+"]");

RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,

Reporter.NULL);

K key = reader.createKey();

V value = reader.createValue();

while (reader.next(key, value)) {

if (r.nextDouble() <= freq) {

if (samples.size() < numSamples) {

// 选择value中的第一列抽样

Text value0 = new Text(value.toString().split("\\t")[0]);

samples.add((K) value0);

} else {

// When exceeding the maximum number of samples, replace a

// random element with this one, then adjust the frequency

// to reflect the possibility of existing elements being

// pushed out

int ind = r.nextInt(numSamples);

if (ind != numSamples) {

Text value0 = new Text(value.toString().split("\\t")[0]);

samples.set(ind, (K) value0);

}

freq *= (numSamples - 1) / (double) numSamples;

}

key = reader.createKey();

}

}

reader.close();

}

return (K[])samples.toArray();

}

首先通过InputFormat的getSplits方法得到所有的输入分区;

然后扫描第一个分区中的记录进行采样。

记录采样的具体过程如下:

从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq

  如果大于则放弃这条记录;

  如果小于,则判断当前的采样数是否小于最大采样数,

    如果小于则这条记录被选中,被放进采样集合中;

    否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。

然后依次遍历分区中的其它记录。

note:

1)PartialSampler只适用于输入数据各文件是独立同分布的情况。

2)自带的三种Sampler通过修改samples.add(key)为samples.add((K) value0); 也可以实现对第一列的抽样。

2. TotalOrderPartitioner

TotalOrderPartitioner主要改进了两点:

1)读partition时指定keyClass为Text.class

因为partition文件中的key类型为Text

在configure函数中,修改:

?

1

2
//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();

Class<K> keyClass = (Class<K>)Text.class;

2)查找分区时,改用value查

?

1

2

3

4
public int getPartition(K key, V value, int numPartitions) {

Text value0 = new Text(value.toString().split("\\t")[0]);

return partitions.findPartition((K) value0);

}

3. Sort

1)设置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass

2)初始化InputSampler对象,抽样

3)partitionFile通过CacheFile传给TotalOrderPartitioner,执行MapReduce任务

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35
Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;

Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class;

Class<? extends WritableComparable> outputKeyClass = Text.class;

Class<? extends Writable> outputValueClass = Text.class;

jobConf.setMapOutputKeyClass(LongWritable.class);

// Set user-supplied (possibly default) job configs

jobConf.setNumReduceTasks(num_reduces);

jobConf.setInputFormat(inputFormatClass);

jobConf.setOutputFormat(outputFormatClass);

jobConf.setOutputKeyClass(outputKeyClass);

jobConf.setOutputValueClass(outputValueClass);

if (sampler != null) {

System.out.println("Sampling input to effect total-order sort...");

jobConf.setPartitionerClass(TotalOrderPartitioner.class);

Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];

inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));

//Path partitionFile = new Path(inputDir, "_sortPartitioning");

TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);

InputSampler.<K,V>writePartitionFile(jobConf, sampler);

URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");

DistributedCache.addCacheFile(partitionUri, jobConf);

DistributedCache.createSymlink(jobConf);

}

FileSystem hdfs = FileSystem.get(jobConf);

hdfs.delete(outputpath);

hdfs.close();

System.out.println("Running on " +

cluster.getTaskTrackers() +

" nodes to sort from " +

FileInputFormat.getInputPaths(jobConf)[0] + " into " +

FileOutputFormat.getOutputPath(jobConf) +

" with " + num_reduces + " reduces.");

Date startTime = new Date();

System.out.println("Job started: " + startTime);

jobResult = JobClient.runJob(jobConf);

四、执行

usage:

hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]
[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)
-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)
-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)
-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]
<input> <output> <partitionfile>

Example:

hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition

五、性能

200G输入数据,15亿条url,1000个分区,排序时间只用了6分钟

总结

以上就是本文关于Hadoop文本文件的快速全局排序实现方法及分析的全部内容,希望对大家有所帮助 ,如有不足之处,欢迎留言指出,感谢朋友们对本站的支持!

原文链接:http://www.cnblogs.com/ftyblog/p/3727621.html

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 行业资讯 Hadoop对文本文件的快速全局排序实现方法及分析 https://www.kuaiidc.com/67990.html

相关文章

发表评论
暂无评论