什么是Kafka
Apache kafka是消息中间件的一种,是一个分布式的流平台,可以应用于高吞吐,高性能的消息队列服务.具体说明可以参考Apache
Kafka官网.下面简单的说下使用golang来实现Kafka的生产者和消费者.
安装Kafka
下载
官网下载页面:http://kafka.apache.org/downloads
1 2 3
|
wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz tar -zxvf kafka_2.12-0.10.2.1.tgz cd kafka_2.12-0.10.2.1
|
启动服务
1 2 3 4
|
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties &
|
创建Topic
1 2 3 4 5
|
// 创建Topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test // 列出Topic是否创建成功 bin/kafka-topics.sh --list --zookeeper localhost:2181
|
发送消息
向创建的test Topic 发送消息(生产者)
1 2 3
|
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
|
创建消费者
订阅一个test Topic,并进行消费
1 2 3
|
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
|
如果你的生产者和消费者是成功的话,消费者开启的时候是可以收到所有生产者的消息的.
生产者消费者具体实现
下载Kafka客户端Go语言Library
1
|
go get github.com/Shopify/sarama
|
官方语言客户端Library
生产者的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
package main import ( "bufio" "fmt" "os" "sarama" "strings" ) func main() { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close() msg := &sarama.ProducerMessage{ Topic: "testGo", Partition: int32(-1), Key: sarama.StringEncoder("key"), } var value string for { inputReader := bufio.NewReader(os.Stdin) value, err = inputReader.ReadString('\n') if err != nil { panic(err) } value = strings.Replace(value, "\n", "", -1) msg.Value = sarama.ByteEncoder(value) paritition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println("Send Message Fail") } fmt.Printf("Partion = %d, offset = %d\n", paritition, offset) } }
|
消费者的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
package main import ( "fmt" "sarama" "sync" ) var ( wg sync.WaitGroup ) func main() { consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } partitionList, err := consumer.Partitions("testGo") if err != nil { panic(err) } for partition := range partitionList { pc, err := consumer.ConsumePartition("testGo", int32(partition), sarama.OffsetNewest) if err != nil { panic(err) } defer pc.AsyncClose() wg.Add(1) go func(sarama.PartitionConsumer) { defer wg.Done() for msg := range pc.Messages() { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } }(pc) wg.Wait() consumer.Close() } }
|
PS:如果你的代码不是运行在loalhost这台机器上的话,需要修改config/server.properties
配置文件的listeners中的host,否则kafka服务端会拒绝你非localhost的连接请求,配置好后重启kafka服务.
测试运行
运行生产者
1
|
go run producer/main.go
|
运行消费者
1
|
go run consumer/main.go
|