设为首页 加入收藏

TOP

说说 MQ 之 Kafka(二)(二)
2018-10-28 10:11:10 】 浏览:504
Tags:说说 Kafka
ache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class SimpleProducerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094"); props.put("zookeeper.connect", "192.168.232.23:2181"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); String topic = "topic1"; Boolean isAsync = false; int messageNo = 1; while (true) { String messageStr = "Message_" + String.format("%05d",messageNo); long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } ++messageNo; } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "Send message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() + " to partition(" + metadata.partition() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }

上例中使用了同步和异步发送两种方式。在多副本的情况下,如果要指定同步复制还是异步复制,可以使用 acks 参数,详细参考官方文档 Producer Configs 部分的内容;在多分区的情况下,如果要指定发送到哪个分区,可以使用 partitioner.class 参数,其值是一个实现了 org.apache.kafka.clients.producer.Partitioner 接口的类,用于根据不同的消息指定分区6。消费者的 API 有几种,比较新的 API 如下,

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.232.23:9092");
        props.put("group.id", "test");
        props.put("enable.auto.c
首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Git内部原理之Git引用 下一篇说说 MQ 之 Kafka(一)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目