设为首页 加入收藏

TOP

FunDA(5)- Reactive Streams:Play with Iteratees(三)
2017-10-09 14:30:11 】 浏览:827
Tags:FunDA Reactive Streams Play with Iteratees
ure[Iteratee[E, A]]
= i.pureFoldNoEC { case Step.Cont(k) => k(Input.El(in.head)) case _ => i } } case _ => new Enumerator[E] { def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i) } } ----- private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) => l.foldLeft(Future.successful(i))((i, e) => i.flatMap(it => it.pureFold { case Step.Cont(k) => k(Input.El(e)) case _ => it }(dec))(dec)) }

我们可以通过定义fold函数来获取不同功能的Iteratee。下面就是一个直接返回恒量值Iteratee的定义过程:

val doneIteratee = new Iteratee[String,Int] { def fold[B](folder: Step[String,Int] => Future[B])(implicit ec: ExecutionContext): Future[B] = { folder(Step.Done(21,Input.EOF)) } }

这个Iteratee不会消耗任何输入,直接就返回21。实际上我们可以直接用Done.apply来构建这个doneIteratee:

val doneIteratee = Done[String,Int](21,Input.Empty)

我们也可以定义一个只消耗一个输入元素的Iteratee:

val consumeOne = new Iteratee[String,String] { def fold[B](folder: Step[String,String] => Future[B])(implicit ec: ExecutionContext): Future[B] = { folder(Step.Cont { case Input.EOF => Done("OK",Input.EOF) case Input.Empty => this
        case Input.El(e) => Done(e,Input.EOF) }) } }

同样,我们也可以用Cont构建器来构建这个consumeOne:

val consumeOne1 = Cont[String,String](in => Done("OK",Input.EOF))

从上面这些例子里我们可以推敲folder函数应该是在Enumerator里定义的,看看下面这个Enumerator例子:

val enumerator = new Enumerator[String] { // some messages
    val items = 1 to 10 map (i => i.toString) var index = 0

    override def apply[A](i: Iteratee[String, A]): Future[Iteratee[String, A]] = { i.fold( // the folder
 { step => { step match { // iteratee is done, so no more messages // to send
            case Step.Done(result, remaining) => { println("Step.Done") Future(i) } // iteratee can consume more
            case Step.Cont(k: (Input[String] => Iteratee[String, A])) => { println("Step.Cont") // does enumerator have more messages ?
              if (index < items.size) { val item = items(index) println(s"El($item)") index += 1

                // get new state of iteratee
                val newIteratee = k(Input.El(item)) // recursive apply
 apply(newIteratee) } else { println("EOF") Future(k(Input.EOF)) } } // iteratee is in error state
            case Step.Error(message, input: Input[String]) => { println("Step.Error") Future(i) } } } }) } }

下面我们示范一个完整的例子: 

val userIteratee = new Iteratee[String, Unit] { override def fold[B](folder: (Step[String, Unit]) => Future[B]) (implicit ec: ExecutionContext): Future[B] = { // accumulator
    val buffer: ListBuffer[String] = ListBuffer() // the step function
    def stepFn(in: Input[String]): Iteratee[String, Unit] = { in match { case Input.Empty => this
        case Input.EOF => Done({ println(s"Result ${buffer.mkString("--")}") }, Input.Empty) case Input.El(el) => { buffer += el Cont(stepFn) } } } // initial state -> iteratee ready to accept input
 folder(Step.Cont(stepFn)) } } //> userIteratee : play.api.libs.iteratee.Iteratee[String,Unit] = demo.worksheet.iteratee2$$anonfun$main$1$$anon$3@4f063c0a
val usersEnum = Enumerator("Tiger","John","Jimmy","Kate","Chris") //> usersEnum : play.api.libs.iteratee.Enumerator[String] = play.api.libs.iteratee.Enumerator$$anon$19@51cdd8a
(usersEnum |>>> userIteratee)   //> Result Tiger--John--Ji
首页 上一页 1 2 3 4 5 下一页 尾页 3/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(3)- 流动数据行操作:FD.. 下一篇FunDA(6)- Reactive Streams:..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目