设为首页 加入收藏

TOP

FunDA(15)- 示范:任务并行运算 - user task parallel execution(四)
2017-10-09 14:26:39 】 浏览:10589
Tags:FunDA 示范 任务 并行 运算 user task parallel execution
FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq //constructed a Stream[Task,String] val countyStream = fda_staticSource(countySeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) => //target row type if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } countyStream.appendTask(getid).startRun id } //original table listing implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid) val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)() def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel => if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip } val cnt_start = System.currentTimeMillis() /* AQMRPTStream.take(100000) .appendTask(getIdsThenInsertAction) .appendTask(runInsertAction) .startRun //println(s"processing 10000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 10000 rows in a single thread in 570 seconds //println(s"processing 20000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 20000 rows in a single thread in 1090 seconds //println(s"processing 100000 rows in a single thread in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 100000 rows in a single thread in 2+ hrs implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8) .appendTask(runInsertAction) .startRun //println(s"processing 10000 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") // processing 10000 rows parallelly in 316 seconds //println(s"processing 20000 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 20000 rows parallelly in 614 seconds println(s"processing 100000 rows parallelly in ${(System.currentTimeMillis - cnt_start)/1000} seconds") //processing 100000 rows parallelly in 3885 seconds }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(12)- 示范:强类型数据.. 下一篇FunDA(16)- 示范:整合并行运..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目