Skip to content
laforge49 edited this page Nov 28, 2011 · 6 revisions

AsyncFP operates on a single thread, because that is generally faster. But sometimes it is better to do things in parallel. That's when you need an asynchronous mailbox.

As an exercise, we will use Pause to represent a work load that you want to be able to run in parallel. Here's the code, including a convenience companion object:

case class Pause()

class Worker extends Actor {
  bind(classOf[Pause], pause)

  def pause(msg: AnyRef, rf: Any => Unit) {
    Thread.sleep(200)
    rf(null)
  }
}

object Pause {
  def apply(rf: Any => Unit)
           (implicit srcActor: ActiveActor) {
    val worker = new Worker
    worker.setExchangeMessenger(srcActor.bindActor.newAsyncMailbox)
    worker(Pause())(rf)
  }
}

Next we need a driver, an actor which will launch any number of worker actors. Once they have all completed, we print out the elapsed time and return a null result:

case class Doit(c: Int)

class Driver extends Actor {
  bind(classOf[Doit], doit)
  var rem = 0
  var c = 0
  var rf: Any => Unit = null
  var t0 = 0L

  def doit(msg: AnyRef, _rf: Any => Unit) {
    c = msg.asInstanceOf[Doit].c
    rem = c
    rf = _rf
    t0 = System.currentTimeMillis
    var i = 0
    while(i < c) {
      i += 1
      Pause(r)
    }
  }

  def r(rsp: Any) {
    rem -= 1
    if (rem == 0) {
      val t1 = System.currentTimeMillis
      println("total time for "+c+" messages = "+(t1 - t0)+" milliseconds")
      rf(null)
    }
  }
}

Clone this wiki locally