able.ArrayBuffer[B](size) iterator.collect(pf).copyToBuffer(buf) Chunk.indexedSeq(buf) } def map[B](f: A => B): Chunk[B] = { val buf = new collection.mutable.ArrayBuffer[B](size) iterator.map(f).copyToBuffer(buf) Chunk.indexedSeq(buf) } def mapAccumulate[S,B](s0: S)(f: (S,A) => (S,B)): (S,Chunk[B]) = { val buf = new collection.mutable.ArrayBuffer[B](size) var s = s0 for { c <- iterator } { val (newS, newC) = f(s, c) buf += newC s = newS } (s, Chunk.indexedSeq(buf)) } def scanLeft[B](z: B)(f: (B, A) => B): Chunk[B] = { val buf = new collection.mutable.ArrayBuffer[B](size + 1) iterator.scanLeft(z)(f).copyToBuffer(buf) Chunk.indexedSeq(buf) } def iterator: Iterator[A] = new Iterator[A] { var i = 0 def hasNext = i < self.size def next = { val result = apply(i); i += 1; result } } ...
fs2的大部分转换函数都考虑了对Chunk数据的处理机制。我们先看看fs2是如何表现Chunk数据的:
1 (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)).chunks.toList 2 //> res16: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4, 5), Chunk(6, 7))
fs2是按照Stream的构建批次来分节的。我们来示范一下如何使用Pull的Chunk机制:
1 def myTakeC[F[_],A](n: Int): Pipe[F,A,A] = { 2 def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => { 3 if ( n <= 0 ) Pull.done 4 else Pull.awaitLimit(n)(h).flatMap {case Step(chunk,h) =>
5 if (chunk.size <= n) Pull.output(chunk) >> go(n-chunk.size)(h) 6 else Pull.output(chunk.take(n)) } 7 } 8 sin => sin.pull(go(n)) 9 } //> myTakeC: [F[_], A](n: Int)fs2.Pipe[F,A,A]
10 val s1 = (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)) 11 //> s1 : fs2.Stream[Nothing,Int] = append(append(Segment(Emit(Chunk(1, 2))), S
12 egment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).fla 13 ?tMap(<function1>)) 14 s1.pure.through(myTake(4)).chunks.toList ?//> res20: List[fs2.Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4))
15 s1.pure.through(myTakeC(4)).chunks.toList ?//> res21: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4))
myTake和myTakeC产生了不同的结果。
fs2的特长应该是多线程编程了。在Stream的类型款式中:Stream[F[_],A],F[_]是一种可能产生副作用的运算方式,当F[_]等于Nothing时,Stream[Nothing,A]是一种纯数据流,而Stream[F[_],A]就是一种运算流了。我们可以在对运算流进行状态转换的过程中进行运算来实现F的副作用如:数据库读写、IO操作等。fs2不再绑定Task一种运算方式了。任何有Catchable实例的Monad都可以成为Stream的运算方式。但是,作为一种以多线程编程为主导的工具库,没有什么运算方式会比Task更合适了。 我们可以把一个纯数据流升格成运算流:
1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task] //> s2 : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))
我们先运算这个运算流,结果为一个Task,然后再运算Task来获取运算值:
1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task] //> s2 : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))
2 val t2 = s2.runLog //> t2 : fs2.Task[Vector[Int]] = Task
3 t2.unsafeRun //> res22: Vector[Int] = Vector(1, 2, 3)
现在使用myTake和myFilter就不需要pure升格了:
1 s3.through(myFilter(_ % 2 == 0)).through(myTake(3)).runLog.unsafeRun 2 //> res23: Vector[Int] = Vector(2, 2, 2)
下面的例子里展示了fs2的运算流从源头(Source)到传换(Transducer)一直到终点(Sink)的使用示范:
1 def stdOut: Sink[Task,String] =
2 _.eva lMap { x => Task.delay{ println(s"milli: $x")}} 3 //> stdOut: => fs2.Sink[fs2.Task,String]
4 Stream.repeateva l(Task.delay{System.currentTimeMillis}) 5 .map(_.toString) 6 .through(myTake(3)) 7 .to(stdOut) 8 .run.unsafeRun //> milli: 1472001934708 9 //| milli: 1472001934714 10 //| milli: 1472001934714
?在上面的例子里我们使用了through,to等连接函数。由于数据最终发 |