设为首页 加入收藏

TOP

FunDA(11)- 数据库操作的并行运算:Parallel data processing(一)
2017-10-09 14:26:41 】 浏览:3086
Tags:FunDA 数据库 操作 并行 运算 Parallel data processing

   FunDA最重要的设计目标之一就是能够实现数据库操作的并行运算。我们先重温一下fs2是如何实现并行运算的。我们用interleave、merge、either这几种方式来同时处理两个Stream里的元素。interleave保留了固定的交叉排列顺序,而merge和either则会产生不特定顺序,这个现象可以从下面的例子里看到:

implicit val strategy = Strategy.fromFixedDaemonPool(4) implicit val scheduler = Scheduler.fromFixedDaemonPool(2) //当前元素跟踪显示
def log[A](pre: String): Pipe[Task,A,A] = _.eva lMap { row => Task.delay {println(s"${pre}>${row}");row} } def randomDelay[A](max: FiniteDuration): Pipe[Task,A,A] = _.eva lMap { a => { val delay: Task[Int] = Task.delay {scala.util.Random.nextInt(max.toMillis.toInt)} delay.flatMap {d => Task.now(a).schedule(d.millis)} } } val s1: Stream[Task,Int] = Stream(1,2,3,4,5).through(randomDelay(100.millis)) val s2 = Stream(11,22,33,44,55,66).through(randomDelay(30.millis)) val s3: Stream[Task,String] = Stream("a","b","c").through(randomDelay(200.millis)) (s1 interleave s2).through(log("")).run.unsafeRun //> >1 //| >11 //| >2 //| >22 //| >3 //| >33 //| >4 //| >44 //| >5 //| >55
 (s1 merge s2).through(log("")).run.unsafeRun      //> >11 //| >1 //| >22 //| >2 //| >33 //| >44 //| >3 //| >55 //| >4 //| >5 //| >66
(s1 either s3).through(log("")).run.unsafeRun     //> >Left(1) //| >Left(2) //| >Right(a) //| >Right(b) //| >Left(3) //| >Left(4) //| >Left(5) //| >Right(c)

从上面的例子我们可以看到merge产生的不规则顺序。fs2的nondeterministic算法可以保证两个队列元素处理顺序的合理分配最大化。如果我们需要对两个以上数据流进行并行处理的话,fs2提供了join(mergeN)函数:

def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}

从这个函数的款式我们看到它的入参数outer是个Stream[F,Stream[F,O]]类型,是个内外两层的流。现实场景如外层是多个数据库连接(connections),内层是多个客户端(clients)。在FunDA的功能描述里外层是多个数据源(sources),内层是多个读取函数(reader),又或者外层是多个数据行(元素),内层是数据处理函数。我们先看看如何实现多个数据源的并行产生:

val ss: Stream[Task,Stream[Task,Int]] = Stream(s1,s2,s1,s2) //> ss : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Seg

从ss的类型款式来看,我们可以直接用Stream构建器来生成这个Stream[Task,Stream[Task,A]]类型。在前面我们已经掌握了用Slick来产生Stream[Task,FDAROW]的方法,例如:

  val albumStream1 = streamLoader.fda_typedStream(albumsInfo.result)(db)(10.minutes, 512, 128)()()

albumStream1是个Reactive-Stream数据源。这样我们可以在FunDA里增加一个并行Source构建函数:

  def fda_par_load(sources: FDAPipeLine[FDAROW]*)(maxOpen: Int) = { concurrent.join(maxOpen)(Stream(sources: _*)) }

maxOpen代表最多可以同时运行的运算数,最好取小于机器内核数的一个数。用这个函数来并行构建数据源:

package com.bayakala.funda.fdapars.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows.FDAROW
import com.bayakala.funda.fdasources.FDADataStream._
import scala.concurrent.duration._
import com.bayakala.funda.fdapipes._
import FDAValves._
import com.bayakala.funda.fdapars.FDAPars._
object Example1 extends App {
  val albums = SlickModels.albums
  val companies = SlickModels.companies

  //数据源query
  val albumsInfo = for {
    (a,c) <- albums join companies on (_.company === _.id)
  } yield (a.title,a.artist,a.year,c.name)

  //query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
  //强类型转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], S
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(10)- 用户功能函数模式.. 下一篇FunDA(14)- 示范:并行运算,..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目