设为首页 加入收藏

TOP

Kafka的多任务线程消费测试
2019-05-06 14:29:21 】 浏览:68
Tags:Kafka 任务 线程 消费 测试
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/linhaiyun_ytdx/article/details/83154675

代码:

package com.weichai.kafka;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

/**
 * 消费者的线程执行
 * @author lhy
 * @date 2018.10.09
 */
public class Consumer implements Runnable {

	private KafkaStream stream;
	private int threadNumber;
	
	public Consumer(KafkaStream stream, int threadNumber) {
		this.stream = stream;
		this.threadNumber = threadNumber;
	}

    /**
     * 线程执行
     */
	@Override
	public void run() {
		// TODO Auto-generated method stub
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		while(it.hasNext()){
			System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
		}
		System.out.println("Shutting down Thread: " + threadNumber);
	}

}
package com.weichai.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/**
 * Kafka消费者的多线程调用,提高吞吐效率
 * @author lhy
 * @date 2018.10.09
 */
public class ConsumerThread {

	private final ConsumerConnector consumer;
	private final String topic;
	private ExecutorService executor;
	
	public ConsumerThread(String a_zookeeper, String a_groupId, String a_topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
		this.topic = a_topic;
	}
	
	public void shutdown() {
		if(consumer !=null){
			consumer.shutdown();
		}
		if(executor !=null){
			executor.shutdown();
		}
		try {
			if(!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){
				System.out.println("消费者线程等待超时,直接退出!");
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("系统异常中断,直接退出!");
			e.printStackTrace();
		}
	}
	
	public void run(int a_numThreads) {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(a_numThreads));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        
		executor = Executors.newFixedThreadPool(a_numThreads);
		
		int threadNumber = 0;
		for (final KafkaStream stream : streams) {
			executor.submit(new Consumer(stream, threadNumber));
			threadNumber++;
		}
	}
	private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
		// TODO Auto-generated method stub
		Properties props = new Properties();
		props.put("zookeeper.connect", a_zookeeper);
		props.put("group.id", a_groupId);
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		
		return new ConsumerConfig(props);
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		String zooKeeper = "localhost:2181";
		String groupId = "0";
		String topic = "SimpleNode";
		int threads = 5; // 启动的线程数
		
		ConsumerThread thread = new ConsumerThread(zooKeeper, groupId, topic);
		thread.run(threads);
		try {
			Thread.sleep(5000);      //线程休眠5秒后终止
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		thread.shutdown();
	}
}

运行截图:

此处重温了并发编程的内容,使用到了多线程框架,开启5个线程吸收Topic,大大提高了Kafka消费Topic的效率.

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spring Cloud Stream Kafka 特定.. 下一篇Kafka的安装和简单实例测试

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目