设为首页 加入收藏

TOP

kafka的partition如何分布到不同的broker上,consumerGroup组员和partition之间如何做负载均衡,kafka常用命令
2019-03-19 14:14:06 】 浏览:90
Tags:kafka partition 如何 分布 不同 broker consumerGroup 组员 之间 负载 均衡 常用 命令
版权声明:转发请注明,谢谢配合 https://blog.csdn.net/qq_31289187/article/details/80910775

1、partition如何分布到不同的broker上

下面给出kafka在实现分区分布到各个broker上的算法实现,可以通过创建topic,设置副本数验证

public void kafkaProducter(){
        //partitions创建的分区,比如我创建了一个topic,
        // 设置的副本是1时,partitions = partition * 1;
        // 设置的副本为2时,partitions = partition * 2;
        List<String> partitions = new LinkedList<>();
        partitions.add("p0");
        partitions.add("p1");
        partitions.add("p2");
        partitions.add("p3");
        partitions.add("p0");//副本
        partitions.add("p1");//副本
        partitions.add("p2");//副本
        partitions.add("p3");//副本
        //borkers是kafka集群
        List<String> brokers = new LinkedList<>();
        brokers.add("b1");
        brokers.add("b2");
        brokers.add("b3");
        for(int i = 0;i<partitions.size();i++){
            System.out.println("分区"+partitions.get(i)+"在:"+brokers.get(i%brokers.size())+"的broker上");
        }
    }

测试结果:


2、consumerGroup组员和partition之间如何做负载均衡

通过设置消费者的数量,验证下面的demo

public void kafkaConsumer(){
        List<String> partitions = new LinkedList<>();
        partitions.add("p0");
        partitions.add("p1");
        partitions.add("p2");
        partitions.add("p3");
        List<String> consumers = new LinkedList<>();
        consumers.add("c1");
        consumers.add("c2");
        consumers.add("c3");
        consumers.add("c4");
        consumers.add("c5");
        consumers.add("c6");
        //向上取整,计算每个消费者对应几个分区
        int m = (int) Math.ceil(partitions.size()*1.0/consumers.size());
        System.out.println("m:"+m);
        for (int i = 0;i<consumers.size();i++){
            System.out.println("消费者"+consumers.get(i)+",对应的分区:");
            for(int j=0;j<m;j++){
                //如果下标大于等于partitions的元素个数,break
                if(i*m+j >= partitions.size()){
                    break;
                }
                System.out.println(partitions.get(i*m+j));
            }
        }
    }

测试结果:


3、kafka常用命令

a、创建topic

bin/kafka-topics.sh --zookeeper ip:2181 --create --topic topic名字   --partitions 分区数量  --replication-factor 副本数量

b、删除topic,执行命令之后不是马上删除

bin/kafka-topics.sh --zookeeper ip:2181 --delete --topic topic名字

c、查看各个分区数据量

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --time -1 --topic topic名字

d、查看topic列表

bin/kafka-topics.sh --list --zookeeper ip:3181

e、查看topic表述

bin/kafka-topics.sh --zookeeper ip:3181 --topic topic名字 --describe

f、向对应topic发送消息

bin/kafka-console-producer.sh --broker-list ip:9092 --topic topic名字

g、消费消息

bin/kafka-console-consumer.sh --zookeeper ip:2181 --topic topic名字 --from-beginning


欢迎大家来吐槽,内容有问题,会及时修改,谢谢!!!


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume 自定义Sink之kafkaSink 下一篇大数据系列之实时处理Storm(五)..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目