设为首页 加入收藏

TOP

说说MQ之RocketMQ(二)(二)
2018-10-12 12:08:03 】 浏览:774
Tags:说说 RocketMQ
ublic static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try { long offset = consumer.fetchConsumeOffset(mq, true); PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (null != pullResult.getMsgFoundList()) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { System.out.print(new String(messageExt.getBody())); System.out.print(pullResult); System.out.println(messageExt); } } putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: // TODO break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } }

这部分的 API 其实是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分区,而 Kafka 可以自动管理(当然也可以手动管理),并且不需要指定分区(分区是在 Kafka 订阅的时候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 接口,提供了两种管理方式,本地文件和远程 Broker。这部分感觉两者差不多。

下面再看看 Push 模式顺序消费,代码如下,

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                else if ((this.consumeTimes
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇说说MQ之RocketMQ(三) 下一篇JVM之ParNew和CMS日志分析

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目