import com.bayakala.funda._ import api._ import scala.language.implicitConversions import slick.jdbc.H2Profile.api._ import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import Models._ import scala.concurrent.ExecutionContext.Implicits.global
object ParallelLoading extends App { //assume two distinct db objects
val db_a = Database.forConfig("h2db") //another db object
val db_b = Database.forConfig("h2db") //create STATE table
val actionCreateState = Models.StateQuery.schema.create val futCreateState = db_a.run(actionCreateState).andThen { case Success(_) => println("State Table created successfully!") case Failure(e) => println(s"State Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
Await.ready(futCreateState,Duration.Inf) //create COUNTY table
val actionCreateCounty = Models.CountyQuery.schema.create val futCreateCounty = db_a.run(actionCreateCounty).andThen { case Success(_) => println("County Table created successfully!") case Failure(e) => println(s"County Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
Await.ready(futCreateCounty,Duration.Inf) //define query for extracting State names from AQMRPT
val qryStates = AQMRPTQuery.map(_.state).distinct.sorted // .distinctOn(r => r)
case class States(name: String) extends FDAROW implicit def toStates(row: String) = States(row) val stateLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toStates _) val statesStream = stateLoader.fda_typedStream(qryStates.result)(db_a)(64,64)() //define query for extracting County names from AQMRPT in separate chunks //query with state name >A and <K
val qryCountiesA_K = AQMRPTQuery.filter(r => (r.state.toUpperCase > "A" && r.state.toUpperCase < "K")).map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) //query with state name >K and <P
val qryCountiesK_P = AQMRPTQuery.filter(r => (r.state.toUpperCase > "K" && r.state.toUpperCase < "P")).map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) //query with state name >P
val qryCountiesP_Z = AQMRPTQuery.filter(r => r.state.toUpperCase > "P") .map(r => (r.state,r.county)) .distinctOn(r => (r._1,r._2)) .sortBy(r => (r._1,r._2)) case class Counties(state: String, name: String) extends FDAROW implicit def toCounties(row: (String,String)) = Counties(row._1,row._2) val countyLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toCounties _) //3 separate streams to extract county names from the same database table AQMRPT
val countiesA_KStream = countyLoader.fda_typedStream(qryCountiesA_K.result)(db_b)(64,64)() val countiesK_PStream = countyLoader.fda_typedStream(qryCountiesK_P.result)(db_b)(64,64)() val countiesP_ZStream = countyLoader.fda_typedStream(qryCountiesP_Z.result)(db_b)(64,64)() //obtain a combined stream with parallel loading with max of 4 open computation
val combinedStream = fda_par_load(statesStream,countiesA_KStream,co