/**
* A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}.
*/@InternalpublicclassStreamSink<IN>extendsAbstractUdfStreamOperator<Object, SinkFunction<IN>>implementsOneInputStreamOperator<IN, Object>{...}
StreamTask的执行是通过其invoke方法。invoke方法做的事情大致如下,
*--invoke()*|*+----> Create basic utils (config, etc) and load the chain of operators
*+----> operators.setup()*+----> task specific init()*+----> initialize-operator-states()*+----> open-operators()*+---->run()*+----> close-operators()*+----> dispose-operators()*+----> common cleanup
*+----> task specific cleanup()
synchronized(lock){// both the following operations are protected by the lock// so that we avoid race conditions in the case that initializeState()// registers a timer, that fires before the open() is called.initializeState();openAllOperators();}// final check to exit early before starting to runif(canceled){thrownewCancelTaskException();}// let the task do its work
isRunning =true;run();
privatevoidhandleRestoredBucketState(State<T> restoredState){
Preconditions.checkNotNull(restoredState);for(BucketState<T> bucketState : restoredState.bucketStates.values()){/******* Checkpoint成功时`pendingFiles`应该是空的 *******/// we can clean all the pending files since they were renamed to// final files after this checkpoint was successful// (we re-start from the last **successful** checkpoint)
bucketState.pendingFiles.clear();/******* 处理上一次Checkpoint成功时处于`in-progress`状态的文件 *******/handlePendingInProgressFile(bucketState.currentFile, bucketState.currentFileva lidLength);// Now that we've restored the bucket to a valid state, reset the current file info
bucketState.currentFile = null;
bucketState.currentFileva lidLength =-1;
bucketState.isWriterOpen =false;/******* 处理分桶状态的`pendingFilesPerCheckpoint`信息 *******/handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
bucketState.pendingFilesPerCheckpoint.clear();}}
This method is called as a notification once a distributed checkpoint has been completed. Note that any exception during this method will not cause the checkpoint to fail any more.