设为首页 加入收藏

TOP

Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介(二)
2017-10-09 13:43:40 】 浏览:7671
Tags:Akka Stream 数据流 基础 组件 -Source Flow Sink 简介
aph(
new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource"))) /** * A graph with the shape of a source logically is a source, this method makes * it so also in type. */ def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match { case s: Source[T, M] ? s case s: javadsl.Source[T, M] ? s.asScala case other ? new Source( LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape) }

下面还有几个特殊的Source:

  /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ def empty[T]: Source[T, NotUsed] = _empty private[this] val _empty: Source[Nothing, NotUsed] = Source.fromGraph(EmptySource) /** * Create a `Source` that will continually emit the given element. */ def repeat[T](element: T): Source[T, NotUsed] = { val next = Some((element, element)) unfold(element)(_ ? next).withAttributes(DefaultAttributes.repeat) } /** * Creates [[Source]] that will continually produce given elements in specified order. * * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements * will continue infinitely by repeating the sequence of elements provided by function parameter. */ def cycle[T](f: () ? Iterator[T]): Source[T, NotUsed] = { val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten fromIterator(() ? iterator).withAttributes(DefaultAttributes.cycledSource) }

2、Sink:数据终端。属于数据元素的使用方,主要作用是消耗数据流中的元素。SinkShape是有一个输入端的数据流形状。Sink消耗流元素的例子有:

  /** * A `Sink` that will consume the stream and discard the elements. */ def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream.. */ def foreach[T](f: T ? Unit): Sink[T, Future[Done]] = Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")

注意,akka-stream实际是在actor上进行运算的。actor的内部状态最终可以形成运算结果。上面的例子可以得出Sink的运算结果是Future[??]类型的。

3、Flow:数据处理节点。对通过输入端口输入数据流的元素进行转变处理(transform)后经过输出端口输出。FlowShape有一个输入端和一个输出端。

在akka-stream里数据流组件一般被称为数据流图(graph)。我们可以用许多数据流图组成更大的stream-graph。

akka-stream最简单的完整(或者闭合)线性数据流(linear-stream)就是直接把一个Source和一个Sink相接。这种方式代表一种对数据流所有元素的直接表现,如:source.runWith(Sink.foreach(println))。我们可以用Source.via来连接Flow,用Source.to连接Sink:

  override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ? Mat3): Source[T, Mat3] = { val toAppend =
      if (flow.traversalBuilder eq Flow.identityTraversalBuilder) LinearTraversalBuilder.empty() else flow.traversalBuilder new Source[T, Mat3]( traversalBuilder.append(toAppend, flow.shape, combine), SourceShape(flow.shape.out)) } /** * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both. */ def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left) /** * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.s
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇为什么要创建开放源码的PlayScala.. 下一篇Akka(23): Stream:自定义流构..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目