两个 protobuf 新功能

oneof

这个功能类似 C/C++ 的 union,它可以把几个不可能共存的 field 组织在一起,这样也可以作为一种 closed polymorphism 来使用。它除了原生的一些 API 以外,还提供了一些便于做 dynamic check 的 API,比如:提供了一个 enum 表示被设置的是哪个 field,_case() 返回这个 enum,clear_*() 用于清除设置的 oneof。

一个具体的例子是比如我们需要实例化一个 Foo 的子类,Bar、Baz 和 Bzar,他们的构造函数不同,比如接受三种不一样的 message type,就可以通过 oneof 将三个 field group 在一起,同时创建一个 factory method,根据这个 oneof 获得的类型调用对应的构造函数。

map

这是一个呼声很高的 feature,最终进入了 protobuf,在没有这个东西以前,一个比较常见的做法就是需要某种 map 的时候就定义一个类似 std::pair 的 message type,比如使用 nested message 控制这个类型的范围,里面都是用 key/value 表示两个 field,但这样一来,就需要通过某个 template method 将这个 repeated field 转换到一个实际的 map 才能使用,同时面临两个问题:如果是 map 需要 operator<,如果是 unordered_map 则需要 hash function 和 operator==,对于 protobuf 来说这些都是 missing pieces,这样一来 protobuf 要求 key 只能是整数或者字符串。同时 map 不能使用 required/optional/repeated,因为它本质上仍然是通过我们这种比较土的方式生成的,只是在外面套上了一个类似 STL map 的 interface 而已(参看这里)。

通过 map 在很多需要表示(简易)关联的场合会轻松许多,但是如果需要做复杂的 lookup,可能这不是一个很好的选择。

arena 内存分配

主要是一类似 memory pool 的东西,方便批量释放内存,提高 service 的 performance 可以考虑。

——————
Then there passed by Midianites merchantmen; and they drew and lifted up Joseph out of the pit, and sold Joseph to the Ishmeelites for twenty pieces of silver:and they brought Joseph into Egypt

两个 protobuf 新功能

初探 protobuf

protocol buffer 本质上是一个用来做 serialization/deserialization 的东西,这点上跟 avro、thrift 并无区别(见更早的 blog),它们最终的应用方向是 RPC,wiki 上有一坨做这类事情的工具,但是应用的目的并不一样。现在 protobuf 版本是 2.5。使用 protobuf 的基本过程如下:

  • 设计交互的消息,这往往使用一个 .proto 文件来描述
  • 使用 protoc 将这个消息描述文件编译成对应语言使用的库
  • 在应用程序里面使用生成的库创建对应的对象,并可以使用其中的函数将对象转换到需要的形式(如写入 stream)

protobuf 只是对数据本身的转换,并不带有格式信息,这意味着通信的双方必须都拥有同一个 .proto 产生的库,这样才能从数据映射到对应语言的对象。某些动态语言可能更倾向于将 schema 与数据一起传输,这样可以动态的构造对象,感觉各有利弊。

下面拿个简单的例子来看 protobuf 怎么用到一个通讯的问题里面。如果我们有一个系统将一则消息发送给另外一个系统,消息包括一个时间戳,一组 URL 和一个可选的 cookie,我们可以如下定义

message Work {
  required int64 timestamp = 1 ;
  repeated string urls = 2 ;
  optional string cookie = 3 ;
}

这里我们可以看到一些基本的要素,

  • 类似 C 的 struct 的定义,每个 field 需要使用一个编号,
  • 类型修饰可以为 required、repeated 和 optional,一般为了兼容性,required 一定要慎用(第一版可以用,后面的只能加 optional 了)。
  • 除了类似 C 的这些类型以外,我们可以定义 enum 或者嵌套的别的 message 形成更为复杂的结构。
  • 这里并没有 set/map 这类类型。
  • 可以通过 import 倒入其他的 proto 文件,有两类 import,通过 import public 导入的在本文件被 import 的时候也会被 import,否则就不会。
  • 有时候需要用 nested message,这时生成的类名字往往是通过内外拼接起来的。
  • message 不支持继承,但支持 extension,这时语法为 extension Work 然后加入一些自己定义的域,也就是说另外的人可以有自己不同的 Work,但是都叫 Work…
  • 可以定义 package,对应各个语言不同的

下面看看每个语言下面对应生成的是什么样的东西。

C++

