private val nextInputStreamId = new AtomicInteger(0)
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
}
}
private[streaming] val checkpointDuration: Duration = {
if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
}
private[streaming] val scheduler = new JobScheduler(this)
private[streaming] val waiter = new ContextWaiter
private[streaming] val progressListener = new StreamingJobProgressListener(this)
private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}
StreamingJobProgressListener实现:
private[streaming] classStreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener with SparkListener {
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
privatevar totalCompletedBatches = 0L
privatevar totalReceivedRecords = 0L
privatevar totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
......
然后我自己实现了一个Listener类,当发生阻塞的时候,可以发送邮件,以下实现比较简单
import org.apache.spark.streaming.scheduler._
import streaming.test.email.EmailSender
import org.slf4j._
classBJJListener(private val appName:String, private val duration: Int)extendsStreamingListener{privateval logger = LoggerFactory.getLogger("BJJListener")
overridedef onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
super.onReceiverStarted(receiverStarted)
}
overridedef onReceiverError(receiverError: StreamingListenerReceiverError): Unit = super.onReceiverError(receiverError)
overridedef onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = super.onReceiverStopped(receiverStopped)
overridedef onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
super.onBatchSubmitted(batchSubmitted)
val batchInfo = batchSubmitted.batchInfo
val batchTime = batchInfo.batchTime
logger.info("BJJListener batchTime : ", batchTime)
}
overridedef onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
val batchInfo = batchStarted.batchInfo
val processingStartTime = batchInfo.processingStartTime
logger.info("BJJListener processingStartTime : ", processingStartTime)
}
overridedef onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
val processingStartTime = batchCompleted.batchInfo.processingStartTime
val processingEndTime = batchInfo.processingEndTime
val processingDelay = batchInfo.processingDelay
val totalDelay = batchInfo.totalDelay
if(totalDelay.get >= 6 * duration * 1000 && totalDelay.get >= 10 * duration * 1000){
val monitorTitle = s"spark streaming $appName 程序阻塞异常警告"val monitorContent = s"BJJListener : processingStartTime -> ${processingStartTime.get}, processingEndTime -> ${processingEndTime.get} , " +
s"processingDelay -> ${processingDelay.get} , totalDelay -> ${totalDelay.get}, 请及时检查!"
EmailSender.sendMail(monitorTitle, monitorContent)
}
logger.info("BJJListener processingEndTime : ", processingEndTime)
logger.info("BJJListener processingDelay : ", processingDelay)
logger.info("BJJListener totalDelay : ", totalDelay)
}
overridedef onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit =
super.onOutputOperationStarted(outputOperationStarted)
overridedef onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit =
super.onOutputOperationCompleted(outputOperationCompleted)
}