设为首页 加入收藏

TOP

Kafka操作梳理
2018-11-13 16:37:07 】 浏览:246
Tags:Kafka 操作 梳理
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010649766/article/details/76057855

Kafka操作梳理

1.安装JDK

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default

如果你想安装Java 8(i.e Java 1.8), 在上面的命令中用java7代替java8.

2.下载kafka

进入下载页面:http://kafka.apache.org/downloads.html

选择Binary downloads下载 (Source download需要编译才能使用)

3.启动和停止

启动Zookeeper server:

bin/zookeeper-server-start.sh config/zookeeper.properties &  

&是为了能退出命令行

启动Kafka server:

bin/kafka-server-start.sh config/server.properties & 

停止Kafka server:

bin/kafka-server-stop.sh 

停止Zookeeper server:

bin/zookeeper-server-stop.sh 

4.单机连通性测试

创建 topic

创建一个叫做“test”的topic,它只有一个分区,一个副本。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

运行producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  

早版本的Kafka,–broker-list localhost:9092需改为–zookeeper localhost:2181

运行consumer:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test  

在producer端输入字符串并回车,查看consumer端是否显示。

查看 kafka 消费情况:


kafka-consumer-offset-checker.sh --zookeeper localhost:2181--topic topicName --group groupID

  • 字段详情:
Group Topic Pid Offset logSize Lag
消费者组 Topic ID Partition ID 当前已消费的条数 总条数 未消费的条数

5.分布式连通性测试

Zookeeper Server, Kafka Server, Producer都放在服务器server1上,ip地址为192.168.1.10

Consumer放在服务器server2上,ip地址为192.168.1.12。

分别运行server1的producer和server2的consumer

运行Producer:

bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test

运行Consumer:

bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning  

在producer的console端输入字符串,consumer报Connection refused错误:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2017-07-24 15:50:16,777] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,shuzilm,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96)
	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:68)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2017-07-24 15:50:16,793] WARN [console-consumer-19332_liujx-virtual-machine-1500882597887-d5c624e6-leader-finder-thread]: Failed to find leader for Set(test-0) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,shuzilm,9092))] failed
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:75)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:96)
	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:68)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
Caused by: java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:61)
    ... 3 more

broker, producer和consumer都注册到zookeeper上,producer和consumer的参数明确指定。问题出在broker的配置文件server.properties上:

    # Hostname the broker will bind to. If not set, the server will bind to all interfaces  
    #host.name=localhost  

host名称没有指定,就是127.0.0.1,consumer去broker拿数据就有问题。设置为192.168.1.10,重启服务就好了。

Golang

  • Golang kafka 消费客户端使用异步生产接口实现,但是如果消息写入过快,会存在 Input() 和 Successes() 的阻塞问题造成死锁,所以尽量不要用 select 模型,而要使用 goroutine 模型实现,避免一个循环中一个管道阻塞,无法释放其他管道。

  • 大量写 kafka 时 ,写 35000 条只有 9069 条成功,其他消息都会报此错误信息, 解决方法是加入 sleep(1*time.Nanosecond)

    kafka: Failed to produce message to topic test: kafka server: Message was too large, server rejected it to avoid allocation error.

Python

  • timestamp字段,如果是 CreateTime 返回-1;如果是 LogAppendTime,返回写入该条消息时 broker 的本地时间, Kafka broker config 提供了一个参数:log.message.timestamp.type 来统一指定集群中的所有 topic 使用哪种时间戳类型。用户也可以为单个 topic 设置不同的时间戳类型,具体做法是创建 topic 时覆盖掉全局配置.

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 --config message.timestamp.type=LogAppendTime
  • Version:kafka-python 模块的客户端,Kafka (0, 11, 0) 可以使用时间戳 Seek Offset 功能接口 offsets_for_times 来指定偏移,但是在 Kakfa (0, 10, 0) 版本:[Error 35] UnsupportedVersionError: offsets_for_times API not supported for cluster version (0, 10, 0)

for message in consumer:
    if start_stamp and message.timestamp < start_stamp:
        tp = TopicPartition( message.topic, message.partition )
        offsets = consumer.offsets_for_times( { tp : start_stamp } )
        consumer.seek( tp, offsets[tp].offset )
        continue

    if end_stamp and message.timestamp >= end_stamp:
        break
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇流式计算Storm和Kafka知识点 下一篇Kafka介绍, kafka主题的副本机制

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目