初探 protobuf

protocol buffer 本质上是一个用来做 serialization/deserialization 的东西,这点上跟 avro、thrift 并无区别(见更早的 blog),它们最终的应用方向是 RPC,wiki 上有一坨做这类事情的工具,但是应用的目的并不一样。现在 protobuf 版本是 2.5。使用 protobuf 的基本过程如下:

  • 设计交互的消息,这往往使用一个 .proto 文件来描述
  • 使用 protoc 将这个消息描述文件编译成对应语言使用的库
  • 在应用程序里面使用生成的库创建对应的对象,并可以使用其中的函数将对象转换到需要的形式(如写入 stream)

protobuf 只是对数据本身的转换,并不带有格式信息,这意味着通信的双方必须都拥有同一个 .proto 产生的库,这样才能从数据映射到对应语言的对象。某些动态语言可能更倾向于将 schema 与数据一起传输,这样可以动态的构造对象,感觉各有利弊。

下面拿个简单的例子来看 protobuf 怎么用到一个通讯的问题里面。如果我们有一个系统将一则消息发送给另外一个系统,消息包括一个时间戳,一组 URL 和一个可选的 cookie,我们可以如下定义

message Work {
  required int64 timestamp = 1 ;
  repeated string urls = 2 ;
  optional string cookie = 3 ;
}

这里我们可以看到一些基本的要素,

  • 类似 C 的 struct 的定义,每个 field 需要使用一个编号,
  • 类型修饰可以为 required、repeated 和 optional,一般为了兼容性,required 一定要慎用(第一版可以用,后面的只能加 optional 了)。
  • 除了类似 C 的这些类型以外,我们可以定义 enum 或者嵌套的别的 message 形成更为复杂的结构。
  • 这里并没有 set/map 这类类型。
  • 可以通过 import 倒入其他的 proto 文件,有两类 import,通过 import public 导入的在本文件被 import 的时候也会被 import,否则就不会。
  • 有时候需要用 nested message,这时生成的类名字往往是通过内外拼接起来的。
  • message 不支持继承,但支持 extension,这时语法为 extension Work 然后加入一些自己定义的域,也就是说另外的人可以有自己不同的 Work,但是都叫 Work…
  • 可以定义 package,对应各个语言不同的

下面看看每个语言下面对应生成的是什么样的东西。

C++

通过 protoc 编译获得了两个文件,work.pb.h 和 work.pb.cc,容易发现一些有意思的东西:

  • 一般将 .cc 编译后获得一个 .o/.so/.a 供其他程序链接
  • package 对应 namespace,message 对应类,extension 是通过“黑科技”实现的
  • message 都是 public 继承 google::protobuf::Message,有用的方法主要是 serialization/deserialization 部分,包括 IsInitialized、Clear、ParseFrom…/SerializeTo…,似乎 IO 部分都是 google 自己的 stream 都没看见 std 里面的… 不过好歹有 string、array 类型的
  • 一般的域都有 has_xxx 这个 bool 操作,xxx 这个 getter 和 set_xxx 这个 setter,repeated 类型的一般有 add_xxx 返回一个指针供修改内容(message 自己有 ownership),其实难道不应该返回引用?

个么我们来写个最简单的程序

#include "work.pb.h"
#include <iostream>
#include <fstream>

int
main (int argc, char* argv[]) {
  demo::Work work ;
  work.set_timestamp (12344L) ;
  work.add_urls ("www.yahoo.com") ;
  work.add_urls ("www.google.com") ;

  std::ofstream os (argv[1]) ;
  if (work.SerializeToOstream (&os))
    std::cout << "succeeded in serialization\n" ;
  else
    std::cout << "failed in serialization\n" ;

  return 0 ;
}

如此写入后可以如此读出

#include "work.pb.h"
#include <iostream>
#include <fstream>

