设为首页 加入收藏

TOP

Akka(5): ConsistentHashing Router - 可选定Routee的任务分配模式(一)
2017-10-09 14:10:02 】 浏览:8046
Tags:Akka ConsistentHashing Router 选定 Routee 任务 分配 模式

    上一篇讨论里我们介绍了几种任务分配(Routing)模式。Akka提供的几种现成智能化Routing模式大多数是通过对用户屏蔽具体的运算Routee选择方式来简化Router使用,提高智能程度,所以我们提到Router的运算是一种无序的运算,消息之间绝对不容许任何形式的依赖,因为向Router发送的消息可能在任何Routee上运算。但是,如果我们能够把运算任务按照任务的类型分配给专门负责处理此等类型任务的Routee,那么我们就可以充分利用Routing模式所带来的运算拓展能力来提高整体运算效率。Akka的ConsistentHashingRouter就是为了满足这样的需求而提供的。ConsistentHashingRouter是通过消息的特征来分辨消息类型,然后自动构建和管理处理各种类型消息的Routees。当然,这就要求系统的消息必须具备预先设定的特征,使ConsistentHashingRouter可以正确分辨并分配给指定的Routee去运算。如果我们确定只有一个Routee负责处理一种类型消息的话,甚至可以在这个Routee中维护某种状态。我们可以设计一个场景来示范ConsistentHashingRouter的应用:模拟一个多货币的存钱盒,分n次随意从盒里取出钱币然后统计各种货币的总额。这个场景中的特征很明显:就是货币种类了,我们把抽出的货币按币种、金额合成消息发给ConsistentHashingRouter。例子里的Routee应该是按照币种由Router自动构建的,维护各种货币当前总额作为内部状态。向ConsistentHashingRouter发送的消息被分配给相应币种的Routee去登记更新货币当前总额。这个统计金额的Routee可以如下定义:

import akka.actor._ val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF") object MoneyCounter { sealed trait Counting case class OneHand(cur: String, amt: Double) extends Counting case class ReportTotal(cur: String) extends Counting } class MoneyCounter extends Actor with ActorLogging { import MoneyCounter._ var currency: String = "RMB"
  var amount: Double = 0

  override def receive: Receive = { case OneHand(cur,amt) => currency = cur amount += amt log.info(s"${self.path.name} received one hand of $amt$cur") case ReportTotal(_) => log.info(s"${self.path.name} has a total of $amount$currency") } }

MoneyCounter支持两项功能:一是统计某种货币收到的总额,二是按指令汇报当前总额。我们在前一篇讨论里了解到如果MoneyCounter是Routee类型,那它们应该被视为具相同功能的Actor。而且用户无法分辨或者直接面对某个特定的Routee。任何MoneyCounter都可以收到一手任何货币,不同的货币金额相加结果是错误的。所以我们要用Akka提供的ConsistentHashingRouter来解决这个问题。ConsistentHashingRouter的主要特点是能够分辨消息类型,然后按照消息类型对应到选定的Routee。在我们上面的例子里每个Routee负责一种货币,这样就可以保证每个Routee里的金额总数都是正确的了。ConsistentHashingRouter有三种分辨消息的方法:

1、定义ConsistentHashingRouter的hashMapping函数:这是个PartialFunction[Any,Any],如下:

object HashingRouter extends App { import MoneyCounter._ val currencies = List("RMB","USD","EUR","JPY","GBP","DEM","HKD","FRF","CHF") val routerSystem = ActorSystem("routerSystem") def mcHashMapping: PartialFunction[Any,Any] = { case OneHand(cur,_) => cur case ReportTotal(cur) => cur } val router = routerSystem.actorOf(ConsistentHashingPool( nrOfInstances = 5,hashMapping = mcHashMapping,virtualNodesFactor = 2) .props(MoneyCounter.props),name = "moneyCounter" ) router ! OneHand("RMB",10.00) router ! OneHand("USD",10.00) router ! OneHand("HKD",10.00) router ! OneHand("RMB",10.00) router ! OneHand("CHF",10.00) router ! ReportTotal("RMB") router ! ReportTotal("USD") scala.io.StdIn.readLine() routerSystem.terminate() }

我们在定义router时直接把mcHashingMapping传到ConsistentHashingPool的构建器里就行了。特别要注意nrOfInstances,这个参数必须比消息类型的数量大才行,否则Router会错误引导消息。测试运算结果显示如下:

INFO] [06/05/2017 15:20:09.334] [routerSystem-akka.actor.default-dispatcher-6] [akka://routerSystem/user/moneyCounter/$e] $e received one hand of 10.0RMB
[INFO] [06/05/
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala的Class、Object和Apply()方.. 下一篇Akka(4): Routers - 智能任务..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目