通过 protoc 编译获得了两个文件,work.pb.h 和 work.pb.cc,容易发现一些有意思的东西:

  • 一般将 .cc 编译后获得一个 .o/.so/.a 供其他程序链接
  • package 对应 namespace,message 对应类,extension 是通过“黑科技”实现的
  • message 都是 public 继承 google::protobuf::Message,有用的方法主要是 serialization/deserialization 部分,包括 IsInitialized、Clear、ParseFrom…/SerializeTo…,似乎 IO 部分都是 google 自己的 stream 都没看见 std 里面的… 不过好歹有 string、array 类型的
  • 一般的域都有 has_xxx 这个 bool 操作,xxx 这个 getter 和 set_xxx 这个 setter,repeated 类型的一般有 add_xxx 返回一个指针供修改内容(message 自己有 ownership),其实难道不应该返回引用?

个么我们来写个最简单的程序

#include "work.pb.h"
#include <iostream>
#include <fstream>

int
main (int argc, char* argv[]) {
  demo::Work work ;
  work.set_timestamp (12344L) ;
  work.add_urls ("www.yahoo.com") ;
  work.add_urls ("www.google.com") ;

  std::ofstream os (argv[1]) ;
  if (work.SerializeToOstream (&os))
    std::cout << "succeeded in serialization\n" ;
  else
    std::cout << "failed in serialization\n" ;

  return 0 ;
}

如此写入后可以如此读出

#include "work.pb.h"
#include <iostream>
#include <fstream>

int
main (int argc, char* argv[]) {
  demo::Work work ;
  std::ifstream is (argv[1]) ;
  if (work.ParseFromIstream (&is))
    std::cout << "successful in deserialization\n" ;
  else
    std::cout << "failed in serialization\n" ;

  if (work.IsInitialized ()) {
    std::cout << "timestamp: " << work.timestamp ()
              << "\n" ;
    for (const std::string& url : work.urls ()) {
      std::cout << "url: " << url << "\n" ;
    }
  }

  return 0 ;
}

如果我们希望通过网络传输的话,不妨考虑一下 boost.asio,这里从 boost.asio 的 tutorial 里面借两个例子改一下,我们的 server 随机产生一个 demo::Worker,发送到 client,client 打印对应的信息。

// server side
try {
  boost::asio::io_service io_service;
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 52027));

  while (true) {
    tcp::socket socket(io_service);
    acceptor.accept(socket);

    std::string s ;
    demo::Work work = gen () ;
    work.SerializeToString (&s) ;
    demo::show (work) ;

    boost::system::error_code ignored_error;
    boost::asio::write(socket, boost::asio::buffer (s), ignored_error);
  }
} catch (std::exception& e) {
  std::cerr << e.what() << std::endl;
}

// client side
try {
  boost::asio::io_service io_service ;
  ip::tcp::endpoint ep (ip::address::from_string (argv[1]), 52027) ;
  ip::tcp::socket socket (io_service) ;
  socket.connect (ep) ;

  while (true) {
    boost::system::error_code error ;
    size_t len = socket.read_some(boost::asio::buffer (arr), error) ;

    if (error == boost::asio::error::eof)
      break; // Connection closed cleanly by peer.
    else if (error)
      throw boost::system::system_error(error); // Some other error.

    demo::Work work ;
    if (work.ParseFromArray (&arr[0], len))
      demo::show (work) ;
    else
      std::cout << "parsing failed\n" ;
  }
} catch (std::exception& e) {
  std::cerr << e.what() << std::endl;
}

看来通过 boost.asio 实现一个 RPC 看起来不会那么困难了,剩下的就是做成一个 framework 方便使用不同的协议调用。

python

python 的实现接近于一种“声明式”的方式,对应于一个 work_pb2 模块,打开可以看见是通过 python 的 meta programming 生成的,其中有一段大概定义的是 serialization 之后的格式。对应的 Work 类直接用反射将描述映射成为这个类的行为。如果希望前面的 server 能跟 python 交互,我们只需要

from work_pb2 import Work
import socket

if __name__ == '__main__':
    s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
    s.connect (('127.0.0.1', 52027))
    data = s.recv (128)
    w = Work.FromString (data)
    print str (w)

——————
And thou saidst, I will surely do thee good, and make thy seed as the sand of the sea, which cannot be numbered for multitude.

初探 protobuf

数据库管理与迁移

在 django 和 rails 里面都带有一个数据库迁移的工具,以 rails 为例,你可以使用 rails generate migration 为现有的 model 产生一个新的 delta,这个部分可以认为是一个对现有 model 的 patch,通常你可以使用 rake db:migrate 让 rails 将这部分改动反映到数据库上,这种操作对“开发”过程是非常友好的,因为你可以生成这个改变后在本地的数据库里面进行开发而不影响到其他的开发者,一旦你觉得改变是错误的还可以 rollback。一旦这个 migration 需要进入 production,一般 rails 会在 deploy 到主机的时候自动对数据库进行更新,即便有多台主机,更新的时候也不大会冲突。那么这种东西实现的机理是什么呢?仔细看看在你的数据库里面多了一张表,这个就是 rails 用来判断是否需要根据产生的 migration file 来做更新的。一旦发现数据库内最新的 schema 比起 rb 文件表述的要老,那么就可以利用数据库进行 distributed lock 然后从一台主机开始发送进行迁移需要的 SQL 指令了。

