id: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) => //target row type
if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit
} else fda_skip //take next row
case _ => fda_skip } } countyStream.appendTask(getid).startRun id } //process input row and produce action row to insert into NORMAQM
def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel =>
if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } //runner for the action rows
val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip } //create parallel sources //get a stream of years
val qryYears = AQMRPTQuery.map(_.year).distinct case class Years(year: Int) extends FDAROW implicit def toYears(y: Int) = Years(y) val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _) val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq val yearStream = fda_staticSource(yearSeq)() //strong row type
implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid) //shared stream loader when operate in parallel mode
val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) //loading rows with year yr
def loadRowsInYear(yr: Int) = { //a new query
val query = AQMRPTQuery.filter(row => row.year === yr) //reuse same loader
AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)() } //loading rows by year
def loadRowsByYear: FDASourceLoader = row => { row match { case Years(y) => loadRowsInYear(y) //produce stream of the year
case _ => fda_appendRow(FDANullRow) } } //start counter
val cnt_start = System.currentTimeMillis() def showRecord: FDAUserTask[FDAROW] = row => { row match { case Years(y) => println(y); fda_skip case aqm: AQMRPTModel => println(s"${aqm.year} $aqm") fda_skip case FDAActionRow(action) => println(s"${action}") fda_skip case _ => fda_skip } } //the following is a process of composition of stream combinators //get parallel source constructor
val parSource = yearStream.toParSource(loadRowsByYear) //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") //produce a stream from parallel sources
val source = fda_par_source(parSource)(3) //turn getIdsThenInsertAction into parallel task
val parTasks = source.toPar(getIdsThenInsertAction) //runPar to produce a new stream
val actionStream =fda_runPar(parTasks)(3) //turn runInsertAction into parallel task
val parRun = actionStream.toPar(runInsertAction) //runPar and carry out by startRun
fda_runPar(parRun)(2).startRun println(s"processing 219400 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") }
|