int
main (int argc, char* argv[]) {
  demo::Work work ;
  std::ifstream is (argv[1]) ;
  if (work.ParseFromIstream (&is))
    std::cout << "successful in deserialization\n" ;
  else
    std::cout << "failed in serialization\n" ;

  if (work.IsInitialized ()) {
    std::cout << "timestamp: " << work.timestamp ()
              << "\n" ;
    for (const std::string& url : work.urls ()) {
      std::cout << "url: " << url << "\n" ;
    }
  }

  return 0 ;
}

如果我们希望通过网络传输的话,不妨考虑一下 boost.asio,这里从 boost.asio 的 tutorial 里面借两个例子改一下,我们的 server 随机产生一个 demo::Worker,发送到 client,client 打印对应的信息。

// server side
try {
  boost::asio::io_service io_service;
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 52027));

  while (true) {
    tcp::socket socket(io_service);
    acceptor.accept(socket);

    std::string s ;
    demo::Work work = gen () ;
    work.SerializeToString (&s) ;
    demo::show (work) ;

    boost::system::error_code ignored_error;
    boost::asio::write(socket, boost::asio::buffer (s), ignored_error);
  }
} catch (std::exception& e) {
  std::cerr << e.what() << std::endl;
}

// client side
try {
  boost::asio::io_service io_service ;
  ip::tcp::endpoint ep (ip::address::from_string (argv[1]), 52027) ;
  ip::tcp::socket socket (io_service) ;
  socket.connect (ep) ;

  while (true) {
    boost::system::error_code error ;
    size_t len = socket.read_some(boost::asio::buffer (arr), error) ;

    if (error == boost::asio::error::eof)
      break; // Connection closed cleanly by peer.
    else if (error)
      throw boost::system::system_error(error); // Some other error.

    demo::Work work ;
    if (work.ParseFromArray (&arr[0], len))
      demo::show (work) ;
    else
      std::cout << "parsing failed\n" ;
  }
} catch (std::exception& e) {
  std::cerr << e.what() << std::endl;
}

看来通过 boost.asio 实现一个 RPC 看起来不会那么困难了,剩下的就是做成一个 framework 方便使用不同的协议调用。

python

python 的实现接近于一种“声明式”的方式,对应于一个 work_pb2 模块,打开可以看见是通过 python 的 meta programming 生成的,其中有一段大概定义的是 serialization 之后的格式。对应的 Work 类直接用反射将描述映射成为这个类的行为。如果希望前面的 server 能跟 python 交互,我们只需要

from work_pb2 import Work
import socket

if __name__ == '__main__':
    s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
    s.connect (('127.0.0.1', 52027))
    data = s.recv (128)
    w = Work.FromString (data)
    print str (w)

——————
And thou saidst, I will surely do thee good, and make thy seed as the sand of the sea, which cannot be numbered for multitude.

Advertisements
初探 protobuf

Spark、MLlib、Spark Streaming 与 GraphX

我们知道 hadoop 早期的构架分为

  • HDFS,master 是 name node 记录文件系统的 meta data,slave 是 data node 存储文件的实际内容
  • 运算部分局限在 map/reduce 两类任务上,通过 JobTracker 作为 master 和 TaskScheduler 作为 slave 来进行任务的提交分发和执行

这个设计下导致数据访问频繁时 name node 可能出问题,运算提交频繁时 job tracker 可能出问题,如此一来导致了后面通过 zookeeper 对这两部分的改进,这样能够提高整个系统的 availability。在 2.x 里面将资源管理单独分离出来,这就是 YARN,YARN 上面的程序需要提供一个 application master 用来索取计算资源,并将任务部署到这些资源上执行。如此一来 hadoop 试图成为一个 general purpose 的平台,各种不同的分布式任务可以部署在同一个 cluster 上。早期衍生了一些通过 map/reduce 实现的类似 ETL 的工具,如 PIG 和 Hive,也有人利用这个框架实现了一些低效的 machine learning 算法(因为 map/reduce 每一个 iteration 会导致 IO),还有很多很多 apache 相关的项目不断的扩展 map/reduce 范式下的应用范围。

