版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/m0_38001814/article/details/82628101
一、先上pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
二、消息提供者
package uyun.hornet.ticket.impl.service;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import uyun.hornet.ticket.impl.common.DataCallback;
import java.util.Properties;
/**
* Created by xujia on 2018/9/11
*/
public class KafkaProducter {
private final KafkaProducer<String, String> producer;
public final static String TOPIC = "itsm-test";
private KafkaProducter(){
Properties properties = new Properties();
//此处配置的是kafka的端口
properties.put("bootstrap.servers", "xxxxx");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks","-1");
producer = new KafkaProducer<>(properties);
}
void produce() {
//发送100条消息
int messageNo = 100;
int count = 200;
while (messageNo < count) {
String key = String.valueOf(messageNo);
String data = "hello kafka message " + key;
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, data);
producer.send(record, new DataCallback(startTime, data));
System.out.println(data);
messageNo++;
}
}
public static void main( String[] args )
{
new KafkaProducter().produce();
}
}
DataCallback为自定义回调类,如下
package uyun.hornet.ticket.impl.common;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* kafka回调函数
* Created by xujia on 2018/9/7
*/
public class DataCallback implements Callback {
private static final Logger logger = LoggerFactory.getLogger(DataCallback.class);
private final long startTime;
private final String message;
public DataCallback(long startTime, String message) {
this.startTime = startTime;
this.message = message;
}
/**
* 生产者成功发送消息,收到kafka服务端发来的ACK确认消息后,会调用此回调函数
* @param recordMetadata 生产者发送的消息的元数据,如果发送过程中出现异常,此参数为null
* @param e 发送过程中出现的异常,如果发送成功,则此参数为null
*/
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null) {
long endTime = System.currentTimeMillis() - startTime;
logger.info("callback success, message(" + message + ") send to partition("
+ recordMetadata.partition() + ")," + "offset(" + recordMetadata.offset() + ") in" + endTime);
} else {
e.printStackTrace();
}
}
}
三、消息消费者
package uyun.hornet.ticket.impl.service;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* Created by xujia on 2018/9/11
*/
public class KafkaConsumer22 {
private final Consumer<String, String> consumer;
private KafkaConsumer22() {
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxx");//服务器ip:端口号,集群用逗号分隔
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("itsm-test"));
}
void consume() {
while(true){
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() > 0) {
for (ConsumerRecord<String, String> record : records) {
String message = record.value();
System.out.println("从kafka接收到的消息是:" + message);
}
}
}
}
public static void main(String[] args) {
new KafkaConsumer22().consume();
}
}
该demo所传消息只是简单的字符串,若为对象可用ObjectMapper先转为json串发送,在消费端接收的时候转回来即可。