//another conceived task for the purpose of resource consumption //getting id with corresponding names from COUNTIES table
def getCountyID(state: String, county: String): Int = { //create a stream for county id with state name and county name
implicit def toCounty(row: CountyTable#TableElementType) = CountyModel(row.id,row.name) val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq //constructed a Stream[Task,String]
val countyStream = fda_staticSource(countySeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case CountyModel(cid,cname) => //target row type
if (cname.contains(state) && cname.contains(county)) { id = cid fda_break //exit
} else fda_skip //take next row
case _ => fda_skip } } countyStream.appendTask(getid).startRun id }
//original table listing
implicit def toAQMRPT(row: AQMRPTTable#TableElementType) = AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid) val AQMRPTLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _) val AQMRPTStream = AQMRPTLoader.fda_typedStream(AQMRPTQuery.result)(db)(256,256)()
def getIdsThenInsertAction: FDAUserTask[FDAROW] = row => { row match { case aqm: AQMRPTModel =>
if (aqm.valid) { val stateId = getStateID(aqm.state) val countyId = getCountyID(aqm.state,aqm.county) val action = NORMAQMQuery += NORMAQMModel(0,aqm.mid, stateId, countyId, aqm.year,aqm.value,aqm.total) fda_next(FDAActionRow(action)) } else fda_skip case _ => fda_skip } } val runner = FDAActionRunner(slick.jdbc.H2Profile) def runInsertAction: FDAUserTask[FDAROW] = row => row match { case FDAActionRow(action) => runner.fda_execAction(action)(db) fda_skip case _ => fda_skip }
AQMRPTStream.take(10000) .appendTask(getIdsThenInsertAction) .appendTask(runInsertAction) .startRun
AQMRPTStream.toPar(getIdsThenInsertAction)
implicit val strategy = Strategy.fromCachedDaemonPool("cachedPool") fda_runPar(AQMRPTStream.take(100000).toPar(getIdsThenInsertAction))(8) //max 8 open computations
.appendTask(runInsertAction) .startRun
//processing 10000 rows in a single thread in 570 seconds // processing 10000 rows parallelly in 316 seconds //processing 20000 rows in a single thread in 1090 seconds //processing 200