设为首页 加入收藏

TOP

Scalaz(47)- scalaz-stream: 深入了解-Source(四)
2017-10-10 12:12:38 】 浏览:9713
Tags:Scalaz scalaz-stream: 深入 了解 -Source
yncHttpClient : com.ning.http.client.AsyncHttpClient = com.ning.http.client.AsyncHttpClient@245b4bdc
3 def get(s: String): Task[Response] = Task.async[Response] { callback => 4 asyncHttpClient.prepareGet(s).execute( 5 new AsyncCompletionHandler[Unit] { 6 def onCompleted(r: Response): Unit = callback(r.right) 7 8 def onError(e: Throwable): Unit = callback(e.left) 9 } 10 ) 11 } //> get: (s: String)scalaz.concurrent.Task[com.ning.http.client.Response] 12 val prcGet = Process.eva l(get("http://sina.com")) 13 //> prcGet : scalaz.stream.Process[scalaz.concurrent.Task,com.ning.http.client.Response] = Await(scalaz.concurrent.Task@222545dc,<function1>,<function1>) 14 prcGet.run.run //> 12:25:27.852 [New I/O worker #1] DEBUG c.n.h.c.p.n.r.NettyConnectListener -Request using non cached Channel '[id: 0x23fa1307, /192.168.200.3:50569 =>sina.com/66.102.251.33:80]':

如果直接按照scalaz Task callback的类型款式 def async(callback:(Throwable \/ Unit) => Unit):

 1   def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ???
 2                                  //> read: (callback: scalaz.\/[Throwable,Array[Byte]] => Unit)Unit
 3   val t: Task[Array[Byte]] = Task.async(read)     //> t : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@1a677343
 4   val t2: Task[Array[Byte]] = for {  5     bytes <- t  6     moarBytes <- t  7   } yield (bytes ++ moarBytes)          //> t2 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@15de0b3c
 8   val prct2 = Process.eva l(t2)          //> prct2 : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@15de0b3c,<function1>,<function1>)
 9 
10   def asyncRead(succ: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ???
11                           //> asyncRead: (succ: Array[Byte] => Unit, fail: Throwable => Unit)Unit
12   val t3: Task[Array[Byte]] = Task.async { callback =>
13      asyncRead(b => callback(b.right), err => callback(err.left)) 14   }                      //> t3 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@489115ef
15   val t4: Task[Array[Byte]] = t3.flatMap(b => Task(b)) 16                          //> t4 : scalaz.concurrent.Task[Array[Byte]] = scalaz.concurrent.Task@3857f613
17   val prct4 = Process.eva l(t4)      //> prct4 : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await(scalaz.concurrent.Task@3857f613,<function1>,<function1>)

我们也可以用timer来产生Process[Task,A]:

1  import scala.concurrent.duration._ 2   implicit val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(3) 3                   //> scheduler : java.util.concurrent.ScheduledExecutorService = java.util.concurrent.ScheduledThreadPoolExecutor@516be40f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
4   val fizz = time.awakeEvery(3.seconds).map(_ => "fizz") 5                   //> fizz : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@5762806e,<function1>,<function1>)
6   val fizz3 = fizz.take(3)   //> fizz3 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
7   fizz3.runLog.run           //> res9: Vector[String] = Vector(fizz, fizz, fizz)

Queue、Top和Signal都可以作为带副作用数据源的构建器。我们先看看Queue是如何产生数据源的:

 1   type BigStringResult = String  2   val qJobResult = async.unboundedQueu
首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记7 - 运算符重载 下一篇scala学习手记21 - 传递变长参数

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目