PIG 的 UDF 之深入研究

既然接了这么个活,就好好的研究一下 hadoop 系的东西了。大致的打算是从最近需要的 PIG UDF 开始,后面会重新弄一下 hadoop,另外比较感兴趣的是能做 serving 的 HBase。之后有空再看看 zookeeper。

PIG 方面的文档最近似乎变多了一些,这里主要参考其 API 文档这个 tutorial

环境

首先要下载 PIG 的源文件和二进制文件(见这里),然后如果你需要的话可以开始编译,PIG 自己使用的是 ant。如果想直接使用,可以直接把对应的 jar 文件、lib 之类的 copy 出去。PIG 的 jar 分两个,一个是自带 hadoop 的,一个是单独的。如果我们想写点 UDF 之类的,应该指定其一就行了。传统上会指定一个 PIG_HOME,不过好像新的不要求了。

UDF 的分类

我们的 UDF 继承下面四个类之一(在org.apache.pig 下):

  • EvalFunc,用于求值或者数据聚合
  • FilterFunc 用于过滤,本质上就是 EvalFunc<Boolean>
  • LoadFunc 用于读取特殊的数据类型
  • StoreFunc 用于存储特殊的数据类型

特别的 EvalFunc 在实现了 Algebraic 接口时可以在 reducer/combiner 里面进行 reduce 方面的工作。对于不使用 combiner 的 UDF,可以使用 Accumulator 接口来实现。

EvalFunc 以及 Algebraic、Accumulator 接口

EvalFunc 是一个 generic 类,类型指定的是返回类型,PIG 里面使用的类型一般放在 org.apache.pig.data 下面。EvalFunc 需要实现的是 exec 函数,它返回的就是 generic 指定的类型,且输入是 org.apache.pig.data.Tuple。

一个 Algebraic 接口包含三个接口函数,getInitial、getIntermed 与 getFinal,它们的分别是 mapper、combiner 和 reducer 侧调用,返回的是一个 EvalFunc 的类名,这样我们实际上需要提供额外的三个 EvalFunc 的实现,它们对应的输入也是 Tuple,一般在 getInitial 返回的类对应的 exec 需要处理一个 tuple(可以是 bag 的一部分),而在 getIntermed 和 getFinal 里面需要将这些值组合起来。比如我们如果需要实现 COUNT,我们就需要在 getInitial 时返回一个能计算 bag 或者 map 大小的 EvalFunc(比如多个 mapper 每个 mapper 处理这个 bag 的一部分都会调用这个函数获得当前 bag 的大小),然后无论是 combiner 还是 reducer 都是将这些大小相加并返回和。

一个 Accumulator 的接口包含 accumulate、getValue 和 cleanup 三个函数,这三个函数将在每碰到同一个 key 对应的 value 是进行 accumulate,在该 key 对应 tuple 遍历完后调用 getValue 获得 accumulation 的结果,cleanup 用于初始化这个 accumulator 内部的结构。比如计算 MAX,我们需要保留一个当前最大值,cleanup 的时候将该值设为负无穷大(Java 可以用 null),然后每 accumulate 时更新该值或者 skip,最后 getValue 的时候返回。

EvalFunc 可以用 reporter 对象的 progress() 方法更新进度。重写 getCacheFiles 能够将某些 cache 文件传到 UDF 执行的本地,并且就可以在需要的时候读取。UDF 的构造函数可以在 define 里面调用。使用 -Dudf.import.list 可以将给定的几个 package 里面的类全部 import,这样就不需要写 canonical name 了。可以用

Schema

EvalFunc 可以使用 outputSchema 函数进行 org.apache.pig.impl.logicLayer.schema.Schema 的运算,从这点来说能够帮助 PIG 了解一个运行时多态返回的不同 Schema。outputSchema 函数输入是 input 的 Schema,我们可以用 getField 获得这个 tuple 每个部分的 Schema,并返回一个 Schema,用来表达返回类型。还是用最简单的例子来说明一下,比如我们写了一个求平方的东西,如何能为 Integer、Float 或者 Double 返回需要的类型呢?

import java.io.IOException ;
import org.apache.pig.EvalFunc ;
import org.apache.pig.data.Tuple ;
import org.apache.pig.data.TupleFactory ;
import org.apache.pig.impl.logicalLayer.schema.Schema ;

class Square extends EvalFunc<Tuple> {
  public Tuple exec (Tuple t) throws IOException {
    if (t == null || t.size () != 1)
      return null ;
    try {
      Tuple output = TupleFactory.getInstance ().newTuple (1) ;
      Object o = t.get (0) ;
      if (o instanceof Integer) {
        int l = ((Integer) o).intValue () ;
        Integer r = new Integer (l*l) ;
        output.set (0, r) ;
      } else if (o instanceof Double) {
        double l = ((Double) o).intValue () ;
        Double r = new Double (l*l) ;
        output.set (0, r) ;
      } else if (o instanceof Float) {
        float l = ((Float) o).intValue () ;
        Float r = new Float (l*l) ;
        output.set (0, r) ;
      } else {
        return null ;
      }
      return output ;
    } catch (Exception e) {
      System.err.println (e.getMessage ()) ;
      return null ;
    }
  }

  public Schema outputSchema (Schema input) {
    return new Schema (input) ;
  }
}

Java 似乎不支持直接写个 * 在 exec 里面,我们必须根据每种类型重写其实现,而且 ms 由于包裹过的 Double、Integer 之类也没法使用方法进行计算,最后就很挫… 不知道有啥优美的解决方案不?另外可以使用 getArgToFuncMapping 将对应的 schema 交给不同的实现,这个返回的是一个 List<FuncSpec>。

LoadFunc 与 StoreFunc

需要搞清楚的是 LoadFunc 和 StoreFunc 是建立在 InputFormat/OutputFormat 之上的一层封装,通常由 RecordReader 负责将 record 取出来,LoadFunc 将其转换成为 PIG 的数据类型,如 Tuple、Map 等,而 StoreFunc 恰好相反,将 PIG 类型通过 RecordWriter 写入。

LoadFunc 需要重写 getNext 函数:首先调用 RecordReader 的 nextKeyValue() 检查是否有下一个,然后使用 currentKey/Value 获得需要的内容。最后返回 Tuple;StoreFunc 重写 putNext 函数,根据类型写到 RecordWriter 里面。

如何传递 configuration

如果我们需要通过 XML 或者 -D 参数传递一些东西给 UDF,可以使用 org.apache.pig.impl.util.UDFContext 这个类,这是一个 singleton,我们用 getJobConf 就能拿到当前 job 的 Configuration 对象,这样就能获得传递进来的参数了。

Jython 一个 BT 的功能

如果你只想写一个 python,而不把 python 实现的 UDF 与 PIG 分离,可以利用 python UDF 中的 __main__ ,下面是一个来自以上所说 tutorial 的例子,

#!/usr/bin/jython
from org.apache.pig.scripting import *

@outputSchema("word:chararray")
def helloworld():
    return 'Hello, World'

if __name__ == '__main__':
    P = Pig.compile("""a = load '1.txt' as (a0, a1);
                       b = foreach a generate helloworld();
                       store b into 'myoutput'; """)

    result = P.bind().runSingle();

还能有 JS

比较奇葩的是 PIG 现在也支持 JS 作为脚本写 UDF(不知道能不能用 jQuery/YUI 之类的东西啊…),不过鉴于 python 已经能搞定很多东西了,就不仔细研究这部分了。

——————
I will go down now, and see whether they have done altogether according to the cry of it, which is come to me; and if not, I will know.

Advertisements
PIG 的 UDF 之深入研究

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s