设为首页 加入收藏

TOP

Scalaz(55)- scalaz-stream: fs2-基础介绍,fs2 stream transformation(一)
2017-10-10 12:11:08 】 浏览:5056
Tags:Scalaz scalaz-stream: fs2- 基础 介绍 fs2 stream transformation

    fs2是scalaz-stream的最新版本,沿用了scalaz-stream被动式(pull model)数据流原理但采用了全新的实现方法。fs2比较scalaz-stream而言具备了:更精简的基础组件(combinator)、更安全的类型、资源使用(type safe, resource safety)、更高的运算效率。由于fs2基本沿用了scalaz-stream的原理,所以我们会在下面的讨论里着重介绍fs2的使用。根据fs2的官方文件,fs2具备了以下新的特点:

1、完全不含任何外部依赖(third-party dependency)

2、流元素增加了节组(chunk)类型和相关的操作方法

3、fs2不再只局限于Task一种副作用运算方式(effect)。用户可以提供自己的effect类型

4、更精简的流转换组件(stream transformation primitives)

5、增加了更多并行运算组件(concurrent primitives)

6、通过bracket函数增强了资源使用安全,特别是异线程资源占用的事后处理过程。用onFinalize取代了onComplete

7、stream状态转换采用了全新的实现方式,使用了新的数据结构:Pull

8、Stream取代了Process。fs2中再没有Process1、Tee、Wye、Channel这些类型别名,取而代之的是:   

  • type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]
  • type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]
  • Pipe 替代了 Channel 和 Process1 
  • Pipe2 替代了 Tee 和 Wye

下面我们来看看fs2的一些基本操作:

 

1 Stream()                       //> res0: fs2.Stream[Nothing,Nothing] = Segment(Emit(Chunk()))
2 Stream(1,2,3)                  //> res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))
3 Stream.emit(4)                 //> res2: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(4)))
4 Stream.emits(Seq(1,2,3))       //> res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))

 

Stream的类型款式是:Stream[F[_],A]。从上面的例子我们看到所有的F[_]都是Nothing,我们称这样的流为纯数据流(pure stream)。再值得注意的是每个流构建都形成了一个Chunk,代表一节元素。fs2增加了Chunk类型来提高数据元素处理效率。这是fs2的一项新功能。

我们可以用toList或者toVector来运算纯数据流中的元素值:

 

1 Stream.emits(Seq(1,2,3)).toList        ?//> res3: List[Int] = List(1, 2, 3)
2 Stream.emits(Seq(1,2,3)).toVector      ?//> res4: Vector[Int] = Vector(1, 2, 3)

纯数据流具备了许多与List相似的操作函数:

 

 1 (Stream(1,2,3) ++ Stream(4,5)).toList             //> res5: List[Int] = List(1, 2, 3, 4, 5)
 2 Stream(1,2,3).map { _ + 1}.toList                 //> res6: List[Int] = List(2, 3, 4)
 3 Stream(1,2,3).filter { _ % 2 == 0}.toList         //> res7: List[Int] = List(2)
 4 Stream(1,2,3).fold(0)(_ + _).toList               //> res8: List[Int] = List(6)
 5 Stream(None,Some(1),Some(3),None).collect {  6   case None => 0
 7   case Some(i) => i  8 }.toList                                          //> res9: List[Int] = List(0, 1, 3, 0)
 9 Stream.range(1,5).intersperse(42).toList          //> res10: List[Int] = List(1, 42, 2, 42, 3, 42, 4)
10 Stream(1,2,3).flatMap {x => Stream(x,x)}.toList   //> res11: List[Int] = List(1, 1, 2, 2, 3, 3)
11 Stream(1,2,3).repeat.take(5).toList               //> res12: List[Int] = List(1, 2, 3, 1, 2)

 

以上都是一些基本的List操作函数示范。

我们知道,纯数据流就是scalaz-stream里的Process1,即transducer,是负责对流进行状态转换的。在fs2里transducer就是Pipe(也是channel),我们一般用through来连接transducer。上面示范中的take,filter等都是transducer,我们可以在object pipe里找到这些函数:

 1 object pipe {  2 ...  3 /** Drop `n` elements of the input, then echo the rest. */
 4   def drop[F[_],I](n: Long): Stream[F,I] => Stream[F,I] =
 5     _ pull (h => Pull.drop(n)(h) flatMap Pull.echo)  6 ...  7 /** Emits `true` as soon as a matching element is received, else `false` if no input matches */
 8   def exists[F[_], I](p: I => Boolean): Stream[F, I] => Stream[F, Boolean] =
 9     _ pull { h => Pull.forall[F,I](!p(_))(h) flatMap { i => Pull.output1(!i) }} 10 
11   /** Emit only inputs which match the supplied predicate. */
12   def filter[F[_], I](f: I => Boolean): Stream[F,I] => Stream[F,I] =
13  mapChunks(_ filter f) 14 
15   /** Emits the first input (if any) which matches the supplied predicate, to the outpu
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala--继承 下一篇Scala--包和引入

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目