设为首页 加入收藏

TOP

spark streaming 应用程序 监控 邮件提醒
2018-11-13 11:58:06 】 浏览:25
Tags:spark streaming 应用程序 监控 邮件 提醒

spark streaming应用程序,放到线上后,怎么监测spark streaming程序的阻塞状态,
虽然spark 提供了spark webUI去查看,但是作为开发人员总不能天天去看spark webUI页面吧,
去官网看,貌似可以通过请求spark 自带的jetty 服务器可以获取相关检测统计信息,

http://host:8088/proxy/application_1517299288666_7058/streaming/

返回的数据是要html页面,可以通过正则去解析需要的信息;但是这样子很不方便,能不能在spark streaming 发生阻塞的时候给我发送邮件提醒甚至是钉钉提醒呢?

这个方法比较笨,有没有更好的方法呢?跟踪spark StreamingContext源码:

spark streaming StreamingJobProgressListener 监控器

private val nextInputStreamId = new AtomicInteger(0)

  private[streaming] var checkpointDir: String = {
    if (isCheckpointPresent) {
      sc.setCheckpointDir(cp_.checkpointDir)
      cp_.checkpointDir
    } else {
      null
    }
  }

  private[streaming] val checkpointDuration: Duration = {
    if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration
  }

  private[streaming] val scheduler = new JobScheduler(this)

  private[streaming] val waiter = new ContextWaiter

  private[streaming] val progressListener = new StreamingJobProgressListener(this)

  private[streaming] val uiTab: Option[StreamingTab] =
    if (conf.getBoolean("spark.ui.enabled", true)) {
      Some(new StreamingTab(this))
    } else {
      None
    }

StreamingJobProgressListener实现:

private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
  extends StreamingListener with SparkListener {

  private val waitingBatchUIData = new HashMap[Time, BatchUIData]
  private val runningBatchUIData = new HashMap[Time, BatchUIData]
  private val completedBatchUIData = new Queue[BatchUIData]
  private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
  private var totalCompletedBatches = 0L
  private var totalReceivedRecords = 0L
  private var totalProcessedRecords = 0L
  private val receiverInfos = new HashMap[Int, ReceiverInfo]
  ......

然后我自己实现了一个Listener类,当发生阻塞的时候,可以发送邮件,以下实现比较简单


import org.apache.spark.streaming.scheduler._
import streaming.test.email.EmailSender
import org.slf4j._


class BJJListener(private val appName:String, private val duration: Int) extends StreamingListener{

  private val logger = LoggerFactory.getLogger("BJJListener")

  override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
    super.onReceiverStarted(receiverStarted)
  }

  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = super.onReceiverError(receiverError)

  override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = super.onReceiverStopped(receiverStopped)

  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
    super.onBatchSubmitted(batchSubmitted)
    val batchInfo = batchSubmitted.batchInfo
    val batchTime = batchInfo.batchTime
    logger.info("BJJListener  batchTime : ", batchTime)
  }

  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
    val batchInfo = batchStarted.batchInfo
    val processingStartTime = batchInfo.processingStartTime
    logger.info("BJJListener  processingStartTime : ", processingStartTime)
  }

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    val processingStartTime = batchCompleted.batchInfo.processingStartTime

    val processingEndTime = batchInfo.processingEndTime
    val processingDelay = batchInfo.processingDelay
    val totalDelay = batchInfo.totalDelay

    if(totalDelay.get >= 6 * duration * 1000 && totalDelay.get >= 10 * duration * 1000){
      val monitorTitle = s"spark streaming $appName 程序阻塞异常警告"
      val monitorContent = s"BJJListener : processingStartTime -> ${processingStartTime.get}, processingEndTime -> ${processingEndTime.get} , " +
        s"processingDelay -> ${processingDelay.get} , totalDelay -> ${totalDelay.get}, 请及时检查!"
      EmailSender.sendMail(monitorTitle, monitorContent)
    }
    logger.info("BJJListener  processingEndTime : ", processingEndTime)
    logger.info("BJJListener  processingDelay : ", processingDelay)
    logger.info("BJJListener  totalDelay : ", totalDelay)
  }

  override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit =
    super.onOutputOperationStarted(outputOperationStarted)

  override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit =
    super.onOutputOperationCompleted(outputOperationCompleted)

}

