设为首页 加入收藏

TOP

Scalaz(55)- scalaz-stream: fs2-基础介绍,fs2 stream transformation(二)
2017-10-10 12:11:08 】 浏览:5061
Tags:Scalaz scalaz-stream: fs2- 基础 介绍 fs2 stream transformation
t of the returned `Pull`
*/ 16 def find[F[_],I](f: I => Boolean): Stream[F,I] => Stream[F,I] = 17 _ pull { h => Pull.find(f)(h).flatMap { case o #: h => Pull.output1(o) }} 18 19 20 /** 21 * Folds all inputs using an initial value `z` and supplied binary operator, 22 * and emits a single element stream. 23 */ 24 def fold[F[_],I,O](z: O)(f: (O, I) => O): Stream[F,I] => Stream[F,O] = 25 _ pull { h => Pull.fold(z)(f)(h).flatMap(Pull.output1) } 26 ... 27 /** Emits all elements of the input except the first one. */ 28 def tail[F[_],I]: Stream[F,I] => Stream[F,I] = 29 drop(1) 30 31 /** Emit the first `n` elements of the input `Handle` and return the new `Handle`. */ 32 def take[F[_],I](n: Long): Stream[F,I] => Stream[F,I] = 33 _ pull Pull.take(n) 34 ...

我们可以用through来连接这些transducer:

1 Stream(1,2,3).repeat 2   .throughPure(pipe.take(10)) 3   .throughPure(pipe.filter(_ % 2 == 0)) 4   .toList                                    //> res13: List[Int] = List(2, 2, 2)

以上的throughPure等于是through + pure。Pure是没有任何作用的F[_],是专门为帮助compiler进行类型推导的类型。其实我们可以用pure先把纯数据流升格后再用through:

1 Stream(1,2,3).repeat.pure
2   .through(pipe.take(10))
3   .through(pipe.filter(_ % 2 == 0))
4   .toList                                         //> res14: List[Int] = List(2, 2, 2)

这时compiler不再出错误信息了。在fs2 pipe对象里的函数通过方法注入或者类型继承变成了Stream的自身函数,所以我们也可以直接在Stream类型上使用这些transducer:

1 Stream(1,2,3).repeat.take(10).filter(_ % 2 == 0).toList 2                                   //> res15: List[Int] = List(2, 2, 2)

我们在前面提到过fs2使用了全新的方法和数据类型来实现transducer。transducer的类型是Pipe,即:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

我们看到Pipe就是一个Function1的类型别名,一个lambda:提供一个Stream[F,I],返回Stream[F,O]。那么在fs2里是如何读取一个Stream[F,I]里的元素呢?我们前面提到是通过一个新的数据结构Pull来实现的,先来看看fs2是如何实现Stream >> Pull >> Stream转换的:

1 val pll = Stream(1,2,3).pure.open    //> pll : fs2.Pull[fs2.Pure,Nothing,fs2.Stream.Handle[fs2.Pure,Int]] = fs2.Pull
2 de5031f 3 val strm = pll.close                 //> strm : fs2.Stream[fs2.Pure,Nothing] = eva lScope(Scope(Bind(eva l(Snapshot),<
4 function1>))).flatMap(<function1>)

对一个Stream施用open后得到一个Pull类型。pll是个Pull数据结构,它的类型定义如下:

class Pull[+F[_],+O,+R](private[fs2] val get: Free[P[F,O]#f,Option[Either[Throwable,R]]])

在Pull的类型参数中F是一个运算,O代表输出元素类型,R代表Pull里的数据资源。我们可以从R读取元素。在上面的例子里pll的R值是个Handle类型。这个类型里应该提供了读取元素的方法:

implicit class HandleOps[+F[_],+A](h: Handle[F,A]) { def push[A2>:A](c: Chunk[A2])(implicit A2: RealSupertype[A,A2]): Handle[F,A2] = self.push(h: Handle[F,A2])(c) def push1[A2>:A](a: A2)(implicit A2: RealSupertype[A,A2]): Handle[F,A2] = self.push1(h: Handle[F,A2])(a) def #:[H](hd: H): Step[H, Handle[F,A]] = Step(hd, h) def await: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.await(h) def await1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.await1(h) def awaitNonempty: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = Pull.awaitNonempty(h) def echo1: Pull[F,A,Handle[F,A]] = Pull.echo1(h) def echoChunk: Pull[F,A,Handle[F,A]] = Pull.echoChunk(h) def peek: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.peek(h) def peek1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.peek1(h) def awaitAsync[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]): Pull[F2, Nothing, AsyncStep[F2,A2]] = self.awaitAsync(Sub1.substHandle(h)) def await1Async[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]): Pull[
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala--继承 下一篇Scala--包和引入

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目