设为首页 加入收藏

TOP

Scalaz(59)- scalaz-stream: fs2-程序并行运算,fs2 running effects in parallel(一)
2017-10-10 12:10:54 】 浏览:7994
Tags:Scalaz scalaz-stream: fs2- 程序 并行 运算 fs2 running effects parallel

    scalaz-stream-fs2是一种函数式的数据流编程工具。fs2的类型款式是:Stream[F[_],O],F[_]代表一种运算模式,O代表Stream数据元素的类型。实际上F就是一种延迟运算机制:F中间包含的类型如F[A]的A是一个可能会产生副作用不纯代码(impure code)的运算结果类型,我们必须用F对A运算的延迟机制才能实现编程过程中的函数组合(compositionality),这是函数式编程的标准做法。如果为一个Stream装备了F[A],就代表这个Stream会在处理数据元素O的过程中对O施用运算A,如果这个运算A会与外界交互(interact with outside world)如:文件、数据库、网络等的读写操作,那么这个Stream有数据元素I/O功能的需求。我们可以通过fs2 Stream的状态机器特性(state machine)及F[A]与外界交互功能来编写完整的数据处理(data processing)程序。如果能够在数据库程序编程中善用fs2的多线程运算模式来实现对数据库存取的并行运算,将会大大提高数据处理的效率。我们将在本篇着重讨论fs2在实现I/O程序中的有关方式方法。

首先,我们需要以整体Stream为程序运算框架,把与外界交互的运算A串联起来,然后通过Stream的节点来代表程序状态。我们首先需要某种方式把F[A]与Stream[F,A]关联起来,也就是我们所说的把一个F[A]升格成Stream[F,A]。fs2提供了Stream.eva l函数,我们看看它的类型款式:

def eva l[F[_], A](fa: F[A]): Stream[F, A] = attempteva l(fa) flatMap { _ fold(fail, emit) }

很明显,提供一个F[A],eva l返回Stream[F,A]。这个返回结果Stream[F,A]的元素A是通过运算F[A]获取的:在一个数据库程序应用场景里这个A可能是个数据库连接(connection),那么F[A]就是一个连接数据库的操作函数,返回的A是个连接connection。这次我们来模拟一个对数据库表进行新纪录存储的场景。一般来说我们会按以下几个固定步骤进行:

1、连接数据库,获取connection连接

2、产生新数据(在其它场景里可能是读取数据然后更新)。这可能是一个循环的操作

3、将数据写入数据库

这三个步骤可以用Stream的三种状态来表示:一个源头(source)、传转(pipe transducer)、终点(sink)。

我们先示范如何构建源头:这是一种占用资源的操作,会产生副作用,所以我们必须用延迟运算方式来编程:

1 //用Map模拟数据库表
2 import scala.collection.mutable.Map 3 type DataStore = Map[Long, String] 4 val dataStore: DataStore = Map()       ?//> dataStore : fs2eva l.DataStore = Map()
5 case class Connection(id: String, store: DataStore) 6 def src(producer: String): Stream[Task,Connection] =
7  Stream.eva l(Task.delay { Connection(producer,dataStore)}) 8       ?          //> src: (producer: String)fs2.Stream[fs2.Task,fs2eva l.Connection]

这个示范用了一个mutable map类型来模拟会产生副作用的数据库表。我们把具体产生数据的源头用Connection.id传下去便于在并行运算示范里进行跟踪。在这个环节里我们模拟了连接数据库dataStore操作。

产生数据是在内存里进行的,不会使用到connection,但我们依然需要把这个connection传递到下个环节:

1 case class Row(conn: Connection, key: Long, value: String) 2 val recId = new java.util.concurrent.atomic.AtomicLong(1) 3                                          ?//> recId : java.util.concurrent.atomic.AtomicLong = 1
4 def createData(conn: Connection): Row =
5    Row(conn, recId.incrementAndGet, s"Producer $conn.id: at ${System.currentTimeMillis}") 6                                          ?//> createData: (conn: fs2eva l.Connection)fs2eva l.Row
7 val trans: Pipe[Task,Connection,Row] = _.map {conn => createData(conn)} 8                 ? //> trans : fs2.Pipe[fs2.Task,fs2eva l.Connection,fs2eva l.Row] = <function1>

trans是个Pipe。我们可以用through把它连接到src。

向数据库读写都会产生副作用。下一个环节我们模拟把trans传递过来的Row写入数据库。这里我们需要用延迟运算机制:

1 def log: Pipe[Task, Row, Row] = _.eva lMap { r =>
2  Task.delay {println(s"saving row pid:${r.conn.id}, rid:${r.key}"); r}} 3 def saveRow(row: Row) = row.conn.store += (row.key -> row.value) 4 
5 val snk: Sink[Task,Row] = _.eva lMap { r =>
6   Task.delay { saveRow(r); () } }

增加了个跟踪函数log。从上面的代码可以看出:实际上Sink就是Pipe,只不过返回了()。

我们试试把这几个步骤连接起来运算一下:

1 val sprg = src("001").through(trans).repeat.take(3).through(log).to(snk) 2     ?//> sprg : fs2.Stream[fs2.Task,Unit] = eva lScope(Scope(Bind(eva l(Snapshot),<function1>))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
3 sprg.run.unsafeRun                                //> saving row pid:001, rid:2 4                                                   //| saving row pid:001, rid:3 5                                                   //| saving row pid:001, rid:4
6 println(dataStore)       //>
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scalaz(58)- scalaz-stream: f.. 下一篇Scala _ 下划线

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目