//obtain a combined stream with parallel loading with max of 4 open computation
val combinedStream = fda_par_load(statesStream,countiesA_KStream,countiesK_PStream,countiesP_ZStream)(4)
//define separate rows for different actions
case class StateActionRow(action: FDAAction) extends FDAROW case class CountyActionRow(action: FDAAction) extends FDAROW val actionRunner = FDAActionRunner(slick.jdbc.H2Profile) //user-task to catch rows of States type and transform them into db insert actions
def processStates: FDAUserTask[FDAROW] = row => { row match { //catch states row and transform it into insert action
case States(stateName) => //target row type
println(s"State name: ${stateName}") val action = StateQuery += StateModel(0,stateName) fda_next(StateActionRow(action)) case others@ _ => //pass other types to next user-defined-tasks
fda_next(others) } } //user-task to catch rows of Counties type and transform them into db insert actions
def processCounties: FDAUserTask[FDAROW] = row => { row match { //catch counties row and transform it into insert action
case Counties(stateName,countyName) => //target row type
println(s"County ${countyName} of ${stateName}") val action = CountyQuery += CountyModel(0,countyName+ " of "+stateName) fda_next(CountyActionRow(action)) case others@ _ => //pass other types to next user-defined-tasks
fda_next(others) } }
//user-task to catch States insert action rows and run them
def runStateAction: FDAUserTask[FDAROW] = row => { row match { case StateActionRow(action) => //this is a state action row type
println(s"runstate: ${action}") actionRunner.fda_execAction(action)(db_a) //run this query with db_a context
fda_skip case others@ _ => //otherwise pass alone to next user-defined-tasks
fda_next(others) } } //user-task to catch Counties insert action rows and run them
def runCountyAction: FDAUserTask[FDAROW] = row => { row match { case CountyActionRow(action) => //this is a county action row type
actionRunner.fda_execAction(action)(db_b) //run this query with db_b context
fda_skip case others@ _ => //otherwise pass alone to next user-defined-tasks
fda_next(others) } }
combinedStream.appendTask(processStates)
.appendTask(processCounties)
.appendTask(runStateAction)
.app