设为首页 加入收藏

TOP

分布式日志传送工具kafka
2018-11-13 16:36:59 】 浏览:85
Tags:分布式 日志 传送 工具 kafka

要点:

  1. 什么是kafka, 使用场景有哪些
  2. kafka优缺点
  3. 如何使用kafka( shell接口, java api )

point1: 简介

1, Kafka是一个分布式发布-订阅消息系统

最初由LinkedIn公司开发,之后成为Apache项目的一部分 (Kafka是高吞吐量的,可分区的,冗余备份的,可插拔式扩展的,持久性的日志服务,主要用于处理活跃的流式数据)

( kafka 采用linux Zero-Copy提高发送性能, 实现高吞吐量)

2, 什么是发布-订阅消息系统?

Java消息服务JMS( java Message Service),是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 JMS规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic),这两种模式主要区别就是发送到队列的消息能否重复消费(多次订阅)

点对点: 生产者生产消息,发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消费者不能消费已经被消费完的消息( queue 支持多个消费者,但是对一个消息而言,只会有一个消费者可以消费 )

发布/订阅:生产者生产消息,发布到topic中,同时有多个消息消费者(订阅)消费该消息(发布到topic的消息会被所有订阅者消费 )

3, Kafka client 消息接收的三种模式( 对JMS的消息传输担保的实现)

at most once: 最多一次

----->props.put("enable.auto.commit", "true"), client不调用commitSync()

at least once: 消息至少发送一次

----->props.put("enable.auto.commit", "false"), consumer.commitAsync()

exactly once: 消息只会发送一次

----->props.put("enable.auto.commit", "false"),catch异常时:consumer.seek(topicPartition,record.offset())

point2:优缺点

特点1:kafka 只能能保证一个Partition内, 消息的有序性(默认为异步发送消息, 若topic有多个partition, 在一个消费者组内若只有一个consumer就不能保证消息的有序性了==>此时,要实现消息有序,只能改变topic的分区数为1即可)

特点2: 实时性, 消息被生成者线程生产就能马上被消费者线程消费

特点3: 提供分区数据的复制和备份,确保至少有一份数据是可用的,不会丢失数据

特点4: 结合zookeeper使用,在Controller出现异常的情况之下,会从Broker里面自动地选择一个Broker成为新的Controller,而Controller的主要职责是管理整个集群的分区和副本的状态,所以当出现“脑裂”就会造成数据混乱的问题

point3:使用场景

1,网站活动跟踪场景: 按照用户的行为(注册、登录、购买)等进行切分为不同的topic, 进行数据分析

2,构建日志分析平台: 结合flume使用, 采集日志的时候业务是无感知无侵入的,数据可以sink到hive、hbase的hadoop组件

3,数据的多路转发: 充当flume的channel角色,多个channel可以实现多个sink路径

point4: kafka api的使用(shell, java)

1, shell接口: 安装kafka----> tar -xzvf kafka_2.9.2-0.8.1.1.tgz ; cd kafka_2.9.2-0.8.1.1

单机版kafka:

#启动zk, kafka:  
  bin/zookeeper-server-start.sh -daemon config/zookeeper.properties ; bin/kafka-server-start.sh -daemon config/server.properties

#创建topic :
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic t1

# 查看topic :
    bin/kafka-topics.sh --list --zookeeper localhost:2181)

#启动生产者:
     bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 (阻塞时,可以输入消息)

#启动消费者: 
     bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1 --from-beginning

集群版kafka: 配置config/server.properties

kafka1机器:  broker.id=101
             zookeeper.connect=zk1:2181,zk2:2181

kafka2机器:  broker.id=102
             zookeeper.connect=zk1:2181,zk2:2181

使用idea,编写生产者/消费者

         <dependency>
                   <groupId>org.apache.kafka</groupId>
                   <artifactId>kafka_2.12</artifactId>
                   <version>1.1.0</version>
        </dependency>

1,启动入口

 public static void main(String[] args) throws InterruptedException {
        //kafka消费, 生产
        produce();
        consume();
  }

2,生产者

private static void produce() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //生产者
        final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        final String topic="t1";
        final RecordHeader header1 = new RecordHeader("city", "bj".getBytes());
        final RecordHeader header2 = new RecordHeader("location", "haidian".getBytes());
        final RecordHeaders headers = new RecordHeaders(new RecordHeader[]{header1, header2});

        new Thread(){
            @Override
            public void run() {
                int a=0;
                while (true){
                    a++;
                    //分区内生产:(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
                    //String topic, K key, V value) {
                    ProducerRecord<String, String> record =
                            new ProducerRecord<String, String>(topic,0, "key"+ a, "val"+a,  headers  );
                    producer.send(record);
                }
            }
        }.start();
    }

3,消费者

private static void consume() {
        Properties props = new Properties();
        props.put("bootstrap.servers","localhost:9092");
        props.put("group.id", "test");//必须指定组:否则报错 ERROR AbstractCoordinator:The configured groupId is invalid
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        String topic="t1";

        //消费
        final KafkaConsumer consumer = new KafkaConsumer(props) ;
        consumer.subscribe(Collections.singletonList(topic));
        System.out.println("Subscribed to topic " + "t1");

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + ", val= " + record.value()+ ", partition= "+record.partition());
            }
        }
    }

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka Stream 类库的使用入门 下一篇Storm 0.9.3的新特性

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目