设为首页 加入收藏

TOP

FunDA(15)- 示范:任务并行运算 - user task parallel execution(二)
2017-10-09 14:26:39 】 浏览:10578
Tags:FunDA 示范 任务 并行 运算 user task parallel execution
der.fda_typedRows(StateQuery.result)(db).toSeq
//constructed a Stream[Task,String] val stateStream = fda_staticSource(stateSeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case StateModel(stid,stname) => //target row type if (stname.contains(state)) { id = stid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } stateStream.appendTask(getid).startRun id }

可以看到getStateID函数每次运算都重复构建stateStream。这样可以达到增加io操作的目的。

同样,我们也需要设计另一个函数来从COUNTIES表里获取id字段:

  //another conceived task for the purpose of resource consumption //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = { //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name) val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq //constructed a Stream[Task,String]
    val countyStream = fda_staticSource(countySeq)() var id  = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) =>   //target row type
          if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit
 } else fda_skip   //take next row
        case _ => fda_skip } } countyStream.appendTask(getid).startRun id }

我们可以如下这样获取这个程序的数据源:

  //original table listing
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid) val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)()

按照正常的FunDA流程我们设计了两个用户自定义函数:一个根据数据行内的state和county字段调用函数getStateID和getCountyID获取相应id后构建一条新的NORMAQM表插入指令行,然后传给下个自定义函数。下个自定义函数就直接运算收到的动作行:

  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel =>
        if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip }

像前面几篇示范那样我们把这两个用户自定义函数与数据源组合起来成为完整的FunDA程序后startRun就可以得到实际效果了:

    AQMRPTStream.take(10000) .appendTask(getIdsThenInsertAction) .appendTask(runInsertAction) .startRun

这个程序运算了579秒,不过这是个单一线程运算。我们想知道并行运算结果。那么我们首先要把这个getIdsThenInsertAction转成一个并行运算函数FDAParTask:

AQMRPTStream.toPar(getIdsThenInsertAction)

FunDA提供了并行运算器fda_runPar:

      implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8)  //max 8 open computations
 .appendTask(runInsertAction) .startRun

我们可以自定义线程池。fda_runPar返回标准的FunDA FDAPipeLine,所以我们可以在后面挂上runInsertAction函数。下面是不同行数的运算时间对比结果:

    //processing 10000 rows in a single thread in 570 seconds // processing 10000 rows parallelly in 316 seconds //processing 20000 rows in a single thread in 1090 seconds //processing 200
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(12)- 示范:强类型数据.. 下一篇FunDA(16)- 示范:整合并行运..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目