Int]](2)).flatMap { q =>
//run Enumerator-Iteratee and enqueue data in thread 1
//dequeue data and en-stream in thread 2(current thread)
}
因为Stream.eva l运算结果是Stream[Task,Int],所以我们可以得出这个flatMap内的函数款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我们先考虑如何实现数据enqueue部分:这部分是通过Iteratee的运算过程产生的。我们提到过这部分必须在另一个线程里运行,所以可以用Task来选定另一线程如下:
Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
现在这个Task就在后面另一个线程里自己去运算了。但它的运行进展则会依赖于另一个线程中dequeue数据的进展。我们先看看fs2提供的两个函数款式:
/** Repeatedly calls `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eva l(d._1), d => d._2).repeat /** * Halts the input stream at the first `None`. * * @example {{{ * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList * res0: List[Int] = List(1, 2) * }}} */ def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] = _ repeatPull { _.receive { case (hd, tl) => val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i }) if (out.size == hd.size) Pull.output(out) as tl else if (out.isEmpty) Pull.done else Pull.output(out) >> Pull.done }}
刚好,dequeue产生Stream[F,A]。而unNoneTerminate可以根据Stream(None)来终止运算。现在我们可以把这个Reactive-Streams到fs2-pull-streams转换过程这样来定义:
implicit val strat = Strategy.fromFixedDaemonPool(4) //> strat : fs2.Strategy = Strategy
val fs2Stream: Stream[Task,Int] = Stream.eva l(async.boundedQueue[Task,Option[Int]](2)).flatMap { q => Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture pipe.unNoneTerminate(q.dequeue) } //> fs2Stream : fs2.Stream[fs2.Task,Int] = attempteva l(Task).flatMap(<function1>).flatMap(<function1>)
现在这个stream应该已经变成fs2.Stream[Task,Int]了。我们可以用前面的log函数来试运行一下:
def log[A](prompt: String): Pipe[Task,A,A] = _.eva lMap {row => Task.delay{ println(s"$prompt> $row"); row }} //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
fs2Stream.through(log("")).run.unsafeRun //> > 1 //| > 2 //| > 3 //| > 4 //| > 5
我们成功的把Iteratee的Reactive-Stream转化成fs2的Pull-Model-Stream。
下面是这次讨论的源代码:
import play.api.libs.iteratee._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.mutable._
import fs2._
object iteratees {
def showElements: Iteratee[Int,Unit] = Cont {
case Input.El(e) =>
println(s"EL($e)")
showElements
case Input.Empty => showElements
case Input.EOF =>
println("EOF")
Done((),Input.EOF)
}
val enumNumbers = Enumerator(1,2,3,4,5)
enumNumbers |>> showElements
Iteratee.flatten(enumNumbers |>> showElements).run
def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {
case Input.EOF =>
q.enqueue1(None).unsafeRun
Done((),Input.EOF)
case Input.Empty => enqueueTofs2(q)
case Input.El(e) =>
q.enqueue1(Some(e)).unsafeRun
enqueueTofs2(q)
}
implicit val strat = Strategy.fromFixedDaemonPool(4)
val fs2Stream: Stream[Task,Int] = Stream.eva l(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>
Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyn