TOP

FunDA(9)- Stream Source:reactive data streams(一)
2017-10-09 14:30:07 】 浏览:9393
Tags:FunDA Stream Source reactive data streams

    上篇我们讨论了静态数据源(Static Source, snapshot)。这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不安全的资源使用方式。Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效、便捷、安全的后台数据处理工具库。我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-Streams并且提供与Slick3.x的接口API,我们就在这篇讨论里介绍如何把Slick-Reactive-Streams转换成fs2-Streams。根据Slick官方文档:Slick可以通过db.stream函数用Reactive-Stream方式来读取后台数据,具体的配置如下:

  val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false)) val action = queryAction.withStatementParameters(fetchSize = 512) val publisher = db.stream(disableAutocommit andThen action)

首先,我们需要取消自动提交(disableAutocommit)。fetchSize是缓存数据页长度(每批次读取数据字数),然后用db.stream来构成一个Reactive-Streams标准的数据源publisher。Slick官方网页只提供了下面这个使用publisher的例子:

  val fut = publisher.foreach(s => println(s)) Await.ready(fut,Duration.Inf)

除了数据枚举外就没什么用处,也无法提供更细节点的示范。FunDA的具体解决方案是把publisher转换成play-iteratee的Enumerator。play-iteratee支持Reactive-Streams,所以这个Enumerator应该具备协调后台数据和内存缓冲之间关系(back-pressure)的功能。play-iteratee是如下构建Enumerator的;

import play.api.libs.iteratee._ val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

enumerator从后台数据库表中产生的数据源通过Iteratee把数据元素enqueue推送给一个fs2的queue:

    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont { case Input.EOF => { q.enqueue1(None).unsafeRun Done((), Input.Empty) } case Input.Empty => pushData(q) case Input.El(e) => { q.enqueue1(Some(e)).unsafeRun pushData(q) } }

然后fs2进行dequeue后生成fs2的Stream:

      Stream.eva l(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q => Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture() pipe.unNoneTerminate(q.dequeue) }

整个构建Stream的过程在FunDA的fdasources包是这样定义的:

package com.bayakala.funda.fdasources import fs2._ import play.api.libs.iteratee._ import com.bayakala.funda.fdapipes._ import slick.driver.JdbcProfile object FDADataStream { class FDAStreamLoader[SOURCE, TARGET](slickProfile: JdbcProfile, convert: SOURCE => TARGET) { import slickProfile.api._ def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[TARGET] = { val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false)) val action_ = action.withStatementParameters(fetchSize = fetchSize) val publisher = slickDB.stream(disableAutocommit andThen action) val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher) Stream.eva l(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q => Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture() pipe.unNoneTerminate(q.dequeue).map {row => convert(row)} } } def fda_plainStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[SOURCE] = { val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false)) val action_ = action.withStatementParameters(fetchSize = fetchSize) val publisher = slickDB.stream(disableAutocommit andThen action) val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher) Stream.eva l(async.bound  
		
FunDA(9)- Stream Source:reactive data streams(一) https://www.cppentry.com/bencandy.php?fid=90&id=124697

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(8)- Static Source:保.. 下一篇Scala 元组