设为首页 加入收藏

TOP

Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用(一)
2017-10-10 12:11:25 】 浏览:8120
Tags:Spark 集群 Akka Kafka Scala 开发 一个 应用

前言

Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。
Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。
本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。

akka是什么?

akka的作用

akka的名字是action kernel的回文。根据官方定义:akka用于resilient elastic distributed real-time transaction processing。
个人理解是:
resilient:是指对需求和安全性等方面(来自于外部的)的一种适应力(弹性)。
elastic:是指对资源利用方面的弹性。
因此,akka是一个满足需求弹性、资源分配弹性的分布式实时事务处理系统。
akka只是一个类库,一个工具,并没有提供一个平台。

akka的运行模式和用例

  • akka有两种运行模式:
  • As a library: 一个使用于web应用,把akka作为一个普通的jar包放到classpath或者WEB-INF/lib
  • As an application: 也称为micro system。
  • akka的用例
    akka的用例很多,可以参照Examples of use-cases for Akka.

本文中的用例

在本文中,一个Spark + akka的环境里,akka被用于as an application模式下。
我们会创建一个akka工程,含有两个应用:

  • akka host application
    建立一个actor system, 定义了所有的任务(actors)。等待客户端的请求。
    部分actor使用了spark的云计算功能。
    这是一个spark的应用。
  • akka client application
    调用host application上特定的actor。

我们看出,这里我们把akka作为一个任务处理器,并通过spark来完成任务。

项目结构和文件说明

说明

这个工程包含了两个应用。
一个Consumer应用:CusomerApp:实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。
一个Producer应用:ProducerApp:实现了向Kafka集群发消息的功能。

文件结构

AkkaSampleApp    # 项目目录
|-- build.bat    # build文件    
|-- src
    |-- main
        |-- resources
            |-- application.conf   # Akka Server应用的配置文件
            |-- client.conf        # Akka Client应用的配置文件
        |-- scala
            |-- ClientActor.scala       # Akka Client的Actor:提供了一种调用Server Actor的方式。
            |-- ClientApp.scala         # Akka Client应用
            |-- ProductionReaper.scala  # Akka Shutdown pattern的实现者
            |-- Reaper.scala            # Akka Shutdown pattern的Reaper抽象类
            |-- ServerActor.scala       # Akka Server的Actor,提供一个求1到n的MapReduce计算。使用了Spark。
            |-- ServerApp.scala         # Akka Server应用

构建工程目录

可以运行:

mkdir AkkaSampleApp
mkdir -p /AkkaSampleApp/src/main/resources
mkdir -p /AkkaSampleApp/src/main/scala

代码

build.sbt

name := "akka-sample-app"
 
version := "1.0"
 
scalaVersion := "2.11.8"

scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
 
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.10",
  "com.typesafe.akka" %% "akka-remote" % "2.4.10",
  "org.apache.spark" %% "spark-core" % "2.0.0"
)

resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

application.conf

akka {
  #loglevel = "DEBUG"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
    #log-sent-messages = on
    #log-received-messages = on
  }
}

cient.conf

akka {
  #loglevel = "DEBUG"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
    #log-sent-messages = on
    #log-received-messages = on
  }
}

注:port = 0表示这个端口号会自动生成一个。

ClientActor.scala

import akka.actor._
import akka.event.Logging

class ClientActor(serverPath: String) extends Actor {
  val log = Logging(context.system, this)
  val serverActor = context.actorSelection(serverPath)

  def receive = {
    case msg: String =>
        log.info(s"ClientActor received message '$msg'")
        serverActor ! 10000L
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇IDEA 中scala 程序运行时的错误:.. 下一篇Spark集群 + Akka + Kafka + Scal..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目