文件目录如下:
1.ConsumerDemo配置
package com.course.test;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Properties pros = new Properties();
pros.put("bootstrap.servers","localhost:9092");
pros.put("group.id","test"); // 用来表示consumer进程所在组的一个字符串,如果设置同样的group_id,表示这些进程都是属于同一个consumer——group
pros.put("enable.auto.commit","true"); // 如果设置为true,consumer所接收到的消息的offset将会自动同步到zookeeper
pros.put("auto.commit.interval.ms","1000"); // consumer向zookeeper提交offset的频率,单位是秒
pros.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
pros.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pros);
consumer.subscribe(Arrays.asList("my_test"));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String,String>record:records){
System.out.printf("offset = %d, key = %s , value = %s%n",record.offset(),record.key(),record.value());
}
}
}
}
2.ProducerDemo配置
package com.course.test;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerDemo {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("bootstrap.servers","localhost:9092");
prop.put("acks","all"); // 生产者需要server接收到数据之后,要发出一个确认接收的信号
// 0 producer不需要等待任何确认的消息
// 1 意味着至少要等待leader已经成功将数据写入本地log,并不意味着所有follower已经写入
// all 意味着leader需要等待所有备份都成功写入到日志中
prop.put("retries",0); // 重试次数
// 比如有两条消息, 1 和 2 。1先来,但是如果1发送失败了,重试次数为1.2就会接着发送数据,然后1再发一次,这样会改变消息发送的顺序
prop.put("buffer.memory",33554432); // 缓存大小
prop.put("batch.size",1000); // producer试图批量处理消息记录。目的是减少请求次数,改善客户端和服务端之间的性能。
// 这个配置是控制批量处理消息的字节数。如果设置为0,则禁用批处理。如果设置过大,会占用内存空间.
prop.put("linger.ms",1);
prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(prop);
for (int i = 0 ; i < 100 ; i++){
producer.send(new ProducerRecord<String, String>("my_test", Integer.toString(i+1),Integer.toString(i)));
}
producer.close();
}
}
然后我们可以先运行ProducerDemo,再运行ConsumerDemo,可以看到生产者发送的信息。