.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
}
else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
}
else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
虽然提供了 Push 模式,RocketMQ 内部实际上还是 Pull 模式的 MQ,Push 模式的实现应该采用的是长轮询,这点与 Kafka 一样。使用该方式有几个注意的地方,
- 接收消息的监听类要使用
MessageListenerOrderly
;
ConsumeFromWhere
有几个参数,表示从头开始消费,从尾开始消费,还是从某个 TimeStamp 开始消费;
- 可以控制 offset 的提交,应该就是
context.setAutoCommit(false);
的作用;
控制 offset 提交这个特性非常有用,某种程度上扩展一下,就可以当做事务来用了,看代码 ConsumeMessageOrderlyService
的实现,其实并没有那么复杂,在不启用 AutoCommit 的时候,只有返回 COMMIT
才 commit offset;启用 AutoCommit 的时候,返回 COMMIT
、ROLLBACK
(这个比较扯)、SUCCESS
的时候,都 commit offset。
后来发现,commit offset 功能在 Kafka 里面也有提供,使用新的 API,调用 consumer.commitSync
。
再看一个 Push 模式乱序消费 + 消息过滤的例子,消费者的代码如下,
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
consumer.setNamesrvAddr("192.168.232.23:9876");
consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
这个例子与之前顺序消费不同的地方在于,
- 接收消息的监听类使用的是
MessageListenerConcurrently
;
- 回调方法中,使用的是自动 offset commit;
- 订阅的时候增加了消息过滤类
MessageFilterImpl
;
消息过滤类 MessageFilterImpl
的代码如下,
import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg) {
String property = msg.getUserProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if ((id % 3) == 0 && (id > 10)) {
return true;
}
}
return false;
}
}
RocketMQ 执行过滤是在 Broker 端,Broker 所在的机器会启动多个 FilterServer 过滤进程;Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。这种过滤方法可