设为首页 加入收藏

TOP

Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用(二)
2017-10-10 12:11:25 】 浏览:8122
Tags:Spark 集群 Akka Kafka Scala 开发 一个 应用
} }

ClientApp.scala

import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.RemoteScope
import akka.util._

import java.util.concurrent.TimeUnit

import scala.concurrent._
import scala.concurrent.duration._

object ClientApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
    
    // get the remote actor via the server actor system's address
    val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552")
    val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress))))

    // invoke the remote actor via a client actor.
    // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor"
    // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor")

    buildReaper(system, actor)

    // tell
    actor ! 10000L
    
    waitShutdown(system, actor)
  }

  private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = {
    import Reaper._
    val reaper = system.actorOf(Props(classOf[ProductionReaper]))
    
    // Watch the action
    reaper ! WatchMe(actor)
  }

  private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = {
    // trigger the shutdown operation in ProductionReaper
    system.stop(actor)
    
    // wait to shutdown
    Await.result(system.whenTerminated, 60.seconds)
  }
}

ProductionReaper.scala

当所有的Actor停止后,终止Actor System。

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = {
    context.system.terminate()
  }
}

Reaper.scala

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method. It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

ServerActor.scala

提供一个求1到n平方和的MapReduce计算。

import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

class ServerActor extends Actor {
  val log = Logging(context.system, this)

  def receive = {
    case n: Long =>
        squareSum(n)
  }

  private def squareSum(n: Long): Long = {
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)

    val squareSum = sc.parallelize(1L until n).map { i => 
      i * i
    }.reduce(_ + _)

    log.info(s"============== The square sum of $n is $squareSum. ==============")

    squareSum
  }
}

ServerApp.scala

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props

object ServerApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ServerActorSystem")
    val actor = system.actorOf(Props[ServerActor], name = "serverActor
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇IDEA 中scala 程序运行时的错误:.. 下一篇Spark集群 + Akka + Kafka + Scal..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目