设为首页 加入收藏

TOP

Kafka 的配置及应用
2018-11-13 16:37:02 】 浏览:86
Tags:Kafka 配置 应用
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_1290259791/article/details/79720337

Kafka 简介

1.简介
Kafka 对消息保存根据 Topic 进行归类,发送者称为 Producer,消息接受者称为 Consumer,Kafka 集群中有多个 Kafka 实例组成,每个实例称为 broker。无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性集群保存一些 meta 信息。
使用 Scala 语言编写,统一的信息收集平台,实时的收集反馈信息,能支撑较大的数据量,具备良好的容错能力。
这里写图片描述

2.Kafka 的特性

  1. 消息持久化:采用时间复杂度O(1)的磁盘结构顺序存储
  2. 高吞吐量:支持每秒百万级别的消息
  3. 易扩展:新增机器,集群无需停机,自动感知
  4. 高容错:通过多分区,多副本提供高容错性
  5. 多客户端支持:Java、PHP、Python等
  6. 实时性:进入到Kafka的消息能够立即被消费者消费

3.Topics
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型数字,是唯一标记一条消息。
这里写图片描述
在 Kafka 中即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.
consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制。
kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

4.生产者
负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”.事实上,消息被路由到哪个partition上,有producer决定.比如可以采用”random”“key-hash”“轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的.

其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件.
异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。

5.消费者
consumer端向broker发送”fetch”请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.

在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

Kafka 的配置

1.集群规划
使用三台机器部署,分别是 node1、node2、node3
2.下载 Kafka 安装包
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
3.使用 hadoop 用户将安装包上传到 node机器,并解压到/home/hadoop/apps 目录下
4.修改配置文件

 cd /home/hadoop/apps/kafka_2.11-0.10.2.1/config
 vim server.properties
server.properties配置文件添加修改的内容如下
 每个borker的id是唯一的,多个broker要设置不同的id broker.id=0
 访问端口号 port=9092
 访问地址 host.name=192.168.183.102
 允许删除topic delete.topic.enable=true
 存储数据路径,默认是在/tmp目录下,需要修改 log.dirs=/home/hadoop/apps/kafka_2.11-0.10.2.1/kafka-logs
 创建topic默认分区数 num.partitions=1
 数据保存时间,默认7天,单位小时 log.retention.hours=168
 Zookeeper访问地址,多个地址用逗号隔开 zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181

配置文件及 IDEA 下代码下载地址:
https://download.csdn.net/download/qq_1290259791/10312363

5.在/home/hadoop/apps/kafka_2.11-0.10.2.1创建kafka-logs文件夹 mkdir /home/hadoop/apps/kafka_2.11-0.10.2.1/kafka-logs

6.使用scp将配置好的kafka安装包拷贝到node2和node3两个节点

7.分别修改node04和node05的配置文件server.properties
node2的server.properties修改项
broker.id=1
host.name=192.168.183.103
node3的server.properties修改项
broker.id=2
host.name=192.168.183.104

8.分别在node03、node04、node05启动kafka
cd /home/hadoop/apps/kafka_2.11-1.0.1
启动的时候使用-daemon选项,则kafka将以守护进程的方式启动
bin/kafka-server-start.sh -daemon config/server.properties

Kafka 的使用

  1. 启动 Kafka

执行代码:

bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@node1 kafka_2.11-1.0.1]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@node1 kafka_2.11-1.0.1]$ jps
25715 DataNode
25443 JournalNode
25605 NameNode
3271 QuorumPeerMain
30551 Kafka
30568 Jps
26026 DFSZKFailoverController

多了 Kafka 进程

  1. 主题添加
    使用kafka自带的Producer客户端创建topic,创建topic名称为topictest1,3个分区,每个分区有2个副本
bin/kafka-topics.sh --create --zookeeper 192.168.56.101:2181 --replication-factor 2 --partitions 3 --topic topictest1
  1. 主题查看
    使用list查看Kafka 中已创建的主题列表
bin/kafka-topics.sh --list --zookeeper 192.168.56.101:2181 topictest1
[hadoop@node1 kafka_2.11-1.0.1]$ bin/kafka-topics.sh --list --zookeeper 192.168.56.101:2181 topictest1
__consumer_offsets
topictest1
[hadoop@node1 kafka_2.11-1.0.1]$ 

4.查看主题信息
使用describe查看主题信息

