F2, Nothing, AsyncStep1[F2,A2]] = self.await1Async(Sub1.substHandle(h)) def covary[F2[_]](implicit S: Sub1[F,F2]): Handle[F2,A] = Sub1.substHandle(h) } implicit class HandleInvariantEffectOps[F[_],+A](h: Handle[F,A]) { def invAwait1Async[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]): Pull[F, Nothing, AsyncStep1[F,A2]] = self.await1Async(h) def invAwaitAsync[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]): Pull[F, Nothing, AsyncStep[F,A2]] = self.awaitAsync(h) def receive1[O,B](f: Step[A,Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await1.flatMap(f) def receive[O,B](f: Step[Chunk[A],Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await.flatMap(f) }
果然在Handle提供的函数里有await,receive等这些读取函数。我们试着来实现一个简单的transducer:一个filter函数:
1 import scala.language.higherKinds 2 def myFilter[F[_],A](f: A => Boolean): Pipe[F, A, A] = { 3 def go(h: Stream.Handle[F,A]): Pull[F,A,Unit] = { 4 // h.receive1 {case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)}
5 h.await1.flatMap { case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)} 6 } 7 // sin => sin.open.flatMap {h => go(h)}.close
8 sin => sin.pull(go _) 9 } //> myFilter: [F[_], A](f: A => Boolean)fs2.Pipe[F,A,A]
10
11 Stream.range(0,10).pure.through(myFilter(_ % 2 == 0)).toList 12 //> res17: List[Int] = List(0, 2, 4, 6, 8)
我们从Pull里用await1或者receive1把一个Step数据结构从Handle里扯(pull)出来然后再output到Pull结构里。把这个Pull close后得到我们需要的Stream。我们把例子使用的类型及函数款式陈列在下面:
type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O] def await1[F[_],I]: Handle[F,I] => Pull[F,Nothing,Step[I,Handle[F,I]]] = {...} def receive1[F[_],I,O,R](f: Step[I,Handle[F,I]] => Pull[F,O,R]): Handle[F,I] => Pull[F,O,R] = _.await1.flatMap(f) def pull[F[_],F2[_],A,B](s: Stream[F,A])(using: Handle[F,A] => Pull[F2,B,Any])(implicit S: Sub1[F,F2]) : Stream[F2,B] = Pull.close { Sub1.substPull(open(s)) flatMap (h => Sub1.substPull(using(h))) }
再示范另一个Pipe的实现:take
1 def myTake[F[_],A](n: Int): Pipe[F,A,A] = { 2 def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => { 3 if (n <= 0) Pull.done 4 else h.receive1 { case a #: h => Pull.output1(a).flatMap{_ => go(n-1)(h)}} 5 } 6 sin => sin.pull(go(n)) 7 } //> myTake: [F[_], A](n: Int)fs2.Pipe[F,A,A]
8 Stream.range(0,10).pure.through(myTake(3)).toList //> res18: List[Int] = List(0, 1, 2)
我们曾经提过fs2功能提升的其中一项是增加了节组(Chunk)数据类型和相关的操作函数。Chunk是fs2内部使用的一种集合,这样fs2就可以一节一节(by chunks)来处理数据了。Chunk本身具备了完整的集合函数:
/** * Chunk represents a strict, in-memory sequence of `A` values. */ trait Chunk[+A] { self => def size: Int def uncons: Option[(A, Chunk[A])] =
if (size == 0) None else Some(apply(0) -> drop(1)) def apply(i: Int): A def copyToArray[B >: A](xs: Array[B]): Unit def drop(n: Int): Chunk[A] def take(n: Int): Chunk[A] def filter(f: A => Boolean): Chunk[A] def foldLeft[B](z: B)(f: (B,A) => B): B def foldRight[B](z: B)(f: (A,B) => B): B def indexWhere(p: A => Boolean): Option[Int] = { val index = iterator.indexWhere(p) if (index < 0) None else Some(index) } def isEmpty = size == 0 def toArray[B >: A: ClassTag]: Array[B] = { val arr = new Array[B](size) copyToArray(arr) arr } def toList = foldRight(Nil: List[A])(_ :: _) def toVector = foldLeft(Vector.empty[A])(_ :+ _) def collect[B](pf: PartialFunction[A,B]): Chunk[B] = { val buf = new collection.mut |