设为首页 加入收藏

TOP

FunDA(16)- 示范:整合并行运算 - total parallelism solution(四)
2017-10-09 14:23:36 】 浏览:5207
Tags:FunDA 示范 整合 并行 运算 total parallelism solution
t api._ import scala.language.implicitConversions import scala.concurrent.ExecutionContext.Implicits.
global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import slick.jdbc.H2Profile.api._ import Models._ import fs2.Strategy object ParallelExecution extends App { val db = Database.forConfig("h2db") //drop original table schema val futVectorTables = db.run(MTable.getTables) val futDropTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.drop) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //create new table to refine AQMRawTable val actionCreateTable = Models.NORMAQMQuery.schema.create val futCreateTable = db.run(actionCreateTable).andThen { case Success(_) => println("Table created successfully!") case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table Await.ready(futCreateTable,Duration.Inf) //truncate data, only available in slick 3.2.1 val futTruncateTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.truncate) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //a conceived task for the purpose of resource consumption //getting id with corresponding name from STATES table def getStateID(state: String): Int = { //create a stream for state id with state name implicit def toState(row: StateTable#TableElementType) = StateModel(row.id,row.name) val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq //constructed a Stream[Task,String] val stateStream = fda_staticSource(stateSeq)() var id = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case StateModel(stid,stname) => //target row type if (stname.contains(state)) { id = stid fda_break //exit } else fda_skip //take next row case _ => fda_skip } } stateStream.appendTask(getid).startRun id } //another conceived task for the purpose of resource consumption //getting id with corresponding names from COUNTIES table def getCountyID(state: String, county: String): Int = { //create a stream for county id with state name and county name implicit def toCounty(row: CountyTable#TableElementType) = CountyModel(row.id,row.name) val countyLoader = FDAViewLoader(slick.jdbc.H2Profile)(toCounty _) val countySeq = countyLoader.fda_typedRows(CountyQuery.result)(db).toSeq //constructed a Stream[Task,String] val countyStream = fda_staticSource(countySeq)() var id = -1 def get
首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(15)- 示范:任务并行运.. 下一篇FunDA(17)- 示范:异常处理与..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目