设为首页 加入收藏

TOP

FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees(一)
2017-10-09 14:30:10 】 浏览:3732
Tags:FunDA Reactive Streams Play with Iteratees Enumerator and Enumeratees

    在上一节我们介绍了Iteratee。它的功能是消耗从一些数据源推送过来的数据元素,不同的数据消耗方式代表了不同功能的Iteratee。所谓的数据源就是我们这节要讨论的Enumerator。Enumerator是一种数据源:它会根据下游数据消耗方(Iteratee)的具体状态主动向下推送数据元素。我们已经讨论过Iteratee的状态Step类型:

trait Step[E,+A] case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A] case class Cont[E,+A](k: Input[E] => InputStreamHandler[E,A]) extends Step[E,A] case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]

这其中Iteratee通过Cont状态通知Enumerator可以发送数据元素,并提供了k函数作为Enumerator的数据推送函数。Enumerator推送的数据元素,也就是Iteratee的输入Input[E],除单纯数据元素之外还代表着数据源状态: 

trait Input[+E] case class EL[E](e: E) extends Input[E] case object EOF extends Input[Nothing] case object Empty extends Input[Nothing]

Enumerator通过Input[E]来通知Iteratee当前数据源状态,如:是否已经完成所有数据推送(EOF),或者当前推送了什么数据元素(El[E](e:E))。Enumerator主动向Iteratee输出数据然后返回新状态的Iteratee。我们可以从Enumerator的类型款式看得出:

trait Enumerator[E] { /** * Apply this Enumerator to an Iteratee */ def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] }

这个Future的目的主要是为了避免占用线程。实际上我们可以最终通过调用Iteratee的fold函数来实现Enumerator功能,如:

 /** * Creates an enumerator which produces the one supplied * input and nothing else. This enumerator will NOT * automatically produce Input.EOF after the given input. */ def enumInput[E](e: Input[E]) = new Enumerator[E] { def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.fold { case Step.Cont(k) => eagerFuture(k(e)) case _ => Future.successful(i) }(dec) }

又或者通过构建器(constructor, apply)来构建Eumerator:

/** * Create an Enumerator from a set of values * * Example: * {{{ * val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar") * }}} */ def apply[E](in: E*): Enumerator[E] = in.length match { case 0 => Enumerator.empty case 1 => new Enumerator[E] { def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC { case Step.Cont(k) => k(Input.El(in.head)) case _ => i } } case _ => new Enumerator[E] { def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i) } } /** * Create an Enumerator from any TraversableOnce like collection of elements. * * Example of an iterator of lines of a file : * {{{ * val enumerator: Enumerator[String] = Enumerator( scala.io.Source.fromFile("myfile.txt").getLines ) * }}} */ def enumerate[E](traversable: TraversableOnce[E])(implicit ctx: scala.concurrent.ExecutionContext): Enumerator[E] = { val it = traversable.toIterator Enumerator.unfoldM[scala.collection.Iterator[E], E](it: scala.collection.Iterator[E])({ currentIt =>
      if (currentIt.hasNext) Future[Option[(scala.collection.Iterator[E], E)]]({ val next = currentIt.next Some((currentIt -> next)) })(ctx) else Future.successful[Option[(scala.collection.Iterator[E], E)]]({ None }) })(dec) } /** * An empty enumerator */ def empty[E]: Enumerator[E] = new Enumerator[E] { def apply[A](i: Iteratee[E, A]) = Future.successful(i) } private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) => l.foldLeft(Future.successful(i))((i, e) => i.flatMap(it => it.pureFold { case Step.Cont(k) => k(Input.El(e)) case _ => it }(dec))(dec)) }

下面是个直接构建Enumerator的例子: 

 val enumUsers: Enumerator[String] = { Enumerator("Tiger","Hover","Grand","John") //> enumUsers : play.api.libs.i
首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(5)- Reactive Streams:.. 下一篇FunDA(7)- Reactive Streams t..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目