hadoop 的深入了解

这部分内容算是重读一下 Hadoop: the Definitive Guide 的笔记吧。宏观性的东西自己看书吧。

HDFS

hadoop 的 Java api 对文件系统的抽象是 org.apache.hadoop.fs.FileSystem,具体的实现包括 LocalFileSystem(带有 crc 校验)、RawLocalFileSystem(不带校验),新的文档里面居然没有 HDFS 自己的部分,很奇怪。不过一般我们也不需要直接与具体的文件系统直接打交道,多数情况下都是直接对 FileSystem 这个抽象上的行为。比如常见的做法就是首先通过字符串初始化 org.apache.hadoop.fs.Path 对象,然后使用其 getFileSystem 方法获得具体的文件系统。

Configuration conf = getConf () ;
Path p = new Path ("file:///tmp/my/file") ;
FileSystem fs = p.getFileSystem (conf) ;

在文件系统之上,我们常见的就是读写操作(其他的关于文件管理方面的看文档吧),Java 把这个文件读写过程分的非常的细致,一般的流程是先建立所谓的 InputStream/OutputStream,这个 stream 之上流通的是 byte array,常见的压缩解压缩往往也在这个 layer 之上,这方面的东西可以看看 java.io 里面的一些(比较有用的有把两 stream 粘一起的 SequenceInputStream 或者和管道有关系的几个),在此之上通过 Reader/Writer 将 stream 的抽象变成字符,往往他们需要通过 Charset 进行编码上的转换,实现这个转换的是 InputStreamReader 和 OutputStreamWriter,在此之上我们就能建立一些更有用的 Reader/Writer,比如按行读写的 BufferedReader/BufferedWriter。正因为这样一个流程,HDFS 其实只需要提供合适的 stream,后面的事情都可以用 Java 自己的类处理了。我们通过 FileSystem 的 open 方法和 create 方法可以帮助我们创建需要的 stream。

if (!fs.exists (p))
  return 1 ;
FSDataInputStream fsis = fs.open (p, 64*1024) ;

这个过程与 java.net.URL 的做法非常类似,事实上如果注册好了对应协议(如 HDFS)需要的 stream 和对应的打开方式,我们也可以直接用 URL 的 openStream 来进行操纵。从上面我们可以看到 FSDataInputStream 就是 HDFS 给我们操纵的一个 stream。对应有 FSDataOutputStream。

文件系统上比较有用的就是定位符合某些条件的文件,这方面 HDFS 提供了 globStatus 和 PathFiler。这也与 java 自己某些类类似(如 FileFilter)。HDFS 另外提供了与 tar 类似的 HAR 文件系统。

HDFS 的 IO

这部分我们讨论的多数类都在 org.apache.hadoop.io 下面。第一个说的是压缩解压缩,这些一般放在所谓的 CompressionCodec 的实现里面,包括无(default)、gzip、lzo、bzip2 等。使用这些 CompressionCodec 可以显式的初始化他们,然后使用 createInputStream/createOutputStream 方法套在前面的 FSDataIn/OutputStream 上就能干活了。

    BufferedReader reader
      = new BufferedReader
          (new InputStreamReader
            (codec.createInputStream (fsis))) ;

一般有几种方法,比如将 codec 写在 configuration 里面,我们就可以通过 reflection 把字符串映射成为需要的东西

Configuration conf = getConf();
String codecClassname = conf.get ("my.output.codec");
Class codecClass = Class.forName(codecClassname);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);

另一种策略是使用 CompressionCodecFactory 进行猜测,

    CompressionCodecFactory factory = new CompressionCodecFactory (conf) ;
    CompressionCodec codec = factory.getCodec (p) ;
    BufferedReader reader
      = new BufferedReader
          (new InputStreamReader
            (codec.createInputStream (fsis))) ;

如果需要反复使用 codec 可以考虑使用 CodecPool 避免反复创建这类对象。在 map/reduce 程序里面,我们往往通过设置 configuration 的方式(见这个文章)。

有了这些基本的东西之后,HDFS 上一个重要的、反复使用的概念就是 Writable,这是 HDFS 上或者说 map/reduce 程序操纵数据的接口(怎么从 byte array 变成 Java 的对象),这个过程也被称为 serialization。Writable 接口包含 write 和 readFields 两个方法,其接口是建立在 java.io.DataIn 和 java.io.DataOut 接口之上的,常见的实现包括 DataInputStream、DataOutputStream,这个接口用来将 java 内部的数据存储成为平台无关的形式,简而言之就是为了做 serialization 设计的。其实这个层非常简单,我们只需要将一些 java native 格式使用 DataIn/Out 提供的接口读入写下去就可以了,如果是层次结构,那么我们也只需要递归调用成员的相关方法即可。虽然很简单,但是似乎并没有提供一个简单的工具帮助我们生成这个 boilerplate。

