设为首页 加入收藏

TOP

FunDA(9)- Stream Source:reactive data streams(二)
2017-10-09 14:30:07 】 浏览:9454
Tags:FunDA Stream Source reactive data streams
edQueue[Task,Option[SOURCE]](queSize)).flatMap { q => Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture() pipe.unNoneTerminate(q.dequeue) } } private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont { case Input.EOF => { q.enqueue1(None).unsafeRun Done((), Input.Empty) } case Input.Empty => pushData(q) case Input.El(e) => { q.enqueue1(Some(e)).unsafeRun pushData(q) } } } object FDAStreamLoader { def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDAStreamLoader[SOURCE, TARGET] = new FDAStreamLoader[SOURCE, TARGET](slickProfile, converter) } }

FDADataStream对象内主要实现了fda_typedStream和fda_plainStream。fda_typedStream提供了SOURCE=>TARGET的转换。从Enumerator转换到Stream整个过程和原理我们在FunDA(7)里已经详细介绍过了。下面我们看看FunDA-Example中fda_typedStream的具体应用例子:

package com.bayakala.funda.fdasources.examples import slick.driver.H2Driver.api._ import com.bayakala.funda.fdasources.FDADataStream._ import com.bayakala.funda.samples._ import com.bayakala.funda.fdarows._ import com.bayakala.funda.fdapipes._ import FDANodes._ import FDAValves._ object Example2 extends App { val albums = SlickModels.albums val companies = SlickModels.companies //数据源query
   val albumsInfo = for { (a,c) <- albums join companies on (_.company === _.id) } yield (a.title,a.artist,a.year,c.name) //query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW //转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], String)): Album = Album(row._1, row._2, row._3.getOrElse(2000), row._4) val db = Database.forConfig("h2db") val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(512,128) //定义一个用户作业函数:列印数据内容
  def printAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"发行:${album.publisher}") fda_next(album) case _ => fda_skip } } albumStream.through(fda_execUserTask(printAlbums)).run.unsafeRun }

运算结果:

品名:Keyboard Cat's Greatest Hits
演唱:Keyboard Cat 年份:1999 发行:Sony Music Inc ____________________ 品名:Spice 演唱:Spice Girls 年份:1999 发行:Columbia Records ____________________ 品名:Whenever You Need Somebody 演唱:Rick Astley 年份:1999 发行:Sony Music Inc ____________________ 品名:The Triumph of Steel 演唱:Manowar 年份:1999 发行:The K-Pops Singers ____________________ 品名:Believe 演唱:Justin Bieber 年份:1999 发行:Columbia Records Process finished with exit code 0

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(8)- Static Source:保.. 下一篇Scala 元组

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目