设为首页 加入收藏

TOP

Hadoop————Kafka强化
2019-03-14 14:28:47 】 浏览:13
Tags:Hadoop Kafka 强化
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/Jorocco/article/details/80807163

1、kafka的特点

分布式流处理平台。在系统之间构建实时数据流管道。以topic分类对记录进行存储,每个记录包含key-value+timestamp每秒钟百万消息吞吐量。

producer            //消息生产者
consumer            //消息消费者
consumer group      //消费者组
kafka server        //broker,kafka服务器
topic               //主题,副本数,分区.

这里写图片描述
这里写图片描述
2、安装kafaka

0.选择s202 ~ s204三台主机安装kafka

1.准备zk

2.jdk

3.tar文件

4.环境变量

5.配置kafka

[kafka/config/server.properties]
    ...
    broker.id=202
    ...
    listeners=PLAINTEXT://:9092
    ...
    log.dirs=/home/centos/kafka/logs
    ...
    zookeeper.connect=s201:2181,s202:2181,s203:2181

6.分发server.properties,同时修改每个文件的broker.id为IP地址最后一组,其他的也可以,只要保证唯一性即可。

7.启动kafka服务器

a)先启动zk

b)启动kafka

[s202 ~ s204]
$>bin/kafka-server-start.sh -daemon config/server.properties

c)验证kafka服务器是否启动

$>netstat -anop | grep 9092

8.创建主题

bin$> ./kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 3 --partitions 3 --topic test

9.查看主题列表

bin$> ./kafka-topics.sh --list --zookeeper s201:2181

10.启动控制台生产者

bin$> ./kafka-console-producer.sh --broker-list s202:9092 --topic test

11.启动控制台消费者

bin$> ./kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181

12.在生产者控制台输入hello world

3、kafka集群在zk的配置

-----------------------
    /controller         ===>    {"version":1,"brokerid":202,"timestamp":"1490926369148"

    /controller_epoch   ===>    1

    /brokers
    /brokers/ids
    /brokers/ids/202    ===>    {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
    /brokers/ids/203
    /brokers/ids/204    


    /brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
    /brokers/topics/test/partitions/1/state ===>...
    /brokers/topics/test/partitions/2/state ===>...

    /brokers/seqid      ===> null

    /admin
    /admin/delete_topics/test       ===>标记删除的主题

    /isr_change_notification

    /consumers/xxxx/
    /config

创建主题

每个分区都有leader,follow,follow就是副本个数,它具有容错性,直到所有的副本都挂掉,否则它还能保持容错。

repliation_factor 2 partitions 5:表示2个副本,五个分区,即每个分区具有2个副本,它是以文件夹的形式存在的
$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 2 --partitions 5 --create --topic test3

2 x 5 = 10 //10个文件夹

    [s202]
    test2-1         //
    test2-2         //
    test2-3         //

    [s203]
    test2-0
    test2-2
    test2-3
    test2-4

    [s204]
    test2-0
    test2-1
    test2-4

4、重新布局
这里写图片描述
重新布局分区和副本,手动再平衡

$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204

副本
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。
支持到n-1故障。每个分区都有leader,follow.
leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
新leader的选举是通过isr进行,第一个注册的follower成为leader。

kafka支持副本模式

同步复制

1.producer联系zk识别leader
2.向leader发送消息
3.leadr收到消息写入到本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack消息
7.leader收到所有follower的ack消息
8.leader向producer回传ack

这里写图片描述

异步副本
和同步复制的区别在与leader写入本地log之后,
直接向client回传ack消息,不需要等待所有follower复制完成。

5、通过java API实现消息生产者,发送消息

package cn.ctgu.kafkademo.test;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.junit.Test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by Administrator on 2017/3/31.
 */
public class TestProducer {

    //创建生产者
    @Test
    public void testSend(){
        Properties props = new Properties();
        //broker列表
        props.put("metadata.broker.list", "s2:9092");
        //串行化
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //
        props.put("request.required.acks", "1");

        //创建生产者配置对象
        ProducerConfig config = new ProducerConfig(props);

        //创建生产者
        Producer<String, String> producer = new Producer<String, String>(config);

        KeyedMessage<String, String> msg = new KeyedMessage<String, String>("test3","100" ,"hello world tomas100");
        producer.send(msg);
        System.out.println("send over!");
    }
    //创建消费者
    @Test
    public void testConsumer(){
        Properties props = new Properties();
        props.put("zookeeper.connect", "s2:2181");
        props.put("group.id", "g3");
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //创建消费者配置对象
        ConsumerConfig config = new ConsumerConfig(props);
        //
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test", new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)).createMessageStreams(map);
        List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test");
        for(KafkaStream<byte[],byte[]> stream : msgList){
            ConsumerIterator<byte[],byte[]> it = stream.iterator();
            while(it.hasNext()){
                byte[] message = it.next().message();
                System.out.println(new String(message));
            }
        }
    }
}

6、flume集成kafka
这里写图片描述
这里写图片描述
6.1KafkaSink [生产者]

        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type=netcat
        a1.sources.r1.bind=localhost
        a1.sources.r1.port=8888

        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.topic = test3
        a1.sinks.k1.kafka.bootstrap.servers = s202:9092
        a1.sinks.k1.kafka.flumeBatchSize = 20
        a1.sinks.k1.kafka.producer.acks = 1

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

6.2 KafkaSource[消费者]

        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
        a1.sources.r1.batchSize = 5000
        a1.sources.r1.batchDurationMillis = 2000
        a1.sources.r1.kafka.bootstrap.servers = s202:9092
        a1.sources.r1.kafka.topics = test3
        a1.sources.r1.kafka.consumer.group.id = g4

        a1.sinks.k1.type = logger

        a1.channels.c1.type=memory

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

6.3 Channel[生产者 + 消费者]

        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        a1.sources.r1.type = avro
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        a1.sinks.k1.type = logger

        a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
        a1.channels.c1.kafka.bootstrap.servers = s202:9092
        a1.channels.c1.kafka.topic = test3
        a1.channels.c1.kafka.consumer.group.id = g6

        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka实战——flume中消息输出到K.. 下一篇kafka顺序写实现原理   参..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }