设为首页 加入收藏

TOP

FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading(四)
2017-10-09 14:26:40 】 浏览:2394
Tags:FunDA 示范 并行 运算 数据库 读取 parallel data loading
untiesK_PStream,countiesP_ZStream)(4) //define separate rows for different actions case class StateActionRow(action: FDAAction) extends FDAROW case class CountyActionRow(action: FDAAction) extends FDAROW val actionRunner = FDAActionRunner(slick.jdbc.H2Profile) //user-task to catch rows of States type and transform them into db insert actions def processStates: FDAUserTask[FDAROW] = row => { row match { //catch states row and transform it into insert action case States(stateName) => //target row type println(s"State name: ${stateName}") val action = StateQuery += StateModel(0,stateName) fda_next(StateActionRow(action)) case others@ _ => //pass other types to next user-defined-tasks fda_next(others) } } //user-task to catch rows of Counties type and transform them into db insert actions def processCounties: FDAUserTask[FDAROW] = row => { row match { //catch counties row and transform it into insert action case Counties(stateName,countyName) => //target row type println(s"County ${countyName} of ${stateName}") val action = CountyQuery += CountyModel(0,countyName+ " of "+stateName) fda_next(CountyActionRow(action)) case others@ _ => //pass other types to next user-defined-tasks fda_next(others) } } //user-task to catch States insert action rows and run them def runStateAction: FDAUserTask[FDAROW] = row => { row match { case StateActionRow(action) => //this is a state action row type println(s"runstate: ${action}") actionRunner.fda_execAction(action)(db_a) //run this query with db_a context fda_skip case others@ _ => //otherwise pass alone to next user-defined-tasks fda_next(others) } } //user-task to catch Counties insert action rows and run them def runCountyAction: FDAUserTask[FDAROW] = row => { row match { case CountyActionRow(action) => //this is a county action row type actionRunner.fda_execAction(action)(db_b) //run this query with db_b context fda_skip case others@ _ => //otherwise pass alone to next user-defined-tasks fda_next(others) } } def showRows: FDAUserTask[FDAROW] = row => { row match { case States(nm) => println("") println(s"State: $nm") println("************") fda_skip case Counties(s,c) => println("") println(s"County: $c") println(s"state of $s") println("------------") fda_skip case _ => fda_skip } } combinedStream.appendTask(processStates) .appendTask(processCounties) .appendTask(runStateAction) .appendTask(runCountyAction) .startRun }

 

 

 

 

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(11)- 数据库操作的并行.. 下一篇FunDA(12)- 示范:强类型数据..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目