demonstrate 的 blog

daily blog

Archive for the ‘linux related’ Category

hbase 试用

leave a comment »

首先要明白一件别的事情,那就是 hadoop 自己的版本号,这是来自这里的说明:

hadoop 版本

我最近在玩的版本是 1.0.x 系的,所以很多东西都没有(前面玩过一段时间 0.21),不知道什么时候这些分出去的 branch 才会 merge 回来。这从某个角度说明,版本号不是越大越好 -,-b 不过 hadoop 那班开发人员也真够…

hbase 在 Yahoo! 内部似乎也用的不是很多(肯定没有 hadoop 和 pig 那么普及),但是常常听说这个。只知道提供了类似数据库的功能,据说能做 serving。这里简单玩玩。首先 hbase 的数据存放是按照 column 来组织的(一般是按 row 来的),column 分成若干个 column family,对应一个属性子集,by design 一般将需要同时访问的属性放在一个 column family 里面,这样有助于高效的使用 hbase。hbase 提供了一个基于 iruby 的 shell(咋感觉跟 ipython 那个类似?),这个 iruby 使用 ~/.irbrc 作为配置文件,我们可以在 shell 里面直接操纵 java 的对象(是不是觉得跟 Matlab 某个功能很像?)。

在 shell 里面我们可以执行一些类似 SQL 的语句,如创建 table

create 'test', 'data'

table 名是 test,column family 是 data,

put 'test', 'row1', 'data:1', 'value1'

这里使用了一个 cell,包括 row1 表示的 row 标识,data:1 表示的列和所属的 column family,最后值为 value1。通过 list 可以列出有些什么 table,而

scan 'test'

会把内容全部 dump 出来。

disable 'test'

临时取消该表,而

drop 'test'

删除该表。

我们可以通过 hadoop 的 map/reduce 程序访问存储在 HBase 里面的数据,这需要使用 TableInputFormat,这个类提供了 createdSubmittableJob 用来创建对应的 job。另外 hbase 还提供了 REST 和 thrift 的 API,分别见这里这里。前者为 HBase 提供了 web service 的接口,这意味着可以通过它与别的 web service 简单的进行交互(数据库一般需要调用对应的 client),而 HBase 发布的程序里面本身包含了一个 REST 的 client,可以直接用 hbase-daemon.sh rest start 开启。更多文档见这里

尽管 HBase 号称是向 DB 方向的努力,但实际上并没做到那个份上,感觉只是做到了 storage 层面上。freepeter 同学最近的一篇 blog 讨论了一些 DB 系与 map/reduce 系的一些比较,值得后面好好体会下 :-) 谬数据库的基础还是看不大懂哈

——————
The sun was risen on the earth when Lot entered into Zoar.

Written by zt

2012/05/20 at 11:49 PM

lex 与 yacc 学习笔记(二)

leave a comment »

yacc 语法

前面的例子我们能看出和 lex 类似,我们首先有个 prologue 段,用来引入需要的头文件,定义需要的东西,这会原封不动的放在生成的 .c 文件中,然后在 %} 结束后是一段定义 token 的区域,这部分还可以表达结合律(使用 %left、%right 表示结合的优先顺序),按照书写的前后表示优先(后面的优先),之后 %% 之间的区域是表示 CFG 的区域,它是一些 rule,每条 rule 包括一个 head(: 之前),以及 body(使用 | 分割的,称为 product)。最后用 %% 分割的部分和 lex 一样是放一些程序,会插入到最后。

yacc 的 prologue 比 lex 复杂很多,这是因为 yacc 有时为了存放计算结果会使用 %union 声明一个 union 供后面的程序使用。这样就引入了各个 prologue 表意上的区别,一般分为 %code top、%code requires,%code provides 与 %code 分别表示放在最前面的部分(include 系统头文件、定义宏)、依赖的东西(include 自定义的头文件,可能 union 中会使用,定义的结构,定义 YYSTYPE 或者 YYLTYPE 会使用到东西,一切应该进入 implementation 的东西)、提供的东西(函数前向声明,一切还应该进入 header 的东西)和其他一些东西。

