设为首页 加入收藏

TOP

java Kafka 简单应用实例
2019-01-21 02:25:56 】 浏览:72
Tags:java Kafka 简单 应用 实例

使用kafka有多种安装方式
1.单机模式 (分为windows下的模式linux下的模式);
2.伪分布式模式;
3.分布式模式;
具体搭建方式参考:https://blog.csdn.net/xlgen157387/article/details/77312569utm_source=blogxgwz0

下面是Linux下的单机模式

1、安装zookeeper

下载zookeeper-3.4.9.tar;

解压tar -zxvf zookeeper-3.4.9.tar;

进入zookeeper-3.4.9/conf目录创建zoo.cfg文件,内容如下:

tickTime=2000

dataDir=/usr/myenv/zookeeper-3.4.8/data        ##(填写自己的data目录)

dataLogDir=/usr/myenv/zookeeper-3.4.8/logs

clientPort=2181

启动zookeeper:

./yourZookeeperDir/bin/zkServer.sh start

2、安装kafka

下载kafka:http://kafka.apache.org/downloads;

解压kafka:tar -zxvf kafka_2.10-0.8.2.1.tar

修改config/server.propertie配置文件中zookeeper的host配置,由于zookeeper是在本地启动所以不需要修改:

server.propertie配置

启动kafka
./yourKafkaDir/bin/kafka-server-start.sh /yourKafkaDir/config/server.properties

3、kafka java 应用demo

maven依赖包(请注意kafka的版本)

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.11.0.1</version>
		</dependency>

kafka Producer:

package kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

	
	// Topic
    private static final String topic = "kafkaTopic";

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.11.100:9092");
        props.put("acks", "0");
        props.put("group.id", "1111");
        props.put("retries", "0");
         //设置key和value序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        int i = 1;

        // 发送业务消息
        // 读取文件 读取内存数据库 读socket端口
        while (true) {
            Thread.sleep(1000);
            producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
            System.out.println("key:" + i + " " + "value:" + i);
            i++;
        }
    }



}

kafka consumer:


package kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerDemo {

	private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    private static final String topic = "kafkaTopic";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.xx.xxx:9092");//单节点,kafka多节点时候使用,逗号隔开
        props.put("group.id", "1111"); //定义消费组
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
       
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topic));//订阅主题

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }


}

结果:
生产者不断生产
在这里插入图片描述
消费者不断消费:
在这里插入图片描述


参考文章:https://www.jianshu.com/p/0e378e51b442
https://blog.csdn.net/xlgen157387/article/details/77312569utm_source=blogxgwz0
https://blog.csdn.net/Evankaka/article/details/52494412utm_source=blogxgwz0
https://blog.csdn.net/evankaka/article/details/52421314

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka查询最新producer offset的.. 下一篇CentOs 6.5安装Kafka集群步骤

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目