TOP

说说MQ之RocketMQ(二)(三)
2018-10-12 12:08:03 】 浏览:862
Tags:说说 RocketMQ

.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }

虽然提供了 Push 模式,RocketMQ 内部实际上还是 Pull 模式的 MQ,Push 模式的实现应该采用的是长轮询,这点与 Kafka 一样。使用该方式有几个注意的地方,

  1. 接收消息的监听类要使用 MessageListenerOrderly
  2. ConsumeFromWhere 有几个参数,表示从头开始消费,从尾开始消费,还是从某个 TimeStamp 开始消费;
  3. 可以控制 offset 的提交,应该就是 context.setAutoCommit(false); 的作用;

控制 offset 提交这个特性非常有用,某种程度上扩展一下,就可以当做事务来用了,看代码 ConsumeMessageOrderlyService 的实现,其实并没有那么复杂,在不启用 AutoCommit 的时候,只有返回 COMMIT 才 commit offset;启用 AutoCommit 的时候,返回 COMMITROLLBACK(这个比较扯)、SUCCESS 的时候,都 commit offset。

后来发现,commit offset 功能在 Kafka 里面也有提供,使用新的 API,调用 consumer.commitSync

再看一个 Push 模式乱序消费 + 消息过滤的例子,消费者的代码如下,

import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

这个例子与之前顺序消费不同的地方在于,

  1. 接收消息的监听类使用的是 MessageListenerConcurrently
  2. 回调方法中,使用的是自动 offset commit;
  3. 订阅的时候增加了消息过滤类 MessageFilterImpl

消息过滤类 MessageFilterImpl 的代码如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
        return false;
    }
}

RocketMQ 执行过滤是在 Broker 端,Broker 所在的机器会启动多个 FilterServer 过滤进程;Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。这种过滤方法可
说说MQ之RocketMQ(二)(三) https://www.cppentry.com/bencandy.php?fid=76&id=171264

首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇说说MQ之RocketMQ(三) 下一篇JVM之ParNew和CMS日志分析