设为首页 加入收藏

TOP

kafka集群安装,常用命令
2019-04-23 14:27:21 】 浏览:50
Tags:kafka 集群 安装 常用 命令

官方文档:http://kafka.apache.org/documentation/

其他参考:http://blog.csdn.net/code52/article/details/50475511

http://www.infoq.com/cn/articles/kafka-analysis-part-1

http://www.orchome.com/kafka/index

https://blog.csdn.net/qq_34834325/article/details/78743490

https://blog.csdn.net/gongxinju/article/details/53415051


安装环境:

  • slave1 192.168.255.121
  • slave2192.168.255.122
  • slave3192.168.255.123
  • java环境
  • (可选)zookeeper环境

安装步骤

1、下载解压

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.12-1.0.1.tgz -P /opt/software

[root@slave1] /usr/local$ tar -zxvf kafka_2.12-1.0.1.tgz -C /usr/local

[root@slave1] /usr/local$ ln -s kafka_2.12-1.0.1 kafka

2、修改zookeeper配置

kafka依赖zookeeper,所以先建立zk集群(部署zk集群,节点数最少为3个),直接使用kafka自带的zookeeper建立zk集群,

修改zookeeper.properties文件,所有节点配置相同

vim /usr/local/kafka/config/zookeeper.properties

dataDir=/usr/local/kafka/zookeeper  #数据文件路径,存储myid文件,需要手动创建
dataLogDir=/var/log/kafka/zookeeper  #日志路径,需要手动创建
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

server.1=192.168.255.121:2888:3888
server.2=192.168.255.122:2888:3888
server.3=192.168.255.123:2888:3888

创建myid文件,进入/usr/local/kafka/zookeeper,创建myid文件,将三个节点的myid文件分别写入1,2,3,(myid是zk集群用来发现彼此的标识,必须创建,且不能相同)

[root@slave1] /usr/local/kafka$ mkdir zookeeper
[root@slave1] /usr/local/kafka$ cat myid 
1

3、修改kafka配置

vim /server.properties,分别修改各个节点的不同的值

broker.id=0 #每个实例不一样,从0依次分配
port=9092 #将listeners改为端口
host.name=192.168.255.121    #将advertised.host.name改为host.name
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka #需手动创建,kafka并不会根据配置文件自动创建
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.255.121:2181,192.168.255.122:2181,192.168.255.123:2181
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
auto.create.topics.enable=false

3、启动

(1)先启动zookeeper,所有节点都要执行启动,这里注意一下,启动第一台和第二台节点的时候,可能会报错"Cannot open channel",这是正常情况,因为其他节点还没有启动。

bin/zookeeper-server-start.sh config/zookeeper.properties

[2018-11-13 07:57:41,263] INFO Reading configuration from: /usr/local/kafka/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-11-13 07:57:41,285] INFO Resolved hostname: 192.168.255.123 to address: /192.168.255.123 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-11-13 07:57:41,285] INFO Resolved hostname: 192.168.255.122 to address: /192.168.255.122 (org.apache.zookeeper.server.quorum.QuorumPeer)
[2018-11-13 07:57:41,286] INFO Resolved hostname: 192.168.255.121 to address: 

......

[2018-11-13 07:57:53,719] INFO Getting a diff from the leader 0x400000036 (org.apache.zookeeper.server.quorum.Learner)
[2018-11-13 07:57:55,889] INFO Received connection request /192.168.255.123:50880 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
[2018-11-13 07:57:55,896] INFO Notification: 1 (message format version), 3 (n.leader), 0x400000036 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x4 (n.peerEpoch) FOLLOWING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection)
[2018-11-13 07:57:55,903] INFO Notification: 1 (message format version), 2 (n.leader), 0x400000036 (n.zxid), 0x3 (n.round), LOOKING (n.state), 3 (n.sid), 0x5 (n.peerEpoch) FOLLOWING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection)
[2018-11-13 07:58:53,549] WARN Got zxid 0x600000001 expected 0x1 (org.apache.zookeeper.server.quorum.Learner)
[2018-11-13 07:58:53,549] INFO Creating new log file: log.600000001 (org.apache.zookeeper.server.persistence.FileTxnLog)

(2)再启动kafka,所有节点都要执行启动

[root@slave1] /var/log/kafka$ nohup /usr/local/kafka/bin/kafka-server-start.sh  /usr/local/kafka/config/server.properties > /var/log/kafka/startup.log 2>1 &
[1] 41595
[root@slave1] /var/log/kafka$ cat startup.log 
[2018-11-13 08:27:02,738] INFO KafkaConfig values: 
	advertised.host.name = null
	advertised.listeners = null
	advertised.port = null
	alter.config.policy.class.name = null
	authorizer.class.name = 

	......

