设为首页 加入收藏

TOP

Akka(27): Stream:Use case-Connecting Slick-dbStream & Scalaz-stream-fs2(一)
2017-10-09 13:27:32 】 浏览:8877
Tags:Akka Stream Use case-Connecting Slick-dbStream Scalaz-stream-fs2

   在以前的博文中我们介绍了Slick,它是一种FRM(Functional Relation Mapper)。有别于ORM,FRM的特点是函数式的语法可以支持灵活的对象组合(Query Composition)实现大规模的代码重复利用,但同时这些特点又影响了编程人员群体对FRM的接受程度,阻碍了FRM成为广为流行的一种数据库编程方式。所以我们只能从小众心态来探讨如何改善Slick现状,希望通过与某些Stream库集成,在Slick FRM的基础上恢复一些人们熟悉的Recordset数据库光标(cursor)操作方式,希望如此可以降低FRM数据库编程对函数式编程水平要求,能够吸引更多的编程人员接受FRM。刚好,在这篇讨论里我们希望能介绍一些Akka-Stream和外部系统集成对接的实际用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是一个挺好的想法。Slick和Akka-Stream可以说是自然匹配的一对,它们都是同一个公司产品,都支持Reactive-Specification。Reactive系统的集成对象之间是通过公共界面Publisher来实现对接的。Slick提供了个Dababase.stream函数可以构建这个Publisher:

 /** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified * `DBIOAction` and return the result directly as a stream without buffering everything first. * This method is only supported for streaming actions. * * The Publisher itself is just a stub that holds a reference to the action and this Database. * The action does not actually start to run until the call to `onSubscribe` returns, after * which the Subscriber is responsible for reading the full response or cancelling the * Subscription. The created Publisher can be reused to serve a multiple Subscribers, * each time triggering a new execution of the action. * * For the purpose of combinators such as `cleanup` which can run after a stream has been * produced, cancellation of a stream by the Subscriber is not considered an error. For * example, there is no way for the Subscriber to cause a rollback when streaming the * results of `someQuery.result.transactionally`. * * When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row * is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers * from within `onNext`. If streaming is interrupted due to back-pressure signaling, the next * row will be prefetched (in order to buffer the next result page from the server when a page * boundary has been reached). */ final def stream[T](a: DBIOAction[_, Streaming[T], Nothing]): DatabasePublisher[T] = streamInternal(a, false)

这个DatabasePublisher[T]就是一个Publisher[T]:

/** A Reactive Streams `Publisher` for database Actions. */
abstract class DatabasePublisher[T] extends Publisher[T] { self => ... }

然后Akka-Stream可以通过Source.fromPublisher(publisher)构建Akka Source构件:

  /** * Helper to create [[Source]] from `Publisher`. * * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] = fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))

理论上Source.fromPublisher(db.stream(query))就可以构建一个Reactive-Stream-Source了。下面我们就建了例子来做示范:首先是Slick的铺垫代码boiler-code:

  val aqmraw = Models.AQMRawQuery val db = Database.forConfig("h2db") // aqmQuery.result returns Seq[(String,String,String,String)]
  val aqmQuery = aqmraw.map {r => (r.year,r.state,r.county,r.value)} // type alias
  type RowType = (String,String,String,String) // user designed strong typed resultset type. must extend FDAROW
  case class TypedRow(year: String, state: String, county: String, value: String) extends FDAROW // strong typed resultset conv
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(26): Stream:异常处理-E.. 下一篇spark-shell简单使用介绍(scala)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目