scalaz-stream是一个泛函数据流配件库(functional stream combinator library),特别适用于函数式编程。scalar-stream是由一个以上各种状态的Process串联组成。stream代表一连串的元素,可能是自动产生或者由外部的源头输入,如:一连串鼠标位置;文件中的文字行;数据库记录;又或者一连串的HTTP请求等。Process就是stream转换器(transducer),它可以把一种stream转换成另一种stream。Process的类型款式如下:
sealed trait Process[+F[_], +O]
其中F是个高阶类,是一种算法,O是Process的运算值。从类型款式上看Process是个对O类型值进行F运算的节点,那么scalaz-stream就应该是个运算流了。Process包含以下几种状态:
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O] case class Await[+F[_], A, +O]( req: F[A] , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing]) ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] { ... } case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing] case class Append[+F[_], +O]( head: HaltEmitOrAwait[F, O] , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance ) extends Process[F, O] { ... }
scalaz-stream是个主动读取模式的流(pull model stream),Process转换stream的方式不是以Stream[I] => Stream[O]这种函数方式,而是一种状态转换方式进行(state transition),所以这些状态就等于向一个驱动程序发出的请求:
Emit[+O]:请求发一个O值
Await[+F[_],A,+O]:要求运算F[A],得出F[A]的结果A后输入函数rcv再运算得出下一个Process状态。这个是flatMap函数的结构化版本
Halt:停止发送
Append:连接前后两个Process
可以看到Emit,Await,Halt,Append都是Process类型的结构化状态。其中Await就是flatMap函数的结构化,Emit就像Return,所以Process就是一个Free Monad。
Emit的作用是发出一个O值,Await的作用是运算F然后连接下一个Process, Append的作用则是把前一个Process的信息传递到下一个Process。Await和Append分别是不同方式的Process连接方式。
Process又分以下几类:
type Process0[+O] = Process[Nothing,O] /** * A single input stream transducer. Accepts input of type `I`, * and emits values of type `O`. */ type Process1[-I,+O] = Process[Env[I,Any]#Is, O] /** * A stream transducer that can read from one of two inputs, * the 'left' (of type `I`) or the 'right' (of type `I2`). * `Process1[I,O] <: Tee[I,I2,O]`. */ type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O] /** * A stream transducer that can read from one of two inputs, * non-deterministically. */ type Wye[-I,-I2,+O] = Process[Env[I,I2]#Y, O] /** * An effectful sink, to which we can send values. Modeled * as a source of effectful functions. */ type Sink[+F[_],-O] = Process[F, O => F[Unit]] /** * An effectful channel, to which we can send values and * get back responses. Modeled as a source of effectful * functions. */ type Channel[+F[_],-I,O] = Process[F, I => F[O]]
Process[F[_],O]:source:运算流源点,由此发送F[O]运算
Process0[+O]:>>>Process[Nothing,+O]:source:纯数据流源点,发送O类型元素
Process1[-I,+O]:一对一的数据转换节点:接收一个I类型输入,经过处理转换成O类型数据输出
Tee[-I1,-I2,+O]:二对一的有序输入数据转换节点:从左右两边一左一右有顺接受I1,I2类型输入后转换成O类型数据输出
Wye[-I1,-I2,+O]:二对一的无序输入数据转换节点:不按左右顺序,按上游数据发送情况接受I1,I2类型输入后转换成O类型数据输出
Sink[+F[_],-O]:运算终点,在此对O类型数据进行F运算,不返回值:O => F[Unit]
Channel[+F[_],-I,O]:运算终点,接受I类型输入,进行F运算后返回F[O]:I => F[O]
以下是一些简单的Process构建方法:
1 Process.emit(1) //> res0: scalaz.stream.Process0[Int] = Emit(Vector(1))
2 Process.emitAll(Seq(1,2,3)) //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
3 Process.halt //> res2: scalaz.stream.Process0[Nothing] = Halt(End)
4 Process.range(1,2,3) //> res3: scalaz.stream.Process0[Int] = Append(Halt(End),Vector(<function1>))
这些是纯数据流的构