匹配规则包括 symbol(用引号表示的)、terminal 表示 token 或者 nonterminal,我们除了可以用 %token <> 表示一些 token 与 union 中哪些部分对应,%type <> 表示一些 nonterminal 的 type,YYSTYPE 表示的是 semantic action 对应的类型,比较常见的就是类似前面例子中的 double,同时也可以用自定义的一些 union(不止一种 type),这时我们需要 #define 对应的 YYSTYPE。为了在 semantic action 里面表示匹配上部分的值,除了利用 $n 这种靠位置来的标记,还可以在匹配上的部分后面用中括号跟一个名字,这样就能用 $name 表示。如果使用 $0 或者负数,则表示在 stack 里面前面的某些值(比如某个空的 expr 可以用 $0 表示前面某个 rule 匹配到的值)。可以在 $n 之间插入 <> 表示 union 中某个 type。

某些情况下我们希望在匹配的 pattern 当中插入一些 semantic action,可以直接在打断的 patten 之间插入 {} 代码段,为了给某些 mid-rule 的 action 提供 destructor,我们只能借助 yacc 提供的 nonterminal 的 destructor,这是通过 %type 声明后,再接上 %destructor {…} nonterminal-name 添加析构。这样一旦通过这部分 pattern 成功构造了某些结构,而在随后的 pattern 里面 parsing fail 了,我们就能自动的调用析构。另外一种方式是使用 nonterminal 自己的名字来 refer,比如 exp 就能用 $exp 来获得对应的值,但在有歧义的时候不能这么用。

为了在 parsing 的同时维护当前 parsing 的位置,我们需要 YYLTYPE 这个结构,我们可以自定义,但是需要提供 first_line、first_column、last_line 和 last_column 等成员,并且在 parsing 的过程中更新这些值(对象名为 yylloc)。对应的结构我们可以用 @ 开头和前面 $ 类似的一些变量来记录对应的值,如 @$ 就合 $$ 类似,后者表示当前 production 的值,那么前者就是解析这个 production 对应的 yylloc,为此,我们可以用 @$.first_line 等类似的值更新位置信息,这样一旦出错就能显示哪些地方出错了。作为一个例子,下面是对原计算器的改进版本

%{
#include "ltcalc-parser.h"
%}

external YYLTYPE yylloc ;
%%

[0-9]*(\.[0-9]*)?(e[+-]?[0-9]+)? {
                                    yylloc.first_column = yylloc.last_column ;
                                    yylloc.last_column += yyleng ;
                                    return NUM ;
                                 }
[ \t]                            {
                                    yylloc.first_column = yylloc.last_column ;
                                    ++ yylloc.last_column ;
                                 }
.                                {
                                    yylloc.first_column = yylloc.last_column ;
                                    ++ yylloc.last_column ;
                                    return *yytext ;
                                 }
\n                               {
                                    ++ yylloc.last_line ;
                                    ++ yylloc.first_line ;
                                    yylloc.first_column = 0 ;
                                    yylloc.last_column = 0 ;
                                    return '\n' ;
                                 }
<>                          {return 0;}

%%

对应的 parser 可以这样写

%{
#define YYSTYPE double
#include
#include
#include

extern char* yytext ;
int yylex () ;
void yyerror (const char*) ;
%}

%token NUM
%left '-' '+'
%left '*' '/'
%left NEG
%right '^'

%%

input:

  | input line
  ;

line:
    '\n'
  | exp '\n'    {printf ("\t=%.10g\n", $1) ;}
  ;

exp:
    NUM                {$$ = atof (yytext) ;}
  | exp '+' exp        {$$ = $1 + $3 ;}
  | exp '-' exp        {$$ = $1 - $3 ;}
  | exp '*' exp        {$$ = $1 * $3 ;}
  | exp '/' exp        {
                         if ($3)
                           $$ = $1 / $3 ;
                         else {
                           $$ = $1 ;
                           fprintf (stderr, "%d.%d-%d.%d: division by zero",
                                    @3.first_line, @3.first_column,
                                    @3.last_line, @3.last_column) ;
                         }
                       }
  | exp '^' exp        {$$ = pow ($1, $3) ;}
  | '-' exp %prec NEG  {$$ = -$1 ;}
  | '(' exp ')'        {$$ = $2 ;}
  ;

%%

