设为首页 加入收藏

TOP

Scalaz(50)- scalaz-stream: 安全的无穷运算-running infinite stream freely(一)
2017-10-10 12:12:32 】 浏览:5347
Tags:Scalaz scalaz-stream: 安全 无穷 运算 running infinite stream freely

   scalaz-stream支持无穷数据流(infinite stream),这本身是它强大的功能之一,试想有多少系统需要通过无穷运算才能得以实现。这是因为外界的输入是不可预料的,对于系统本身就是无穷的,比如键盘鼠标输入什么时候终止、网站上有多少网页、数据库中还有多少条记录等等。但对无穷数据流的运算又引发了新的挑战。我们知道,fp程序的主要运算方式是递归算法,这是个问题产生的源泉:极容易掉入StackOverflowError陷阱。相信许多人对scalaz-stream如何实现无穷数据的运算安全都充满了好奇和疑问,那我们就在本篇讨论中分析一下scalaz-stream的具体运算方式。

   scalaz-stream是由Process类型组件链接而成。Process是个状态机器(state machine)由Emit、Await、Append、Halt几个状态组成。值得注意的是这几个状态都是结构化的:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O] case class Await[+F[_], A, +O]( req: F[A] , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing]) ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] { ... } case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing] case class Append[+F[_], +O]( head: HaltEmitOrAwait[F, O] , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance ) extends Process[F, O] { ... }

首先这些结构代表了Process类型其中的某种状态,而且要注意Await和Append的连接函数运算结果是Trampoline类型的,说明运算这两个连接函数可以避免StackOverflowError,实现安全运行。同时仔细观察可以发现用这些状态结构是可以实现point和flatMap函数的:

  def point(o: O): Process[Nothing,O] = Emit(o) /** * Generate a `Process` dynamically for each output of this `Process`, and * sequence these processes using `append`. */ final def flatMap[F2[x] >: F[x], O2](f: O => Process[F2, O2]): Process[F2, O2] = { // Util.debug(s"FMAP $this")
    this match { case Halt(_) => this.asInstanceOf[Process[F2, O2]] case Emit(os) if os.isEmpty => this.asInstanceOf[Process[F2, O2]] case Emit(os) => os.tail.foldLeft(Try(f(os.head)))((p, n) => p ++ Try(f(n))) case aw@Await(_, _, _) => aw.extend(_ flatMap f) case ap@Append(p, n) => ap.extend(_ flatMap f) } }  

以上证实了Process就是Free Monad。Free Monad可以实现函数结构化,通过heap置换stack,可以在固定的堆栈空间内运行任何规模的程序,有效解决运行递归算法造成的StackOverflowError问题。值得注意的是不但Await和Append这两个状态转换方式是结构化的,它们的连接函数(continuation)运算结果也是包嵌在Trampoline里的。也就是说这样的设计保证了无论在翻译多层的Process状态组合或者运算超长Process链接的stream都可以避免StackOverflowError。

我们来详细了解一下具体的scalaz-stream程序实现方式:在之前的讨论里介绍了通过Free Monad编程的特点是算式/算法关注分离。我们可以说用Process组合成stream就是所谓的算式:对程序功能的描述。而算法具体来说应该由两部分组成:程序翻译和运算,把程序功能描述翻译成Free Monad结构然后运算这些结构里的函数。连续的算法会被翻译成多层的结构。那么翻译和运算就可能会同时进行:翻译一层即运算一层。所以我称算法(interpreter)为译算器:代表翻译和运算。对于无穷运算程序,compiler只能用Process类型的构建器(constructor)把程序翻译成Process的初始状态,然后译算器(interpreter)会一边继续进一步翻译一边运算结果。我们先从分析Process的运算器(runner)Process.runLog作业模式开始:

/** * Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in * which we can catch exceptions. This function is not tail recursive and * relies on the `Monad[F]` to ensure stack safety. */ final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = { runFoldMap[F2, Vector[O2]](Vector(_))( F, C, // workaround for performance bug in Vector ++
      Monoid.instance[Vector[O2]]((a, b) => a fast_++ b, Vector()) ) }

runLog是runFoldMap函数的一个特殊施用:

/** * Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in * which we can catch exceptions. This function is not tail recursive and * relies on the `Monad[F]` to ensure stack safety. */ final def runFoldMap[
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记23 - 函数值 下一篇scala学习手记24 - 多参数函数值

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目