设为首页 加入收藏

TOP

FunDA(5)- Reactive Streams:Play with Iteratees(一)
2017-10-09 14:30:11 】 浏览:823
Tags:FunDA Reactive Streams Play with Iteratees

    FunDA的设计目标就是把后台数据库中的数据搬到内存里,然后进行包括并行运算的数据处理,最后可能再对后台数据库进行更新。如果需要把数据搬到内存的话,那我们就必须考虑内存是否能一次性容纳所有的数据,有必要配合数据处理分部逐步读入,这就是Reactive Stream规范主要目的之一。所以在设计FunDA的数据源(Source)之前必须要考虑实现reacive-data-stream。Slick 3.x版在功能上的突破之一就是实现了对Reactive-Stream API的支持。遗憾的是新版的Slick并没有提供针对data-stream的具体操作函数,官方文档提到可以通过akka-stream或者Play-Iteratee-Reactive-Stream来实现对data-stream的处理操作。Slick是通过db.stream构建一个DatabasePublisher类型来实现Reactive-Stream接口的。Play则提供了stream.IterateeStreams.publisherToEnumerator(SlickDatabasePubliser)转换函数,能够把DatabasePublisher转成Reactive-Stream的数据源(Source)。Play是通过Iteratee来实现对Reactive-Stream的处理操作。我们就在这节讨论一下有关Iteratee的一些原理。在示范前我们必须在build.sbt中增加依赖:"com.typesafe.play" % "play-iteratees-reactive-streams_2.11" % "2.6.0"。所谓Reactive从字面来解释就是互动。Reacive-Stream是指数据产生方(producer)和数据使用方(consumer)之间的互动。大体上是producer通知consumer数据准备完毕可以读取、consumer通知producer读取数据的具体状态,提示是否可以发送数据。下面我们就把Reactive-Stream的基础原理给大家介绍一下:一般我们需要从一个Stream里获取数据时,可以用下面这个界面的read:

trait InputStream {
  def read(): Byte
}

这是一种典型的同步操作:read会占用线程直到获取这个Byte。我们可以用callback函数形式来解决这个问题:把一个读取函数传给目标Stream,以一种被动形式来获取这个Byte: 

trait InputStreamHandler { def onByte(byte: Byte) }

我们想办法把onByte传给Stream作为一种callback函数。当Stream有了Byte后调用这个onByte函数,在这个onByte函数里是收到Byte后应该进行的运算。不过收到这个Byte代表我们程序状态的一个转变,所以我们可以把上面这个界面写成函数式的:

trait InputStreamHandler { def onByte(byte: Byte): InputStreamHandler }

由于状态可能转变,所以我们把当前这个有变化的对象传出来。下面是一个界面实现的例子:

class consume(data: Seq[Byte]) extends InputStreamHandler { def onByte(byte: Byte) = new consume(data :+ byte) }

这个例子里我们把读取的Byte汇集到一个Seq里。但是假如Stream准备好了数据后调用我们的callback函数onByte,而我们无法立即完成函数内的运算,导致调用方线程阻塞,影响整个Stream的运转。我们可以用Future来解决这个问题:

trait InputStreamHandle { def onByte(byte: Byte): Future[InputStreamHandle] }

这样调用方可以立即返回了。不过,调用方如何把数据发送状态通知数据读取方呢?比如已经完成所有数据发送。我们需要把调用方返回的数据再细化点:

trait Input[+E] case class EL[E](e: E) extends Input[E] case object EOF extends Input[Nothing] case object Empty extends Input[Nothing]

现在这个返回数据是个Input[E]了,是带状态的。返回数据具体类型EL,EOF,Empty从字面就可以理解它们代表的状态了。我们的界面变成了这样:

trait InputStreamHandler[E] {
  def onInput(input: Input[E]): Future[InputStreamHandler[E]]
}

界面实现例子变成下面这样:

class consume(data: Seq[Byte]) extends InputStreamHandler[Byte] { def onInput(input: Input[Byte]) = input match { case EL(byte) => Future.successful(new consume(data :+ byte)) case _ => Future.successful(this) } }

上面这个例子中返回Future很是别扭,我们可以这样改善界面InputStreamHandler定义:

trait InputStreamHandler[E] { def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B] }

现在我们可以这样实现那个例子:

class consume(data: Seq[Byte]) extends InputStreamHandler[Byte] { def onByte[B](cont: (Input[Byte] => InputStreamHandler[Byte]) => Future[B]) = cont { case EL(byte) => new consume(data :+ byte) case _ => this } }

现在用起来顺手多了吧。从上面这些例子中我们可以得出一种“推式”流模式(push-model-stream): 由目标stream向读取方推送数据。但Reactive-Stream应该还具备反向通告机制,比如读取方如何通知目标stream已经完成读取操作或者暂时无法再接受数据、又或者可以接受数据了。

现在我们对Reactive-Streams有了个大概的印象:这个模式由两方组成,分别是:数据源(在push-model中就是数据发送方)以及数据消耗方,分别对应了Iteratee模式的Enumerator和Iteratee。也就是说:Enumerator负责发送,Iteratee负责接收。用Iteratee实现Reactive-Streams时必须实现Enumerator和Iteratee之间的双向通告机制。实际上Iteratee描述

首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(3)- 流动数据行操作:FD.. 下一篇FunDA(6)- Reactive Streams:..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目