设为首页 加入收藏

TOP

kafka运行时某个节点丢失
2019-04-24 02:32:44 】 浏览:77
Tags:kafka 行时 某个 节点 丢失
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/change_on/article/details/86674045

最近在开发的过程中用到kafka做消息中间件,在测试高并发的时候,发现了一系列的问题。这两天也明白一点,做出产品跟做好产品是两回事!程序能跑起来跟程序能稳定运行更是两回事!
开发的过程中,只要测试数据能否通,业务逻辑能不能被执行,只需要测试小数据量。但是程序开发完成后,性能测试是另一个工作。

  • 第一个问题:kafka生产者数据包丢失
    这个原因大多是生产者配置没配好,kafka的通信有接收到数据之后发出的确认接收的信号
    acks:
    (1)acks=0: 设置为0表示producer不需要等待任何确认收到的信息
    (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入
    (3)acks=all:leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据
    request.required.acks
    (1)0: 表示producer从来不等待来自broker的确认信息
    (2)1:表示获得leader replica已经接收了数据的确认信息,这个选择时延较小同时确保了server确认接收成功。
    (3)-1:producer会获得所有同步replicas都收到数据的确认,同时时延最大

我给出自己的配置,在10000/s并发下,可以保证可靠性

props.put("compression.type", "gzip");
        props.put("linger.ms", "50");
        props.put("acks", "all");
        props.put("retries ", "30");
        props.put("reconnect.backoff.ms ", "20000");
        props.put("retry.backoff.ms", "20000");
		props.put("metadata.broker.list", brokerlist);
		props.put("serializer.class", "kafka.serializer.DefaultEncoder");
		props.put("producer.type", "async");
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");
		props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
		props.put("request.required.acks", "1");
		props.put("queue.buffering.max.ms", "5000");
		props.put("queue.buffering.max.messages", "10000");
		props.put("queue.enqueue.timeout.ms", "-1");
		props.put("batch.num.messages", "200");
  • 第二个问题:kafka消费者掉线(丢失)
    报的错:
java.io.IOException: Failed to process transaction type: 1 error: KeeperErrorCode = NoNode for /....
        at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:188)
        at org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:223)
        at org.apache.zookeeper.server.quorum.QuorumPeer.loadDataBase(QuorumPeer.java:417)
        at org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:409)
        at org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:156)
        at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
        at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:79)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /....
        at org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:250)
        at org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:186)

具体的原因还未查清,可以确定范围,在消费者的配置,经过不断调试和改进,这里分两步解决

(1)首先是消费者的配置文件,把zookeeper的会话超时时间设置长一些:

 //如果value合法,则自动提交偏移量
        props.put("enable.auto.commit", "false");
        //设置多久一次更新被消费消息的偏移量
        props.put("auto.commit.interval.ms", "1000");
        //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
        props.put("session.timeout.ms", "60000");
        props.put("rebalance.max.retries", "5");
		props.put("rebalance.backoff.ms", "11000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

(2)代码上面,控制消费速度,我选择控制生产速度,避免一下子产生太大的数据量
即使调整消费者配置,有时还是会掉线,所以在业务最开始的地方加一个队列控制,来一个任务入队列,完成一个任务从队列中移除,当队列超过10000的时候,此后每加一个任务sleep1s,不要堆太多,这当然是没有办法的办法:

if(CmdQueue.PRE_CMD_QUEUE.size() > 10000) {
	try {
		Thread.sleep(1000L);
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇在一台虚拟机上做kafka集群,使用J.. 下一篇kafka实战教学

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目