简单介绍 Rocket MQ 的消息的几种类型以及简单的使用,包括顺序消息,广播、延时消息、批量消息、过滤消息、事务消息。
顺序消息
RocketMQ 的顺序消息是保证消息局部有序而不是全局有序,全局有序的话相当于所有的消息都用一个 queue 那对于效率来说简直是灾难。简单来说顺序消息是指要保证有序的一批消息发送到一个 queue,针对这一个 queue 是有序的,对个 queue 之间是无序的。
 生产者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | public class OrderProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 for (int i = 0; i < 5; i++) {
 for (int j = 0; j < 10; j++) {
 Message message = new Message("Order", "TagA",("order_"+ i + "_step_" + j).getBytes(StandardCharsets.UTF_8));
 producer.send(message, new MessageQueueSelector() {
 @Override
 
 public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
 Integer id = (Integer) o;
 int index = id == 0 ? 0 : id % list.size();
 return list.get(index);
 }
 }, i);
 }
 }
 producer.shutdown();
 }
 }
 
 | 
 
            相当于分组发送,一个组就是一个 queue,相同的 queue 的消息是有序的。
 消费者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | public class OrderConsumer {public static void main(String[] args) throws Exception{
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("Order", "*");
 consumer.setMessageListener(new MessageListenerOrderly() {
 @Override
 public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
 for (int i = 0; i < list.size(); i++) {
 System.out.println(i + "_消息消费成功_" + new String(list.get(i).getBody()));
 }
 return ConsumeOrderlyStatus.SUCCESS;
 }
 });
 consumer.start();
 }
 }
 
 | 
 
            广播消息
广播消息的生产者没有额外的配置,对于消费者一条消息会发送给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组(同一个消费者组,一条消费只会消费一次)。
 广播消息 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | public class BroadcastConsumer {public static void main(String[] args) throws Exception{
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("best","*");
 consumer.setMessageModel(MessageModel.BROADCASTING);
 consumer.setMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
 for (int i = 0; i < list.size(); i++) {
 System.out.println(i + "消费成功_"+ new String(list.get(i).getBody()));
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 consumer.start();
 System.out.println("consumer run");
 
 }
 }
 
 | 
 
            对于消费者的消息模型其实有广播消息和集群消息,如果不像上面显式指定的话默认为CLUSTERING也就是集群模式。
延时消息
延时消息在业务场景中还是用的比较多的,比如创建订单半小时未支付自动取消等。延时消息对于消费者没有额外的配置
 生产者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | public class ScheduleProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 for (int i = 0; i < 10; i++){
 Message message = new Message("Schedule","Tags", (i+"_ScheduleProducer").getBytes(StandardCharsets.UTF_8) );
 message.setDelayTimeLevel(2);
 
 
 
 producer.send(message);
 System.out.println("消息发送成功_" + LocalTime.now());
 }
 producer.shutdown();
 }
 }
 
 | 
 
            从代码中看出设置延时时间的时候有三种不同的方式,可以根据自己的业务进行选择。当然延时也不是无限延长,默认支持最大延迟时间为 3天,可以根据 broken 配置:timerMaxDelaySec 修改。
