设为首页 加入收藏

TOP

FunDA(15)- 示范:任务并行运算 - user task parallel execution(三)
2017-10-09 14:26:39 】 浏览:10575
Tags:FunDA 示范 任务 并行 运算 user task parallel execution
00 rows parallelly in 614 seconds
//processing 100000 rows in a single thread in 2+ hrs //processing 100000 rows parallelly in 3885 seconds

可以得出,并行运算对越大数据集有更大的效率提高。下面就是这次示范的源代码:

import slick.jdbc.meta._ import com.bayakala.funda._ import api._ import scala.language.implicitConversions import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success} import slick.jdbc.H2Profile.api._ import Models._ import fs2.Strategy object ParallelTasks extends App { val db = Database.forConfig("h2db") //drop original table schema
  val futVectorTables = db.run(MTable.getTables) val futDropTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.drop) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} dropped successfully! ") case Failure(e) => println(s"Failed to drop Table ${NORMAQMQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //create new table to refine AQMRawTable
  val actionCreateTable = Models.NORMAQMQuery.schema.create val futCreateTable = db.run(actionCreateTable).andThen { case Success(_) => println("Table created successfully!") case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}") } //would carry on even fail to create table
 Await.ready(futCreateTable,Duration.Inf) //truncate data, only available in slick 3.2.1
  val futTruncateTable = futVectorTables.flatMap{ tables => { val tableNames = tables.map(t => t.name.name) if (tableNames.contains(NORMAQMQuery.baseTableRow.tableName)) db.run(NORMAQMQuery.schema.truncate) else Future() } }.andThen { case Success(_) => println(s"Table ${NORMAQMQuery.baseTableRow.tableName} truncated successfully!") case Failure(e) => println(s"Failed to truncate Table ${NORMAQMQuery.baseTableRow.tableName}! Error: ${e.getMessage}") } Await.ready(futDropTable,Duration.Inf) //a conceived task for the purpose of resource consumption //getting id with corresponding name from STATES table
  def getStateID(state: String): Int = { //create a stream for state id with state name
    implicit def toState(row:  StateTable#TableElementType) = StateModel(row.id,row.name) val stateLoader = FDAViewLoader(slick.jdbc.H2Profile)(toState _) val stateSeq = stateLoader.fda_typedRows(StateQuery.result)(db).toSeq //constructed a Stream[Task,String]
    val stateStream = fda_staticSource(stateSeq)() var id  = -1 def getid: FDAUserTask[FDAROW] = row => { row match { case StateModel(stid,stname) =>   //target row type
          if (stname.contains(state)) { id = stid fda_break //exit
 } else fda_skip   //take next row
        case _ => fda_skip } } stateStream.appendTask(getid).startRun id } //another conceived task for the purpose of resource consumption //getting id with corresponding names from COUNTIES table
  def getCountyID(state: String, county: String): Int = { //create a stream for county id with state name and county name
    implicit def toCounty(row:  CountyTable#TableElementType) = CountyModel(row.id,row.name) val countyLoader =
首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(12)- 示范:强类型数据.. 下一篇FunDA(16)- 示范:整合并行运..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目