设为首页 加入收藏

TOP

Scalaz(47)- scalaz-stream: 深入了解-Source(三)
2017-10-10 12:12:38 】 浏览:9715
Tags:Scalaz scalaz-stream: 深入 了解 -Source
e[BigStringResult]
3 //> qJobResult : scalaz.stream.async.mutable.Queue[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.mutable.Queue$$anon$1@25d250c6 4 def longGet(jobnum: Int): BigStringResult = { 5 Thread.sleep(2000) 6 s"Some large data sets from job#${jobnum}" 7 } //> longGet: (jobnum: Int)demo.ws.blogStream.BigStringResult 8 9 // multi-tasking 10 val start = System.currentTimeMillis() //> start : Long = 1468556250797 11 Task.fork(qJobResult.enqueueOne(longGet(1))).unsafePerformAsync{case _ => ()} 12 Task.fork(qJobResult.enqueueOne(longGet(2))).unsafePerformAsync{case _ => ()} 13 Task.fork(qJobResult.enqueueOne(longGet(3))).unsafePerformAsync{case _ => ()} 14 val timemill = System.currentTimeMillis() - start 15 //> timemill : Long = 17 16 Thread.sleep(3000) 17 qJobResult.close.run 18 val bigData = { 19 //multi-tasking 20 val j1 = qJobResult.dequeue 21 val j2 = qJobResult.dequeue 22 val j3 = qJobResult.dequeue 23 for { 24 r1 <- j1 25 r2 <- j2 26 r3 <- j3 27 } yield r1 + ","+ r2 + "," + r3 28 } //> bigData : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Await(scalaz.concurrent.Task@778d1062,<function1>,<function1>) 29 30 bigData.runLog.run //> res9: Vector[String] = Vector(Some large data sets from job#2,Some large data sets from job#3,Some large data sets from job#1)

再看看Topic示范:

 1 import scala.concurrent._  2  import scala.concurrent.duration._  3   import scalaz.stream.async.mutable._  4   import scala.concurrent.ExecutionContext.Implicits.global
 5   val sharedData: Topic[BigStringResult] = async.topic()  6        //> sharedData : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.package$$anon$1@797badd3
 7   val subscriber = sharedData.subscribe.runLog    //> subscriber : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = scalaz.concurrent.Task@226a82c4
 8   val otherThread = future {  9     subscriber.run // Added this here - now subscriber is really attached to the topic
10   }                //> otherThread : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List() 11   // Need to give subscriber some time to start up. 12   // I doubt you'd do this in actual code. 13 
14   // topics seem more useful for hooking up things like 15   // sensors that produce a continual stream of data, 16 
17   // and where individual values can be dropped on 18   // floor.
19   Thread.sleep(100) 20 
21   sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task
22   sharedData.close.run // Don't just call close; need to run the resulting task 23 
24   // Need to wait for the output
25   val result = Await.result(otherThread, Duration.Inf) 26        //> result : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)

以上对可能带有副作用的Source的各种产生方法提供了解释和示范。scalaz-stream的其他类型节点将在下面的讨论中深入介绍。

 

 

 

 

 

 

 

 

 

 

首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记7 - 运算符重载 下一篇scala学习手记21 - 传递变长参数

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目