设为首页 加入收藏

TOP

kafka入门一:安装与使用
2019-05-06 14:29:06 】 浏览:46
Tags:kafka 入门 安装 使用

Kafak安装与使用

一、前言

kafka是Apache平台下的一种分布式发布/订阅消息系统,也就是消息中间件。在之前我使用的是ActiveMQ,初次接触Kafka,先从最基本的路数走起,后续再进行深入的学习。

二、Kafka下载与安装

Kafka版本:1.0.0

2.1 下载

下载地址:

https://www.apache.org/dyn/closer.cgipath=/kafka/1.0.0/kafka_2.12-1.0.0.tgz


以上为下载地址

2.2 linux下载命令,拿其中之一举例

$> wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

2.3 单机安装

解压、安装

$> mv kafka_2.12-1.0.0.tgz /usr/local
$> tar -zxf kafka_2.12-1.0.0.tgz

Kafka强依赖于ZooKeeper,启动Kafka必须先启动ZooKeeper。单机时在configure/server.properties中的默认配置为zookeeper.connect=localhost:2181,暂不用作修改,直连本机ZooKeeper即可

启动服务(后台)

$> ./bin/kafka-server-start.sh ./config/server.properties &

停止时,先停止Kafka,再停止ZooKeeper

$> ./bin/kafka-server-stop.sh
$> cd /usr/local/zookeeper-3.4.11/
$> ./bin/zkServer.sh stop

测试

创建topic。创建一个名为“test”的topic,他只有一个分区--partition,一个副本--replication-factor

$> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

验证topic是否创建成功

$> ./bin/kafka-topics.sh --list --zookeeper localhost:2181
test

在Console模式下,启动producer发送消息(ctrl c退出Console模式)

$> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 > love
 > and
 > peace

在Console模式下,启动consumer消费消息

$> ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
love
and
peace

在使用Console时,会有如下warning。提示是可以直连borker-list的

Using the ConsoleConsumer/Producer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

$> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2.4 集群部署

2.5 Kafka监控工具——KafkaOffsetMonitor安装部署

请自行百度安装包下载,需要注意一点的是,在jar包中有些js文件需要科学上网才能访问,相对应也有本地化修改的版本,我没有找到,如果你有,请联系我(正经脸)!

安装部署:

KafkaOffsetMonitor的所有运行资源已经打包为一个jar文件,我们可以新建一个单独的目录存放monitor文件。在同目录下,编写启动脚本:

vim monitorStart.sh

输入以下内容

#! /bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
 com.quantifind.kafka.offsetapp.OffsetGetterWeb \# 指明运行Web监控的类
 --zk localhost:2181 \# 连接的ZooKeeper服务器地址
 --port 8080 \# 监控器Web运行端口
 --refresh 10.seconds \# 页面数据刷新时间
 --retain 1.days # 页面数据保留时间

更改sh文件可执行权限

chmod u+x monitorStart.sh

后台启动监控程序,即可访问(注意是否该科学上网)

nohup ./monitorStart.sh &

访问虚拟机服务器地址,我的是192.168.81.129:8080,如下,第一次访问时为空白,进行一次消费即可



三、Java客户端

首先在kafka服务器上新生成一个topic

$> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic java-topic_test

Java端消息生产者

public class JavaKafkaProducer {
    private Logger logger = Logger.getLogger("JavaKafkaProducer");
    /**
     * 设置实例生产消息的总数
     */
    private static final int MSG_SIZE = 10;
    /**
     * 主题名称
     */
    public static final String TOPIC = "java-topic_test";
    /**
     * kafka服务器节点
     */
    private static final String BROKER_LIST = "192.168.81.129:9092";

    private static KafkaProducer<String,String> producer = null;

    static {
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }

