设为首页 加入收藏

TOP

FunDA(7)- Reactive Streams to fs2 Pull Streams(二)
2017-10-09 14:30:09 】 浏览:5835
Tags:FunDA Reactive Streams fs2 Pull
Int]](2)).flatMap { q => //run Enumerator-Iteratee and enqueue data in thread 1 //dequeue data and en-stream in thread 2(current thread) }

因为Stream.eva l运算结果是Stream[Task,Int],所以我们可以得出这个flatMap内的函数款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我们先考虑如何实现数据enqueue部分:这部分是通过Iteratee的运算过程产生的。我们提到过这部分必须在另一个线程里运行,所以可以用Task来选定另一线程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()

现在这个Task就在后面另一个线程里自己去运算了。但它的运行进展则会依赖于另一个线程中dequeue数据的进展。我们先看看fs2提供的两个函数款式:

/** Repeatedly calls `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eva l(d._1), d => d._2).repeat /** * Halts the input stream at the first `None`. * * @example {{{ * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList * res0: List[Int] = List(1, 2) * }}} */ def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] = _ repeatPull { _.receive { case (hd, tl) => val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i }) if (out.size == hd.size) Pull.output(out) as tl else if (out.isEmpty) Pull.done else Pull.output(out) >> Pull.done }}

刚好,dequeue产生Stream[F,A]。而unNoneTerminate可以根据Stream(None)来终止运算。现在我们可以把这个Reactive-Streams到fs2-pull-streams转换过程这样来定义:

implicit val strat = Strategy.fromFixedDaemonPool(4) //> strat : fs2.Strategy = Strategy
val fs2Stream: Stream[Task,Int] = Stream.eva l(async.boundedQueue[Task,Option[Int]](2)).flatMap { q => Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture pipe.unNoneTerminate(q.dequeue) } //> fs2Stream : fs2.Stream[fs2.Task,Int] = attempteva l(Task).flatMap(<function1>).flatMap(<function1>)

现在这个stream应该已经变成fs2.Stream[Task,Int]了。我们可以用前面的log函数来试运行一下:

def log[A](prompt: String): Pipe[Task,A,A] = _.eva lMap {row => Task.delay{ println(s"$prompt> $row"); row }} //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
 fs2Stream.through(log("")).run.unsafeRun          //> > 1 //| > 2 //| > 3 //| > 4 //| > 5

我们成功的把Iteratee的Reactive-Stream转化成fs2的Pull-Model-Stream。

下面是这次讨论的源代码:

import play.api.libs.iteratee._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.mutable._
import fs2._
object iteratees {
def showElements: Iteratee[Int,Unit] = Cont {
  case Input.El(e) =>
     println(s"EL($e)")
     showElements
  case Input.Empty => showElements
  case Input.EOF =>
     println("EOF")
     Done((),Input.EOF)
}
val enumNumbers = Enumerator(1,2,3,4,5)

enumNumbers |>> showElements

Iteratee.flatten(enumNumbers |>> showElements).run


def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {
   case Input.EOF =>
       q.enqueue1(None).unsafeRun
       Done((),Input.EOF)
   case Input.Empty => enqueueTofs2(q)
   case Input.El(e) =>
       q.enqueue1(Some(e)).unsafeRun
       enqueueTofs2(q)
}
implicit val strat = Strategy.fromFixedDaemonPool(4)
val fs2Stream: Stream[Task,Int] = Stream.eva l(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>
  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyn
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(6)- Reactive Streams:.. 下一篇FunDA(8)- Static Source:保..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目