设为首页 加入收藏

TOP

Akka(27): Stream:Use case-Connecting Slick-dbStream & Scalaz-stream-fs2(二)
2017-10-09 13:27:32 】 浏览:8865
Tags:Akka Stream Use case-Connecting Slick-dbStream Scalaz-stream-fs2
ersion function. declared implicit to remind during compilation
implicit def toTypedRow(row: RowType): TypedRow = TypedRow(row._1,row._2,row._3,row._4)

我们需要的其实就是aqmQuery,用它来构建DatabasePublisher:

  // construct DatabasePublisher from db.stream
  val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result) // construct akka source
  val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)

有了dbPublisher就可以用Source.fromPublisher函数构建source了。现在我们试着运算这个Akka-Stream:

  implicit val actorSys = ActorSystem("actor-system") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer() source.take(6).map{row => toTypedRow(row)}.runWith( Sink.foreach(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") })) scala.io.StdIn.readLine() actorSys.terminate()

下面是运算结果:

州名: Alabama 县名:Elmore 年份:1999 取值:5
------------- 州名: Alabama 县名:Jefferson 年份:1999 取值:39
------------- 州名: Alabama 县名:Lawrence 年份:1999 取值:28
------------- 州名: Alabama 县名:Madison 年份:1999 取值:31
------------- 州名: Alabama 县名:Mobile 年份:1999 取值:32
------------- 州名: Alabama 县名:Montgomery 年份:1999 取值:15
-------------

显示我们已经成功的连接了Slick和Akka-Stream。

现在我们有了Reactive stream source,它是个akka-stream,该如何对接处于下游的scalaz-stream-fs2呢?我们知道:akka-stream是Reactive stream,而scalaz-stream-fs2是纯“拖式”pull-model stream,也就是说上面这个Reactive stream source必须被动等待下游的scalaz-stream-fs2来读取数据。按照Reactive-Stream规范,下游必须通过backpressure信号来知会上游是否可以发送数据状态,也就是说我们需要scalaz-stream-fs2来产生backpressure。scalaz-stream-fs2 async包里有个Queue结构:

/** * Asynchronous queue interface. Operations are all nonblocking in their * implementations, but may be 'semantically' blocking. For instance, * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */ trait Queue[F[_], A] { self =>
  /** * Enqueues one element in this `Queue`. * If the queue is `full` this waits until queue is empty. * * This completes after `a` has been successfully enqueued to this `Queue` */ def enqueue1(a: A): F[Unit] /** * Enqueues each element of the input stream to this `Queue` by * calling `enqueue1` on each element. */ def enqueue: Sink[F, A] = _.eva lMap(enqueue1) /** Dequeues one `A` from this queue. Completes once one is ready. */ def dequeue1: F[A] /** Repeatedly calls `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eva l(d._1), d => d._2).repeat ... }

这个结构支持多线程操作,也就是说enqueue和dequeue可以在不同的线程里操作。值得关注的是:enqueue会block,只有在完成了dequeue后才能继续。这个dequeue就变成了抵消backpressure的有效方法了。具体操作方法是:上游在一个线程里用enqueue发送一个数据元素,然后等待下游完成在另一个线程里的dequeue操作,完成这个循环后再进行下一个元素的enqueue。enqueue代表akka-stream向scalaz-stream-fs2发送数据,可以用akka-stream的Sink构件来实现:

 class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] { val in = Inlet[T]("inport") val shape = SinkShape.of(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = { pull(in)          //initiate stream elements movement
 super.preStart() } override def onPush(): Unit = { q.enqueue1(Some(grab(in))).uns
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Akka(26): Stream:异常处理-E.. 下一篇spark-shell简单使用介绍(scala)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目