设为首页 加入收藏

TOP

FunDA(2)- Streaming Data Operation:流式数据操作(一)
2017-10-10 12:10:44 】 浏览:5907
Tags:FunDA Streaming Data Operation 流式 数据 操作

   在上一集的讨论里我们介绍并实现了强类型返回结果行。使用强类型主要的目的是当我们把后端数据库SQL批次操作搬到内存里转变成数据流式按行操作时能更方便、准确、高效地选定数据字段。在上集讨论示范里我们用集合的foreach方式模拟了一个最简单的数据流,并把从数据库里批次读取的数据集转换成一串连续的数据行来逐行使用。一般来说完整的流式数据处理流程包括了从数据库中读取数据、根据读取的每行数据状态再对后台数据库进行更新,包括:插入新数据、更新、删除等。那么在上篇中实现的流式操作基础上再添加一种指令行类型就可以完善整个数据处理流程了,就像下面这个图示:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database 

如果我们还是以Slick为目标FRM,那么这个ActionRow的类型就是Slick的DBIO[T]了:

1 package com.bayakala.funda.rowtypes 2 import slick.dbio._ 3 object ActionType { 4   type FDAAction[T] = DBIO[T] 5 }

记得有一次在一个Scala讨论区里遇到这样一个问题:如何把a表里的status字段更新成b表的status字段值,转化成SQL语句如下:

 update a,b set a.status=b.status where a.id=b.id

那位哥们的问题是如何用Slick来实现对a表的更新,不能用sql"???" interpolation 直接调用SQL语句,可能因为要求compile time语法check保障吧。这个问题用Slick Query还真的不太容易解决(能不能解决就不想费功夫去想了),这是因为FRM的SQL批次处理弱点。如果用FunDA的流式操作思路就会很容易解决了,只要用join Query把b.status读出来再用b.id=a.id逐个更新a.status。刚好,下面我们就示范通过ActionRow来解决这个问题。先用下面这段代码来设置测试数据:

 1 import slick.dbio.DBIO  2 import slick.driver.H2Driver.api._  3 
 4 import scala.concurrent.duration._  5 import scala.concurrent.{Await, Future}  6 import scala.util.{Failure, Success}  7 import scala.concurrent.ExecutionContext.Implicits.global
 8 import slick.jdbc.meta.MTable  9 object ActionRowTest extends App { 10 
11   class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA") { 12     def id = column[Int]("id",O.PrimaryKey) 13     def flds = column[String]("aflds") 14     def status = column[Int]("status") 15     def * = (id,flds,status) 16  } 17   val tableA = TableQuery[ATable] 18 
19   class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB") { 20     def id = column[Int]("id",O.PrimaryKey) 21     def flds = column[String]("bflds") 22     def status = column[Int]("status") 23     def * = (id,flds,status) 24  } 25   val tableB = TableQuery[BTable] 26 
27   val insertAAction =
28     tableA ++= Seq ( 29         (1,"aaa",0), 30         (2,"bbb",3), 31         (3,"ccc",1), 32         (4,"ddd",0), 33         (16,"kkk",16) 34  ) 35    val insertBAction =
36      tableB ++= Seq ( 37        (1,"aaa",1), 38        (2,"bbb",2), 39        (3,"ccc",3), 40        (4,"ddd",4), 41        (5,"kkk",5) 42  ) 43 
44    val db = Database.forConfig("h2db") 45 
46 
47    def tableExists(tables: Vector[MTable], tblname: String) =
48     tables.exists {t =>t.name.toString.contains(tblname)} 49 
50    def createSchemaIfNotExists(): Future[Unit] = { 51  db.run(MTable.getTables).flatMap { 52       case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>
53         println("Creating schemas for TA and TB...") 54         db.run((tableA.schema ++ tableB.schema).create) 55       case tables if !tableExists(tables,".TA") =>
56         println("Creating schema for TA ...") 57  db.run(tableA.schema.create) 58       case tables if !tableExists(tables,".TB") =>
59         println("Creating schema for TB ...") 60  db.run(tableB.schema.create) 61       case _ =>
62         println("Schema for TA, TB already created.") 63  Future.successful() 64  } 65  } 66 
67    def insertInitialData(): Future[Unit] = { 68     val cleanInsert = DBIO.seq( 69  tableA.d
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(1)- Query Result Row:.. 下一篇scala练习题1 基础知识

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目