设为首页 加入收藏

TOP

FunDA(16)- 示范:整合并行运算 - total parallelism solution(二)
2017-10-09 14:23:36 】 浏览:5206
Tags:FunDA 示范 整合 并行 运算 total parallelism solution
= -1 def getid: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) => //target row type if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } countyStream.appendTask(getid).startRun id }

以及两个用户自定义函数:

  //process input row and produce action row to insert into NORMAQM
  def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel =>
        if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } //runner for the action rows
  val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip }

跟着是本篇新增代码,我们先构建一个所有年份的流:

 //create parallel sources //get a stream of years
  val qryYears = AQMRPTQuery.map(_.year).distinct case class Years(year: Int) extends FDAROW implicit def toYears(y: Int) = Years(y) val yearViewLoader = FDAViewLoader(slick.jdbc.H2Profile)(toYears _) val yearSeq = yearViewLoader.fda_typedRows(qryYears.result)(db).toSeq val yearStream = fda_staticSource(yearSeq)()

下面是一个按年份从AQMRPT表读取数据的函数:

  //strong row type
  implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid, row.mid, row.state, row.county, row.year, row.value, row.total, row.valid) //shared stream loader when operate in parallel mode
  val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) //loading rows with year yr
  def loadRowsInYear(yr: Int) = { //a new query
    val query = AQMRPTQuery.filter(row => row.year === yr) //reuse same loader
    AQMRPTLoader.fda_typedStream(query.result)(db)(256, 256)() }

我们可以预见多个loadRowsInYear函数实例会共享统一的FDAStreamLoader AQMRPTLoader。用户自定义数据读取函数类型是FDASourceLoader。下面是FDASourceLoader示范代码:

  //loading rows by year
  def loadRowsByYear: FDASourceLoader = row => { row match { case Years(y) => loadRowsInYear(y) //produce stream of the year
      case _ => fda_appendRow(FDANullRow) } }

我们用toParSource构建一个并行数据源:

  //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear)

用fda_par_source来把并行数据源转换成统一数据流:

  //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3)

source是个FDAPipeLine,可以直接运算:source.startRun,也可以在后面挂上多个环节。下面我们把其它两个用户自定义函数转成并行运算函数后接到source后面:

  //the following is a process of composition of stream combinators //get parallel source constructor
  val parSource = yearStream.toParSource(loadRowsByYear) //implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") //produce a stream from parallel sources
  val source = fda_par_source(parSource)(3) //turn getIdsThenInsertAction into parallel task
  val parTasks = source.toPar(getIdsThenInsertAction) //runPar to produce a new stream
  val actionStream =fda_runPar(parTasks)(3) //turn runInsertAction into parallel task
  val parRun = actionStream.toPar(runInsertAction) //runPar and carry out by startRun
  fda_runPar(parRun)(2).startRun

下面是本次示范的完整源代码: 

import slick.jdbc.meta._ import com.bayakala.funda._ impor
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(15)- 示范:任务并行运.. 下一篇FunDA(17)- 示范:异常处理与..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目