设为首页 加入收藏


Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介(一)
2017-10-09 13:43:40 】 浏览:7667
Tags:Akka Stream 数据流 基础 组件 -Source Flow Sink 简介

    在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定。一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易造成“回调地狱(callback hell)”,即所谓的“goto-hell”:程序控制跳来跳去很难跟踪,特别是一些变量如果在回调函数中更改后产生不可预料的结果。数据流(stream)是一种解决问题的有效编程方式。Stream是一个抽象概念,能把程序数据输入过程和其它细节隐蔽起来,通过申明方式把数据处理过程描述出来,使整体程序逻辑更容易理解跟踪。当然,牺牲的是对一些运算细节的控制能力。我们在前面介绍过scalaz-stream,它与akka-stream的主要区别在于:





  /** * Helper to create [[Source]] from `Iterable`. * Example usage: `Source(Seq(1,2,3))` * * Starts a new `Source` from the given `Iterable`. This is like starting from an * Iterator, but every Subscriber directly attached to the Publisher of this * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, NotUsed] = fromGraph(new GraphStages.SingleSource(element)) /** * Helper to create [[Source]] from `Iterator`. * Example usage: `Source.fromIterator(() => Iterator.from(0))` * * Start a new `Source` from the given function that produces anIterator. * The produced stream of elements will continue until the iterator runs empty * or fails during eva luation of the `next()` method. * Elements are pulled out of the iterator in accordance with the demand coming * from the downstream transformation steps. */ def fromIterator[T](f: () ? Iterator[T]): Source[T, NotUsed] = apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() override def toString: String = "() => Iterator" }) /** * Starts a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. */ def fromFuture[T](future: Future[T]): Source[T, NotUsed] = fromGraph(new FutureSource(future)) /** * Helper to create [[Source]] from `Publisher`. * * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] = fromGr
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇为什么要创建开放源码的PlayScala.. 下一篇Akka(23): Stream:自定义流构..



Hot 文章


C 语言



