1 val par_start = System.currentTimeMillis //> par_start : Long = 1470051682874
2 val parlines = (pa merge pb merge pc).runFoldMap(_ => 1).run 3 //> parlines : Int = 2823
4 println(s"parallel reading $parlines in ${System.currentTimeMillis - par_start}ms") 5 //> parallel reading 2823 in 6278ms
1 /c 是个vowl 2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c) 3 //> vowls: (c: Char)Boolean 4
5 //返回Map代表每个字符频率, 测试使用了scalaz.Lens
6 def vowlCount(text: String): Map[Char,Int] = { 7 text.toUpperCase.toList.filter(vowls).foldLeft(Map[Char,Int]()) { (b,a) =>
8 if ((Lens.mapVLens(a) get b) == None) Lens.mapVLens(a) set(b,1.some) 9 else Lens.mapVLens(a).set(b, (Lens.mapVLens(a) get b).map(_ + 1)) 10 } 11 } //> vowlCount: (text: String)Map[Char,Int] 12 //直接用scala标准库实现
13 def stdVowlsCount(text: String): Map[Char,Int] =
14 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 15 //> stdVowlsCount: (text: String)Map[Char,Int]
1 //为runFoldMap提供一个Map[Char,Int]Monoid实例
2 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 3 def zero: Map[Char,Int] = Map() 4 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 5 (m1.keySet ++ m2.keySet).map { k =>
6 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 7 }.toMap 8 } 9 } 10
11 val cnt_start = System.currentTimeMillis //> cnt_start : Long = 1470197392016
12 val merged = (pa merge pb merge pc) 13 .map(vowlCount) 14 .runFoldMap(identity).run //> merged : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O-> 3748)
15 println(s"calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 16 //> calc vowl frequency in 28646ms
/** * Merges non-deterministically processes that are output of the `source` process. * * Merging stops when all processes generated by source have stopped, and all source process stopped as well. * Merging will also stop when resulting process terminated. In that case the cleanup of all `source` * processes is run, followed by cleanup of resulting process. * * When one of the source processes fails the mergeN process will fail with that reason. * * Merging is non-deterministic, but is fair in sense that every process is consulted, once it has `A` ready. * That means processes that are `faster` provide it's `A` more often than slower processes. * * Internally mergeN keeps small buffer that reads ahead up to `n` values of `A` where `n` equals to number * of active source streams. That does not mean that every `source` process is consulted in this read-ahead * cache, it just tries to be as much fair as possible when processes provide their `A` on almost the same speed. * */ def mergeN[A]