PIG UDF 的 python 实现技巧

这里简单介绍一些最近写 python UDF 的心得。相对于 java native code,python 写完了不需要编译还是很简洁的,加上有一些 python 和 java 的 lib 都能很自然的使用,这使得书写相关程序灵活性更高。

多个 py 文件

PIG 使用 register 将多个 py 文件注册的结果是这里面的名字会放在一个 namespace 里面(至少现在 0.10 在 local 上是这样的,但是为啥并行的时候不 work 呢…),所以注意命名不要重复。在 py 文件里面可以 import python 自己的一些 module,这都是 jython 带的,也可以 import java 的类。

schema manipulation

为了实现一些 generic UDF,我们需要熟练的掌握 schema 的变换,这要求对 PIG 的基本类型(见 org.apache.pig.data)有一个比较清楚的了解,同时还需要对 PIG 实现 schema 的结构比较了解(见 org.apache.pig.impl.logicLayer.schema),我们这里不在赘述其细节,大概的意思就是每个 Schema 由若干个 FieldSchema 组成,我们设置一个 schema,等价于设置下面每个 FieldSchema,而 FieldSchema 可以决定 type、alias 等,而每个 FieldSchema 也有自己的 schema(对于 tuple、bag 这类来说)。一种比较常见的操作就是对 bag 处理,将内部的 tuple 处理一下,比如拼成 pair 仍然输出一个 bag。

from java.util import ArrayList
from org.apache.pig.impl.logicalLayer.schema import Schema
from org.apache.pig.data import DataType

@schemaFunction ('pairwise_schema')
def pairwise_schema (input):
    prev_tuple_alias = prop.getProperty ('pairwise.schema.prev.tuple.alias', 'prev')
    next_tuple_alias = prop.getProperty ('pairwise.schema.next.tuple.alias', 'next')
    bag_alias = prop.getProperty ('pairwise.schema.bag.alias', 'pair')
    bt = input.getField (0)
    if bt.type != DataType.BAG:
        raise RuntimeError ('wrong schema')
    tt = bt.schema.getField(0)
    ts = Schema (tt.schema)
    afs = Schema.FieldSchema (prev_tuple_alias, ts)
    bfs = Schema.FieldSchema (next_tuple_alias, ts)
    l = ArrayList ()
    l.add (afs)
    l.add (bfs)
    ts = Schema(l)
    bt = Schema.FieldSchema (bag_alias, DataType.BAG)
    bt.schema = Schema (ts)
    bs = Schema (bt)
    return bs

注意这里使用了 Java 自己的很多东西,直接 import 了需要的类。

传递参数

前面提到过给 UDF 传递参数,当时主要觉得通过 Configuration 就足够 powerful 了,事实上并非如此,在 pig frontend 是无法使用它的,因此我们得换用 Properties,不过需要注意的是 java code 是可以用 this.getClass 然后通过类信息取相关的 Properties(见这个例子),但是用 python 来实现似乎就没那么好办了。比如我们下面的 code 提供了一些基本的对象,可以为需要的情况提供需要的参数

from java.util import Properties
from java.io import FileInputStream
from org.apache.pig.impl.util import UDFContext

context = UDFContext.getUDFContext ()
prop = Properties ()
prop.load (FileInputStream ('udf_utils.properties'))

这样像前面 schema function 里面我们需要使用 prop,而 UDF 的实现部分,我们可以使用 context.getJobConf 获得 configuration。其实很好奇,为啥就不能用一种方式呢,还要搞两套… 另外一种方式是通过写 generator 来生成 UDF,这样“相对静态”,比如 schema 的类型可能这样做更好一点。

具体问题

PIG 里面常见如做序列分析,需要 group by bcookie,这样得到一个用户的所有数据,然后在这个 bag 上比如 order by timestamp 获得按照时间排好的 behavior,之后

  • 我们需要将这个 bag 拆分称为前后两个 event 组成的 pair,这样方便进行 transition 方面的分析
  • 我们还需要将这个 bag 拆分为多个 bag,每一段对应用户的一个 session

等等。这都依赖于一个两个 generic schema function,具体 UDF 的实现却非常容易。

各种吐槽

  • pig 似乎不能用 -files 传递文件
  • jython 似乎连 import re/os 这种基本的事情都做不好
  • 不能 import 含有 udf 的 py
  • 命名空间没有隔绝开

小结

PIG 就是一个充满了各种地雷的东西,各种问题… 不用也罢… 如果一个东西自己都不能 scale 怎么指望别人写出 scalable 的程序…

—————–
For the LORD had fast closed up all the wombs of the house of Abimelech, because of Sarah Abraham’s wife.

Advertisements
PIG UDF 的 python 实现技巧

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s