设为首页 加入收藏

TOP

Scalaz(47)- scalaz-stream: 深入了解-Source(一)
2017-10-10 12:12:38 】 浏览:9712
Tags:Scalaz scalaz-stream: 深入 了解 -Source

   scalaz-stream库的主要设计目标是实现函数式的I/O编程(functional I/O)。这样用户就能使用功能单一的基础I/O函数组合成为功能完整的I/O程序。还有一个目标就是保证资源的安全使用(resource safety):使用scalaz-stream编写的I/O程序能确保资源的安全使用,特别是在完成一项I/O任务后自动释放所有占用的资源包括file handle、memory等等。我们在上一篇的讨论里笼统地解释了一下scalaz-stream核心类型Process的基本情况,不过大部分时间都用在了介绍Process1这个通道类型。在这篇讨论里我们会从实际应用的角度来介绍整个scalaz-stream链条的设计原理及应用目的。我们提到过Process具有Emit/Await/Halt三个状态,而Append是一个链接stream节点的重要类型。先看看这几个类型在scalaz-stream里的定义:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O] case class Await[+F[_], A, +O]( req: F[A] , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing]) ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing] case class Append[+F[_], +O]( head: HaltEmitOrAwait[F, O] , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance ) extends Process[F, O] 

我们看到Process[F,O]被包嵌在Trampoline类型里,所以Process是通过Trampoline来实现函数结构化的,可以有效解决大量stream运算堆栈溢出问题(StackOverflowError)。撇开Trampoline等复杂的语法,以上类型可以简化成以下理论结构:

 1 rait Process[+F[_],+O]  2 case object Cause  3 
 4 case class Emit[O](out: O) extends Process[Nothing, O]  5 
 6 case class Halt(cause: Cause) extends Process[Nothing,Nothing]  7 
 8 case class Await[+F[_],E,+O](  9  req: F[E], 10   rcv: E => Process[F,O], 11   preempt: E => Process[F,Nothing] = Halt) extends Process[F,O] 12 
13 case class Append[+F[_],+O]( 14  head: Process[F,O], 15   stack: Vector[Cause => Process[F,O]]) extends Process[F,O]  

我们来说明一下:

Process[F[_],O]:从它的类型可以推断出scalaz-stream可以在输出O类型元素的过程中进行可能含副作用的F类型运算。

Emit[O](out: O):发送一个O类型元素;不可能进行任何附加运算

Halt(cause: Cause):停止发送;cause是停止的原因:End-完成发送,Err-出错终止,Kill-强行终止

Await[+F[_],E,+O]:这个是运算流的核心Process状态。先进行F运算req,得出结果E后输入函数rcv转换到下一个Process状态,完成后执行preempt这个事后清理函数。这不就是个flatMap函数结构版嘛。值得注意的是E类型是个内部类型,由F运算产生后输入rcv后就不再引用了。我们可以在preepmt函数里进行资源释放。如果我们需要构建一个运算流,看来就只有使用这个Await类型了

Append[+F[_],+O]:Append是一个Process[F,O]链接类型。首先它不但担负了元素O的传送,更重要的是它还可以把上一节点的F运算传到下一个节点。这样才能在下面节点时运行对上一个节点的事后处置函数(finalizer)。Append可以把多个节点结成一个大节点:head是第一个节点,stack是一串函数,每个函数接受上一个节点完成状态后运算出下一个节点状态

一个完整的scalaz-stream由三个类型的节点组成Source(源点)/Transducer(传换点)/Sink(终点)。节点间通过Await或者Append来链接。我们再来看看Source/Transducer/Sink的类型款式:

上游:Source       >>> Process0[O]   >>> Process[F[_],O]

中游:Transduce    >>> Process1[I,O] 

下游:Sink/Channel >>> Process[F[_],O => F[Unit]], Channel >>> Process[F[_],I => F[O]]

我们可以用一个文件处理流程来描述完整scalaz-stream链条的作用:

Process[F[_],O],用F[O]方式读取文件中的O值,这时F是有副作用的 

>>> Process[I,O],I代表从文件中读取的原始数据,O代表经过筛选、处理产生的输出数据

>>> O => F[Unit]是一个不返回结果的函数,代表对输入的O类型数据进行F运算,如把O类型数据存写入一个文件

/>> I => F[O]是个返回结果的函数,对输入I进行F运算后返回O,如把一条记录写入数据库后返回写入状态

以上流程简单描述:从文件中读出数据->加工处理读出数据->写入另一个文件。虽然从描述上看起来很简单,但我们的目的是资源安全使用:无论在任何终止情况下:正常读写、中途强行停止、出错终止,scalaz-stream都会主动关闭开启的文件、停止使用的线程、释放占用的内存等其它资源。这样看来到不是那么简单了。我们先试着分析Source/Transducer/Sink这几种类型的作用:

1 import Process._ 2 emit(0)                        //> res0: scalaz.str
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记7 - 运算符重载 下一篇scala学习手记21 - 传递变长参数

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目