设为首页 加入收藏

TOP

Kafka 编写自己的producer、partitioner和consumer
2019-02-12 02:29:39 】 浏览:33
Tags:Kafka 编写 自己 producer partitioner consumer

1. 简单的Producer

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;

public class MyProducer {
	
	@Test
	public void testProducer(){
		 Properties props = new Properties();
		 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos1:9092");
		 props.put(ProducerConfig.ACKS_CONFIG, "all");
		 props.put(ProducerConfig.RETRIES_CONFIG, 0);
		 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
		 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.kafka.practice.MyPartitioner");

		 Producer<String, String> producer = new KafkaProducer<>(props);
		 for(int i = 0; i < 100; i++)
		     producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), "7777-"+i));
		 producer.close();
	}
}

简单的partitioner

package org.kafka.practice;

import java.util.Map;

import org.apache.kafka.clients.producer.
		    

Partitioner; import org.apache.kafka.common.Cluster; public class MyPartitioner implements Partitioner{ @Override public void configure(Map<String, > configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 1; } @Override public void close() { } }

结果:

所发送的消息全部写道编号为1的分区上,查看log文件 /tmp/kafka-logs/mytopic-1/0000000000.log

2. 实现了callback函数的producer

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;

public class MyProducer {
	
	@Test
	public void testProducer(){
		 Properties props = new Properties();
		 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos1:9092");
		 props.put(ProducerConfig.ACKS_CONFIG, "all");
		 props.put(ProducerConfig.RETRIES_CONFIG, 0);
		 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
		 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.kafka.practice.MyPartitioner");

		 Producer<String, String> producer = new KafkaProducer<>(props);
		 for(int i = 0; i < 5; i++){
			 ProducerRecord<String, String> record = new ProducerRecord<String, String>("mytopic", Integer.toString(i), "222-"+i);
		     producer.send(record, new Callback(){
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					System.out.println("received ack!!!");
				}
		     });
		     System.out.println("send message!!!");
		 }
		 producer.close();
	}
}

运行结果:

send message!!!

send message!!!

send message!!!

send message!!!

send message!!!

17/05/18 15:23:40 INFO producer.KafkaProducer: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

received ack!!!

received ack!!!

received ack!!!

received ack!!!

received ack!!!

3. 简单的consumer-自动提交

	@Test
	public void testConsumer() throws Exception{
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos1:9092");
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //自动提交
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("mytopic"));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records){
				Date now = new Date();
				System.out.printf(now + " offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
				Thread.sleep(3000);
			}
		}
	}

3. 简单的consumer-手动提交

	public void testConsumer2() {
	     Properties props = new Properties();
	     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
	     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
	     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
	     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
	     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
	     consumer.subscribe(Arrays.asList("mytopic"));
	     final int minBatchSize = 200;
	     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
	     while (true) {
	         ConsumerRecords<String, String> records = consumer.poll(100);
	         for (ConsumerRecord<String, String> record : records) {
	             buffer.add(record);
	         }
	         if (buffer.size() >= minBatchSize) {
	             //insertIntoDb(buffer);
	             consumer.commitSync();
	             buffer.clear();
	         }
	     }
	}

参考:

Kafka参数说明:http://www.cnblogs.com/rilley/p/5391268.html


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark读取kafka数据(两种方式比.. 下一篇kafka 消息服务

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }