设为首页 加入收藏

TOP

Scalaz(55)- scalaz-stream: fs2-基础介绍,fs2 stream transformation(三)
2017-10-10 12:11:08 】 浏览:5063
Tags:Scalaz scalaz-stream: fs2- 基础 介绍 fs2 stream transformation
F2, Nothing, AsyncStep1[F2,A2]]
= self.await1Async(Sub1.substHandle(h)) def covary[F2[_]](implicit S: Sub1[F,F2]): Handle[F2,A] = Sub1.substHandle(h) } implicit class HandleInvariantEffectOps[F[_],+A](h: Handle[F,A]) { def invAwait1Async[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]): Pull[F, Nothing, AsyncStep1[F,A2]] = self.await1Async(h) def invAwaitAsync[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]): Pull[F, Nothing, AsyncStep[F,A2]] = self.awaitAsync(h) def receive1[O,B](f: Step[A,Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await1.flatMap(f) def receive[O,B](f: Step[Chunk[A],Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await.flatMap(f) }

果然在Handle提供的函数里有await,receive等这些读取函数。我们试着来实现一个简单的transducer:一个filter函数:

 1 import scala.language.higherKinds  2 def myFilter[F[_],A](f: A => Boolean): Pipe[F, A, A] = {  3   def go(h: Stream.Handle[F,A]): Pull[F,A,Unit] = {  4 // h.receive1 {case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}
 5        h.await1.flatMap { case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}  6  }  7 // sin => sin.open.flatMap {h => go(h)}.close
 8   sin => sin.pull(go _)  9 }                                   //> myFilter: [F[_], A](f: A => Boolean)fs2.Pipe[F,A,A]
10 
11 Stream.range(0,10).pure.through(myFilter(_ % 2 == 0)).toList 12                                      //> res17: List[Int] = List(0, 2, 4, 6, 8)

我们从Pull里用await1或者receive1把一个Step数据结构从Handle里扯(pull)出来然后再output到Pull结构里。把这个Pull close后得到我们需要的Stream。我们把例子使用的类型及函数款式陈列在下面:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O] def await1[F[_],I]: Handle[F,I] => Pull[F,Nothing,Step[I,Handle[F,I]]] = {...} def receive1[F[_],I,O,R](f: Step[I,Handle[F,I]] => Pull[F,O,R]): Handle[F,I] => Pull[F,O,R] = _.await1.flatMap(f) def pull[F[_],F2[_],A,B](s: Stream[F,A])(using: Handle[F,A] => Pull[F2,B,Any])(implicit S: Sub1[F,F2]) : Stream[F2,B] = Pull.close { Sub1.substPull(open(s)) flatMap (h => Sub1.substPull(using(h))) }

再示范另一个Pipe的实现:take

1 def myTake[F[_],A](n: Int): Pipe[F,A,A] = { 2    def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => { 3       if (n <= 0) Pull.done 4       else h.receive1 { case a #: h => Pull.output1(a).flatMap{_ => go(n-1)(h)}} 5  } 6    sin => sin.pull(go(n)) 7 }                                                 //> myTake: [F[_], A](n: Int)fs2.Pipe[F,A,A]
8 Stream.range(0,10).pure.through(myTake(3)).toList //> res18: List[Int] = List(0, 1, 2)

我们曾经提过fs2功能提升的其中一项是增加了节组(Chunk)数据类型和相关的操作函数。Chunk是fs2内部使用的一种集合,这样fs2就可以一节一节(by chunks)来处理数据了。Chunk本身具备了完整的集合函数:

 

/** * Chunk represents a strict, in-memory sequence of `A` values. */ trait Chunk[+A] { self => def size: Int def uncons: Option[(A, Chunk[A])] =
    if (size == 0) None else Some(apply(0) -> drop(1)) def apply(i: Int): A def copyToArray[B >: A](xs: Array[B]): Unit def drop(n: Int): Chunk[A] def take(n: Int): Chunk[A] def filter(f: A => Boolean): Chunk[A] def foldLeft[B](z: B)(f: (B,A) => B): B def foldRight[B](z: B)(f: (A,B) => B): B def indexWhere(p: A => Boolean): Option[Int] = { val index = iterator.indexWhere(p) if (index < 0) None else Some(index) } def isEmpty = size == 0 def toArray[B >: A: ClassTag]: Array[B] = { val arr = new Array[B](size) copyToArray(arr) arr } def toList = foldRight(Nil: List[A])(_ :: _) def toVector = foldLeft(Vector.empty[A])(_ :+ _) def collect[B](pf: PartialFunction[A,B]): Chunk[B] = { val buf = new collection.mut
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala--继承 下一篇Scala--包和引入

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目