如果scalaz-stream真的是一个实用的数据流编程工具库的话,那它应该能处理同时从多个数据源获取数据以及把数据同时送到多个终点(Sink),最重要的是它应该可以实现高度灵活的多线程运算。但是:我们说Process代表了一串可能是无穷的元素。这个一串的意思是多个按序排列的元素。也就是说如果我们有一个Process(a,b,c),那么我们只能按顺序来进行运算:我们只能在完成了对a的运算后才能运算b。这样也说得过去:它让我们更容易理解scalaz-stream Process的运算过程。面对scalaz-stream这样的特性我们应该怎样去实现它的并行运算呢?实际上在很多应用场景中我们对运算结果的排列顺序并不关心,我们只对运算结果内容感兴趣。如:从数据库库存表中查询商品价格大于100的所有商品,这时我们对读出商品记录的顺序并不关心,我们只对每条记录的价格感兴趣。如果我们从很多源头(数据表)读取商品信息的话,可以同时对这些源头进行并行读取。scalaz-stream是通过merge来实现并行运算的。merge可以同时读取多个数据源然后产生一个合并的数据流。由于各个源头的滞后情况有所不同,所以merge产生结果的顺序是不可预测的(nondeterministic)。我们用个例子来示范有那些方法可以同时从三个文件中逐行读取文字然后再合并成一个多行文件:
1 al p1 = io linesR s"/Users/Tiger/Process.scala"
2 //> p1 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
3 val p2 = io linesR s"/Users/Tiger/Wye.scala"
4 //> p2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val p3 = io linesR s"/Users/Tiger/Tee.scala"
6 //> p3 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)
p1,p2,p3是三个Source。它们分别从Process.scala, Wye.scala, Tee.scala中读取数据。我们可以模拟读取数据时可能遇到的延迟:
1 //假定读取数据造成不确定延迟
2 def readDelay(i: Int) = Thread.sleep( i/10 ) //> readDelay: (i: Int)Unit
3 val pa = p1.map{ s => readDelay(s.length); s} //> pa : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
4 val pb = p2.map{ s => readDelay(s.length); s} //> pb : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val pc = p3.map{ s => readDelay(s.length); s} //> pc : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)
现在pa,pb,pc都按照所读文件中每行文字长度来产生滞延。下面我们先统计一下每个Process运算所需要的时间:
1 val pa_start = System.currentTimeMillis //> pa_start : Long = 1470051661503
2 val palines= pa.runFoldMap(_ => 1).run //> palines : Int = 1616
3 println(s"reading p1 $palines lines in ${System.currentTimeMillis - pa_start}ms") 4 //> reading p1 1616 lines in 6413ms
5 val pb_start = System.currentTimeMillis //> pb_start : Long = 1470051667917
6 val pblines=pb.runFoldMap(_ => 1).run //> pblines : Int = 901
7 println(s"reading p2 $pblines lines in ${System.currentTimeMillis - pb_start}ms") 8 //> reading p2 901 lines in 3275ms
9 val pc_start = System.currentTimeMillis //> pc_start : Long = 1470051671192
10 val pclines=pc.runFoldMap(_ => 1).run //> pclines : Int = 306
11 println(s"reading p3 $pclines lines in ${System.currentTimeMillis - pc_start}ms") 12 //> reading p3 306 lines in 1181ms
13 println(s"reading all ${palines+pblines+pclines} lines in ${System.currentTimeMillis - pa_start}ms") 14 //> reading all 2823 lines in 10870ms
三个文件总共有2823行,读取时间为10870ms。我们用append方式来连续运算:
1 val pl_start = System.currentTimeMillis //> pl_start : Long = 1470051672373
2 val plines = (pa ++ pb ++ pc).runFoldMap(_ =&g