bin/kafka-topics.sh --describe --zookeeper 192.168.56.101:2181 --topic topictest1
[hadoop@node1 kafka_2.11-1.0.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.56.101:2181 --topic topictest1
Topic:topictest1    PartitionCount:5    ReplicationFactor:2 Configs:
    Topic: topictest1   Partition: 0    Leader: 2   Replicas: 0,2   Isr: 2,0
    Topic: topictest1   Partition: 1    Leader: 1   Replicas: 1,0   Isr: 1,0
    Topic: topictest1   Partition: 2    Leader: 2   Replicas: 2,1   Isr: 2,1
[hadoop@node1 kafka_2.11-1.0.1]$ 

5.给指定主题增加分区,不支持减少分区的操作
使用alter给topictest1主题分区由最初创建的3个分区,增加到5个分区

bin/kafka-topics.sh --alter --zookeeper 192.168.56.101:2181 --topic topictest1 --partitions 5

查看主题信息

[hadoop@node1 kafka_2.11-1.0.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.56.101:2181 --topic topictest1
Topic:topictest1    PartitionCount:5    ReplicationFactor:2 Configs:
    Topic: topictest1   Partition: 0    Leader: 2   Replicas: 0,2   Isr: 2,0
    Topic: topictest1   Partition: 1    Leader: 1   Replicas: 1,0   Isr: 1,0
    Topic: topictest1   Partition: 2    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: topictest1   Partition: 3    Leader: 2   Replicas: 0,2   Isr: 2,0
    Topic: topictest1   Partition: 4    Leader: 1   Replicas: 1,0   Isr: 1,0
[hadoop@node1 kafka_2.11-1.0.1]$ 

6.删除主题
使用 delete 删除主题

./kafka-topics.sh --delete --zookeeper 192.168.56.101:2181 --topic topictest1

7.生产者向主题发送消息
使用Kafka自带的生产者客户端脚本向topictest1主题发送消息

bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092 --topic topictest1
```![这里写图片描述](https://img-blog.csdn.net/20180327223206323watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzEyOTAyNTk3OTE=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
8.消费者从主题拉取消息
    使用kafka自带的消费者客户端脚本从topictest1主题拉取消息




<div class="se-preview-section-delimiter"></div>

bin/kafka-console-consumer.sh –zookeeper 192.168.56.102:2181 –from-beginning –topic topictest1






<div class="se-preview-section-delimiter"></div>

## IDEA下 Kafka 的消费者和生产者创建 ##

1.ProducerClient的创建





<div class="se-preview-section-delimiter"></div>

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
* Created by hubo on 2018/3/27
*/
public class ProducerClient {
public static void main(String[] args) {

    Properties properties = new Properties();
    //Kafka broker列表
    properties.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
    properties.put("acks","1");
    //key 和 value 字符串序列化
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    Producer<String,String> producer = new KafkaProducer<String,String>(properties);

    //用户产生随机数,模拟消息产生
    Random random = new Random();
    for(int i=0;i<20;i++){
        String ip = "192.168.1." + random.nextInt(255);
        long runtime = new Date().getTime();
        String msg = runtime + "-----" + ip;
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("send to kafka ->key:" + ip + "value:"+msg);
        producer.send(new ProducerRecord<String, String>("topictest1",ip,msg));
    }
    producer.close();
}

}

2.ConsumerClient的创建





<div class="se-preview-section-delimiter"></div>

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
* Created by hubo on 2018/3/27
*/
public class ConsumerClient {

/**
 * 手动提交
 */
public static void manualCommintClient(){

    Properties properties = new Properties();

    //kafka broker 列表
    properties.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
    properties.put("group.id","newautocget1");
    //自动提交
    properties.put("enable.auto.commit","false");
    // earliest表示从最早的偏移量开始拉取,
    // latest表示从最新的偏移量开始拉取,默认值latest
    // none表示如果没有发现该Consumer组之前拉取的偏移量则抛异常
    properties.put("auto.offset.reset","earliest");
    //key 和 value 字符串的反序列化
    properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList("topictest1"));
    //每次最少处理10条消息后才提交
    final int minBatchSize = 10;
    //用于保存消息的 List
    List<ConsumerRecord<String,String>> bufferList = new ArrayList<ConsumerRecord<String, String>>();
    while(true){
        System.out.printf("---------start pull message---------");
        long starttime = System.currentTimeMillis();
        //poll 需要传入一个超时时间,当没有可拉取的消息时先等待
        //如果遇到已超时时间还没有可拉取的消息则进行下一轮拉取,单位毫秒
        ConsumerRecords<String,String> records = consumer.poll(1000);
        long endtime = System.currentTimeMillis();
        long tm = (endtime-starttime) / 1000;
        System.out.println("------------end pull message and times " + tm + "s-----");

        for(ConsumerRecord<String,String> record : records){
            System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n",record.partition(),record.offset(),record.key(),record.value());
            bufferList.add(record);
        }
        System.out.println("-----------buffer size->" + bufferList.size());

        //如果读取的消息满足10条,进行处理
        if(bufferList.size() >= minBatchSize){
            System.out.println("********start deal message *******");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("manual commint offset start...");
            //处理完之后进行提交
            consumer.commitSync();
            //清除 list,继续接受
            bufferList.clear();
            System.out.println("manual commint offset end....");
        }
    }
}
/**
 * 自动提交便宜变量
 */
public static void autoCommintClient(){
    //Properties配置变量
    Properties properties = new Properties();
    //kafka broker 列表
    properties.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");
    properties.put("group.id","newautocget1");
    //自动提交
    properties.put("enable.auto.commit","true");

    properties.put("auto.commit.interval.ms","1000");
    properties.put("auto.offset.reset","earliest");
    //key 和 value 字符串的反序列化
    properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    //consumer 订阅topictest1主题,同时消费多个多个主题用,隔开
    consumer.subscribe(Arrays.asList("topictest1"));
    while(true){
        //poll 方法需要传入一个超时时间,当没有可以拉去的消息时先等待
        //如果已经超时还没有可以拉取的消息则进行下一轮拉取,单位毫秒
        ConsumerRecords<String, String> records = consumer.poll(1000);
        //处理拉取来的消息
        for(ConsumerRecord<String, String> record : records){
            System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n",record.partition(),record.offset(),record.key(),record.value());
        }

    }


}

public static void main(String[] args) {
    //自动提交 offset
   // autoCommintClient();
    //手动提交 offset
    //consumer 获取 topic 的数据,一遍获取一遍写到数据库,但是获取完之后自动提交 offset,写入数据库的进程断了。所以要等完全写进数据库再提交 offset
    manualCommintClient();

}

}

“`
先启动消费者,然后启动生产者,两者会产生通信
这里写图片描述
这里写图片描述

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka知识点总结二 下一篇Kafka Stream 类库的使用入门

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目