设为首页 加入收藏

TOP

kafka 多消费者实现
2018-12-05 18:19:00 】 浏览:35
Tags:kafka 消费者 实现
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012447842/article/details/84823029

kafka官网:http://kafka.apache.org/quickstart

目录

kafka简单介绍:

实现方式

1:kafka分区

2: 实现结果

3:kafka的consumer代码

4:kafka生产者


kafka简单介绍(网上找的):

实现方式

必要条件:

kafka配置:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic TEST

topic存在多个分区(--partitions 5), 才会实现多个consumer消费一个topic, 注意:consumer的数量应小于partitions数量, 要不然会浪费。

误区: 多线程多个消费者, 在kafka多个线程消费者不安全

实现方式:

方法一: 开启多个进程消费者, 在每个进程里使用线程池异步做业务处理。

方法二:多个Consumer且每一个Consumer有自己的线程,

这里主要讲的方法一, 方法二(优秀人的博客):http://www.cnblogs.com/qizhelongdeyang/p/7355309.html

1:kafka分区

2: 实现结果

开启5个进程如下:

开启一个进程:

3:kafka的consumer代码

Kafka_Consumer.java

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;


public final class Kafka_Consumer {

/**
* kafka消费者不是线程安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;

public Kafka_Consumer() {
Properties props = new Properties();
props.put("bootstrap.servers",
"180.108.64.146:9099");//180.108.64.146:9099 kafka的服务器和端口号
props.put("group.id", "12334");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "100");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset","latest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("TEST"));
}


public void execute() {
executorService = Executors.newFixedThreadPool(6);  //线程池做异步清洗数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (null != records) {
executorService.submit(new ConsumerThread(records));
}
}
}

public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}

if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout");
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}



/**
* 线程池做业务处理, 将kakfa接收消息和业务分离开来
*/
class ConsumerThread implements Runnable {

    private ConsumerRecords<String, String> records;
public ConsumerThread(ConsumerRecords<String, String> records) {
this.records = records;
  }

@Override
public void run() {
for (ConsumerRecord<String, String> record : records) {
      System.out.println("当前线程:" + Thread.currentThread() + ","
+ "偏移量:" + record.offset() + "," + "主题:"
+ record.topic() + "," + "分区:" + record.partition()
+ "," + "获取的消息:" + record.value());

}

}

}

ConsumerMain.java

public class ConsumerMain {

public static void main(String[] args) {
Kafka_Consumer kafka_Consumer = new Kafka_Consumer();
try {

kafka_Consumer.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_Consumer.shutdown();
}
}
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>data</groupId>
    <artifactId>analyticCore</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>analyticCore</name>
    <url>http://maven.apache.org</url>

    <properties>
        <drools.version>5.3.1.Final</drools.version>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.drools</groupId>
            <artifactId>drools-core</artifactId>
            <version>${drools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.drools</groupId>
            <artifactId>drools-compiler</artifactId>
            <version>${drools.version}</version>
        </dependency>
        <!-- required for drools and spring integration -->
        <dependency>
            <groupId>org.drools</groupId>
            <artifactId>drools-spring</artifactId>
            <version>${drools.version}</version>
        </dependency>
        <dependency>
            <groupId>com.thoughtworks.xstream</groupId>
            <artifactId>xstream</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.0.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.9</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.35</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20180130</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore-nio</artifactId>
            <version>4.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                    <skipTests>true</skipTests>
                    <verbose>true</verbose>
                    <showWarnings>true</showWarnings>
                    <fork>true</fork>
                    <meminitial>128m</meminitial>
                    <maxmem>512m</maxmem>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <configuration>
                    <mainClass>data.analyticCore.consumerMain</mainClass>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/rules</directory>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
    </build>
</project>

4:kafka生产者

python代码:

from kafka import KafkaProducer
import json
import time
import random
import threading

producer = KafkaProducer(
                            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                            bootstrap_servers=['180.108.64.146:9099']
                         )
sj = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))


def send_msg(Num):
    for i in range(Num):
        time.sleep(1)
        data = {
            "name": "李四",
            "age": 23,
            "gender": "男",
            "id": i
        }
        producer.send('TEST', data)
        print("============%s" % i)
    end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print("===开始时间:%s" % sj)
    print("=====截止时间: %s" % end)
    producer.close()


def thread_start():
    num = 1
    Num = 2000
    Threads = []

    for i in range(num):
        Threads.append(threading.Thread(target=send_msg, args=(Num,)))
    for t in Threads:
        # t.setDaemon(True)
        t.start()

if __name__ == "__main__":
    send_msg(100000)


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka遇到的坑-- Error while fet.. 下一篇Kafka->Mongodb->Es

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目