}
}
ClientApp.scala
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.RemoteScope
import akka.util._
import java.util.concurrent.TimeUnit
import scala.concurrent._
import scala.concurrent.duration._
object ClientApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
// get the remote actor via the server actor system's address
val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552")
val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress))))
// invoke the remote actor via a client actor.
// val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor"
// val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor")
buildReaper(system, actor)
// tell
actor ! 10000L
waitShutdown(system, actor)
}
private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = {
import Reaper._
val reaper = system.actorOf(Props(classOf[ProductionReaper]))
// Watch the action
reaper ! WatchMe(actor)
}
private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = {
// trigger the shutdown operation in ProductionReaper
system.stop(actor)
// wait to shutdown
Await.result(system.whenTerminated, 60.seconds)
}
}
ProductionReaper.scala
当所有的Actor停止后,终止Actor System。
class ProductionReaper extends Reaper {
// Shutdown
def allSoulsReaped(): Unit = {
context.system.terminate()
}
}
Reaper.scala
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
ServerActor.scala
提供一个求1到n平方和的MapReduce计算。
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
class ServerActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case n: Long =>
squareSum(n)
}
private def squareSum(n: Long): Long = {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val squareSum = sc.parallelize(1L until n).map { i =>
i * i
}.reduce(_ + _)
log.info(s"============== The square sum of $n is $squareSum. ==============")
squareSum
}
}
ServerApp.scala
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
object ServerApp {
def main(args: Array[String]): Unit = {
val system = ActorSystem("ServerActorSystem")
val actor = system.actorOf(Props[ServerActor], name = "serverActor