批量消息
批量消息也是业务中常用一个类型,为了减少网络的 IO,将多个消息合并成一个消息一次发送出去,提高了系统的吞吐量。批量消息对消费者也没有额外的要求,只需要修改生产者即可。
 生产者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | public class BatchProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 List<Message> messages = new ArrayList<>();
 for (int i = 0; i < 1000; i++) {
 Message message = new Message("best", "TagA", (i + "_BatchProducer").getBytes());
 messages.add(message);
 }
 SendResult send = producer.send(subList);
 System.out.println("消息发送成功: " + i + " ");
 producer.shutdown();
 }
 }
 
 | 
 
            但是批量消息也不是没有限制的,官方文档上写消息的大小不能超过 1M,但是实际上使用不超过 4M 即可,但是文档上的 1M 应该是整体性能最好的一个大小。当数据量很大时我们需要把这一批数据分成多批发送,就需要改造一下生产者的发送方式。
 切割消息发送 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 
 | public class BatchProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
 producer.setNamesrvAddr("100.77.140.42:9876");
 producer.start();
 List<Message> messages = new ArrayList<>();
 for (int i = 0; i < 100000; i++) {
 Message message = new Message("best", "TagA", (i + "_BatchProducer").getBytes());
 messages.add(message);
 }
 ListSplitter listSplitter = new ListSplitter(messages);
 int i = 0;
 while (listSplitter.hasNext()){
 List<Message> subList = listSplitter.next();
 SendResult send = producer.send(subList);
 System.out.println("消息发送成功: " + i + " ");
 i++;
 }
 producer.shutdown();
 }
 }
 
 class ListSplitter implements Iterator<List<Message>> {
 private static final int SIZE_LIMIT = 1024 * 1024;
 private final List<Message> messages;
 private int currentIndex;
 
 ListSplitter(List<Message> messages){
 this.messages = messages;
 }
 
 
 @Override
 public boolean hasNext() {
 return currentIndex < messages.size();
 }
 
 @Override
 public List<Message> next() {
 int nextIndex = currentIndex;
 int totalSize = 0;
 for (; nextIndex < messages.size(); nextIndex++){
 Message message = messages.get(nextIndex);
 int messageSize = message.getBody().length + message.getTopic().length();
 int sum = message.getProperties().entrySet().stream().mapToInt(entry -> entry.getKey().length() + entry.getValue().length()).sum();
 messageSize += sum + 20;
 if (messageSize > SIZE_LIMIT){
 if (nextIndex == currentIndex){
 nextIndex++;
 }
 break;
 }
 if (messageSize + totalSize > SIZE_LIMIT){
 break;
 }else {
 totalSize = totalSize + messageSize;
 }
 }
 List<Message> msg = this.messages.subList(currentIndex, nextIndex);
 currentIndex = nextIndex;
 return msg;
 }
 }
 
 | 
 
            这里我们重写一个迭代器对我们的消息体进行切割,将一个巨大消息几个切割成 多个 1M 的消息体分批发送。在使用批量消息时还需要注意:
- 批量消息要同一个 topic
- 要用相同的 waitStoreMsgOk
- 不能是延迟消息、事务消息等
过滤消息
过滤消息只能用于消费者推模式,拉模式是无法使用的,过滤消息分为两种一种是针对 TAG过滤一种是根据 SQL过滤,第二种的开启需要在 .conf配置文件中开启
| 1
 | enablePropertyFilter = true
 | 
