eam.Process0[Int] = Emit(Vector(0))
3 emitAll(Seq(1,2,3)) //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
4 Process(1,2,3) //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3))
5 Process() //> res3: scalaz.stream.Process0[Nothing] = Emit(List())
以上都是Process0的构建方式,也算是数据源。但它们只是代表了内存中的一串值,对我们来说没什么意义,因为我们希望从外设获取这些值,比如从文件或者数据库里读取数据,也就是说需要F运算效果。Process0[O] >>> Process[Nothing,O],而我们需要的是Process[F,O]。那么我们这样写如何呢?
1 val p: Process[Task,Int] = emitAll(Seq(1,2,3)) 2 //> p : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
3
4 emitAll(Seq(1,2,3)).toSource 5 //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
6
类型倒是匹配了,但表达式Emit(...)里没有任何Task的影子,这个无法满足我们对Source的需要。看来只有以下这种方式了:
1 await(Task.delay{3})(emit) 2 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@57855c9a,<function1>,<function1>)
3 eva l(Task.delay{3}) 4 //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@63e2203c,<function1>,<function1>)
现在不但类型匹配,而且表达式里还包含了Task运算。我们通过Task.delay可以进行文件读取等带有副作用的运算,这是因为Await将会运行req:F[E] >>> Task[Int]。这正是我们需要的Source。那我们能不能用这个Source来发出一串数据呢?
1 def emitSeq[A](xa: Seq[A]):Process[Task,A] =
2 xa match { 3 case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t) 4 case Nil => halt 5 } //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A]
6 val es1 = emitSeq(Seq(1,2,3)) //> es1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await(scalaz.concurrent.Task@2d6eabae,<function1>,<function1>),Vector(<function1>))
7 val es2 = emitSeq(Seq("a","b","c")) //> es2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await(
8 scalaz.concurrent.Task@4e7dc304,<function1>,<function1>),Vector(<function1>)) 9 es1.runLog.run //> res7: Vector[Int] = Vector(1, 2, 3)
10 es2.runLog.run //> res8: Vector[String] = Vector(a, b, c)
以上示范中我们用await运算了Task,然后返回了Process[Task,?],一个可能带副作用运算的Source。实际上我们在很多情况下都需要从外部的源头用Task来获取一些数据,通常这些数据源都对数据获取进行了异步(asynchronous)运算处理,然后通过callback方式来提供这些数据。我们可以用Task.async函数来把这些callback函数转变成Task,下一步我们只需要用Process.eva l或者await就可以把这个Task升格成Process[Task,?]。我们先看个简单的例子:假如我们用scala.concurrent.Future来进行异步数据读取,可以这样把Future转换成Process:
1 def getData(dbName: String): Task[String] = Task.async { cb =>
2 import scala.concurrent._ 3 import scala.concurrent.ExecutionContext.Implicits.global
4 import scala.util.{Success,Failure} 5 Future { s"got data from $dbName" }.onComplete { 6 case Success(a) => cb(a.right) 7 case Failure(e) => cb(e.left) 8 } 9 } //> getData: (dbName: String)scalaz.concurrent.Task[String]
10 val procGetData = eva l(getData("MySQL")) //> procGetData : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@dd3b207,<function1>,<function1>)
11 procGetData.runLog.run //> res9: Vector[String] = Vector(got data from MySQL)
我们也可以把java的callback转变成Task:
1 import com.ning.http.client._ 2 val asyncHttpClient = new AsyncHttpClient() //> as |