fs2是scalaz-stream的最新版本,沿用了scalaz-stream被动式(pull model)数据流原理但采用了全新的实现方法。fs2比较scalaz-stream而言具备了:更精简的基础组件(combinator)、更安全的类型、资源使用(type safe, resource safety)、更高的运算效率。由于fs2基本沿用了scalaz-stream的原理,所以我们会在下面的讨论里着重介绍fs2的使用。根据fs2的官方文件,fs2具备了以下新的特点:
1、完全不含任何外部依赖(third-party dependency)
2、流元素增加了节组(chunk)类型和相关的操作方法
3、fs2不再只局限于Task一种副作用运算方式(effect)。用户可以提供自己的effect类型
4、更精简的流转换组件(stream transformation primitives)
5、增加了更多并行运算组件(concurrent primitives)
6、通过bracket函数增强了资源使用安全,特别是异线程资源占用的事后处理过程。用onFinalize取代了onComplete
7、stream状态转换采用了全新的实现方式,使用了新的数据结构:Pull
8、Stream取代了Process。fs2中再没有Process1、Tee、Wye、Channel这些类型别名,取而代之的是:
type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]
type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]
Pipe
替代了 Channel
和 Process1
Pipe2
替代了 Tee
和 Wye
下面我们来看看fs2的一些基本操作:
1 Stream() //> res0: fs2.Stream[Nothing,Nothing] = Segment(Emit(Chunk()))
2 Stream(1,2,3) //> res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))
3 Stream.emit(4) //> res2: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(4)))
4 Stream.emits(Seq(1,2,3)) //> res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))
Stream的类型款式是:Stream[F[_],A]。从上面的例子我们看到所有的F[_]都是Nothing,我们称这样的流为纯数据流(pure stream)。再值得注意的是每个流构建都形成了一个Chunk,代表一节元素。fs2增加了Chunk类型来提高数据元素处理效率。这是fs2的一项新功能。
我们可以用toList或者toVector来运算纯数据流中的元素值:
1 Stream.emits(Seq(1,2,3)).toList ?//> res3: List[Int] = List(1, 2, 3)
2 Stream.emits(Seq(1,2,3)).toVector ?//> res4: Vector[Int] = Vector(1, 2, 3)
纯数据流具备了许多与List相似的操作函数:
1 (Stream(1,2,3) ++ Stream(4,5)).toList //> res5: List[Int] = List(1, 2, 3, 4, 5)
2 Stream(1,2,3).map { _ + 1}.toList //> res6: List[Int] = List(2, 3, 4)
3 Stream(1,2,3).filter { _ % 2 == 0}.toList //> res7: List[Int] = List(2)
4 Stream(1,2,3).fold(0)(_ + _).toList //> res8: List[Int] = List(6)
5 Stream(None,Some(1),Some(3),None).collect { 6 case None => 0
7 case Some(i) => i 8 }.toList //> res9: List[Int] = List(0, 1, 3, 0)
9 Stream.range(1,5).intersperse(42).toList //> res10: List[Int] = List(1, 42, 2, 42, 3, 42, 4)
10 Stream(1,2,3).flatMap {x => Stream(x,x)}.toList //> res11: List[Int] = List(1, 1, 2, 2, 3, 3)
11 Stream(1,2,3).repeat.take(5).toList //> res12: List[Int] = List(1, 2, 3, 1, 2)
以上都是一些基本的List操作函数示范。
我们知道,纯数据流就是scalaz-stream里的Process1,即transducer,是负责对流进行状态转换的。在fs2里transducer就是Pipe(也是channel),我们一般用through来连接transducer。上面示范中的take,filter等都是transducer,我们可以在object pipe里找到这些函数:
1 object pipe { 2 ... 3 /** Drop `n` elements of the input, then echo the rest. */
4 def drop[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
5 _ pull (h => Pull.drop(n)(h) flatMap Pull.echo) 6 ... 7 /** Emits `true` as soon as a matching element is received, else `false` if no input matches */
8 def exists[F[_], I](p: I => Boolean): Stream[F, I] => Stream[F, Boolean] =
9 _ pull { h => Pull.forall[F,I](!p(_))(h) flatMap { i => Pull.output1(!i) }} 10
11 /** Emit only inputs which match the supplied predicate. */
12 def filter[F[_], I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
13 mapChunks(_ filter f) 14
15 /** Emits the first input (if any) which matches the supplied predicate, to the outpu