Archive for the ‘java(script)’ Category
hbase 试用
首先要明白一件别的事情,那就是 hadoop 自己的版本号,这是来自这里的说明:
我最近在玩的版本是 1.0.x 系的,所以很多东西都没有(前面玩过一段时间 0.21),不知道什么时候这些分出去的 branch 才会 merge 回来。这从某个角度说明,版本号不是越大越好 -,-b 不过 hadoop 那班开发人员也真够…
嗯 hbase 在 Yahoo! 内部似乎也用的不是很多(肯定没有 hadoop 和 pig 那么普及),但是常常听说这个。只知道提供了类似数据库的功能,据说能做 serving。这里简单玩玩。首先 hbase 的数据存放是按照 column 来组织的(一般是按 row 来的),column 分成若干个 column family,对应一个属性子集,by design 一般将需要同时访问的属性放在一个 column family 里面,这样有助于高效的使用 hbase。hbase 提供了一个基于 iruby 的 shell(咋感觉跟 ipython 那个类似?),这个 iruby 使用 ~/.irbrc 作为配置文件,我们可以在 shell 里面直接操纵 java 的对象(是不是觉得跟 Matlab 某个功能很像?)。
在 shell 里面我们可以执行一些类似 SQL 的语句,如创建 table
create 'test', 'data'
table 名是 test,column family 是 data,
put 'test', 'row1', 'data:1', 'value1'
这里使用了一个 cell,包括 row1 表示的 row 标识,data:1 表示的列和所属的 column family,最后值为 value1。通过 list 可以列出有些什么 table,而
scan 'test'
会把内容全部 dump 出来。
disable 'test'
临时取消该表,而
drop 'test'
删除该表。
我们可以通过 hadoop 的 map/reduce 程序访问存储在 HBase 里面的数据,这需要使用 TableInputFormat,这个类提供了 createdSubmittableJob 用来创建对应的 job。另外 hbase 还提供了 REST 和 thrift 的 API,分别见这里和这里。前者为 HBase 提供了 web service 的接口,这意味着可以通过它与别的 web service 简单的进行交互(数据库一般需要调用对应的 client),而 HBase 发布的程序里面本身包含了一个 REST 的 client,可以直接用 hbase-daemon.sh rest start 开启。更多文档见这里。
尽管 HBase 号称是向 DB 方向的努力,但实际上并没做到那个份上,感觉只是做到了 storage 层面上。freepeter 同学最近的一篇 blog 讨论了一些 DB 系与 map/reduce 系的一些比较,值得后面好好体会下
谬数据库的基础还是看不大懂哈
——————
The sun was risen on the earth when Lot entered into Zoar.
hadoop 的几个例程(二)
join
表格的 join 一般说两有两种,一种是在 mapper 侧做的,限制比较大,但是更有效一些。另一种是在 reducer 侧做的,更加一般一些,对任意数据都适用。
mapper 侧 join 要求数据本身使用相同的 key,产生同样多的 split,使用一样的 partitioner,这样两个目录下对应文件里面含有的是同样的 key set,因此,使用的 join 策略是:通过某种特殊的 InputFormat 将数据 load 进来,它保证能从多个 path 对应文件里面读取某个 key 对应的对应信息,这样就完成了 join。那么如何实现这样一个流程呢?下面是个 driver,从中可以看出大概的意思,
Class<? extends InputFormat> inputFormatClass =
SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = TupleWritable.class;
String op = "inner";
List<String> otherArgs = new ArrayList<String>();
jobConf.setNumMapTasks(num_maps);
jobConf.setNumReduceTasks(num_reduces);
FileOutputFormat.setOutputPath(jobConf,
new Path(otherArgs.remove(otherArgs.size() - 1)));
List<Path> plist = new ArrayList<Path>(otherArgs.size());
for (String s : otherArgs) {
plist.add(new Path(s));
}
jobConf.setInputFormat(CompositeInputFormat.class);
jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
op, inputFormatClass, plist.toArray(new Path[0])));
jobConf.setOutputFormat(outputFormatClass);
jobConf.setOutputKeyClass(outputKeyClass);
jobConf.setOutputValueClass(outputValueClass);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
这里使用 plist.toArray 将 List 转换成为一般的数组。而调用 CompositeInputFormat 的 compose 方法根据输入构造复杂的输入类型(join 以后使用 TupleWritable),这里就是用多个 SequenceFile 拼起来。
public interface ComposableInputFormat<K extends WritableComparable,
V extends Writable>
extends InputFormat<K,V> {
ComposableRecordReader<K,V> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException;
}
public class CompositeInputFormat<K extends WritableComparable>
implements ComposableInputFormat<K,TupleWritable> {
// ...
public static String compose(String op, Class<? extends InputFormat> inf, String... path) {
final String infname = inf.getName();
StringBuffer ret = new StringBuffer(op + '(');
for (String p : path) {
compose(infname, p, ret);
ret.append(',');
}
ret.setCharAt(ret.length() - 1, ')');
return ret.toString();
}
}
这里调用了另外个 compose,达到的效果就是构造了 op(tbl(inf, p1), …) 这样一个字符串,这作为 mapred.join.expr 供其他类进行解析,解析的过程此处略去(见 Parser.java)。
所谓的 input format 指实现了 getSplits 和 getRecordReader 接口方法的类(implements InputFormat),这两件事情一般会委托给另外两个类即 InputSplit 和 RecordReader,前者负责计算需要 split 数据的份数(必须实现 getLength 和 getLocation),后者负责具体如何读取数据(实现 getPos、getProgress、next、createKey、createValue 和 close),很明显,两者可以根据 c、Configuration 里面设定的结构确定自己的形式:
public class CompositeInputSplit implements InputSplit {
private int fill = 0;
private long totsize = 0L;
private InputSplit[] splits;
public CompositeInputSplit() { }
public CompositeInputSplit(int capacity) {
splits = new InputSplit[capacity];
}
public void add(InputSplit s) throws IOException {
if (null == splits) {
throw new IOException("Uninitialized InputSplit");
}
if (fill == splits.length) {
throw new IOException("Too many splits");
}
splits[fill++] = s;
totsize += s.getLength();
}
public long getLength() throws IOException {
return totsize;
}
public String[] getLocations() throws IOException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
if (hints != null && hints.length > 0) {
for (String host : hints) {
hosts.add(host);
}
}
}
return hosts.toArray(new String[hosts.size()]);
}
// ...
}
public abstract class CompositeRecordReader<
K extends WritableComparable, // key type
V extends Writable, // accepts RecordReader<K,V> as children
X extends Writable> // emits Writables of this type
implements Configurable {
private int id;
private Configuration conf;
private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
private WritableComparator cmp;
private Class<? extends WritableComparable> keyclass;
private PriorityQueue<ComposableRecordReader<K,?>> q;
protected final JoinCollector jc;
protected final ComposableRecordReader<K,? extends V>[] kids;
// implement all interfaces, iterating over all kids
}
这样一来,我们就可以使用该 InputFormat 获得已经 join 的东西了,join 的逻辑委托到几个其他的类,实现不同的 join 类型(inner join 还是 outer)。
reducer 侧的 join 本质上是依靠 mapper 输出用来 join 的 key,这个时候需要解决的主要问题是避免一个 key 对应的 record 太多,往往需要将这个 key 对应的所有 value 载入内存,但这样一来就比较 heavy 了。实现这类 join 在 map 侧为了将多个路径下的数据读入,可以使用 MultipleInputs 这个 InputFormat,之后我们(不同)的 mapper 可以将 tag(表示来自哪个输入)和 key bind 在一起(参看 MultipleInputs 的使用方式),这样在 reducer 侧拿到的是按照 key 做主序 tag 做副序的结果,这样我们遍历的时候就能方便先处理一个输入数据下的所有 case,这样做某些 join(inner join)时比较有用。
side data
所谓的 side data 是指所有 job 或者一个 job 的 mapper/reducer 都能访问的数据,一般来说我们可以用下面的几条途径:
- Configuration,适合非常小的,如参数,使用的类啥的,用起来也比较简单,直接 -D 传递即可
- distributed cache,放在 HDFS 上,运行时会放在本地
terasort
这个其实是前面 total order sorting 的改进版实现,其中利用了一个 trie 来加速 sampling 中获得 partition 的部分。有兴趣仔细看看实现吧。不过似乎觉得有点矛盾,据说这个实现当时是最好的 tera sort benchmark,但是似乎记得 google claim 过自己是,仔细看了下日期,hmm… 看来 google 还是比较擅长这种 engineering。看来如果要较劲,这种地方才是 tech 公司需要经常比的吧。
最后贴篇 bs map/reduce 的文章,这是数据库方面的人写的。不过现在 hive 之类的大力发展,应该在减少这方面的 gap 了吧。
—————–
And it came to pass, when they had brought them forth abroad, that he said, Escape for your life; look not behind you, neither stay you in all the plain; escape to the mountain, lest you be consumed.
hadoop 的几个例程(一)
一个奇怪的设计
这是一个同事最近提出来的问题,就是 mapreduce 最新的 API 的 mapper 和 reducer 都能通过 generic 设定输入输出的 key/value 的类型了,为什么我们还要指定 output 的 key/value 类型(默认情况下 hadoop 的 mapper 输出类型和 setOutputKey/ValueClass 一致,如果不一致,还得另外调用 setMapOutputKey/ValueClass)?
我们似乎可以通过 reflection 拿到 generic,但是其实这个是不对的。Java 的 generics 和 C++ 有很大的不同,比如 java.util.ArrayList 这个东西本身是一个 class,虽然是一个 parameterized type,但这仅仅意味着它是 generic,不论是 ArrayList<Integer> 还是 ArrayList<Double> 都是 ArrayList,而 C++ 里面 std::vector<int> 与 std::vector<double> 却是两个完全不相关的类。
public class GenericExample {
private T data ;
public void set (T a) {
data = a ;
}
public T get () {
return data ;
}
public static void main (String[] args) {
GenericExample e = new GenericExample ();
Integer i = new Integer (0) ;
e.set (i) ;
return ;
}
}
从这个例子我们可以看出来,这里的 T 基本等价于 Object,只是一旦我们写成 Integer 这个“实例化”的对象,编译的时候我们就能检查对应的类型是否匹配,比如如果我们试图 set(new Double(.0)) 的时候就会报错。这个并不会在运行时进行检查(type erasure)。我们尽管可以用 reflection 来判断一个类是否含有 type parameter,但是却没有办法在运行时确定类型,
public static void show (Object o) {
Class c ;
if (o instanceof Class)
c = (Class) o ;
else
c = o.getClass () ;
System.out.print (c.getCanonicalName ()) ;
TypeVariable[] tv = c.getTypeParameters () ;
if (tv.length > 0) {
System.out.println (" is a parameterized type, with the following parameters:") ;
for (int i = 0 ; i < tv.length ; ++ i) { Object w = tv[i].getGenericDeclaration () ; System.out.print (w.getClass ().getCanonicalName () + " -> ") ;
System.out.println (tv[i].getName ()) ;
}
} else
System.out.println (" is not a parameterized type") ;
}
其核心就是通过接口 GenericDeclaration(Class 实现了该接口)的 getTypeParameter 获得需要的参数列表。但是正如我们看到这个函数输出的结果
java.lang.String is not a parameterized type java.util.ArrayList is a parameterized type, with the following parameters: java.lang.Class -> E
而 Java 通过 ? extends 这种搞法只是为了产生一个 type checking 的 bound。如果我们绕过 generic 的机制,比如将一个 List<String> 赋值给 List(本身就是一类),然后在此 List 里面加入 Integer,这完全是被允许的。但是事后如果你真的希望将此 List 当做 String 的列表使用的时候,执行到对象是 Integer 的地方自然就会 fail 了。Java 对这类问题(前面这种奇怪的赋值仅仅会给予 unchecked warning)。为什么 hadoop 不能聪明一点呢?因为 mapper/reducer 写的时候定下来的类型,并没有被记住(只知道是个 Writable),当 driver 程序设置一个 job 的时候,说穿了其实是运行时设定的输入输出,因此需要重新指定一次类型。这里有一些相关的讨论。
Sort
我们知道 MapFileOutputFormat 本质上是把结果存放在几个 map 里面,决定某个 key 在哪个文件的是 partitioner 决定的,那么如果我们希望做一个 serving 的过程(把计算结果通过某个服务返回给需要的应用),我们可以参看下面的代码:
Path path = new Path(args[0]);
IntWritable key = new IntWritable(Integer.parseInt(args[1]));
FileSystem fs = path.getFileSystem(getConf());
Reader[] readers = MapFileOutputFormat.getReaders(fs, path, getConf());
Partitioner partitioner =
new HashPartitioner();
Text val = new Text();
Writable entry = MapFileOutputFormat.getEntry(readers, partitioner, key, val);
if (entry == null) {
System.err.println("Key not found: " + key);
return -1;
}
这个地方的 path 是 HDFS 的目录,我们假定 key 是 int,我们需要读取这个目录里面的文件,因此创建了 FileSystem 对象,然后通过 MapFileOutputFormat 对象的 getReaders 方法获得该目录下所有文件的 reader,之后创建了默认的 Partitioner,通过 getEntry 方法获得 val,根据 entry 是否为 null 判断是否能获得结果,如果非空,我们就可以用 val 里面的值了。
需要注意的是这个类型在老 api 下(org.apache.hadoop.mapred)。
使用 hadoop 做排序,我们就可以直接将结果放在 sequence file 里面,并且传递到一个 reducer 中,这种方式是比较笨的,因为只使用了一台机器进行 sort。更聪明的策略是此时的 partitioner 使用 TotalOrderPartitioner。一个区别就是这样我们产生的输出是几个文件,而不是一个。那么如何能在 reducer 侧根据数据的分布进行拆分来做呢?这个想法就是在构造合适的 partitioner 之前通过 sampling 获得对数据分布的估计,hadoop 提供了一些现成的 sampler,其作用在于从指定的文件进行需要的 sampling,然后创建 _partitions 文件供 TotalOrderPartitioner 使用(来自 hadoop: the definitive guide):
conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputFormat(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); conf.setPartitionerClass(TotalOrderPartitioner.class); InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 10000, 10); Path input = FileInputFormat.getInputPaths(conf)[0]; input = input.makeQualified(input.getFileSystem(conf)); Path partitionFile = new Path(input, "_partitions"); TotalOrderPartitioner.setPartitionFile(conf, partitionFile); InputSampler.writePartitionFile(conf, sampler); // Add to DistributedCache URI partitionUri = new URI(partitionFile.toString() + "#_partitions"); DistributedCache.addCacheFile(partitionUri, conf); DistributedCache.createSymlink(conf);
注意这里的 sampler 运行在 client 端,在 map/reduce 之前创建好 _partition 文件。
如果希望做 secondary sort,我们就需要将需要的 key 做成 pair 提供新的 Comparator。这需要利用 hadoop 传递给 reducer 的 sorting 机制:首先有一个作为的 grouping comparator,通过它会把被一个 reducer.reduce 方法处理的数据 group 起来,在传递给 reducer 之前又会使用 sorting comparator 排一次序,一般来说两个 comparator 是一样的,但是可以设置成为不同的,比如对 secondary sort 的要求,grouping comparator 就是对主键进行排序,而 sorting comparator 则对副键进行排序。这两个 comparator 需要 Job 对象通过 setGroupingComparatorClass 和 setSortComparatorClass 设置。
——————
And while he lingered, the men laid hold on his hand, and on the hand of his wife, and on the hand of his two daughters; the LORD being merciful to him: and they brought him forth, and set him without the city.