通过可插拔的方式添加自己实现的listener

    ssc.addStreamingListener(new BJJListener(appName, 10))

//spark streaming程序测试的例子:

import kafka.serializer.StringDecoder
import org.ansj.splitWord.analysis.NlpAnalysis
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Amy on 2018/2/2.
  */
object test {
  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "D:\\mcyarn\\hadoop-common-2.2.0-bin-master")
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val appName = "spark Steaming test"
    val conf = new SparkConf().setMaster("local[2]").setAppName("test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    val brokerList = "localhost:9092"
    val zookeeperConnect = "localhost:2181"
    val groupId = "baasdf20180302"
    val newsTopic = "test"

    val kafkaParams = Map("metadata.broker.list" -> brokerList, "group.id" -> groupId,
      "zookeeper.connect"->zookeeperConnect,
      "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString)

    val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc, kafkaParams,
      topics = Set(newsTopic)).map(_._1)

    kafkaStream.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){

        val rdds = rdd.union(rdd).union(rdd).union(rdd)
        val transform = rdds.map(news=>{
          if(news!=null){
            val split = NlpAnalysis.parse(news).toStringWithOutNature(" ")
            split
          }else{
            null
          }
        })

        val wordCount = transform.map(word=>(word, 1)).reduceByKey(_+_)
        wordCount.foreach(println)
        println(rdd.count())
      }
    })

    ssc.addStreamingListener(new BJJListener(appName, 10))

    ssc.start()
    ssc.awaitTermination()

  }
}

阻塞到什么程序发送邮件,可以自己决定,可以发送邮件,也可以发送钉钉等,比较方便。

EmailSendInfo 类



import java.util.Properties;

public class EmailSendInfo {

    // 发送邮件的服务器的IP和端口
    private String mailServerHost;
    private String mailServerPort = "25";
    // 邮件发送者的地址
    private String fromAddress;
    // 邮件接收者的地址
    private String toAddress;
    // 登陆邮件发送服务器的用户名和密码
    private String userName;
    private String password;
    // 是否需要身份验证
    private boolean validate = false;
    // 邮件主题
    private String subject;
    // 邮件的文本内容
    private String content;
    // 邮件附件的文件名
    private String[] attachFileNames;

    /**
     * 获得邮件会话属性
     */
    public Properties getProperties() {
        Properties p = new Properties();
        p.put("mail.smtp.host", this.mailServerHost);
        p.put("mail.smtp.port", this.mailServerPort);
        p.put("mail.smtp.auth", validate  "true" : "false");
        return p;
    }

    public String getMailServerHost() {
        return mailServerHost;
    }

    /**
     *
     * @param mailServerHost
     */
    public void setMailServerHost(String mailServerHost) {
        this.mailServerHost = mailServerHost;
    }

    public String getMailServerPort() {
        return mailServerPort;
    }

    /**
     *
     * @param mailServerPort
     */
    public void setMailServerPort(String mailServerPort) {
        this.mailServerPort = mailServerPort;
    }

    public boolean isValidate() {
        return validate;
    }

    /**
     *
     * @param validate
     */
    public void setValidate(boolean validate) {
        this.validate = validate;
    }

    public String[] getAttachFileNames() {
        return attachFileNames;
    }

    /**
     *
     * @param fileNames
     */
    public void setAttachFileNames(String[] fileNames) {
        this.attachFileNames = fileNames;
    }

    public String getFromAddress() {
        return fromAddress;
    }

    /**
     *
     * @param fromAddress
     */
    public void setFromAddress(String fromAddress) {
        this.fromAddress = fromAddress;
    }

    public String getPassword() {
        return password;
    }

