设为首页 加入收藏

TOP

Kafka JavaApi中消费者与生产者的配置
2019-04-23 14:32:03 】 浏览:55
Tags:Kafka JavaApi 消费者 生产者 配置

文件目录如下:
在这里插入图片描述

1.ConsumerDemo配置

package com.course.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

    public static void main(String[] args) {

        Properties pros = new Properties();
        pros.put("bootstrap.servers","localhost:9092");
        pros.put("group.id","test");  // 用来表示consumer进程所在组的一个字符串,如果设置同样的group_id,表示这些进程都是属于同一个consumer——group
        pros.put("enable.auto.commit","true"); // 如果设置为true,consumer所接收到的消息的offset将会自动同步到zookeeper
        pros.put("auto.commit.interval.ms","1000"); // consumer向zookeeper提交offset的频率,单位是秒
        pros.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        pros.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pros);
        consumer.subscribe(Arrays.asList("my_test"));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String>record:records){
                System.out.printf("offset = %d, key = %s , value = %s%n",record.offset(),record.key(),record.value());
            }
        }
    }
}

2.ProducerDemo配置

package com.course.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {
        Properties prop = new Properties();

        prop.put("bootstrap.servers","localhost:9092");
        prop.put("acks","all");  // 生产者需要server接收到数据之后,要发出一个确认接收的信号
                                    // 0 producer不需要等待任何确认的消息
                                    // 1 意味着至少要等待leader已经成功将数据写入本地log,并不意味着所有follower已经写入
                                    // all 意味着leader需要等待所有备份都成功写入到日志中

        prop.put("retries",0); // 重试次数

        // 比如有两条消息, 1 和 2 。1先来,但是如果1发送失败了,重试次数为1.2就会接着发送数据,然后1再发一次,这样会改变消息发送的顺序

        prop.put("buffer.memory",33554432); // 缓存大小
        prop.put("batch.size",1000); // producer试图批量处理消息记录。目的是减少请求次数,改善客户端和服务端之间的性能。
        // 这个配置是控制批量处理消息的字节数。如果设置为0,则禁用批处理。如果设置过大,会占用内存空间.

        prop.put("linger.ms",1);
        prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(prop);
        for (int i = 0 ; i < 100 ; i++){
            producer.send(new ProducerRecord<String, String>("my_test", Integer.toString(i+1),Integer.toString(i)));
        }

        producer.close();
    }
}

然后我们可以先运行ProducerDemo,再运行ConsumerDemo,可以看到生产者发送的信息。
在这里插入图片描述

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka Shell基本命令(包括topic.. 下一篇kafka lag 监控脚本

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目