[2018-11-13 08:27:04,121] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2018-11-13 08:27:04,162] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-13 08:27:04,261] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-11-13 08:27:04,274] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2018-11-13 08:27:04,490] INFO Creating /brokers/ids/0 (is it secure false) (kafka.utils.ZKCheckedEphemeral)
[2018-11-13 08:27:04,500] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2018-11-13 08:27:04,502] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(192.168.255.121,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-11-13 08:27:04,503] WARN No meta.properties file under dir /var/log/kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2018-11-13 08:27:04,580] INFO Kafka version : 1.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-13 08:27:04,580] INFO Kafka commitId : c0518aa65f25317e (org.apache.kafka.common.utils.AppInfoParser)
[2018-11-13 08:27:04,597] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

4、测试

(1)、在slave1创建一个名为test的topic

[root@slave1] ~$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.255.121:2181,192.168.255.122:2181,192.168.255.123:2181 --replication-factor 3 --partitions 3 --topic test
Created topic "test".

(2)、查看topic

[root@slave1] /usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper 192.168.255.121:2181
test

(3)、slave2和slave3上创建consumer

[root@slave2] /usr/local/kafka$ bin/kafka-console-consumer.sh --zookeeper 192.168.255.121:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].


[root@slave3] /usr/local/kafka$ bin/kafka-console-consumer.sh --zookeeper 192.168.255.121:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].


(3)、slave1 上创建producer,并输入一些消息

[root@slave1] /usr/local/kafka$ bin/kafka-console-producer.sh --broker-list 192.168.255.121:9092 --topic test
>111
>222
>

(4)、在消费者节点上可以看到消息已传输过来

如下图

[root@slave2] /usr/local/kafka$ bin/kafka-console-consumer.sh --zookeeper 192.168.255.121:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
111
222


[root@slave3] /usr/local/kafka$ bin/kafka-console-consumer.sh --zookeeper 192.168.255.121:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
111
222

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

这句话的意思:

现在正在使用的ConsoleConsumer已废弃,在未来的版本将会被删除。考虑通过[bootstrap-server]代替[zookeeper]来使用新的consumer,即创建消费者时,用以下命令:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.255.120:2181 --topic test --from-beginning

其他常用命令

停止kafka

/usr/local/kafka_2.12-1.0.1/bin/kafka-server-stop.sh

删除topic

报错排查

1、启动时报错:failed; error='Cannot allocate memory' (errno=12)

原因:内存不够

解决办法:

  • 加内存
  • 杀死占用内存高的进程
  • 修改kafka-server-start.sh
  • 将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 改为 export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

2、创建topic时,Replication factor: 3 larger than available brokers: 0.

解决办法:

查看创建命令中zookeeper的路径是否正确,必须与配置文件中zookeeper.connect项相同

kafka是什么

Apache kafka是消息中间件的一种,我发现很多人不知道消息中间件是什么,在开始学习之前,我这边就先简单的解释一下什么是消息中间件,只是粗略的讲解,目前kafka已经可以做更多的事情。

举个例子,生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。
鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、http什么的),也称为报文,也叫“消息”。
消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
各位现在知道kafka是干什么的了吧,它就是那个"篮子"。

http://www.orchome.com/kafka/index

术语

序号 名称 解释
1 Broker Cloud Kafka集群中的服务器
2 Topic 消息类别,Cloud Kafka是面向消息的
3 Partition

一个物理上分区的概念,一个Topic可以包含一个或者多个partition,Cloud Kafka以partition作为分配单位

每个partition在物理上对应一个文件夹,用户存储该partition中的消息和索引文件。例如,创建两个topic,topic1中存在5个partition,topic2中存在10个partition,则整个集群上会相应生成5+10=15个文件夹。

4 Replica partition 的副本,用于保障 partition 的高可用
5 Offset 消息在partition的唯一序号
6 Producer 生产者,负责发布消息
7 Consumer 消费者,从集群中消费消息
8 Consumer group 消费者分组,每个consumer必须属于一个consumer group。每条消息能被多个consumer group消费,但只能被该group中的一个consumer消费
9 Zookeeper 用于存储集群的meta数据、进行leader选举、故障容错等

业务场景简介

  • 适合的业务场景:
    • 需要使用消息中间件,实现上下游系统的解耦,并需要使用发布、订阅模式,需要多次订阅重复消费的业务
    • 消息量非常大,数据可靠性要求没有非常严格,可以接受极端情况下消息丢失、重复的业务
  • 不适合的业务场景:
    • 对消息可靠性非常高,不允许任何消息丢失的关键业务系统; 例如: 支付,订单;建议选择ActiveMQ, RocketMQ
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka-connect-fs 二次开发总结 下一篇Kafka 生产者消费者 Java API 编程

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目