t of the returned `Pull` */
16 def find[F[_],I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
17 _ pull { h => Pull.find(f)(h).flatMap { case o #: h => Pull.output1(o) }} 18
19
20 /** 21 * Folds all inputs using an initial value `z` and supplied binary operator, 22 * and emits a single element stream. 23 */
24 def fold[F[_],I,O](z: O)(f: (O, I) => O): Stream[F,I] => Stream[F,O] =
25 _ pull { h => Pull.fold(z)(f)(h).flatMap(Pull.output1) } 26 ... 27 /** Emits all elements of the input except the first one. */
28 def tail[F[_],I]: Stream[F,I] => Stream[F,I] =
29 drop(1) 30
31 /** Emit the first `n` elements of the input `Handle` and return the new `Handle`. */
32 def take[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
33 _ pull Pull.take(n) 34 ...
我们可以用through来连接这些transducer:
1 Stream(1,2,3).repeat 2 .throughPure(pipe.take(10)) 3 .throughPure(pipe.filter(_ % 2 == 0)) 4 .toList //> res13: List[Int] = List(2, 2, 2)
以上的throughPure等于是through + pure。Pure是没有任何作用的F[_],是专门为帮助compiler进行类型推导的类型。其实我们可以用pure先把纯数据流升格后再用through:
1 Stream(1,2,3).repeat.pure
2 .through(pipe.take(10))
3 .through(pipe.filter(_ % 2 == 0))
4 .toList //> res14: List[Int] = List(2, 2, 2)
这时compiler不再出错误信息了。在fs2 pipe对象里的函数通过方法注入或者类型继承变成了Stream的自身函数,所以我们也可以直接在Stream类型上使用这些transducer:
1 Stream(1,2,3).repeat.take(10).filter(_ % 2 == 0).toList 2 //> res15: List[Int] = List(2, 2, 2)
我们在前面提到过fs2使用了全新的方法和数据类型来实现transducer。transducer的类型是Pipe,即:
type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]
我们看到Pipe就是一个Function1的类型别名,一个lambda:提供一个Stream[F,I],返回Stream[F,O]。那么在fs2里是如何读取一个Stream[F,I]里的元素呢?我们前面提到是通过一个新的数据结构Pull来实现的,先来看看fs2是如何实现Stream >> Pull >> Stream转换的:
1 val pll = Stream(1,2,3).pure.open //> pll : fs2.Pull[fs2.Pure,Nothing,fs2.Stream.Handle[fs2.Pure,Int]] = fs2.Pull
2 de5031f 3 val strm = pll.close //> strm : fs2.Stream[fs2.Pure,Nothing] = eva lScope(Scope(Bind(eva l(Snapshot),<
4 function1>))).flatMap(<function1>)
对一个Stream施用open后得到一个Pull类型。pll是个Pull数据结构,它的类型定义如下:
class Pull[+F[_],+O,+R](private[fs2] val get: Free[P[F,O]#f,Option[Either[Throwable,R]]])
在Pull的类型参数中F是一个运算,O代表输出元素类型,R代表Pull里的数据资源。我们可以从R读取元素。在上面的例子里pll的R值是个Handle类型。这个类型里应该提供了读取元素的方法:
implicit class HandleOps[+F[_],+A](h: Handle[F,A]) { def push[A2>:A](c: Chunk[A2])(implicit A2: RealSupertype[A,A2]): Handle[F,A2] = self.push(h: Handle[F,A2])(c) def push1[A2>:A](a: A2)(implicit A2: RealSupertype[A,A2]): Handle[F,A2] = self.push1(h: Handle[F,A2])(a) def #:[H](hd: H): Step[H, Handle[F,A]] = Step(hd, h) def await: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.await(h) def await1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.await1(h) def awaitNonempty: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = Pull.awaitNonempty(h) def echo1: Pull[F,A,Handle[F,A]] = Pull.echo1(h) def echoChunk: Pull[F,A,Handle[F,A]] = Pull.echoChunk(h) def peek: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.peek(h) def peek1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.peek1(h) def awaitAsync[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]): Pull[F2, Nothing, AsyncStep[F2,A2]] = self.awaitAsync(Sub1.substHandle(h)) def await1Async[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]): Pull[ |