    // 1.kafka producer参数设置
    private static Properties initConfig() {
        Properties props = new Properties();
        // broker列表
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        // 设置序列化的类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        /**
         * 0        不等待结果返回
         * 1        等待至少有一个服务器返回数据接收标识
         * -1或all   表示必须接受到所有的服务器返回标识,及同步写入
         */
        props.put("request.required.acks", "0");
        /**
         * 内部发送数据是异步还是同步
         * sync 同步,默认
         * async异步
         */
        props.put("producer.type", "async");

        // bootstrap.servers地址,必须指定
        props.put("bootstrap.servers", "192.168.81.129:9092");

        // 设置分区类,可以使用自定义分区类
//        props.put("partitioner.class", "JavaKafkaProducerPartitioner");

        // 延迟发送时间
//        props.put("linger.ms","1");

        // 重试次数
        props.put("message.send.max.retries", 0);

        // 异步提交的时候(async),并发提交的记录数
        props.put("batch.num.message", 200);

        // 设置缓冲区大小,默认10kb
        props.put("send.buffere.bytes", "102400");
        return props;
    }

    /**
     * 产生一个消息
     */
    private static String generateMessage() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 5; i++) {
            sb.append((new Random()).nextInt(20)).append(" ");
        }
        return sb.toString();
    }

    public static void main(String[] args) {
        ProducerRecord<String, String> record = null;
        String message = null;
        try {
            int num = 0;
            for (int i = 0; i<MSG_SIZE; i++) {
                message = generateMessage();
                System.out.println(message);
                record = new ProducerRecord<String, String>(TOPIC,"SCDN", message);
                producer.send(record, new Callback() {
                    public void onCompletion (RecordMetaData recordMetadata, Exception e) {
                        if(e != null) {
                            e.printStackTrace();
                            System.out.println("发送消息失败!");
                        } else {
                            System.out.println("发送消息成功!");
                        }
                    }
);
                if(num++ % 10 == 0) {
                    Thread.sleep(2000);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (producer != null) {
                producer.close();
                producer = null;
            }
        }
    }
}

自定义分区器

public class JavaKafkaProducerPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

    /**
     * 无参构造
     */
    public JavaKafkaProducerPartitioner() {
        this(new VerifiableProperties());
    }

    /**
     * 构造函数,必须给定
     *
     * @param properties
     */
    public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
        // nothing
    }

    private static int toPositive(int number) {
        return number & 0x7fffffff;
    }
// 分区方法
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 调试使用
        System.out.println("key is " + key);
        System.out.println("value is " + new String(valueBytes)); // 此方法和下面的方法都是打印value的值
        System.out.println("value is " + value);

        return new Random().nextInt(100) % numPartitions; // 返回分区
    }

    public void close() {

    }

    public void configure(Map<String, > map) {

    }
}

在kafka服务器中使用如下命令查看相应主题下的offset偏移量日志

$> cd $kafka
$> ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/java-topic_test-0/00000000000000000000.log --print-data-log

Java端消费者

public class JavaKafkaConsumer {

    static Properties props = new Properties();

    static {
        props.put("bootstrap.servers", "192.168.81.129:9092");
        props.put("group.id", "test");
        props.put("client.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    }

    public static void main(String[] args) {
        // 1.初始化消费者
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 2.订阅主题,指定一个监听器,用于在消费者发生平衡操作时回调响应的业务处理
        consumer.subscribe(Arrays.asList("java-topic_test"), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                consumer.commitAsync(); // 提交偏移量
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // 获取该分区下已消费的偏移量
                long commitedOffset = -1;
                for (TopicPartition topicPartition : partitions) {
                    // 获取该分区下已消费的偏移量
                    commitedOffset = consumer.committed(topicPartition).offset();
                    // 重置偏移量到上一次提交的偏移量下一个位置处开始消费
                    consumer.seek(topicPartition, commitedOffset + 1);
                }
            }
        });

        try {
            while (true) {
                // 长轮询拉取消息
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String , String> record : records) {
                    System.out.printf("partition = %d, offset = %d,key= %s value = %s%n",
                            record.partition(), record.offset(),
                            record.key(),record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

ProducerConsole

121411218
key is CSDN0
value is 121411218
partition is 3
-----------
消息发送成功!

ComsumerConsole

partition = 3, offset = 42,key= CSDN0 value = 121411218
成功生产和消费



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka的安装和简单实例测试 下一篇spark+kafka+idea+sbt+scala踩坑

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目