设为首页 加入收藏

TOP

FunDA(15)- 示范:任务并行运算 - user task parallel execution(一)
2017-10-09 14:26:39 】 浏览:10563
Tags:FunDA 示范 任务 并行 运算 user task parallel execution

    FunDA的并行运算施用就是对用户自定义函数的并行运算。原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例。这些函数运行实例同时在各自不同的线程里同步运算直至耗尽所有输入。并行运算的具体函数实例数是用fs2-nondeterminism的算法根据CPU内核数、线程池配置和用户指定的最大运算实例数来决定的。我们在这次示范里可以对比一下同样工作内容的并行运算和串形运算效率。在前面示范里我们获取了一个AQMRPT表。但这个表不够合理化(normalized):state和county还没有实现编码与STATES和COUNTIES表的连接。在这次示范里我们就创建一个新表NORMAQM,把AQMRPT表内数据都搬进来。并在这个过程中把STATENAME和COUNTYNAME字段转换成STATES和COUNTIES表的id字段。下面就是NORMAQM表结构:

  case class NORMAQMModel(rid: Long , mid: Int , state: Int , county: Int , year: Int , value: Int , average: Int ) extends FDAROW class NORMAQMTable(tag: Tag) extends Table[NORMAQMModel](tag, "NORMAQM") { def rid = column[Long]("ROWID",O.AutoInc,O.PrimaryKey) def mid = column[Int]("MEASUREID") def state = column[Int]("STATID") def county = column[Int]("COUNTYID") def year = column[Int]("REPORTYEAR") def value = column[Int]("VALUE") def average = column[Int]("AVG") def * = (rid,mid,state,county,year,value,average) <> (NORMAQMModel.tupled, NORMAQMModel.unapply) } val NORMAQMQuery = TableQuery[NORMAQMTable]

下面是这个表的初始化铺垫代码: 

  val db = Database.forConfig("h2db") //drop original table schema
  val futVectorTables = db.run(MTable.getTables) val futDropTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.drop) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create val futCreateTable = db.run(actionCreateTable).andThen { case Success(_) => println("Table created successfully!") case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
 Await.ready(futCreateTable,Duration.Inf) //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.truncate) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf)

我们需要设计一个函数从STATES表里用AQMRPT表的STATENAME查询ID。我故意把这个函数设计成一个完整的FunDA程序。这样可以模拟一个比较消耗io和计算资源的独立过程(不要理会任何合理性,目标是增加io和运算消耗): 

  //a conceived task for the purpose of resource consumption //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = { //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name) val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) val stateSeq = stateLoa
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(12)- 示范:强类型数据.. 下一篇FunDA(16)- 示范:整合并行运..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目