/** * Collection of callbacks for an input port of a [[GraphStage]] */ trait InHandler { /** * Called when the input port has a new element available. The actual element can be retrieved via the * [[GraphStageLogic.grab()]] method. */ @throws(classOf[Exception]) def onPush(): Unit /** * Called when the input port is finished. After this callback no other callbacks will be called for this port. */ @throws(classOf[Exception]) def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage() /** * Called when the input port has failed. After this callback no other callbacks will be called for this port. */ @throws(classOf[Exception]) def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex) } /** * Collection of callbacks for an output port of a [[GraphStage]] */ trait OutHandler { /** * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] * is now allowed to be called on this port. */ @throws(classOf[Exception]) def onPull(): Unit /** * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ @throws(classOf[Exception]) def onDownstreamFinish(): Unit = { GraphInterpreter .currentInterpreter .activeStage .completeStage() } }
class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] { val outport = Outlet[String]("output") val shape = SourceShape(outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { var pos: Int = 0 setHandler(outport,new OutHandler { override def onPull(): Unit = { push(outport,chars(pos)) pos += 1
if (pos == chars.length) pos = 0 } }) } }
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {...} abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {...}
val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("A","B","C","D")) val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure) alphaSource.runWith(Sink.foreach(println))
class UppercaseSink extends GraphStage[SinkShape[String]] { val inport = Inlet[String]("input") val shape = SinkShape(inport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = pull(inport) override def onPush(): Unit = { println(grab(inport).toUpperCase) pull(inport) } setHandler(inport,this) } }