版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012447842/article/details/84823029
kafka官网:http://kafka.apache.org/quickstart
目录
kafka简单介绍:
实现方式
1:kafka分区
2: 实现结果
3:kafka的consumer代码
4:kafka生产者
kafka简单介绍(网上找的):
实现方式
必要条件:
kafka配置:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic TEST
topic存在多个分区(--partitions 5), 才会实现多个consumer消费一个topic, 注意:consumer的数量应小于partitions数量, 要不然会浪费。
误区: 多线程多个消费者, 在kafka多个线程消费者不安全
实现方式:
方法一: 开启多个进程消费者, 在每个进程里使用线程池异步做业务处理。
方法二:多个Consumer且每一个Consumer有自己的线程,
这里主要讲的方法一, 方法二(优秀人的博客):http://www.cnblogs.com/qizhelongdeyang/p/7355309.html
1:kafka分区
2: 实现结果
开启5个进程如下:
开启一个进程:
3:kafka的consumer代码
Kafka_Consumer.java
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public final class Kafka_Consumer {
/**
* kafka消费者不是线程安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public Kafka_Consumer() {
Properties props = new Properties();
props.put("bootstrap.servers",
"180.108.64.146:9099");//180.108.64.146:9099 kafka的服务器和端口号
props.put("group.id", "12334");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "100");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset","latest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("TEST"));
}
public void execute() {
executorService = Executors.newFixedThreadPool(6); //线程池做异步清洗数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (null != records) {
executorService.submit(new ConsumerThread(records));
}
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout");
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
/**
* 线程池做业务处理, 将kakfa接收消息和业务分离开来
*/
class ConsumerThread implements Runnable {
private ConsumerRecords<String, String> records;
public ConsumerThread(ConsumerRecords<String, String> records) {
this.records = records;
}
@Override
public void run() {
for (ConsumerRecord<String, String> record : records) {
System.out.println("当前线程:" + Thread.currentThread() + ","
+ "偏移量:" + record.offset() + "," + "主题:"
+ record.topic() + "," + "分区:" + record.partition()
+ "," + "获取的消息:" + record.value());
}
}
}
ConsumerMain.java
public class ConsumerMain {
public static void main(String[] args) {
Kafka_Consumer kafka_Consumer = new Kafka_Consumer();
try {
kafka_Consumer.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_Consumer.shutdown();
}
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>data</groupId>
<artifactId>analyticCore</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>analyticCore</name>
<url>http://maven.apache.org</url>
<properties>
<drools.version>5.3.1.Final</drools.version>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
<version>${drools.version}</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>${drools.version}</version>
</dependency>
<!-- required for drools and spring integration -->
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-spring</artifactId>
<version>${drools.version}</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180130</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
<skipTests>true</skipTests>
<verbose>true</verbose>
<showWarnings>true</showWarnings>
<fork>true</fork>
<meminitial>128m</meminitial>
<maxmem>512m</maxmem>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<configuration>
<mainClass>data.analyticCore.consumerMain</mainClass>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/rules</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>
</project>
4:kafka生产者
python代码:
from kafka import KafkaProducer
import json
import time
import random
import threading
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=['180.108.64.146:9099']
)
sj = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
def send_msg(Num):
for i in range(Num):
time.sleep(1)
data = {
"name": "李四",
"age": 23,
"gender": "男",
"id": i
}
producer.send('TEST', data)
print("============%s" % i)
end = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print("===开始时间:%s" % sj)
print("=====截止时间: %s" % end)
producer.close()
def thread_start():
num = 1
Num = 2000
Threads = []
for i in range(num):
Threads.append(threading.Thread(target=send_msg, args=(Num,)))
for t in Threads:
# t.setDaemon(True)
t.start()
if __name__ == "__main__":
send_msg(100000)