hadoop 的几个例程(二)

join

表格的 join 一般说两有两种,一种是在 mapper 侧做的,限制比较大,但是更有效一些。另一种是在 reducer 侧做的,更加一般一些,对任意数据都适用。

mapper 侧 join 要求数据本身使用相同的 key,产生同样多的 split,使用一样的 partitioner,这样两个目录下对应文件里面含有的是同样的 key set,因此,使用的 join 策略是:通过某种特殊的 InputFormat 将数据 load 进来,它保证能从多个 path 对应文件里面读取某个 key 对应的对应信息,这样就完成了 join。那么如何实现这样一个流程呢?下面是个 driver,从中可以看出大概的意思,

Class<? extends InputFormat> inputFormatClass =
  SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
  SequenceFileOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = TupleWritable.class;
String op = "inner";
List<String> otherArgs = new ArrayList<String>();
jobConf.setNumMapTasks(num_maps);
jobConf.setNumReduceTasks(num_reduces);

FileOutputFormat.setOutputPath(jobConf,
  new Path(otherArgs.remove(otherArgs.size() - 1)));
List<Path> plist = new ArrayList<Path>(otherArgs.size());
for (String s : otherArgs) {
  plist.add(new Path(s));
}

jobConf.setInputFormat(CompositeInputFormat.class);
jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
      op, inputFormatClass, plist.toArray(new Path[0])));
jobConf.setOutputFormat(outputFormatClass);

jobConf.setOutputKeyClass(outputKeyClass);
jobConf.setOutputValueClass(outputValueClass);

Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);

这里使用 plist.toArray 将 List 转换成为一般的数组。而调用 CompositeInputFormat 的 compose 方法根据输入构造复杂的输入类型(join 以后使用 TupleWritable),这里就是用多个 SequenceFile 拼起来。

public interface ComposableInputFormat<K extends WritableComparable,
                                       V extends Writable>
    extends InputFormat<K,V> {
  ComposableRecordReader<K,V> getRecordReader(InputSplit split,
      JobConf job, Reporter reporter) throws IOException;
}

public class CompositeInputFormat<K extends WritableComparable>
      implements ComposableInputFormat<K,TupleWritable> {
  // ...
  public static String compose(String op, Class<? extends InputFormat> inf, String... path) {
    final String infname = inf.getName();
    StringBuffer ret = new StringBuffer(op + '(');
    for (String p : path) {
      compose(infname, p, ret);
      ret.append(',');
    }
    ret.setCharAt(ret.length() - 1, ')');
    return ret.toString();
  }
}

这里调用了另外个 compose,达到的效果就是构造了 op(tbl(inf, p1), …) 这样一个字符串,这作为 mapred.join.expr 供其他类进行解析,解析的过程此处略去(见 Parser.java)。

所谓的 input format 指实现了 getSplits 和 getRecordReader 接口方法的类(implements InputFormat),这两件事情一般会委托给另外两个类即 InputSplit 和 RecordReader,前者负责计算需要 split 数据的份数(必须实现 getLength 和 getLocation),后者负责具体如何读取数据(实现 getPos、getProgress、next、createKey、createValue 和 close),很明显,两者可以根据 c、Configuration 里面设定的结构确定自己的形式:

public class CompositeInputSplit implements InputSplit {

  private int fill = 0;
  private long totsize = 0L;
  private InputSplit[] splits;

  public CompositeInputSplit() { }

  public CompositeInputSplit(int capacity) {
    splits = new InputSplit[capacity];
  }

  public void add(InputSplit s) throws IOException {
    if (null == splits) {
      throw new IOException("Uninitialized InputSplit");
    }
    if (fill == splits.length) {
      throw new IOException("Too many splits");
    }
    splits[fill++] = s;
    totsize += s.getLength();
  }

  public long getLength() throws IOException {
    return totsize;
  }

  public String[] getLocations() throws IOException {
    HashSet<String> hosts = new HashSet<String>();
    for (InputSplit s : splits) {
      String[] hints = s.getLocations();
      if (hints != null && hints.length > 0) {
        for (String host : hints) {
          hosts.add(host);
        }
      }
    }
    return hosts.toArray(new String[hosts.size()]);
  }

  // ...
}

public abstract class CompositeRecordReader<
    K extends WritableComparable, // key type
    V extends Writable,           // accepts RecordReader<K,V> as children
    X extends Writable>           // emits Writables of this type
    implements Configurable {
  private int id;
  private Configuration conf;
  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();

  private WritableComparator cmp;
  private Class<? extends WritableComparable> keyclass;
  private PriorityQueue<ComposableRecordReader<K,?>> q;

  protected final JoinCollector jc;
  protected final ComposableRecordReader<K,? extends V>[] kids;

  // implement all interfaces, iterating over all kids
}

这样一来,我们就可以使用该 InputFormat 获得已经 join 的东西了,join 的逻辑委托到几个其他的类,实现不同的 join 类型(inner join 还是 outer)。

reducer 侧的 join 本质上是依靠 mapper 输出用来 join 的 key,这个时候需要解决的主要问题是避免一个 key 对应的 record 太多,往往需要将这个 key 对应的所有 value 载入内存,但这样一来就比较 heavy 了。实现这类 join 在 map 侧为了将多个路径下的数据读入,可以使用 MultipleInputs 这个 InputFormat,之后我们(不同)的 mapper 可以将 tag(表示来自哪个输入)和 key bind 在一起(参看 MultipleInputs 的使用方式),这样在 reducer 侧拿到的是按照 key 做主序 tag 做副序的结果,这样我们遍历的时候就能方便先处理一个输入数据下的所有 case,这样做某些 join(inner join)时比较有用。

side data

所谓的 side data 是指所有 job 或者一个 job 的 mapper/reducer 都能访问的数据,一般来说我们可以用下面的几条途径:

  • Configuration,适合非常小的,如参数,使用的类啥的,用起来也比较简单,直接 -D 传递即可
  • distributed cache,放在 HDFS 上,运行时会放在本地

terasort

这个其实是前面 total order sorting 的改进版实现,其中利用了一个 trie 来加速 sampling 中获得 partition 的部分。有兴趣仔细看看实现吧。不过似乎觉得有点矛盾,据说这个实现当时是最好的 tera sort benchmark,但是似乎记得 google claim 过自己是,仔细看了下日期,hmm… 看来 google 还是比较擅长这种 engineering。看来如果要较劲,这种地方才是 tech 公司需要经常比的吧。

最后贴篇 bs map/reduce 的文章,这是数据库方面的人写的。不过现在 hive 之类的大力发展,应该在减少这方面的 gap 了吧。

—————–
And it came to pass, when they had brought them forth abroad, that he said, Escape for your life; look not behind you, neither stay you in all the plain; escape to the mountain, lest you be consumed.

Advertisements
hadoop 的几个例程(二)

一个有关“hadoop 的几个例程(二)”的想法

  1. 推荐这篇文章 MapReduce and parallel DBMSs: friends or foes? (也是dewitt & stonebraker他们写的). 哪个major step backwards我觉得写的稍微偏激了一点。 Map Reduce更多的是一个data processing系统而不是一个query系统。 应该说MapReduce本身不是一个major step backwards, 但是如果把MapReduce用来作为数据管理系统那就是major step backwards了。 事实上google自己也开发了真正的“数据库”系统: Dremel & MegaStore

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s