设为首页 加入收藏

TOP

FunDA(16)- 示范:整合并行运算 - total parallelism solution(三)
2017-10-09 14:23:36 】 浏览:5204
Tags:FunDA 示范 整合 并行 运算 total parallelism solution
id: FDAUserTask[FDAROW]
= row => { row match { case CountyModel(cid,cname) => //target row type if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } countyStream.appendTask(getid).startRun id } //process input row and produce action row to insert into NORMAQM def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel => if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } //runner for the action rows val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip } //create parallel sources //get a stream of years val qryYears = AQMRPTQuery.map(_.year).distinct case class Years(year: Int) extends FDAROW implicit def toYears(y: Int) = Years(y) val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _) val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq val yearStream = fda_staticSource(yearSeq)() //strong row type implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid) //shared stream loader when operate in parallel mode val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) //loading rows with year yr def loadRowsInYear(yr: Int) = { //a new query val query = AQMRPTQuery.filter(row => row.year === yr) //reuse same loader AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)() } //loading rows by year def loadRowsByYear: FDASourceLoader = row => { row match { case Years(y) => loadRowsInYear(y) //produce stream of the year case _ => fda_appendRow(FDANullRow) } } //start counter val cnt_start = System.currentTimeMillis() def showRecord: FDAUserTask[FDAROW] = row => { row match { case Years(y) => println(y); fda_skip case aqm: AQMRPTModel => println(s"${aqm.year} $aqm") fda_skip case FDAActionRow(action) => println(s"${action}") fda_skip case _ => fda_skip } } //the following is a process of composition of stream combinators //get parallel source constructor val parSource = yearStream.toParSource(loadRowsByYear) //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") //produce a stream from parallel sources val source = fda_par_source(parSource)(3) //turn getIdsThenInsertAction into parallel task val parTasks = source.toPar(getIdsThenInsertAction) //runPar to produce a new stream val actionStream =fda_runPar(parTasks)(3) //turn runInsertAction into parallel task val parRun = actionStream.toPar(runInsertAction) //runPar and carry out by startRun fda_runPar(parRun)(2).startRun println(s"processing 219400 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") }

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(15)- 示范:任务并行运.. 下一篇FunDA(17)- 示范:异常处理与..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目