void
yyerror (const char* s) {
  fprintf (stderr, "%s\n", s) ;
}

还有一种办法传递这些信息是在 lexer 里面通过 %option bison-locations 传递 yylloc_param(这是个指针),然后类似的更新。觉得似乎还不如这样快捷 -,-b

bison 可以用 % 加上一些别的东西,比如 %require 可以要求使用的 bison 版本满足某个条件,前面也看见了 %left、%right 表示的结合律结合的顺序,也可以用 %nonassoc 表示不满足结合律,%type 为 nonterminal 指明类型,%start 表示 start symbol,通过 %locations 可以打开对 yylloc 的支持(默认如果发现使用了相关的变量也会打开)。

yacc 的 C 语言接口

yyparse 是主要的函数,返回 0 表示 parsing 成功。通常我们使用 YYACCEPT 和 YYABORT 这两个宏。如果希望给 yyparse 增加参数可以使用 %parse-param 增加。通过 yyerror 产生出错信息

其他

bison 提供了 C++ 的接口,这时 parser 被封装成一个类 parser,YYSTYPE 变成了 yy::parser::semantic_type,location tracking 方面变得简单,直接调用一些成员函数即可,其中 parse() 成员函数用来执行 parse。后面我们会比较一下 yacc 和 spirit::qi 实现的差异和优缺点。

——————-
And Lot said to them, Oh, not so, my LORD:

Written by zt

2012/05/17 at 1:17 PM

Posted in c/c++, compilers

Tagged with , , ,

hadoop 的几个例程(二)

with 3 comments

join

表格的 join 一般说两有两种,一种是在 mapper 侧做的,限制比较大,但是更有效一些。另一种是在 reducer 侧做的,更加一般一些,对任意数据都适用。

mapper 侧 join 要求数据本身使用相同的 key,产生同样多的 split,使用一样的 partitioner,这样两个目录下对应文件里面含有的是同样的 key set,因此,使用的 join 策略是:通过某种特殊的 InputFormat 将数据 load 进来,它保证能从多个 path 对应文件里面读取某个 key 对应的对应信息,这样就完成了 join。那么如何实现这样一个流程呢?下面是个 driver,从中可以看出大概的意思,

Class<? extends InputFormat> inputFormatClass =
  SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
  SequenceFileOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = TupleWritable.class;
String op = "inner";
List<String> otherArgs = new ArrayList<String>();
jobConf.setNumMapTasks(num_maps);
jobConf.setNumReduceTasks(num_reduces);

FileOutputFormat.setOutputPath(jobConf,
  new Path(otherArgs.remove(otherArgs.size() - 1)));
List<Path> plist = new ArrayList<Path>(otherArgs.size());
for (String s : otherArgs) {
  plist.add(new Path(s));
}

jobConf.setInputFormat(CompositeInputFormat.class);
jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
      op, inputFormatClass, plist.toArray(new Path[0])));
jobConf.setOutputFormat(outputFormatClass);

jobConf.setOutputKeyClass(outputKeyClass);
jobConf.setOutputValueClass(outputValueClass);

Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);

这里使用 plist.toArray 将 List 转换成为一般的数组。而调用 CompositeInputFormat 的 compose 方法根据输入构造复杂的输入类型(join 以后使用 TupleWritable),这里就是用多个 SequenceFile 拼起来。

public interface ComposableInputFormat<K extends WritableComparable,
                                       V extends Writable>
    extends InputFormat<K,V> {
  ComposableRecordReader<K,V> getRecordReader(InputSplit split,
      JobConf job, Reporter reporter) throws IOException;
}

public class CompositeInputFormat<K extends WritableComparable>
      implements ComposableInputFormat<K,TupleWritable> {
  // ...
  public static String compose(String op, Class<? extends InputFormat> inf, String... path) {
    final String infname = inf.getName();
    StringBuffer ret = new StringBuffer(op + '(');
    for (String p : path) {
      compose(infname, p, ret);
      ret.append(',');
    }
    ret.setCharAt(ret.length() - 1, ')');
    return ret.toString();
  }
}

这里调用了另外个 compose,达到的效果就是构造了 op(tbl(inf, p1), …) 这样一个字符串,这作为 mapred.join.expr 供其他类进行解析,解析的过程此处略去(见 Parser.java)。

