几个并行编程的组织方式与 GParS(上)

传统的并行程序(这里指 multi-threading 意义下的并行)无外乎提供线程库,另外提供一些帮助我们同步的工具,如 mutex、semaphor、conditional variable 之类的东西,异步调用进行一些封装,如 promise、future 等等。

  • C++ 下面的这些可以在 boost.thread 里面找到,后来成为了 C++11 里面 thread、future 之类的;
  • Java 原生支持 Thread 对象和 Runnable 接口、synchronized 关键字,后面 JDK 通过 java.util.concurrent 里面的 executor 提供了 thread pool,这样写多线程变得简单了起来,因为只需要提供 runnable 扔到里面就会被 schedule 到对应的线程上去执行,你可以比较好的管理这些东西
  • 其实类似的概念在 boost.asio 里面是有所体现的,前面有所介绍 asio 可以看成是一个帮助你写服务器等应用的辅助组件(比如 CppCMS 就使用了它实现了 C++ 下面的 MVC framework),

executor 这类设计帮助我们解决了一些多线程上的问题,将线程管理和实际执行的任务可以剥离。这篇标题有个 GParS(对应 debian 里面 libgpars-groovy-java 这个 package,不过似乎现在 experimental 下这个 package 有点问题,还是直接下载 2.1.1 的 groovy 吧;这个缩写表示 Groovy Parallel System),这是 groovy 新进入官方 package 的一个很有用的多线程编程的 toolkit。如果说 scala 的 actor 展示了 scala 语言带来对多线程程序建模的简化的话,那么 GParS 展示了 groovy 语言对多线程程序建模的简化。

loop-based/container-based parallel computing

这个概念我想很简单,我最早是在使用 OpenMP 的时候发现其实这类多线程并不难写,OpenMP 使得 C/C++ 程序的循环可以被多个线程 decompose 开来,代价仅仅只是一句 #pragma。Gpars 为 groovy 的容器提供了 concurrent collection processing 的能力。下面是个简单的例子。

(1..10).each {
    println it
}
Closure g = {
  def gen = new Random ()
  println it
  Thread.sleep (100*gen.nextInt (10))
}

groovyx.gpars.GParsPool.withPool (4) {
  (1..10).eachParallel g
}

一个 list 提供的方法在 withPool 里面就会出现对应的 Parallel 版本(常见的方法 each 相当于对每个元素作用一下 closure、collect 相当于 transform、find/All/Any 找到符合条件的、every 返回 predicate 的结果、any 是否存在满足 predicate 的结果、grep 对元素进行 filter、groupBy 、fold、min 取小、max 取大、sum 求和、split、count 计数符合 predicate 的个数)。当然也存在别的方法,如通过 list 自己的 makeConcurrent 和 makeSequential 改变这些函数的默认行为,或者通过 groovyx.gpars.ParallelEnhancer 的 enhanceInstance 来扩展 collection 的方法。

这里的 GParsePool 可以换成 GParsExecutorsPool 这是使用 Java 的 executor 做的。

map/reduce

GPars 为 collection 还提供了类似 map/reduce 的基本操作,如

  • filter 接收 predicate 返回符合条件的
  • map 对元素映射
  • combine 类似 combiner
  • groupBy 进行分组
  • reduce 接受一个 reducer 作用在一个组上
  • 另外有些 min、max、size 等

以上两种形式都是通过 parallel array 实现的。这个实际上 scala 也提供了类似的 library。

可复合的异步函数

在 withPool 的 closure 里面我们还可以通过 callAsync(对应于 closure 的 call 方法的异步)获得一个 promise,之后 get(会 block)。Closure 在 withPool 中可以用 asyncFun 获得异步的 Closure,这时这个 closure 可以和正常的一样使用,返回的是 promise 而不是对应值。

Fork/Join

这是一个典型的处理树状结构时使用的并行化 pattern,即碰到子节点后 fork 一个子线程去处理,使用的代码和当前一样(类似递归,但是不是在同一线程里面做),当前节点等所有子线程结束(即 join)就能完成本节点的计算任务。

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

withPool(1) {pool ->
  println """Number of files: ${
     runForkJoin(new File("${args[0]}")) {file ->
            long count = 0
            file.eachFile {
                if (it.isDirectory()) {
                    println "Forking a child task for $it"
                    forkOffChild(it)
                } else {
                    count++
                }
            }
            return count + (childrenResults.sum(0))
        }
    }"""
}

