设为首页 加入收藏

TOP

FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading(二)
2017-10-09 14:26:40 】 浏览:2391
Tags:FunDA 示范 并行 运算 数据库 读取 parallel data loading
county)) .distinctOn(r
=> (r._1,r._2)) .sortBy(r => (r._1,r._2)) case class Counties(state: String, name: String) extends FDAROW implicit def toCounties(row: (String,String)) = Counties(row._1,row._2) val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _) //3 separate streams to extract county names from the same database table AQMRPT val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(64,64)() val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(64,64)() val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(64,64)()

然后对这四个数据源进行并行读取:

  //obtain a combined stream with parallel loading with max of 4 open computation
  val combinedStream = fda_par_load(statesStream,countiesA_KStream,countiesK_PStream,countiesP_ZStream)(4)

现在这个组合的数据流里最少有两种不同的数据元素,分别是:case class States和case class Counties。我们可以在combinedStream上连接两个用户自定义函数(user-defined-task)分别截取States和Counties数据行并且把它们转化成各自的插入数据指令行(ActionRow):

  //define separate rows for different actions
  case class StateActionRow(action: FDAAction) extends FDAROW case class CountyActionRow(action: FDAAction) extends FDAROW val actionRunner = FDAActionRunner(slick.jdbc.H2Profile) //user-task to catch rows of States type and transform them into db insert actions
  def processStates: FDAUserTask[FDAROW] = row => { row match { //catch states row and transform it into insert action
      case States(stateName) =>  //target row type
        println(s"State name: ${stateName}") val action = StateQuery += StateModel(0,stateName) fda_next(StateActionRow(action)) case others@ _ => //pass other types to next user-defined-tasks
 fda_next(others) } } //user-task to catch rows of Counties type and transform them into db insert actions
  def processCounties: FDAUserTask[FDAROW] = row => { row match { //catch counties row and transform it into insert action
      case Counties(stateName,countyName) =>  //target row type
        println(s"County ${countyName} of ${stateName}") val action = CountyQuery += CountyModel(0,countyName+ " of "+stateName) fda_next(CountyActionRow(action)) case others@ _ => //pass other types to next user-defined-tasks
 fda_next(others) } }

经过processStates和processCounties两个自定义函数处理后combinedStream里又多了两种不同的元素:StateActionRow和CountyActionRow。同样,我们可以用两个自定义函数来运算这两种动作行:

  //user-task to catch States insert action rows and run them
  def runStateAction: FDAUserTask[FDAROW] = row  => { row match { case StateActionRow(action) => //this is a state action row type
        println(s"runstate: ${action}") actionRunner.fda_execAction(action)(db_a) //run this query with db_a context
 fda_skip case others@ _ => //otherwise pass alone to next user-defined-tasks
 fda_next(others) } } //user-task to catch Counties insert action rows and run them
  def runCountyAction: FDAUserTask[FDAROW] = row  => { row match { case CountyActionRow(action) => //this is a county action row type
        actionRunner.fda_execAction(action)(db_b)  //run this query with db_b context
 fda_skip case others@ _ => //otherwise pass alone to next user-defined-tasks
 fda_next(others) } }

好了,现在我们可以把这四个自定义函数在combinedStream上组合起来成为一个完整功能的程序:

  combinedStream.appendTask(processStates)
    .appendTask(processCounties)
    .appendTask(runStateAction)
    .app
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(11)- 数据库操作的并行.. 下一篇FunDA(12)- 示范:强类型数据..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目