前言
在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。
在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。
本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。
akka是什么?
akka的作用
akka的名字是action kernel的回文。根据官方定义:akka用于resilient elastic distributed real-time transaction processing。
个人理解是:
resilient:是指对需求和安全性等方面(来自于外部的)的一种适应力(弹性)。
elastic:是指对资源利用方面的弹性。
因此,akka是一个满足需求弹性、资源分配弹性的分布式实时事务处理系统。
akka只是一个类库,一个工具,并没有提供一个平台。
akka的运行模式和用例
- akka有两种运行模式:
- As a library: 一个使用于web应用,把akka作为一个普通的jar包放到classpath或者
WEB-INF/lib
。
- As an application: 也称为micro system。
- akka的用例
akka的用例很多,可以参照Examples of use-cases for Akka.
本文中的用例
在本文中,一个Spark + akka的环境里,akka被用于as an application
模式下。
我们会创建一个akka工程,含有两个应用:
- akka host application
建立一个actor system, 定义了所有的任务(actors)。等待客户端的请求。
部分actor使用了spark的云计算功能。
这是一个spark的应用。
- akka client application
调用host application上特定的actor。
我们看出,这里我们把akka作为一个任务处理器,并通过spark来完成任务。
项目结构和文件说明
说明
这个工程包含了两个应用。
一个Consumer应用:CusomerApp:实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。
一个Producer应用:ProducerApp:实现了向Kafka集群发消息的功能。
文件结构
AkkaSampleApp # 项目目录
|-- build.bat # build文件
|-- src
|-- main
|-- resources
|-- application.conf # Akka Server应用的配置文件
|-- client.conf # Akka Client应用的配置文件
|-- scala
|-- ClientActor.scala # Akka Client的Actor:提供了一种调用Server Actor的方式。
|-- ClientApp.scala # Akka Client应用
|-- ProductionReaper.scala # Akka Shutdown pattern的实现者
|-- Reaper.scala # Akka Shutdown pattern的Reaper抽象类
|-- ServerActor.scala # Akka Server的Actor,提供一个求1到n的MapReduce计算。使用了Spark。
|-- ServerApp.scala # Akka Server应用
构建工程目录
可以运行:
mkdir AkkaSampleApp
mkdir -p /AkkaSampleApp/src/main/resources
mkdir -p /AkkaSampleApp/src/main/scala
代码
build.sbt
name := "akka-sample-app"
version := "1.0"
scalaVersion := "2.11.8"
scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.10",
"com.typesafe.akka" %% "akka-remote" % "2.4.10",
"org.apache.spark" %% "spark-core" % "2.0.0"
)
resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"
application.conf
akka {
#loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
#log-sent-messages = on
#log-received-messages = on
}
}
cient.conf
akka {
#loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
#log-sent-messages = on
#log-received-messages = on
}
}
注:port = 0
表示这个端口号会自动生成一个。
ClientActor.scala
import akka.actor._
import akka.event.Logging
class ClientActor(serverPath: String) extends Actor {
val log = Logging(context.system, this)
val serverActor = context.actorSelection(serverPath)
def receive = {
case msg: String =>
log.info(s"ClientActor received message '$msg'")
serverActor ! 10000L