从 django 或者 rails 进入到 Java 的世界里面,开发这类应用时常也会需要一个 migration 的工具,这里我们简单试试 liquibase,与 rails 的原理类似,它也会在数据库里面建立一个表,但是用来 track 数据库变化的东西不是源文件而是通过 XML 表示的 changset(现在尚不支持别的格式,希望 yaml 到 3.1 能支持吧),

  • 如果你已经构造了数据库,可以用 liquibase 根据它生成一个基本的 changset
  • 对数据库进行进一步的变化应使用这个 XML 表达
  • liquibase 提供的 Ant 或者 maven 的支持使得它可以作为一个开发环境下的任务被执行
  • liquibase 提供的 spring 的支持,使得它可以在 deploy 到线上的时候根据 XML 进行 migration

有了这样一些概念,我们来试验一下这些想法是否可用。首先创建一个 sqlite 的数据库,下面的 SQL 是我们建立的表(注意 hsqldb 使用 hsqldb-sqltool 需要在 ~/ 放置一个 sqltoo.rc,里面有数据库的配置,这里我们假定使用 personal 对应于 ~/db/personal.* 那几个文件)。

create table User (id integer PRIMARY KEY IDENTITY, login varchar(40) NOT NULL) ;
insert into User (login) VALUES ('foo') ;
insert into User (login) VALUES ('bar') ;
COMMIT ;

我们可以检查

select * from User ;
ID  LOGIN
--  -----
 0  foo
 1  bar

Fetched 2 rows.

一切 okay,那么我们开始玩 liquibase,首先从官方下载 3.x 的压缩包。解开后里面的 liquibase 弄成可执行的就可以开始生成我们的 changeset 了。

liquibase --driver=org.hsqldb.jdbcDriver --changeLogFile=liquitest.xml \
  --url="jdbc:hsqldb:file:$HOME/db/personal;shutdown=true" \
  --classpath="/usr/share/java/hsqldb.jar" generateChangeLog

这样会生成 XML,如果需要 yaml 的话把后缀改掉即可(现在生成的是空的),下面是生成的 xml

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd">
  <changeSet author="remonstrate (generated)" id="1377586754800-1">
    <createTable tableName="USER">
      <column autoIncrement="true" name="ID" type="INT">
        <constraints primaryKey="true" primaryKeyName="SYS_PK_10066"/>
      </column>
      <column name="LOGIN" type="VARCHAR(40)">
        <constraints nullable="false"/>
      </column>
    </createTable>
  </changeSet>
</databaseChangeLog>

okay,突然你想起来,login 这列其实是有唯一约束的,这时你可以编辑这个文件来做,比如下面

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd">
  <!-- initial
  <changeSet author="remonstrate (generated)" id="1377586754800-1">
    <createTable tableName="USER">
      <column autoIncrement="true" name="ID" type="INT">
        <constraints primaryKey="true" primaryKeyName="SYS_PK_10066"/>
      </column>
      <column name="LOGIN" type="VARCHAR(40)">
        <constraints nullable="false"/>
      </column>
    </createTable>
  </changeSet>
  -->
  <!-- add unique constraint for login -->
  <changeSet author="remonstrate" id="1377586754800-2">
    <addUniqueConstraint
            columnNames="login"
            constraintName="unique_login"
            deferrable="true"
            disabled="true"
            initiallyDeferred="true"
            schemaName="public"
            tableName="User" />
  </changeSet>
</databaseChangeLog>

这里注释掉前面的部分是因为和数据库已经一致了,我们可以如下运行

liquibase/liquibase --driver=org.hsqldb.jdbcDriver \
  --changeLogFile=liquitest.xml --url="jdbc:hsqldb:$HOME/db/personal;shutdown=true" \
  --classpath="/usr/share/java/hsqldb.jar" migrate
Liquibase Update Successful

这时我们连接到数据库检查发生了什么变化:

