当前位置:首页 > 普开数据大数据应用实例:基于Hadoop的大规模数据排序算法
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用
job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。 (2) 具体步骤 ●自定义key。
在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
●由于key是自定义的,所以还需要自定义一下类: 分区函数类;key比较函数类;分组函数类。 (3) SecondarySort.java的部分代码 c) 源码位置
/local/zkl/hadoop/hadoop-0.20.1/hadoop-0.20.1/src/examples/org/apache/hadoop/examples/SecondarySort.java
d) 下面程序是一段关于secondarySort的源代码: 33. public class SecondarySort {
34. //自己定义的key类应该实现WritableComparable接口
35. public static class IntPair 36. implements WritableComparable { 37. private int first = 0; 38. private int second = 0; 39. 40. /**
41. * Set the left and right values. 42. */
43. public void set(int left, int right) { 44. first = left; 45. second = right; 46. }
47. public int getFirst() { 48. return first; 49. }
50. public int getSecond() { 51. return second; 52. } 53. / 54. @Override
55. //反序列化,从流中的二进制转换成IntPair 56. public void readFields(DataInput in) throws IOException { 57. first = in.readInt() + Integer.MIN_VALUE; 58. second = in.readInt() + Integer.MIN_VALUE;
59. } 60. @Override
61. //序列化,将IntPair转化成使用流传送的二进制 62. public void write(DataOutput out) throws IOException { 63. out.writeInt(first - Integer.MIN_VALUE); 64. out.writeInt(second - Integer.MIN_VALUE); 65. }
66. //新定义类应该重写的两个方法 67. @Override
68. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) 185. //主函数
186. public static void main(String[] args) throws Exception { 187. // TODO Auto-generated method stub 188. // 读取hadoop配置
189. Configuration conf = new Configuration();
190. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 191. if (otherArgs.length != 2) {
192. System.err.println(\ 193. System.exit(2); 194. }
195. // 实例化一道作业
196. Job job = new Job(conf, \ 197. job.setJarByClass(SecondarySort.class); 198. // Mapper类型
199. job.setMapperClass(MapClass.class); 200. // Reducer类型
201. job.setReducerClass(Reduce.class); 202. // 分区函数
203. job.setPartitionerClass(FirstPartitioner.class); 204. // 分组函数
205. job.setGroupingComparatorClass(FirstGroupingComparator.class); 206. // map 输出Key的类型
207. job.setMapOutputKeyClass(IntPair.class); 208. // map输出Value的类型
209. job.setMapOutputValueClass(IntWritable.class); 210. // rduce输出Key的类型 211. job.setOutputKeyClass(Text.class); 212. // rduce输出Value的类型
213. job.setOutputValueClass(IntWritable.class); 214. // 输入hdfs路径
215. FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 216. // 输出hdfs路径
217. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 218. // 提交job
219. System.exit(job.waitForCompletion(true) ? 0 : 1); 220. } 221. 222. }
共分享92篇相关文档