设为首页 加入收藏

TOP

FunDA(2)- Streaming Data Operation:流式数据操作(二)
2017-10-10 12:10:44 】 浏览:5918
Tags:FunDA Streaming Data Operation 流式 数据 操作
elete, tableB.delete,
70 insertAAction, 71 insertBAction) 72 db.run(cleanInsert).andThen { 73 case Success(_) => println("Data insert completed.") 74 case Failure(e) => println(s"Data insert failed [${e.getMessage}]") 75 } 76 } 77 78 Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf) 79 80 val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()} 81 Await.ready(initResult,Duration.Inf) 82 83 84 85 86 }

用join query先把这两个表相关的字段值搬到内存转成强类型行FDADataRow: 

 1 val selectAB = for {  2      a <- tableA  3      b <- tableB  4      if (a.id === b.id)  5    } yield (a.id,b.id,a.status,b.status)  6 
 7    case class ABRow (id: Int, asts: Int, bsts: Int)  8    def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)  9 
10  import com.bayakala.funda.rowtypes.DataRowType 11   
12    val loader = FDADataRow(slick.driver.H2Driver, toABRow _) 13    loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>
14      println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") 15    }

初始结果如下:

ID:1 Status A = 0, B = 1 ID:2 Status A = 3, B = 2 ID:3 Status A = 1, B = 3 ID:4 Status A = 0, B = 4

现在我们把每条数据行DataRow转成动作行ActionRow。然后把每条DataRow的asts字段值替换成bsts的字段值:

 1 import com.bayakala.funda.rowtypes.ActionType.FDAAction  2    def updateAStatus(row: ABRow): FDAAction[Int] = {  3      tableA.filter{r => r.id === row.id}  4  .map(_.status)  5  .update(row.asts)  6  }  7 
 8 
 9    loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach { 10      actionRow =>
11        println(s"${actionRow.toString}") 12    }

显示结果如下:

slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@492691d7
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@27216cd
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@558bdf1f
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@8576fa0

现在每条DataRow已经被转化成jdbc action类型了。

下一步我们只需要运行这些ActionRow就可以完成任务了:

1   def execAction(act: FDAAction[Int]) = db.run(act) 2   
3  loader.getTypedRows(selectAB.result)(db) 4  .map(updateAStatus(_)) 5        .map(execAction(_))

现在再看看数据库中的TA表状态:

  loader.getTypedRows(selectAB.result)(db).foreach {dataRow => println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") } 结果: ID:1 Status A = 1, B = 1 ID:2 Status A = 2, B = 2 ID:3 Status A = 3, B = 3 ID:4 Status A = 4, B = 4

我们看到已经正确更新了TA的status字段值。

在这个示范中明显有很多不足之处:如果a.status=b.status应该省略更新步骤。这是因为foreach只能模拟最基本的数据流动。如果我们使用了具备强大功能的Stream工具库如scalaz-stream-fs2,就可以更好控制数据元素的流动。更重要的是scalaz-stream-fs2支持并行运算,那么上面所描述的流程:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database

几个 => 环节:Query、Streaming、QueryAction、execAction将可以并行运算,从而实现充分利用多核CPU硬件资源,提高运算效率的目的。

下面是这次讨论涉及的源代码:

 1 package com.bayakala.funda.rowtypes  2 
 3 import scala.concurrent.duration._  4 import scala.concurrent.Await  5 import slick.driver.JdbcProfile  6 
 7 object DataRowType {  8   class FDADataRow[SOURCE, TARGET](slickProfile: JdbcProfile,convert: SOURCE  => TARGET){  9  import slickProfile.api._ 10 
11     def getTypedRows(slickAction: DBIO[Iterable[SOURCE]])(slickDB: Database): Iterable[TARGET] =
12       Await.result(slickDB.run(slick
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(1)- Query Result Row:.. 下一篇scala练习题1 基础知识

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目