spark 试图采用另一种策略,所谓的 Resilient distributed dataset (RDD)这层抽象提供了类似 map/reduce 这类操作,但是归结到另外的 naming,transformation 和 action 上,对 RDD 的操作序列本质上和 functional programming 里面的概念完全一样,因此操作本身可以被“记录”,从而一旦出现 failure,可以在其他的机器上重复同样的操作获得结果。从这点上,可以说继承了 map/reduce 的优点并发扬光大。但是更深层的含义是,数据并不一定需要在磁盘上 replicate 保证可重复性,完全可以通过内存实现类似的需求,后来 tachyon 似乎也试图通过 in-memory filesystem 来加速 mapreduce。

借助于这个 framework,很自然的问题是能不能形成一个类似于 hadoop 之上的应用环境,兴许能提供更多的功能?Spark streaming 算是在这个上面的一个突破。在 YARN 上可以部署 twitter storm 进行 stream processing,spark streaming 算是提供了另外一种选择,不过为了利用 spark RDD 的概念,spark streaming 提供的 DStream 其实是对 stream processing 的一个“另类解读”,将数据流切成一个时间段的部分,每个部分当做一个 RDD,只要时间段足够短,就可以获得接近“实时”的 stream processing 的假象,storm 本身是 pull mode,能够获得 100ms 级别的 latency,而 spark streaming 据说可以达到 0.5-2s 级别。很多应用看起来是没啥问题的。

类似 mahout,spark 之上有 MLLib 这层提供了基本的优化 primitives,现在显得很单薄,功能无外于 SVM/logistic regression、linear regression(带 L^1L^2 的 regularizer)、k-means(估计很容易改成 EM 类型的模型)、CF 里面常见的 ALS。比较有意思的是 MLlib 使用了 jblas,这使得 scala/java API 下矩阵操作变得容易了很多,另外 python 的 API 是 native 的,不像 PIG/Hive 之类的是使用 Jython 来做,numpy/scipy 都可以用到这里面,估计的确比较适合一些模型参数能在单机存储的模型。

GraphX 的前身是 bagel,学习的是 google 的 pregel,设计思想和前面的 BGL 系列的原理类似:顶点通过一个 property map 来表示,对应的边可以看成是 vertex pair 为 key 的 property map。但是这两个概念在分布式系统里面都是很容易扩展的,很明显,通过 spark 的 RDD 将这些信息封装之后,我们能够比较方便的实现图的不少相关算法,某些图的属性比如顶点的入度可以看成是一个 graph 的 property map 通过某些变换后产生的新的 map,GraphX 提供了这些基本的操作。更有意思的是提供的关于 pregel 类型的 API,这是所谓的 bulk synchronous message passing 的模型,每个 iteration 每个顶点接受信息后沿每条边发出消息,下一个 iteration 这些消息聚合后同时发送到目的地。有了这种模型似乎做 page rank 啊或者 LBP 会方便很多。

最后说一下 shark,与 Hive 的初衷比较接近,但是因为 spark 能够将数据 cache 到内存,执行的效率能比 MR 上的 Hive 更快。当然因为 shark 的 API 能在 scala 里面直接用,那么 MLlib、GraphX 的整合都是非常容易的,而像 Hive 抽取到了数据和训练模型必须分开。

这些了解还比较肤浅,后面继续追一些项目实验着玩玩。

——————
And Jacob said, O God of my father Abraham, and God of my father Isaac, the LORD which saidst unto me, Return unto thy country, and to thy kindred, and I will deal well with thee:

Spark、MLlib、Spark Streaming 与 GraphX

Zookeeper 初步

