版权声明:本文为博主原创文章,艾特米工作室,欢迎转载 https://blog.csdn.net/zhaozao5757/article/details/79712259
消息驱动/Kafka的使用
1、下载Zookeeper/Kafka
ZooKeeper
Kafka依赖于Zookeeper,Zookeeper是一个服务的管理框架,在启动Kafka(2.11)服务之前,需要先启动Zookeeper(3.4.8)
Kafka
2、启动Zookeeper
- 进入%Zookeeper_Home%\conf
- 复制zoo_sample.cfg,并更名为zoo.cfg
- 进入%Zookeeper_Home%\bin,进入命令行窗口
- 使用zkServer命令,启动zookeeper,默认端口2181
3、启动Kafka
- 进入%kafka_Home%bin\windows,进入命令行窗口
- 默认是使用%kafka_Home%\config中的server.properties,启动kafka
- 使用 kafka-server-start ../../config/server.properties,启动kafka,默认9092端口
4、创建atm_kafka_client
4.1、引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.9</version>
</dependency>
4.2、SendMessage
package com.atm.cloud;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* 向Kafka服务器发送消息
*/
public class SendMessage {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(
props);
ProducerRecord record = new ProducerRecord<String, String>("my-topic",
"userName", "aitemi");
producer.send(record);
producer.close();
}
}
- 执行main
- 进入%kafka_Home%bin\windows,进入命令行窗口,键如 kafka-topic –list –zookeeper localhost:2181
- 查看新建的topic
- kafka中的topic类似rabbitmq中的队列,我们刚刚向topic中发送了一个消息
4.3、ReadMessage
package com.atm.cloud;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 消费者,订阅"my-topic",获取其中的信息
*/
public class ReadMessage {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("这是消费者A,key: " + record.key() + ", value: "
+ record.value());
}
}
}
}
4.4、消费者组
- 消费者会为自己添加一个消费者组,每一条发布到topic的记录都会被交付到消费者组
- 如果多个消费者实例有相同消费者组,那么信息会分配其中一个消费实例上
- 如果所有的消费者都有不同的消费者组,那么消息会被广播到全部的消费者进行处理
- 通过这样的机制来实现负载均衡的功能
props.put("group.id", "test");