note bug red 这个地方一定要注意格式空格别漏了,我就踩坑了我设置为enablePropertyFilter=true,重启之后配置一直没生效排查了很久才发现是格式问题
针对 TAG的过滤消息
 生产者 demo 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | public class FilterTagProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("FilterGroup");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 String[] tags = new String[]{"TAG-A","TAG-B","TAG-C"};
 for (int i = 0; i < 10; i++){
 Message message = new Message("FilterTagTopic",tags[i%tags.length],("filter_msg_" + i).getBytes(StandardCharsets.UTF_8));
 producer.send(message);
 System.out.println("消息发送成功_" + tags[i%tags.length] + " " + i);
 }
 }
 }
 
 | 
 
             消费者针对 TAG 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | public class FilterConsumer {public static void main(String[] args) throws Exception{
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterConsumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("FilterTagTopic","TAG-A ||TAG-C");
 consumer.setMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
 for (int i = 0; i < list.size(); i++) {
 System.out.println(i + "消息消费成功_" + new String(list.get(i).getBody()));
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 consumer.start();
 System.out.println("consumer start");
 
 }
 }
 
 | 
 
            针对 SQL 的过滤方式
这个在生产者发送消息的时候需要设置UserProperty,是以 map 的形式添加进去。
 生产者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | public class FilterSqlProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("FilterConsumer");
 producer.setNamesrvAddr("100.77.140.42:9876");
 producer.start();
 String[] tags = new String[]{"TAG-A","TAG-B","TAG-C"};
 for (int i = 0; i < 10; i++) {
 Message message = new Message("FilterTagTopic",tags[i%tags.length], (i+"_filter_sql").getBytes(StandardCharsets.UTF_8));
 message.putUserProperty("name","bestrookie");
 message.putUserProperty("age",String.valueOf(i));
 producer.send(message);
 System.out.println("消息发送成功_" + tags[i%tags.length] + " " + i);
 }
 producer.shutdown();
 }
 }
 
 | 
 
             消费者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | public class FilterSqlConsumer {public static void main(String[] args) throws Exception{
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterConsumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("FilterTagTopic", MessageSelector.bySql("TAGS is not null and TAGS IN ('TAG-A','TAG-C')" +
 "and (name is not null and age between 5 and 6)"));
 consumer.setMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
 for (int i = 0; i < list.size(); i++) {
 System.out.println("消息消费成功_" +i + ":" + new String(list.get(i).getBody()) + " " + LocalTime.now());
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 consumer.start();
 System.out.println("consumer run");
 }
 }
 
 | 
 
            过滤 sql 的语法类似 sql 的查询,具体需要查看文档,基本上是一致的。
事务消息
 这种消息在我们业务场景中的使用比较频繁,也是 RocketMQ 一个特色。事务消息是实在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,两个操作要么一起成功要么一起失败。
事务消息发送的时候,生产者会给消息设置三种状态,提交、回滚、无状态。其中提交说明消息发送没问题,消费者可以正常收到消息。回滚,说明执行业务逻辑出现异常,消息并不会发送。无状态,过一段时间会执行回查本地事务,根据回查执行事务定义的业务执行逻辑发送状态,决定这条消息是否成功。当然这种回查也不是没有限制的,执行一定次数 Broker会默认消息执行失败进行丢弃。
 事务监听器 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 
 | public class TransactionListenerImpl implements TransactionListener {@Override
 
 public LocalTransactionState executeLocalTransaction(Message message, Object o) {
 String tags = message.getTags();
 if (StringUtils.contains("TAG-A", tags)){
 return LocalTransactionState.COMMIT_MESSAGE;
 }
 if (StringUtils.contains("TAG-B", tags)){
 return LocalTransactionState.ROLLBACK_MESSAGE;
 }else {
 return LocalTransactionState.UNKNOW;
 }
 }
 @Override
 
 public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
 String tags = messageExt.getTags();
 if (StringUtils.contains("TAG-C", tags)){
 return LocalTransactionState.COMMIT_MESSAGE;
 }
 if (StringUtils.contains("TAG-D", tags)){
 return LocalTransactionState.ROLLBACK_MESSAGE;
 }else {
 return LocalTransactionState.UNKNOW;
 }
 }
 }
 
 | 
 
            这里写一个简单的 demo,根据 TAG 的类型决定消息是否发送成功。
 生产者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | public class TransactionProcter {public static void main(String[] args) throws Exception{
 TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
 producer.setNamesrvAddr("IP:9876");
 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new ThreadFactory() {
 @Override
 public Thread newThread(Runnable r) {
 Thread thread = new Thread(r);
 thread.setName("bestrookie");
 return thread;
 }
 });
 producer.setExecutorService(threadPoolExecutor);
 producer.setTransactionListener(new TransactionListenerImpl());
 producer.start();
 String[] tags = new String[]{"TAG-A","TAG-B","TAG-C","TAG-D","TAG-E"};
 for (int i = 0; i < 10; i++) {
 Message message = new Message("TransactionTopic",tags[i%tags.length],(tags[i%tags.length] + "_transaction_msg_" + i).getBytes(StandardCharsets.UTF_8));
 TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
 System.out.println("消息发送成功_" + tags[i%tags.length] + " " + i);
 Thread.sleep(10);
 }
 Thread.sleep(1000000);
 producer.shutdown();
 }
 }
 
 | 
 
            对于消费者没有额外的额配置
 消费者 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | public class TransactionConsumer {public static void main(String[] args) throws Exception{
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("TransactionTopic","*");
 consumer.setMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
 for (int i = 0; i < list.size(); i++) {
 System.out.println("消息消费成功_" +i + ":" + new String(list.get(i).getBody()) + " " + LocalTime.now());
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 consumer.start();
 System.out.println("consumer start");
 }
 }
 
 |