insert into User (login) VALUES ('foo') ;
SEVERE  SQL Error at '' line 2:
"insert into User (login) VALUES ('foo') "
integrity constraint violation: unique constraint or index violation; UNIQUE_LOGIN table: USER
select * from DatabaseChangeLog ;
ID               AUTHOR  FILENAME       DATEEXECUTED             ORDEREXECUTED  EXECTYPE  MD5SUM                              DESCRIPTION                                                       COMMENTS  TAG     LIQUIBASE
---------------  ------  -------------  -----------------------  -------------  --------  ----------------------------------  ----------------------------------------------------------------  --------  ------  ---------
1377586754800-2  heli    liquitest.xml  2013-08-27 00:25:55.131              1  EXECUTED  7:8747898353bcedb94c4ca95d64799f12  Adds a unique constrant to an existing column or set of columns.            [null]  3.0.0-SNP

好了,这样我们初步见识到 liquibase 的作用了。假使我们有一个应用程序需要部署,我们可以在 spring 里面加入 liquibase,下面是个简单的例子

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                           http://www.springframework.org/schema/util
                           http://www.springframework.org/schema/util/spring-util-3.1.xsd
                           http://www.springframework.org/schema/jdbc
                           http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd">

  <bean id="db" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
    <property name="url" value="jdbc:hsqldb:file:/home/remonstrate/db/personal;shutdown=true"/>
    <property name="username" value="sa"/>
    <property name="password" value=""/>
  </bean>

  <bean id="liquibase" class="liquibase.integration.spring.SpringLiquibase">
    <property name="dataSource" ref="db" />
    <property name="changeLog" value="file:liquitest.xml" />
  </bean>

  <bean id="template" class="org.springframework.jdbc.core.JdbcTemplate">
    <constructor-arg ref="db" />
  </bean>

  <bean id="app" class="demo.App" depends-on="liquibase">
    <property name="template" ref="template" />
  </bean>
</beans>

运行的代码为

package demo;

import org.springframework.context.ApplicationContext ;
import org.springframework.context.support.ClassPathXmlApplicationContext ;
import org.springframework.jdbc.core.JdbcTemplate ;
import org.springframework.jdbc.core.BeanPropertyRowMapper ;

import java.util.List ;

public class App {
    public static class User {
        private Integer id ;
        private String login ;

        public void setId (Integer i) {
            id = i ;
        }
        public Integer getId () {
            return id ;
        }
        public void setLogin (String i) {
            login = i ;
        }
        public String getLogin () {
            return login ;
        }
    }

    private JdbcTemplate template ;

    public void setTemplate (JdbcTemplate t) {
        template = t ;
    }

    public void run () {
        List<User> l = template.query ("select * from User", new BeanPropertyRowMapper (User.class)) ;
        for (User u : l)
            System.out.println ("No " + u.getId () + ": " + u.getLogin ()) ;
    }

    public static void main( String[] args ) {
        ApplicationContext context =
            new ClassPathXmlApplicationContext ("application-conf.xml") ;
        App app = context.getBean ("app", App.class) ;
        app.run () ;
    }
}

通过 maven 我们两次运行的结果如下:

$ mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath demo.App"
INFO: Loaded JDBC driver: org.hsqldb.jdbcDriver
INFO 8/27/13 1:44 AM:liquibase: Successfully acquired change log lock
INFO 8/27/13 1:44 AM:liquibase: Creating database history table with name: PUBLIC.DATABASECHANGELOG
INFO 8/27/13 1:44 AM:liquibase: Reading from PUBLIC.DATABASECHANGELOG
INFO 8/27/13 1:44 AM:liquibase: Reading from PUBLIC.DATABASECHANGELOG
INFO 8/27/13 1:44 AM:liquibase: ChangeSet file:liquitest.xml::1377586754800-1::heli (generated) ran successfully in 5ms
INFO 8/27/13 1:44 AM:liquibase: ChangeSet file:liquitest.xml::1377586754800-2::heli ran successfully in 3ms
INFO 8/27/13 1:44 AM:liquibase: ChangeSet file:liquitest.xml::1377586754800-3::heli ran successfully in 1ms
INFO 8/27/13 1:44 AM:liquibase: Successfully released change log lock
No 0: foo
No 1: bar
$ mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath demo.App"
INFO: Loaded JDBC driver: org.hsqldb.jdbcDriver
INFO 8/27/13 1:44 AM:liquibase: Successfully acquired change log lock
INFO 8/27/13 1:44 AM:liquibase: Reading from PUBLIC.DATABASECHANGELOG
INFO 8/27/13 1:44 AM:liquibase: Reading from PUBLIC.DATABASECHANGELOG
INFO 8/27/13 1:44 AM:liquibase: Successfully released change log lock
No 0: foo
No 1: bar

可见 migration 大致的样子。后面要稍微玩玩这个东西了。

——————
And she conceived again, and bare a son: and she said, Now will I praise the LORD: therefore she called his name Judah; and left bearing.

数据库管理与迁移