设为首页 加入收藏

TOP

FunDA(14)- 示范:并行运算,并行数据库读取 - parallel data loading(三)
2017-10-09 14:26:40 】 浏览:2392
Tags:FunDA 示范 并行 运算 数据库 读取 parallel data loading
endTask(runCountyAction) .startRun

然后用startRun来正式运算这个程序。

下面就是本次示范的源代码:

import com.bayakala.funda._ import api._ import scala.language.implicitConversions import slick.jdbc.H2Profile.api._ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import Models._ import scala.concurrent.ExecutionContext.Implicits.global

object ParallelLoading extends App { //assume two distinct db objects
  val db_a = Database.forConfig("h2db") //another db object
  val db_b = Database.forConfig("h2db") //create STATE table
  val actionCreateState = Models.StateQuery.schema.create val futCreateState = db_a.run(actionCreateState).andThen { case Success(_) => println("State Table created successfully!") case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
 Await.ready(futCreateState,Duration.Inf) //create COUNTY table
  val actionCreateCounty = Models.CountyQuery.schema.create val futCreateCounty = db_a.run(actionCreateCounty).andThen { case Success(_) => println("County Table created successfully!") case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
 Await.ready(futCreateCounty,Duration.Inf) //define query for extracting State names from AQMRPT
  val qryStates = AQMRPTQuery.map(_.state).distinct.sorted  // .distinctOn(r => r)
  case class States(name: String) extends FDAROW implicit def toStates(row: String) = States(row) val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _) val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(64,64)() //define query for extracting County names from AQMRPT in separate chunks //query with state name >A and <K
  val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" && r.state.toUpperCase < "K")).map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) //query with state name >K and <P
  val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" && r.state.toUpperCase < "P")).map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) //query with state name >P
  val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P") .map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) case class Counties(state: String, name: String) extends FDAROW implicit def toCounties(row: (String,String)) = Counties(row._1,row._2) val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _) //3 separate streams to extract county names from the same database table AQMRPT
  val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(64,64)() val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(64,64)() val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(64,64)() //obtain a combined stream with parallel loading with max of 4 open computation
  val combinedStream = fda_par_load(statesStream,countiesA_KStream,co
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(11)- 数据库操作的并行.. 下一篇FunDA(12)- 示范:强类型数据..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目