前面谈到一些分布式系统的设计的时候提到了 zookeeper 这个东西,说实在的从来没有认真看懂过,这次准备稍微了解一些基本原理。zookeeper 并不像 hazelcast 或者 chubby 一样提供了一个 lock 的实现,它所提供的是另外一些元件,利用这些东西你能实现一些基本的模型。从“功能”上来看,zookeeper 是所谓的 coordinator(协调员),可以用来存放分布式系统的一些 meta data。 zookeeper 这个看似复杂的东西实际上只是提供了一个类似文件系统的东西,上面操作的“文件”叫 znode,你可以创建 znode、删除 znode,对其写入一些东西,基于这些基本的概念就能实现一些更为复杂的东西。其中一个比较重要的类型是 ephemeral nodes,这种 node 只在 client 的 session 连接时存在,一旦连接中断就会消失。当然因为 zookeeper 本身也是一个 cluster,因此为了维护这些 znode 的关系,他们需要一定程度上保持同步,从这个角度上来说它 hide 了这方面的 complexity,因此能简化其他的分布式系统的设计(也许后面我们可以看看 hadoop 几个项目上对 zookeeper 的用法)。当然这部分属于需要深入了解的部分,我们后面详细看看,zookeeper 使用的协议 zab 和 Paxos 算法的关系。 写 zookeeper 的应用程序一般都需要实现 Watcher,这相当于是一个 publisher/subscriber model 里面的 subscriber 角色,通常这个类通过一个 Object 的锁(调用 wait 之后将线程交出)等待其他的事件发生,这个暂停的线程依靠 Watcher 提供的 process 方法决定什么时候被唤醒(通过该 Object 的 notify 方法)。这个 process 方法可以在任何 zookeeper 事件发生后获得消息。通常为了避免被“无意间”唤醒, wait 部分会做一个 while (condition) 的循环保证 condition 没有变化的情况下一直等待。 通过 zookeeper 这个简单的模型,我们可以看到 tutorial 和 recipe 里面实现几个基本的概念的方法:

  • barrier,实际上就是几个进程需要达到某个地方,然后继续一起做别的事情。很简单的办法就是通过计数来实现:我们可以向一个约定的 znode 里面创建自己的子 ephemeral znode,之后 count 这个 znode 里面子节点个数决定是否停止等待。当然这个方法你需要知道全局的进程个数。这个一般可以对应进入 barrier 和离开两个过程(一个是等待增加到 n 一个是等待减少到 0)。
  • producer/consumer,这类似一个 queue,producer 将东西放入 queue 而 consumer 将其处理掉,zookeeper 的 znode 存在一个 sequence 的概念,多个 process 可以一起写一个 znode,结果是对应不同的 sequence number,利用这个特性 producer 就只管写而 consumer 就负责读出最近的来删掉。
  • lock,多个 client 同时写一个 znode,使用 ephemeral 模式;通过 getChildren 获得所有的,并不使用 watch 模式,这时最小 sequence number 的 client 获得 lock,其他的设置 watch,并调用 exists(失败的话重新 getChildren)等待获得 client 的同志释放 lock;释放其实很容易,删掉自己的 node 就行了(这时会唤醒一个等候的 node)
  • leader election 和 lock 类似,为了在 leader 挂掉时能够恢复应该大家都 watch

一般说来对一个 zookeeper cluster 读的时候只会从一个 server 获得数据,而写的时候它使用了一种叫 quorum(法定人数)的策略,即写没写成功根据写入成功节点个数来确定,抽象地说假定总票数是 n,投 commit 的要超过 latex n_c,投 abort 的要超过 n_a 才能生效,为了避免两者同时生效,我们有约束 n_c + n_a > n。看样子写一般都是先发给 zookeeper 的 leader,然后由 leader 决定哪些节点来写入的。 为了避免 client 的离线, zookeeper 会通过 heartbeat 来判别,每个 client 都会关联到一个 session,一旦离线 session 就会被关闭。 在以上 publisher/subscriber 模型里面的一个问题是所谓的 herd effect,比如一个节点崩溃后就得将这个消息发送给所有的子节点做出响应,似乎 zookeeper 为此会将 client 排序,仅将这个通知发送给下一个节点。缺点是如果挂掉的是 leader 只有一个子节点知道这个变化。 为了避免写入数据尚未同步之前的读不一致,需要在读之前进行 sync。

——————
And the messengers returned to Jacob, saying, We came to thy brother Esau, and also he cometh to meet thee, and four hundred men with him.

Zookeeper 初步