设为首页 加入收藏

TOP

Scalaz(53)- scalaz-stream: 程序运算器-application scenario(一)
2017-10-10 12:12:17 】 浏览:8281
Tags:Scalaz scalaz-stream: 程序 运算 application scenario

    从上面多篇的讨论中我们了解到scalaz-stream代表一串连续无穷的数据或者程序。对这个数据流的处理过程就是一个状态机器(state machine)的状态转变过程。这种模式与我们通常遇到的程序流程很相似:通过程序状态的变化来推进程序进展。传统OOP式编程可能是通过一些全局变量来记录当前程序状态,而FP则是通过函数组合来实现状态转变的。这个FP模式讲起来有些模糊和抽象,但实际上通过我们前面长时间对FP编程的学习了解到FP编程讲究避免使用任何局部中间变量,更不用说全局变量了。FP程序的数据A是包嵌在算法F[A]内的。FP编程模式提供了一整套全新的数据更新方法来实现对F[A]中数据A的操作。对许多编程人员来讲,FP的这种编程方式会显得很别扭、不容易掌握。如果我们仔细观察分析,会发觉scalaz-stream就是一种很好的FP编程工具:它的数据也是不可变的(immutable),并且是包嵌在高阶类型结构里的,是通过Process状态转变来标示数据处理过程进展的。scalaz-stream的数据处理是有序流程,这样可以使我们更容易分析理解程序的运算过程,它的三个大环节包括:数据源(source),数据传换(transducer)及数据终点(Sink/Channel)可以很形象地描绘一个程序运算的全过程。scalaz-stream在运算过程中的并行运算方式(parallel computaion)、安全资源使用(resource safety)和异常处理能力(exception handling)是实现泛函多线程编程最好的支持。我们先来看看scalaz-stream里的一个典型函数:

/** * Await the given `F` request and use its result. * If you need to specify fallback, use `awaitOr` */ def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O] = awaitOr(req)(Halt.apply)(rcv) /** * Await a request, and if it fails, use `fb` to determine the next state. * Otherwise, use `rcv` to determine the next state. */ def awaitOr[F[_], A, O](req: F[A])(fb: EarlyCause => Process[F, O])(rcv: A => Process[F, O]): Process[F, O] = Await(req,(r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(fb,rcv))))

这个await函数可以说是一个代表完整程序流程的典范。注意,awaitOr里的Await是个数据结构。这样我们在递归运算await时可以避免StackOverflowError的发生。req: F[A]代表与外界交互的一个运算,如从外部获取输入、函数rcv对这个req产生的运算结果进行处理并设定程序新的状态。

1 import scalaz.stream._ 2 import scalaz.concurrent._ 3 object streamApps { 4 import Process._ 5   def getInput: Task[Int] = Task.delay { 3 }      //> getInput: => scalaz.concurrent.Task[Int]
6   val prg = await(getInput)(i => emit(i * 3))     //> prg : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@4973813a,<function1>,<function1>)
7   prg.runLog.run                                  //> res0: Vector[Int] = Vector(9)
8 }

这是一个一步计算程序。我们可以再加一步:

1  val add10 = await1[Int].flatMap{i => emit(i + 10)} 2                                                   //> add10 : scalaz.stream.Process[[x]scalaz.stream.Process.Env[Int,Any]#Is[x],Int] = Await(Left,<function1>,<function1>)
3   val prg1 = await(getInput)(i => emit(i * 3) |> add10) 4                                                   //> prg1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@6737fd8f,<function1>,<function1>)
5   prg1.runLog.run                                 //> res0: Vector[Int] = Vector(19)

add10是新增的一个运算步骤,是个transducer所以调用了Process1的函数await1,并用pipe(|>)来连接。实际上我们可以用组合方式(compose)把add10和prg组合起来:

1 val prg3 = prg |> add10                         //> prg3 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End) ,Vector(<function1>))
2   prg3.runLog.run                               //> res1: Vector[Int] = Vector(19)

我们同样可以增加一步输出运算:

1  val outResult: Sink[Task,Int] = sink.lift { i => Task.delay{println(s"the result is: $i")}} 2                                                   //> outResult : scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))
3   val prg4 = prg1 to outResult                    //> prg4 : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],Unit] = Append(Halt(End),Vector(<function1>, <function1>))
4   prg4.run.run                                    //> the result is: 19

 scalaz-stream的输出类型是Sink,我们用to来连接。那么如果需要不断重复运算呢:

 1 import scalaz._  2 import Scalaz._  3 import scalaz.concurrent._  4 import s
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记32 - trait选择性混.. 下一篇geotrellis使用(七)记录一次惨..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目