像 map/reduce 中的 key 往往要求是 comparable 的,为此还有一个所谓 WritableComparable 的接口。HDFS 提供了常见类型的 Writable 实现,放在 org.apache.hadoop.io 下。

HDFS 提供了一个比较强大的 SequenceFile,不少二进制数据使用它就能很好的工作了。它提供了自己的 Reader 和 Writer 类,方便离线读写,在 map/reduce 程序中我们可以设置 InputFormat 就可以自动的解析好文件了。一般用法是使用 createWriter 或者 createReader 这类 factory 产生对应的 Writer 和 Reader,然后使用 writer.append 或者 reader.next 进行添加或者读取。

另一种是所谓的 MapFile,它本质上是 sorted SequenceFile,但是加入了 index(是一个 java 的 map)。

开发 map/reduce 程序

前面提到了 configuration,我们可以用 addResource 为这个对象添加新的 XML 格式的配置信息,事实上实现了 Configured 类的的对象可以用 getConf 获得对应的 org.apache.hadoop.conf.Configuration,很多参数可以通过这个方式进行传递。

另一个实现 map/reduce 的技巧是实现 Tool 接口,这个里面包含了一个 run 方法,比如前面的例子就是来自如下的一个框架

public class HDFSPrint extends Configured implements Tool {
  @Override
  public int run (String[] args) throws IOException {
    Configuration conf = getConf () ;
    // ...
    return 0 ;
  }

  public static void main (String[] args) throws Exception {
    int exitCode = ToolRunner.run (new HDFSPrint(), args) ;
    System.exit (exitCode) ;
  }
}

这个做法的好处在于 -conf、-D 这类公共的参数可以被这个 framework 中所谓 GenericOptionParser 所处理掉,比如 -D 覆盖的一些 Configuration 或者 -conf 传递的资源文件都会自动的搞定(这个实现是一个典型的 template method 的 design pattern)。这样在 run 里面拿到的就是比较干净的东西了。在 run 里面我们通常可以提交多个 map/reduce 任务,其过程一般是:

  • 产生一个 Job 对象(基于 getConf())
  • 为此 Job 提供需要的 mapper/reducer class,某些问题可提供 combiner
  • 为此 Job 提供 input/output format
  • 为此 job 设定输入输出路径
  • 然后使用 waitForCompletion 提交即可

通过 org.apache.hadoop.jobcontrol.JobControl 可以提供一些复杂的管理工具。值得注意的是 job 提交后和 client 部分的同步或者异步关系,如果是 waitForCompletion 就会变成同步调用,如果是 submit 就会异步调用立即返回。那么通过 JobControl 来做的话,其实是不能处理同步关系的,它的好处是并行提交没有相互依赖关系的 ControledJob。而ControledJob 里面是允许处理依赖关系的。

map/reduce 的类型和格式

这里稍微介绍一下 map/reduce 相关的格式,这些 format 一般是在 org.apache.hadoop.mapreduce.lib.input/output 里面,这些 format 会调用 InputSplits 将输入分成 record,并将每个 record 的 key/value deserialize。对格式有要求的时候,比如禁止 split 输入文件,我们就得提供自己的 format 类。

输入格式有

  • TextInputFormat,key 为位置,value 为当前行的内容
  • KeyValueTextInput,key 为 tab 前的内容,value 为后面的内容,分隔符可以用 key.value.separator.in.input.line设定
  • NLineInputFormat,可以设定多少行被一个 mapper 处理,通过 mapreduce.line.input.format.linespermap
  • SequenceFileInputFormat,使用 sequence file 必备
  • SequenceFileAsTextInputFormat,转换成 Text
  • SequenceFileAsBinaryInputFormat,转换成 byte array
  • DBInputFormat,从数据库里面读入

对应的有 output format。

一些 map/reduce 里面的 feature

第一个就是可以用 reporter 改变 Counter 的值,对于某些问题有 counter 计数可以用来 check 程序正确性。我们可以通过 job 获得这些 counter 的值(getCounters)。

之后我们来看看现在 hadoop 提供的一些复杂的例子,如 tera sort 等。

——————
And he said, Behold now, I have taken on me to speak to the LORD: Peradventure there shall be twenty found there. And he said, I will not destroy it for twenty’s sake.

Advertisements
hadoop 的深入了解

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s