设为首页 加入收藏

TOP

Scalaz(55)- scalaz-stream: fs2-基础介绍,fs2 stream transformation(四)
2017-10-10 12:11:08 】 浏览:5064
Tags:Scalaz scalaz-stream: fs2- 基础 介绍 fs2 stream transformation
able.ArrayBuffer[B](size) iterator.collect(pf).copyToBuffer(buf) Chunk.indexedSeq(buf) } def map[B](f: A
=> B): Chunk[B] = { val buf = new collection.mutable.ArrayBuffer[B](size) iterator.map(f).copyToBuffer(buf) Chunk.indexedSeq(buf) } def mapAccumulate[S,B](s0: S)(f: (S,A) => (S,B)): (S,Chunk[B]) = { val buf = new collection.mutable.ArrayBuffer[B](size) var s = s0 for { c <- iterator } { val (newS, newC) = f(s, c) buf += newC s = newS } (s, Chunk.indexedSeq(buf)) } def scanLeft[B](z: B)(f: (B, A) => B): Chunk[B] = { val buf = new collection.mutable.ArrayBuffer[B](size + 1) iterator.scanLeft(z)(f).copyToBuffer(buf) Chunk.indexedSeq(buf) } def iterator: Iterator[A] = new Iterator[A] { var i = 0 def hasNext = i < self.size def next = { val result = apply(i); i += 1; result } } ...

 

fs2的大部分转换函数都考虑了对Chunk数据的处理机制。我们先看看fs2是如何表现Chunk数据的:

1 (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)).chunks.toList 2     //> res16: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4, 5), Chunk(6, 7))

fs2是按照Stream的构建批次来分节的。我们来示范一下如何使用Pull的Chunk机制:

 1 def myTakeC[F[_],A](n: Int): Pipe[F,A,A] = {  2   def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => {  3      if ( n <= 0 ) Pull.done  4      else Pull.awaitLimit(n)(h).flatMap {case Step(chunk,h) =>
 5        if (chunk.size <= n) Pull.output(chunk) >> go(n-chunk.size)(h)  6        else Pull.output(chunk.take(n)) }  7  }  8   sin => sin.pull(go(n))  9 }                       //> myTakeC: [F[_], A](n: Int)fs2.Pipe[F,A,A]
10 val s1 = (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)) 11                        //> s1 : fs2.Stream[Nothing,Int] = append(append(Segment(Emit(Chunk(1, 2))), S
12 egment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).fla 13 ?tMap(<function1>)) 14 s1.pure.through(myTake(4)).chunks.toList  ?//> res20: List[fs2.Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4))
15 s1.pure.through(myTakeC(4)).chunks.toList ?//> res21: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4))

myTake和myTakeC产生了不同的结果。

fs2的特长应该是多线程编程了。在Stream的类型款式中:Stream[F[_],A],F[_]是一种可能产生副作用的运算方式,当F[_]等于Nothing时,Stream[Nothing,A]是一种纯数据流,而Stream[F[_],A]就是一种运算流了。我们可以在对运算流进行状态转换的过程中进行运算来实现F的副作用如:数据库读写、IO操作等。fs2不再绑定Task一种运算方式了。任何有Catchable实例的Monad都可以成为Stream的运算方式。但是,作为一种以多线程编程为主导的工具库,没有什么运算方式会比Task更合适了。
我们可以把一个纯数据流升格成运算流:

 

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2 : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))

 

我们先运算这个运算流,结果为一个Task,然后再运算Task来获取运算值:

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2 : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))
2 val t2 = s2.runLog                                //> t2 : fs2.Task[Vector[Int]] = Task
3 t2.unsafeRun                                      //> res22: Vector[Int] = Vector(1, 2, 3)

现在使用myTake和myFilter就不需要pure升格了:

1 s3.through(myFilter(_ % 2 == 0)).through(myTake(3)).runLog.unsafeRun 2                                                   //> res23: Vector[Int] = Vector(2, 2, 2)

下面的例子里展示了fs2的运算流从源头(Source)到传换(Transducer)一直到终点(Sink)的使用示范:

 1 def stdOut: Sink[Task,String]  =
 2   _.eva lMap { x => Task.delay{ println(s"milli: $x")}}  3                                                   //> stdOut: => fs2.Sink[fs2.Task,String]
 4 Stream.repeateva l(Task.delay{System.currentTimeMillis})  5  .map(_.toString)  6   .through(myTake(3))  7  .to(stdOut)  8   .run.unsafeRun                                  //> milli: 1472001934708  9                                                   //| milli: 1472001934714 10                                                   //| milli: 1472001934714

?在上面的例子里我们使用了through,to等连接函数。由于数据最终发

首页 上一页 1 2 3 4 5 下一页 尾页 4/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala--继承 下一篇Scala--包和引入

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目