/** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ def empty[T]: Source[T, NotUsed] = _empty private[this] val _empty: Source[Nothing, NotUsed] = Source.fromGraph(EmptySource) /** * Create a `Source` that will continually emit the given element. */ def repeat[T](element: T): Source[T, NotUsed] = { val next = Some((element, element)) unfold(element)(_ ? next).withAttributes(DefaultAttributes.repeat) } /** * Creates [[Source]] that will continually produce given elements in specified order. * * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements * will continue infinitely by repeating the sequence of elements provided by function parameter. */ def cycle[T](f: () ? Iterator[T]): Source[T, NotUsed] = { val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten fromIterator(() ? iterator).withAttributes(DefaultAttributes.cycledSource) }
/** * A `Sink` that will consume the stream and discard the elements. */ def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the * normal end of the stream, or completed with `Failure` if there is a failure signaled in * the stream.. */ def foreach[T](f: T ? Unit): Sink[T, Future[Done]] = Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ? Mat3): Source[T, Mat3] = { val toAppend =
if (flow.traversalBuilder eq Flow.identityTraversalBuilder) LinearTraversalBuilder.empty() else flow.traversalBuilder new Source[T, Mat3]( traversalBuilder.append(toAppend, flow.shape, combine), SourceShape(flow.shape.out)) } /** * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both. */ def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left) /** * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.s