设为首页 加入收藏

TOP

rabbitmq转存入Kafka
2019-04-23 14:31:31 】 浏览:53
Tags:rabbitmq 存入 Kafka

主要功能:rabbitmq实时数据接收后,转存入kafka中,用于实施从kafka消费数据

主要代码如下【Kafka生产】:

package kafka;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import com.hxy.protobuf.DSFusion;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cetc.common.constant.ConstantInfo;

public class Rabbit2Kafka {
	
	private static final String MY_TOPIC = "TOPIC_MQ2KAFKA";
	private KafkaProducer<Integer, byte[]> producer;

	public Rabbit2Kafka() throws IOException, TimeoutException {
		super();
		initKafkaProducer();
	}


	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
		Rabbit2Kafka rabbit2Kafka = new Rabbit2Kafka();
		rabbit2Kafka.mqReceiver();
	}
	
	/*
	*链接rabbitmq,并且接受其中的数据。
	*此处使用的是rabbitqm中最新的api
	*/
	public void mqReceiver() throws IOException, TimeoutException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.xx.xxx");//rabbitmq的ip
		factory.setPort(5672);
		factory.setUsername("admin");
		factory.setPassword("admin");
		
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.queueDeclare(ConstantInfo.sQueueDyStatic_mq2kafka, false, true, true, null);
		channel.queueBind(ConstantInfo.sQueueDyStatic_mq2kafka, ConstantInfo.sMQExchange_out, ConstantInfo.sMQRoutingKey_dsFusion);
		
        if (channel != null && channel.isOpen() ==true) {
			System.out.println("-------------channel .isOpen() : "+ channel.isOpen() );
		}
        
        MyConsumer myConsumer = new MyConsumer(channel);
        channel.basicConsume(ConstantInfo.sQueueDyStatic_mq2kafka, true, myConsumer);
		
	}
	
	/**
	 * @Description:定义一个接收自己数组并解析后发送有效信息
	 * @data 2018年10月26日
	 * @enCodeType: utf-8
	 */
    class MyConsumer extends DefaultConsumer {
        public MyConsumer(Channel channel) {
            super(channel);
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            if (body.length > 0) {
            	DSFusion.DSFusionMessage dFusionMessage = DSFusion.DSFusionMessage.parseFrom(body);
//            	String key = String.valueOf(dFusionMessage.getFusionNum());
            	Integer key =dFusionMessage.getFusionNum();
            	String value = dFusionMessage.getMmsi() + ";"+dFusionMessage.getDynamicUTC();
//            	System.out.println(key);
            	
            	ProducerRecord<Integer,byte[] > producerRecord = new ProducerRecord<Integer, byte[]>(MY_TOPIC,key, body);
            	producer.send(producerRecord);
            	System.out.println("producer :"+producerRecord.key()+" | "+producerRecord.value());
                
            }
        }
    }
	
	public void initKafkaProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.xx.100:9092,192.168.xx.101:9092,192.168.xx.102:9092");
        props.put("acks", "0");
        props.put("retries", "0");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
//        props.put("key.serializer", IntegerDeserializer.class.getName());
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
//        props.put("value.serializer", ByteArrayDeserializer.class.getName());

        //生产者实例
        producer = new KafkaProducer<Integer, byte[]>(props);
        
        if (producer != null) {
			System.out.println("----------------producer has estiblished!!");
		}else {
			System.out.println("producer error");
		}
        
      
	}

}

Kafka消费者:

package kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.InvalidProtocolBufferException;
import com.hxy.protobuf.DSFusion;

public class Rabbit2KafkaConsumer {

	private static final Logger logger = LoggerFactory.getLogger(Rabbit2KafkaConsumer.class);
	private static final String MY_TOPIC = "TOPIC_MQ2KAFKA";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.xx.100:9092,192.168.xx.101:9092,192.168.xx.102:9092");
        props.put("group.id", "group_111");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("max.poll.records", 1000);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", IntegerDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());


        KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<Integer, byte[]>(props);

        consumer.subscribe(Arrays.asList(MY_TOPIC));

        while (true) {
        	System.out.println("----");
            ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);
            for (ConsumerRecord<Integer, byte[]> record : records) {
            	int key = record.key();
            	byte[] value = record.value();
            	System.out.println( "----"+record.offset()+" | "+record.partition()+" | "+record.key()+" | "+record.value());
/*	try {
	    //此处使用Protobuffer解析接受的byte[]
		DSFusion.DSFusionMessage dFusionMessage = DSFusion.DSFusionMessage.parseFrom(value);
		System.out.println("*********** "+dyStaBean);

	} catch (InvalidProtocolBufferException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}*/
			
            }
        }
        
    }
    

}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka(十一):Kafka Java Consu.. 下一篇Kafka - 新消费者

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目