设为首页 加入收藏

TOP

Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介(一)
2017-10-09 13:43:40 】 浏览:7647
Tags:Akka Stream 数据流 基础 组件 -Source Flow Sink 简介

    在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定。一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易造成“回调地狱(callback hell)”,即所谓的“goto-hell”:程序控制跳来跳去很难跟踪,特别是一些变量如果在回调函数中更改后产生不可预料的结果。数据流(stream)是一种解决问题的有效编程方式。Stream是一个抽象概念,能把程序数据输入过程和其它细节隐蔽起来,通过申明方式把数据处理过程描述出来,使整体程序逻辑更容易理解跟踪。当然,牺牲的是对一些运算细节的控制能力。我们在前面介绍过scalaz-stream,它与akka-stream的主要区别在于:

1、scalaz-stream是pull模式的,而akka-stream是push模式的。pull模式的缺点是接收数据效率问题,因为在这种模式里程序必须不断重复检测(polling)输入端口是否有数据存在。而push模式则会把数据推到输入端口后直接进入程序,但如果数据源头动作太快程序无法及时处理所有推送的数据时就会造成所谓的数据溢出问题,遗失数据。不过akka-stream实现了reactive-stream的back-pressure规范:数据发送方和接收方之间互动提示,使过快的数据产生能按接收方要求慢下来甚至暂时停下来。

2、scalaz-sstream和akka-stream的数据流都是一种申明式的数据处理流程描述,属于一种运算方案,最终都需要某种运算器来对数据流按运算方案进行具体的运算,得出运算结果和产生副作用。scalaz-stream的运算器是自备的函数式程序,特点是能很好的控制线程使用和进行并行运算。akka-stream的运算器是materializer。materializer在actor系统上运行,具备了actor模式程序的优点包括:消息驱动、集群运算、监管策略(SupervisorStrategy)等等。

akka-stream的数据流是由三类基础组件组合而成,不同的组合方式代表不同的数据处理及表达功能。三类组件分别是:

1、Source:数据源。akka-stream属于push模式,所以Source也就是Publisher(数据发布方),Source的形状SourceShape代表只有一个输出端口的形状。Source可以从单值、集合、某种Publisher或另一个数据流产生数据流的元素(stream-element),包括:

  /** * Helper to create [[Source]] from `Iterable`. * Example usage: `Source(Seq(1,2,3))` * * Starts a new `Source` from the given `Iterable`. This is like starting from an * Iterator, but every Subscriber directly attached to the Publisher of this * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] = single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, NotUsed] = fromGraph(new GraphStages.SingleSource(element)) /** * Helper to create [[Source]] from `Iterator`. * Example usage: `Source.fromIterator(() => Iterator.from(0))` * * Start a new `Source` from the given function that produces anIterator. * The produced stream of elements will continue until the iterator runs empty * or fails during eva luation of the `next()` method. * Elements are pulled out of the iterator in accordance with the demand coming * from the downstream transformation steps. */ def fromIterator[T](f: () ? Iterator[T]): Source[T, NotUsed] = apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() override def toString: String = "() => Iterator" }) /** * Starts a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. */ def fromFuture[T](future: Future[T]): Source[T, NotUsed] = fromGraph(new FutureSource(future)) /** * Helper to create [[Source]] from `Publisher`. * * Construct a transformation starting with given publisher. The transformation steps * are executed by a series of [[org.reactivestreams.Processor]] instances * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] = fromGr
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇为什么要创建开放源码的PlayScala.. 下一篇Akka(23): Stream:自定义流构..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目