云题海 - 专业文章范例文档资料分享平台

当前位置:首页 > 普开数据大数据应用实例:基于Hadoop的大规模数据排序算法

普开数据大数据应用实例:基于Hadoop的大规模数据排序算法

  • 62 次阅读
  • 3 次下载
  • 2025/12/10 15:06:54

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是的原因。然后调用自定义Map的map方法,将一个个对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出。最终是生成一个List。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用

job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。 (2) 具体步骤 ●自定义key。

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。

●由于key是自定义的,所以还需要自定义一下类: 分区函数类;key比较函数类;分组函数类。 (3) SecondarySort.java的部分代码 c) 源码位置

/local/zkl/hadoop/hadoop-0.20.1/hadoop-0.20.1/src/examples/org/apache/hadoop/examples/SecondarySort.java

d) 下面程序是一段关于secondarySort的源代码: 33. public class SecondarySort {

34. //自己定义的key类应该实现WritableComparable接口

35. public static class IntPair 36. implements WritableComparable { 37. private int first = 0; 38. private int second = 0; 39. 40. /**

41. * Set the left and right values. 42. */

43. public void set(int left, int right) { 44. first = left; 45. second = right; 46. }

47. public int getFirst() { 48. return first; 49. }

50. public int getSecond() { 51. return second; 52. } 53. / 54. @Override

55. //反序列化,从流中的二进制转换成IntPair 56. public void readFields(DataInput in) throws IOException { 57. first = in.readInt() + Integer.MIN_VALUE; 58. second = in.readInt() + Integer.MIN_VALUE;

59. } 60. @Override

61. //序列化,将IntPair转化成使用流传送的二进制 62. public void write(DataOutput out) throws IOException { 63. out.writeInt(first - Integer.MIN_VALUE); 64. out.writeInt(second - Integer.MIN_VALUE); 65. }

66. //新定义类应该重写的两个方法 67. @Override

68. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) 185. //主函数

186. public static void main(String[] args) throws Exception { 187. // TODO Auto-generated method stub 188. // 读取hadoop配置

189. Configuration conf = new Configuration();

190. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 191. if (otherArgs.length != 2) {

192. System.err.println(\ 193. System.exit(2); 194. }

195. // 实例化一道作业

196. Job job = new Job(conf, \ 197. job.setJarByClass(SecondarySort.class); 198. // Mapper类型

199. job.setMapperClass(MapClass.class); 200. // Reducer类型

201. job.setReducerClass(Reduce.class); 202. // 分区函数

203. job.setPartitionerClass(FirstPartitioner.class); 204. // 分组函数

205. job.setGroupingComparatorClass(FirstGroupingComparator.class); 206. // map 输出Key的类型

207. job.setMapOutputKeyClass(IntPair.class); 208. // map输出Value的类型

209. job.setMapOutputValueClass(IntWritable.class); 210. // rduce输出Key的类型 211. job.setOutputKeyClass(Text.class); 212. // rduce输出Value的类型

213. job.setOutputValueClass(IntWritable.class); 214. // 输入hdfs路径

215. FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 216. // 输出hdfs路径

217. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 218. // 提交job

219. System.exit(job.waitForCompletion(true) ? 0 : 1); 220. } 221. 222. }

  • 收藏
  • 违规举报
  • 版权认领
下载文档10.00 元 加入VIP免费下载
推荐下载
本文作者:...

共分享92篇相关文档

文档简介:

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是的原因。然后调用自定义Map的map方法,将一个个对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出。最终是生成一个List。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一

× 游客快捷下载通道(下载后可以自由复制和排版)
单篇付费下载
限时特价:10 元/份 原价:20元
VIP包月下载
特价:29 元/月 原价:99元
低至 0.3 元/份 每月下载150
全站内容免费自由复制
VIP包月下载
特价:29 元/月 原价:99元
低至 0.3 元/份 每月下载150
全站内容免费自由复制
注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信:fanwen365 QQ:370150219
Copyright © 云题海 All Rights Reserved. 苏ICP备16052595号-3 网站地图 客服QQ:370150219 邮箱:370150219@qq.com