package com.bayakala.funda.fdasources.examples import slick.driver.H2Driver.api._ import com.bayakala.funda.fdasources.FDADataStream._ import com.bayakala.funda.samples._ import com.bayakala.funda.fdarows._ import com.bayakala.funda.fdapipes._ import FDAValves._ import com.bayakala.funda.fdarows.FDARowTypes._ import scala.concurrent.duration._ object Example2 extends App { val albums = SlickModels.albums val companies = SlickModels.companies //数据源query
val albumsInfo = for { (a,c) <- albums join companies on (_.company === _.id) } yield (a.title,a.artist,a.year,c.name) //query结果强类型(用户提供)
case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW //转换函数(用户提供)
def toTypedRow(row: (String, String, Option[Int], String)): Album = Album(row._1, row._2, row._3.getOrElse(2000), row._4) val db = Database.forConfig("h2db") val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()() //定义一个用户作业函数:列印数据,完全不影响数据流
def printAlbums: FDATask[FDAROW] = row => { row match { case album: Album => println("____________________") println(s"品名:${album.title}") println(s"演唱:${album.artist}") println(s"年份:${album.year}") println(s"发行:${album.publisher}") //原封不动直接传下去 // fda_skip
fda_next(album) case r@ _ => fda_next(r) } } // albumStream.appendTask(printAlbums).appendTask(printAlbums).startRun
def updateYear: FDATask[FDAROW] = row => { row match { case album: Album => { val updateAction = albums.filter(r => r.title === album.title) .map(_.year) .update(Some(2017)) fda_next(FDAActionRow(updateAction)) fda_next(album) } case others@ _ => fda_next(others) } } val runner = FDAActionRunner(slick.driver.H2Driver) def runActions: FDATask[FDAROW] = row => { row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case others@ _ => fda_next(others) } } albumStream.appendTask(updateYear).appendTask(runActions).appendTask(printAlbums).startRun }