几个并行编程的组织方式与 GParS(下)

CSP

CSP(communicating sequential processes)来自 Occam,也是 google go 的并行模型,它算是 process algebra 理论中的一个情形。GParS 的 CSP 建立在 JCSP 这个 Java 的 package 之上。似乎到现在支持也不是很完整。

import groovyx.gpars.csp.PAR
import org.jcsp.lang.CSProcess
import org.jcsp.lang.Channel
import org.jcsp.lang.ChannelOutput
import org.jcsp.lang.One2OneChannel
import groovyx.gpars.csp.plugAndPlay.GPrefix
import groovyx.gpars.csp.plugAndPlay.GPCopy
import groovyx.gpars.csp.plugAndPlay.GPairs
import groovyx.gpars.csp.plugAndPlay.GPrint

class FibonacciV2Process implements CSProcess {
  ChannelOutput outChannel

  void run() {
    One2OneChannel a = Channel.createOne2One()
    One2OneChannel b = Channel.createOne2One()
    One2OneChannel c = Channel.createOne2One()
    One2OneChannel d = Channel.createOne2One()

    def testList = [
      new GPrefix(prefixValue: 0, inChannel: d.in(), outChannel: a.out()),
      new GPrefix(prefixValue: 1, inChannel: c.in(), outChannel: d.out()),
      new GPCopy(inChannel: a.in(), outChannel0: b.out(), outChannel1: outChannel),
      new GPairs(inChannel: b.in(), outChannel: c.out()), ]
    new PAR(testList).run()
  }
}

One2OneChannel N2P = Channel.createOne2One()
def testList = [
  new FibonacciV2Process(outChannel: N2P.out()),
  new GPrint(inChannel: N2P.in(), heading: "Fibonacci Numbers")
]

final def par = new PAR(testList)
par.run()

这个逻辑大约就是 d 发消息给 a,最开始发 0,a 把消息发给 b 和这个 CSProcess 的输出,b 把消息发给 c 并相加,c 把结果发给 d,这个环形结构实现了 f[n + 1] = f[n] + f[n-1] 的循环,这个 CSProcess 与另一个打印的 channel 拼起来实现了打印。感觉这个东西似乎想法还比较复杂,没看明白,估计得继续了解一些 process algebra 的东西…

Agent

这个概念来自 Clojure,其实类似为资源做的 proxy,这个 proxy 使得对资源的访问通过上锁等策略避免 race condition,

import groovyx.gpars.agent.Agent

def jugMembers = new Agent<List>(['Me'])
jugMembers.send {it.add 'James'}

final Thread t1 = Thread.start {
  jugMembers {it.add 'Joe'}
}

final Thread t2 = Thread.start {
  jugMembers << {it.add 'Dave'}
  jugMembers << {it.add 'Alice'}
}

[t1, t2]*.join()
println jugMembers.val
jugMembers.valAsync {println "Current members: $it"}
jugMembers.await()

这里通过 Agent 类实现对资源类(List)的代理,此后我们可以选择通过 send 一个 closure 或者直接把它本身当成 functor 以 closure 作为参数传递或者使用 << 都能将操作传递给 List。

STM

STM(software transactional memory)从名字上可以看成是对数据库 transaction 概念在 concurrent programming 的扩展,我们知道数据库操作通过 begin transaction 和 end transaction 包围的操作有种 rollback 的特性,而并行操作对共享的内存需要某种“原子性”,下面就是一个简单的例子(不知道是否就是将 lock 弄成了 template?)

import groovyx.gpars.stm.GParsStm
import org.multiverse.api.references.IntRef
import static org.multiverse.api.StmUtils.newIntRef

public class Account {
  private final IntRef amount = newIntRef(0);
  public void transfer(final int a) {
    GParsStm.atomic {
      amount.increment(a);
    }
  }
  public int getCurrentAmount() {
    GParsStm.atomicWithInt {
      amount.get();
    }
  }
}

这个 STM 的实现在 boost.STM 里面也有类似的实现。

其他

递归函数的 memorize 支持,对 Closure 可以用 gmemoize,这样讲 memoize 所有的结果,而可以选择用 memoize 和 memoizeAtMost 决定存多少中间结果。

——————
For he had possession of flocks, and possession of herds, and great store of servants: and the Philistines envied him.

Advertisements
几个并行编程的组织方式与 GParS(下)

一个有关“几个并行编程的组织方式与 GParS(下)”的想法

发表评论

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / 更改 )

Twitter picture

You are commenting using your Twitter account. Log Out / 更改 )

Facebook photo

You are commenting using your Facebook account. Log Out / 更改 )

Google+ photo

You are commenting using your Google+ account. Log Out / 更改 )

Connecting to %s