设为首页 加入收藏

TOP

Scalaz(47)- scalaz-stream: 深入了解-Source(二)
2017-10-10 12:12:38 】 浏览:9729
Tags:Scalaz scalaz-stream: 深入 了解 -Source
eam.Process0[Int] = Emit(Vector(0))
3 emitAll(Seq(1,2,3)) //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3)) 4 Process(1,2,3) //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3)) 5 Process() //> res3: scalaz.stream.Process0[Nothing] = Emit(List())

以上都是Process0的构建方式,也算是数据源。但它们只是代表了内存中的一串值,对我们来说没什么意义,因为我们希望从外设获取这些值,比如从文件或者数据库里读取数据,也就是说需要F运算效果。Process0[O] >>> Process[Nothing,O],而我们需要的是Process[F,O]。那么我们这样写如何呢?

1 val p: Process[Task,Int] = emitAll(Seq(1,2,3)) 2    //> p : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
3 
4 emitAll(Seq(1,2,3)).toSource 5    //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
6                                                   

类型倒是匹配了,但表达式Emit(...)里没有任何Task的影子,这个无法满足我们对Source的需要。看来只有以下这种方式了:

1 await(Task.delay{3})(emit) 2 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@57855c9a,<function1>,<function1>)
3 eva l(Task.delay{3}) 4 //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@63e2203c,<function1>,<function1>)

现在不但类型匹配,而且表达式里还包含了Task运算。我们通过Task.delay可以进行文件读取等带有副作用的运算,这是因为Await将会运行req:F[E] >>> Task[Int]。这正是我们需要的Source。那我们能不能用这个Source来发出一串数据呢?

 1 def emitSeq[A](xa: Seq[A]):Process[Task,A] =
 2  xa match {  3     case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t)  4     case Nil => halt  5   }                                     //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A]
 6 val es1 = emitSeq(Seq(1,2,3))           //> es1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await(scalaz.concurrent.Task@2d6eabae,<function1>,<function1>),Vector(<function1>))
 7 val es2 = emitSeq(Seq("a","b","c"))     //> es2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await(
 8 scalaz.concurrent.Task@4e7dc304,<function1>,<function1>),Vector(<function1>))  9 es1.runLog.run                          //> res7: Vector[Int] = Vector(1, 2, 3)
10 es2.runLog.run                          //> res8: Vector[String] = Vector(a, b, c)

以上示范中我们用await运算了Task,然后返回了Process[Task,?],一个可能带副作用运算的Source。实际上我们在很多情况下都需要从外部的源头用Task来获取一些数据,通常这些数据源都对数据获取进行了异步(asynchronous)运算处理,然后通过callback方式来提供这些数据。我们可以用Task.async函数来把这些callback函数转变成Task,下一步我们只需要用Process.eva l或者await就可以把这个Task升格成Process[Task,?]。我们先看个简单的例子:假如我们用scala.concurrent.Future来进行异步数据读取,可以这样把Future转换成Process:

 1 def getData(dbName: String): Task[String] = Task.async { cb =>
 2  import scala.concurrent._  3    import scala.concurrent.ExecutionContext.Implicits.global
 4  import scala.util.{Success,Failure}  5    Future { s"got data from $dbName" }.onComplete {  6      case Success(a) => cb(a.right)  7      case Failure(e) => cb(e.left)  8  }  9 }                                        //> getData: (dbName: String)scalaz.concurrent.Task[String]
10 val procGetData = eva l(getData("MySQL")) //> procGetData : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@dd3b207,<function1>,<function1>)
11 procGetData.runLog.run                   //> res9: Vector[String] = Vector(got data from MySQL)

我们也可以把java的callback转变成Task:

 1  import com.ning.http.client._  2   val asyncHttpClient = new AsyncHttpClient()     //> as
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala学习手记7 - 运算符重载 下一篇scala学习手记21 - 传递变长参数

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目