设为首页 加入收藏

TOP

Scalaz(52)- scalaz-stream: 并行运算-parallel processing concurrently by merging(二)
2017-10-10 12:12:23 】 浏览:10320
Tags:Scalaz scalaz-stream: 并行 运算 parallel processing concurrently merging
t; 1).run 3 //> plines : Int = 2823 4 println(s"continue reading $plines in ${System.currentTimeMillis - pl_start}ms") 5 //> continue reading 2823 in 10501ms

连续运算所需时间10501ms,稍微短于分开运算结果。那么如果我们用merge来并行运算呢?

1 val par_start = System.currentTimeMillis          //> par_start : Long = 1470051682874
2 val parlines = (pa merge pb merge pc).runFoldMap(_ => 1).run 3                                                   //> parlines : Int = 2823
4 println(s"parallel reading $parlines in ${System.currentTimeMillis - par_start}ms") 5                                                   //> parallel reading 2823 in 6278ms

现在整个运算只需要6278ms,约莫是连续运算所需时间的60%。当然,如果我们需要从更多的源头读取数据的话,那么merge方法可以实现更高的效率提升。但是,由于stream可能是一串无穷的元素,我们更需要对一个stream无穷的元素实现并行运算。在上面的例子里我们用merge把三个源头的数据合并成为一个更长的数据串,如果我们对其中每条记录进行运算如抽取、对比筛选等的话,那么运算时间仍然与数据串的长度成直线正比。比如:在以上例子的基础上,我们需要对合并的数据进行统计:计算出使用元音(vowl)的频率的。我们可以先把每条记录中的vowl过滤出来;然后把所有筛选出来的记录加起来就能得出这个统计结果了:

 1 /c 是个vowl  2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c)  3                                                   //> vowls: (c: Char)Boolean  4 
 5 //返回Map代表每个字符频率, 测试使用了scalaz.Lens
 6 def vowlCount(text: String): Map[Char,Int] = {  7     text.toUpperCase.toList.filter(vowls).foldLeft(Map[Char,Int]()) { (b,a) =>
 8       if ((Lens.mapVLens(a) get b) == None) Lens.mapVLens(a) set(b,1.some)  9       else Lens.mapVLens(a).set(b, (Lens.mapVLens(a) get b).map(_ + 1)) 10  } 11  }                                                //> vowlCount: (text: String)Map[Char,Int] 12 //直接用scala标准库实现
13 def stdVowlsCount(text: String): Map[Char,Int] =
14   text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 15                                                   //> stdVowlsCount: (text: String)Map[Char,Int]

我们先按序运算结果:

 1 //为runFoldMap提供一个Map[Char,Int]Monoid实例
 2 implicit object mapMonoid extends Monoid[Map[Char,Int]] {  3    def zero: Map[Char,Int] = Map()  4    def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = {  5      (m1.keySet ++ m2.keySet).map { k =>
 6        (k, m1.getOrElse(k,0) + m2.getOrElse(k,0))  7  }.toMap  8  }  9 } 10 
11 val cnt_start = System.currentTimeMillis          //> cnt_start : Long = 1470197392016
12 val merged = (pa merge pb merge pc) 13  .map(vowlCount) 14   .runFoldMap(identity).run                       //> merged : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O-> 3748)
15 println(s"calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 16                                                   //> calc vowl frequency in 28646ms

整个运算需要28646ms。实际上这些运算不会依赖每条记录的排列位置,那么如果能够实现并行运算的话可能会提高效率。scalaz-stream提供了merge.mergeN方法来支持对一顺数据流进行并行运算。merge.mergeN函数的款式如下:

/** * Merges non-deterministically processes that are output of the `source` process. * * Merging stops when all processes generated by source have stopped, and all source process stopped as well. * Merging will also stop when resulting process terminated. In that case the cleanup of all `source` * processes is run, followed by cleanup of resulting process. * * When one of the source processes fails the mergeN process will fail with that reason. * * Merging is non-deterministic, but is fair in sense that every process is consulted, once it has `A` ready. * That means processes that are `faster` provide it's `A` more often than slower processes. * * Internally mergeN keeps small buffer that reads ahead up to `n` values of `A` where `n` equals to number * of active source streams. That does not mean that every `source` process is consulted in this read-ahead * cache, it just tries to be as much fair as possible when processes provide their `A` on almost the same speed. * */ def mergeN[A]
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记29 - 偏应用函数 下一篇Spark入门到精通--(第一节)Spar..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目