设为首页 加入收藏

TOP

Kafka 生产者消费者 Java API 编程
2019-04-23 14:27:18 】 浏览:39
Tags:Kafka 生产者 消费者 Java API 编程
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/HG_Harvey/article/details/79198496

我们先创建一个topic,然后启动生产者和消费者,进行消息通信,然后在使用Kafka API编程的方式实现,笔者使用的ZK和Kafka都是单节点,你也可以使用集群方式。

启动Zookeeper

zkServer.sh start

启动Kafka

kafka-server-start.sh $KAFKA_HOME/config/server.properties

创建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_api

查看topic详细信息

[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_api
Topic:kafka_api PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: kafka_api    Partition: 0    Leader: 0   Replicas: 0 Isr: 0

启动生产者和消费者,测试消息通信

# 生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_api

这里写图片描述

# 消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_api

这里写图片描述

Java API 编程实现
1.创建maven项目,pom.xml中引入kafka依赖
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.11.0.0</version>
    </dependency>
</dependencies>
2.创建KafkaProperties类,配置Kafka相关属性
package com.bigdata.kafka;

/**
 * Kafka 相关属性配置类
 */
public interface KafkaProperties {

    // zookeeper连接,与server.properties中的zookeeper.connect属性一致,多个用逗号隔开,例如:zk01:2181,zk02:2181
    public static final String ZK = "Master:2181";

    // 如果是多个blocker,用逗号分隔即可,例如:kafka01::9092,kafka02:9093
    public static final String BLOCK_LIST = "Master:9092";

    // 主题
    public static final String TOPIC = "kafka_api";
}
3.Kafka Producer API 开发

生产者API中常用的类如下
Producer:生产者
ProducerConfig:生产者对应的配置
KeyedMessage:封装的消息对象

创建KafkaProducer类,代码如下

package com.bigdata.kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Kafka 生产者
 */
public class KafkaProducer extends Thread {

    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic) {
        this.topic = topic;

        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaProperties.BLOCK_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }

    @Override
    public void run() {
        int messageNo = 1;

        while(true) {
            String message = "message_" + messageNo;

            System.out.println("Send:" + message);

            producer.send(new KeyedMessage<Integer, String>(topic, message));

            messageNo ++;

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();
    }
}

运行上述代码,在控制台中使用命令启动一个消费者,观察控制台是否能接收到消息

这里写图片描述

4.Kafka Consumer API 开发

消费者API中常用的类如下
Consumer:消费者
ConsumerConnector:消费者连接器
ConsumerConfig:消费者对应的配置
KafkaStream:数据流

创建KafkaConsumer类,代码如下

package com.bigdata.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Kafuka 消费者
 */
public class KafkaConsumer extends Thread{

    private String topic;

    public KafkaConsumer(String topic) {
        this.topic = topic;
    }

    private ConsumerConnector createConsumer() {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", KafkaProperties.ZK);
        properties.setProperty("group.id", "testGroup");

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run() {
        // 创建Consumer
        ConsumerConnector consumer = createConsumer();

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);

        // 获取每次接受到的数据
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        // 不停地从stream中读取最新接收到的数据
        while(iterator.hasNext()){
            String message = String.valueOf(iterator.next().message());

            System.out.println("message:" + message);
        }
    }

    public static void main(String[] args) {
        new KafkaConsumer(KafkaProperties.TOPIC).start();
    }
}

运行生产者及消费者代码,观察控制台

生产者控制台(部分结果):

Send:message_5
Send:message_6
Send:message_7
Send:message_8
Send:message_9
Send:message_10

消费者控制台(部分结果):只接收最新的数据

message:message_7
message:message_8
message:message_9
message:message_10
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka集群安装,常用命令 下一篇kafka消费者分区分配策略及自定义..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目