所谓的 input format 指实现了 getSplits 和 getRecordReader 接口方法的类(implements InputFormat),这两件事情一般会委托给另外两个类即 InputSplit 和 RecordReader,前者负责计算需要 split 数据的份数(必须实现 getLength 和 getLocation),后者负责具体如何读取数据(实现 getPos、getProgress、next、createKey、createValue 和 close),很明显,两者可以根据 c、Configuration 里面设定的结构确定自己的形式:

public class CompositeInputSplit implements InputSplit {

  private int fill = 0;
  private long totsize = 0L;
  private InputSplit[] splits;

  public CompositeInputSplit() { }

  public CompositeInputSplit(int capacity) {
    splits = new InputSplit[capacity];
  }

  public void add(InputSplit s) throws IOException {
    if (null == splits) {
      throw new IOException("Uninitialized InputSplit");
    }
    if (fill == splits.length) {
      throw new IOException("Too many splits");
    }
    splits[fill++] = s;
    totsize += s.getLength();
  }

  public long getLength() throws IOException {
    return totsize;
  }

  public String[] getLocations() throws IOException {
    HashSet<String> hosts = new HashSet<String>();
    for (InputSplit s : splits) {
      String[] hints = s.getLocations();
      if (hints != null && hints.length > 0) {
        for (String host : hints) {
          hosts.add(host);
        }
      }
    }
    return hosts.toArray(new String[hosts.size()]);
  }

  // ...
}

public abstract class CompositeRecordReader<
    K extends WritableComparable, // key type
    V extends Writable,           // accepts RecordReader<K,V> as children
    X extends Writable>           // emits Writables of this type
    implements Configurable {
  private int id;
  private Configuration conf;
  private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();

  private WritableComparator cmp;
  private Class<? extends WritableComparable> keyclass;
  private PriorityQueue<ComposableRecordReader<K,?>> q;

  protected final JoinCollector jc;
  protected final ComposableRecordReader<K,? extends V>[] kids;

  // implement all interfaces, iterating over all kids
}

这样一来,我们就可以使用该 InputFormat 获得已经 join 的东西了,join 的逻辑委托到几个其他的类,实现不同的 join 类型(inner join 还是 outer)。

reducer 侧的 join 本质上是依靠 mapper 输出用来 join 的 key,这个时候需要解决的主要问题是避免一个 key 对应的 record 太多,往往需要将这个 key 对应的所有 value 载入内存,但这样一来就比较 heavy 了。实现这类 join 在 map 侧为了将多个路径下的数据读入,可以使用 MultipleInputs 这个 InputFormat,之后我们(不同)的 mapper 可以将 tag(表示来自哪个输入)和 key bind 在一起(参看 MultipleInputs 的使用方式),这样在 reducer 侧拿到的是按照 key 做主序 tag 做副序的结果,这样我们遍历的时候就能方便先处理一个输入数据下的所有 case,这样做某些 join(inner join)时比较有用。

side data

所谓的 side data 是指所有 job 或者一个 job 的 mapper/reducer 都能访问的数据,一般来说我们可以用下面的几条途径:

  • Configuration,适合非常小的,如参数,使用的类啥的,用起来也比较简单,直接 -D 传递即可
  • distributed cache,放在 HDFS 上,运行时会放在本地

terasort

这个其实是前面 total order sorting 的改进版实现,其中利用了一个 trie 来加速 sampling 中获得 partition 的部分。有兴趣仔细看看实现吧。不过似乎觉得有点矛盾,据说这个实现当时是最好的 tera sort benchmark,但是似乎记得 google claim 过自己是,仔细看了下日期,hmm… 看来 google 还是比较擅长这种 engineering。看来如果要较劲,这种地方才是 tech 公司需要经常比的吧。

最后贴篇 bs map/reduce 的文章,这是数据库方面的人写的。不过现在 hive 之类的大力发展,应该在减少这方面的 gap 了吧。

—————–
And it came to pass, when they had brought them forth abroad, that he said, Escape for your life; look not behind you, neither stay you in all the plain; escape to the mountain, lest you be consumed.

Written by zt

2012/05/16 at 6:02 PM

Follow

Get every new post delivered to your Inbox.