当前位置:首页 > hadoop学习笔记
Hadoop MapReduce,这里进行简单的总结,主要来自于《Hadoop In Action》。 后续将按照Hadoop处理的顺序整理一些笔记,主要包括: (1)Hadoop预定义数据类型; (2)Hadoop InputFormat; (3)Hadoop Mapper;
(4)Hadoop Partitioner(洗牌); (5)Hadoop Reducer; (6)Hadoop OutputFormt; (7)Hadoop Driver (驱动程序); (8)Hadoop Combiner; (9)Hadoop Pipes; (10)Hadoop Streaming; (11)Aggregate;
其它更高级应用,如数据连接等请自行参阅相关书籍,《Hadoop In Action》、《Hadoop 权威指南》等。
一、Hadoop数据类型
Hadoop预定义了一些类用于实现WritableComparable,主要包括面向基本类型的封装类: BooleanWritable 标准布尔变量的封装 ByteWritable 单字节数的封装 DoubleWritable 双字节数的封装 FloatWritable 浮点数的封装 IntWritable 整数的封装 LongWritable Long的封装
Text 使用UTF8格式的文本封装 NullWritable 无键值时的占位符
键和值可以自定义数据类型,Hadoop提供了Writable和WritableComparable接口,Writable实现的是序列化功能,WritableComparable实现了序列化和比较的功能。Hadoop要求键必须实现WritableComparatable
下面实现一个类,用于表示一个网络的边界,比如代表两个城市之间的航线。Edge类实现了Writable接口的readFields和write方法,它们与java中的DataInput和DataOutput类实现内容的串行化,而Comparable接口实现的是compareTo方法。
public class Edge implements WritableComparable
private String departureNode ; private String arrivalNode ;
public String getDepartureNode(){ return departureNode; } public String getArrivalNode() { return arrivalNode ; }
@override
public void readFields(DataInput in) throws IOException {
departureNode = in.readUTF() ; arrivalNode = in.readUTF() ; }
@override
public void write(DataOutput out) throws IOException {
out.writeUTF(departureNode) ; out.writeUTF(arrivalNode) ; }
@override
public int compareTo(Edge 0) {
return (departureNode.compareTo(o.depatrueNode)!=0) ?
departureNode.compareTo(o.departureNode): arrivalNode.compareTo(o.arrivalNode) ; } }
通常使用Hadoop,预定义类型基本满足需要,通过Hadoop数据类型的学习,我们可以自定义数据类型,从而根据需求进行扩充。
二、InputFormat
Hadoop分割与读取输入文件的方式被定义为InputFormat接口的一个实现中,TextInputFormat是InputFormat的默认实现。
Hadoop预定义的一些InputFormat类:
TextInputFormat 在文本文件中的每行一个记录,key为一行的字节偏移,值为一行的内容。ey: LongWritable, Value:Text
KeyValueTextInputFormat 文本文件中每行是一个记录,以每行的第一个分隔符为界,分隔符前的为键,分割符后的为值,分隔符由key.value.separator.in.input.line中设定,默认为'\\t'
SequenceFileInputFormat
NLineInputFormat 与TextInputFormat相同,但每个分片一定有N行,N由
mapred.line.input.format.linespermap中设定,默认为1,key: LongWritable, Value:Text
MapReduce输入格式由 conf.setInputFormat(KeyValueTextInputFormat.class) ; 设定。
2. 生成一个定制的InputFormat --- InputSplit和RecordReader
如果Hadoop提供的InputFormat类不能满足需要,则必须编写自定义的InputFormat类,InputFormat主要完成2件事情:
1)确定所有用于输入数据的文件,并将之分割为输入分片,每个map任务分配一个分片; 2)提供一个RecordReader对象,循环提取给定分片中的记录,并解析每个记录为预定义类型的键和值;
public interface InputFormat
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader
FileInputFormat类实现了InputFormat中的getSplits方法,保留getRecordReader抽象让子类实现,所以在创建InputFormat子类时,最好从负责文件分割的FileInputFormat类中继承,其中有一个isSplitable(FileSystem fs, Path filename)方法,检查是否将给定文件分片,默认返回true,正如压缩文件,如果不对文件进行拆分,则返回false。
使用FileInputFormat时,只需要关注RecordReader,它负责把一个输入分片解析为一条一条的记录,转变成键值对。
public interface RecordReader
bool next(K key, V value) throws IOException ; K createKey() ; V createValue() ;
long getPos() throws IOException ; public void close() throws IOException ;
float getProgress() throws IOException ; }
预定义的RecordReader有:
LineRecordReader用于TextInputFormat中每次读取一行,以字节偏移作为键,行的内容作为值。 KeyValueRecordReader用于KeyValueTextInputFormat
自定义的RecordReader痛处基于现有实现,并把大多数操作放在next()函数中。
public class TimeUrlTextInputFormat extends FileInputFormat
public RecordReader
return new TimeUrlLineRecordReader(job, (FileSplit)input) ; } }
public class URLWritable implements Writable {
protected URL url ; public URLWritable(){}
public URLWritable(URL url){ this.url = url;}
public void write(DataOutput out) throws IOException {
out.writeUTF(url.toString()) ; }
public void readFields(DataInput in) throws IOException {
url = new URL(in.readUTF()) ; }
public void set(String s) throws MalformadURLException {
url = new URL(s) ; } }
class TimeUrlLineRecordReader implements RecordReader
private KeyValueLineRecordReader lineReader ; private Text lineKey, lineValue ;
public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new KeyValueLineRecordReader(job, split) ; lineKey = lineReader.createKey() ; lineValue = lineReader.createValue() ;
共分享92篇相关文档