// construct DatabasePublisher from db.stream
val dbPublisher: DatabasePublisher[RowType] = db.stream[RowType](aqmQuery.result) // construct akka source
val source: Source[RowType,NotUsed] = Source.fromPublisher[RowType](dbPublisher)
implicit val actorSys = ActorSystem("actor-system") implicit val ec = actorSys.dispatcher implicit val mat = ActorMaterializer() source.take(6).map{row => toTypedRow(row)}.runWith( Sink.foreach(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") })) scala.io.StdIn.readLine() actorSys.terminate()
州名: Alabama 县名:Elmore 年份:1999 取值:5
------------- 州名: Alabama 县名:Jefferson 年份:1999 取值:39
------------- 州名: Alabama 县名:Lawrence 年份:1999 取值:28
------------- 州名: Alabama 县名:Madison 年份:1999 取值:31
------------- 州名: Alabama 县名:Mobile 年份:1999 取值:32
------------- 州名: Alabama 县名:Montgomery 年份:1999 取值:15
-------------
/** * Asynchronous queue interface. Operations are all nonblocking in their * implementations, but may be 'semantically' blocking. For instance, * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */ trait Queue[F[_], A] { self =>
/** * Enqueues one element in this `Queue`. * If the queue is `full` this waits until queue is empty. * * This completes after `a` has been successfully enqueued to this `Queue` */ def enqueue1(a: A): F[Unit] /** * Enqueues each element of the input stream to this `Queue` by * calling `enqueue1` on each element. */ def enqueue: Sink[F, A] = _.eva lMap(enqueue1) /** Dequeues one `A` from this queue. Completes once one is ready. */ def dequeue1: F[A] /** Repeatedly calls `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eva l(d._1), d => d._2).repeat ... }
class FS2Gate[T](q: fs2.async.mutable.Queue[Task,Option[T]]) extends GraphStage[SinkShape[T]] { val in = Inlet[T]("inport") val shape = SinkShape.of(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = { pull(in) //initiate stream elements movement
super.preStart() } override def onPush(): Unit = { q.enqueue1(Some(grab(in))).uns