设为首页 加入收藏

TOP

Scalaz(52)- scalaz-stream: 并行运算-parallel processing concurrently by merging(三)
2017-10-10 12:12:23 】 浏览:10319
Tags:Scalaz scalaz-stream: 并行 运算 parallel processing concurrently merging
(source: Process[Task, Process[Task, A]])(
implicit S: Strategy): Process[Task, A] = scalaz.stream.nondeterminism.njoin(0, 0)(source)(S) /** * MergeN variant, that allows to specify maximum of open `source` processes. * If, the maxOpen is <= 0 it acts like standard mergeN, where the number of processes open is not limited. * However, when the maxOpen > 0, then at any time only `maxOpen` processes will be running at any time * * This allows for limiting the eventual concurrent processing of opened streams not only by supplied strategy, * but also by providing a `maxOpen` value. * * * @param maxOpen Max number of open (running) processes at a time * @param source source of processes to merge */ def mergeN[A](maxOpen: Int)(source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] = scalaz.stream.nondeterminism.njoin(maxOpen, maxOpen)(source)(S)

mergeN的入参source类型款式是这样的:Process[Task,Process[Task,A]],意思是在Process里还有一个Process。这个内部Process是并行运算的。这样的类型款式也可以被理解为:内部的Process是读取数据库的记录(data),我们可以同时从多个源头读取数据,外部Process是数据库连接(connection)。应用在我们上面的例子里:内部Process就是vowlCount作业,因为我们希望对每条记录的vowlCount并行处理。那么我们先要进行类型款式转换:从Process[Task,A] 转换到 Process[Task,Process[Task,A]]:

1 val merged = (pa merge pb merge pc)               //> merged : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Hal 2                                                   //| t(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} } 4           .map {task => Process.eva l(task)}       //> par : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))

这个par的类型是我们希望的了。现在我们可以看看mergeN运算的效率:

1 val cnt_start = System.currentTimeMillis          //> cnt_start : Long = 1470204623562
2 val merged = (pa merge pb merge pc)               //> merged : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} } 4           .map {task => Process.eva l(task)}       //> par : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))
5 val resm = merge.mergeN(par).runFoldMap(identity).run 6                                                   //> resm : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O -> 3748)
7 println(s"parallel calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 8                                                   //> parallel calc vowl frequency in 6922ms

看看这个结果:从28646ms降到6922,约莫4倍效率的提高,够显著的了。如果我们把上面这个例子用在实际的数据库操作上:比如对几个数据库表里的所有在一定价格范围内商品购买次数进行统计等,我们是可以在scalaz-stream里实现这个场景并行运算的。

 

 

 

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 3 下一页 尾页 3/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记29 - 偏应用函数 下一篇Spark入门到精通--(第一节)Spar..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目