    /**
     *
     * @param password
     */
    public void setPassword(String password) {
        this.password = password;
    }

    public String getToAddress() {
        return toAddress;
    }

    /**
     *
     * @param toAddress
     */
    public void setToAddress(String toAddress) {
        this.toAddress = toAddress;
    }

    public String getUserName() {
        return userName;
    }

    /**
     *
     * @param userName
     */
    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getSubject() {
        return subject;
    }

    /**
     *
     * @param subject
     */
    public void setSubject(String subject) {
        this.subject = subject;
    }

    public String getContent() {
        return content;
    }

    /**
     *
     * @param textContent
     */
    public void setContent(String textContent) {
        this.content = textContent;
    }
}

EmailAuthenticator 类


import javax.mail.*;

public class EmailAuthenticator extends Authenticator{

    private String userName;
    private String password;

    public EmailAuthenticator(){}

    public EmailAuthenticator(String username, String password) {
        this.userName = username;
        this.password = password;
    }

    protected PasswordAuthentication getPasswordAuthentication(){
        return new PasswordAuthentication(userName, password);
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

}

EmailSender


import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.util.Date;
import java.util.Properties;

public class EmailSender {

    private static boolean sendTextMail(EmailSendInfo mailInfo) {
        boolean sendStatus = false;//发送状态
        // 判断是否需要身份认证
        EmailAuthenticator authenticator = null;
        Properties pro = mailInfo.getProperties();
        if (mailInfo.isValidate()) {
            // 如果需要身份认证,则创建一个密码验证器
            authenticator = new EmailAuthenticator(mailInfo.getUserName(), mailInfo.getPassword());
        }
        // 根据邮件会话属性和密码验证器构造一个发送邮件的session
        Session sendMailSession = Session.getInstance(pro, authenticator);
        //【调试时使用】开启Session的debug模式
        sendMailSession.setDebug(true);
        try {
            // 根据session创建一个邮件消息
            MimeMessage mailMessage = new MimeMessage(sendMailSession);
            // 创建邮件发送者地址
            Address from = new InternetAddress(mailInfo.getFromAddress());
            // 设置邮件消息的发送者
            mailMessage.setFrom(from);
            // 创建邮件的接收者地址,并设置到邮件消息中
            Address to = new InternetAddress(mailInfo.getToAddress());
            mailMessage.setRecipient(Message.RecipientType.TO,to);
            // 设置邮件消息的主题
            mailMessage.setSubject(mailInfo.getSubject(), "UTF-8");
            // 设置邮件消息发送的时间
            mailMessage.setSentDate(new Date());
            // 设置邮件消息的主要内容
            String mailContent = mailInfo.getContent();
            mailMessage.setText(mailContent, "UTF-8");

            // 发送邮件
            Transport.send(mailMessage);
            sendStatus = true;
        } catch (MessagingException ex) {
            ex.printStackTrace();
        }
        return sendStatus;
    }


    public static void sendMail(String monitorTitle, String monitorContent){
        String fromaddr = "xxxx@yqzbw.com";
        String toaddr = "xxxx@yqzbw.com";
        String port = "25";
        String host = "smtp.exmail.qq.com";
        String userName = "xxxxxg@yqzbw.com";
        String password = "12345678";

        EmailSendInfo mailInfo = new EmailSendInfo();
        mailInfo.setMailServerHost(host);
        mailInfo.setValidate(true);
        mailInfo.setUserName(userName);
        mailInfo.setPassword(password);
        mailInfo.setFromAddress(fromaddr);
        mailInfo.setToAddress(toaddr);
        mailInfo.setSubject(monitorTitle);
        mailInfo.setContent(monitorContent);
        //发送文体格式邮件
        EmailSender.sendTextMail(mailInfo);
    }

}

以上实现比较简单,有牛逼的大神,补充后,可以私信我

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spark连接到MySQL并执行查询为什.. 下一篇从RDD的角度来看Spark内部原理

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目