这里看起来非常紧凑的程序,一个是在 ”’str”’ 里面通过 ${} 插入了 runForkJoin 用来计算总数,这个函数第一个参数是 fork 的起点,后面的 closure 开始从这个起点遍历,我们从第一个参数获得一个目录名,之后打开下面的文件进行遍历,如果是子目录就 fork 一个新的任务出来递归作用,否则就计数,这样每个 fork 出来的任务都获得目录下文件的个数,等待子目录结束后将 fork 出来的结果(放在 childrenResults 里面,这是个 list)相加得到整个目录树下面文件总数。事实上对于很多递归的程序都可以利用这个进行加速,每个子问题的计算会被 schedule 到不同的线程上(如果 pool 足够大的话)进行并行计算。通过 runChildDirectly 我们可以将某些需要 fork 的情形在本线程里面完成。

actor

最早使用 actor 这种形式来做多线程的语言大概是 Erlang,scala 借助了这个概念提出了自己的 Actor,而 Gpars 也带有 groovy 实现的版本。所谓的 actor 就是类似 signal/slot 的消息通信的对象,它们可以接受消息、发送消息,产生别的 actor 等等。一般 GUI 程序可能是单线程上互相调用,而 actor 一般是在一个 thread pool 上工作的。

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.DefaultActor

class GameMaster extends DefaultActor {
  int secretNum

  void afterStart() {
    secretNum = new Random().nextInt(10)
    println "my secret is ${secretNum}"
  }

  void act() {
    loop {
      react { int num ->
        if (num > secretNum) {
          reply 'too large'
        } else if (num < secretNum) {
          reply 'too small'
        } else {
          reply 'you win'
          terminate()
        }
      }
    }
  }
}

class Player extends DefaultActor {
  String name
  Actor server

  void act() {
    println ("start!")
    loop {
      int myNum = new Random().nextInt(10)
      server.send myNum
      react {
        switch (it) {
        case 'too large':
          println "$name: $myNum was too large"
          break
        case 'too small':
          println "$name: $myNum was too small"
          break
        case 'you win':
          println "$name: I won $myNum"; terminate()
        }
      }
    }
  }
}

def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()
[master, player]*.join ()

其实有两种方法构造 actor,这里使用的是继承,还可以用 groovyx.gpars.actor.Actors.act 这个 factory method 创建,actor 的核心就是实现 act 方法,这里面是一个 loop,通过 terminate 结束,这里通过 actor.send 方法发送消息,而之后使用 react 处理自己接收到的消息。实际 scala 也是类似的实现,但是 scala 可以做 class 的 matching,可能能写的更简单一点?前文也提到了,coroutine 可以用来实现 actor,通过这个例子我们也就明白是为什么了。利用 actor 这种 model 感觉上也能比较好解决 caller-callee 的问题(原先是说写一个 parser,它需要 call 一个 lexer,两个都是写成 caller 比较容易,相当于通过下层的输入拼成一个自己逻辑里面的概念,如字符 -> token,token -> expr,但是如果你要把他们写成 callee,也就是比如 lexer 提供个函数,我调用一次他就返回一个 token,类似于实现一个 iterator 了,这时困难就出现了,python 的 yield 有用了),使用 actor 时大家是均等的可以互相发消息(接前面的例子,大家都按方便来,这时 lexer 解析出来个 token 后就发给消息给 parser,而 parser 没事的时候就等 lexer 就行了)。参看这篇文章

在 thread pool 上不仅可以让这些 actor 交互起来,原先的单线程 GUI 通过线程池也能获得某种扩展。

Dataflow

这是一种类似于 make 的 declarative 的方式表述任务及其关系,GParS 会将任务 schedule 到线程上一一执行,我们不需要关心其中的 join 之类的关系。GParS 提供了 DataflowVariable、Dateflows(其成员相当于前者的作用),将这些放在 groovyx.gpars.dataflow.Dataflow.task 的闭包里面,这些 task 就会产生关联性。DataflowQueue 可以在某个 task 输入序列数据,而在另一个输出,起到管道的作用。

import static groovyx.gpars.dataflow.Dataflow.task
import groovyx.gpars.dataflow.DataflowVariable

final def x = new DataflowVariable()
final def y = new DataflowVariable()
final def z = new DataflowVariable()

task {
  z << x.val + y.val
}

task {
  x << 10
}

task {
  y << 5
}

println z.get ()

——————
Then Isaac sowed in that land, and received in the same year an hundredfold: and the LORD blessed him.

Advertisements
几个并行编程的组织方式与 GParS(上)

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s