整理一下 RocketMQ 相关知识点,这里简单介绍一下生产者和消费者常用的常模的模式,包括生产者发送消息的模式(同步发送、异步发送、和单向发送)、消费者的推模式和拉模式。
生产者的同步发送
同步发送的模式是三种发送模式中效率最低的一种,但是安全性是最高的一种。
 同步发送 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | public class SyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
 DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 for (int i = 0; i < 2; i++) {
 Message message = new Message("best","Tags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
 SendResult send = producer.send(message);
 System.out.println("消息发送成功: " + i + send.toString() );
 }
 producer.shutdown();
 }
 }
 
 | 
 
            同步发送之后,会有返回值,相当于一问一答,消息发送完之后需要,消费者应答,所以效率相对来说相对低。但是我们这会收到应答的结果,可以根据结果来进行业务处理,可靠性是最高的。
生产者的异步发送
异步发送的效率相对较高,因为会有异步回调,安全性处于中等
 异步发送 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 
 | public class AsyncProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
 DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 CountDownLatch countDownLatch = new CountDownLatch(100);
 for (int i = 0; i < 100; i++) {
 final int index = i;
 Message mes= new Message("best","TagA", (i+"_AsyncProducer").getBytes(StandardCharsets.UTF_8));
 producer.send(mes, new SendCallback() {
 @Override
 public void onSuccess(SendResult sendResult) {
 System.out.println("消息发送成功: " + index + sendResult.toString() );
 countDownLatch.countDown();
 }
 
 @Override
 public void onException(Throwable throwable) {
 System.out.println("消息发送失败: " + index + throwable.getMessage());
 countDownLatch.countDown();
 }
 });
 }
 countDownLatch.await(5, TimeUnit.SECONDS);
 producer.shutdown();
 }
 }
 
 | 
 
            针对消息的回调我们可以针对业务进行处理,这种方式在业务处理中使用的比较多。
生产者的单向发送
因为单向发只负责发送,效率是最高的,但是可靠性是最低的,在业务中常用于日志的记录等对消息丢失容忍度较高的场景。
 单向发送 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | public class OneWayProducer {public static void main(String[] args) throws Exception{
 DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer");
 producer.setNamesrvAddr("IP:9876");
 producer.start();
 for (int i = 0; i < 10; i++) {
 Message message = new Message("best", "Tags", (i+"_OneWayProducer").getBytes(StandardCharsets.UTF_8));
 producer.sendOneway(message);
 }
 producer.shutdown();
 }
 }
 
 | 
 
            三种方式各有优点和缺点,我们需要针对业务场景来原则最合适的发送方式。
消费者推模式和拉模式
这里用的是 5.1.3 版本的 RocketMQ,SimpleConsumer 已被移出,取而代之的是 DefaultMQPushConsumer 和DefaultMQPullConsumer
消费者基础类型分为推模式(push)和拉模式(pull)。
消费者的推模式
消费者的推(push)模式 也是常用的一个模式。只要生产者推送消息消费者就可以接受消息进行消费。
 PUSH 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 
 | public class SimpleConsumer {public static void main(String[] args) throws MQClientException {
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("best","*");
 consumer.setMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
 for (int i = 0; i < list.size(); i++) {
 System.out.println(i+"_消息消费成功" +" " + new String(list.get(i).getBody()));
 }
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 consumer.start();
 System.out.println("consumer run");
 }
 
 | 
 
            在监听器中进行消息的状态的应答处理,在业务处理中可以进行异常处理,对异常信息回复失败状态,生产者重新发送消息
消费者的拉模式
消费者的拉模式分为两种,一种是随机选择一个 queue 进行消息消费
 随机获取 queue 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | public class LitePullConsumer {public static void main(String[] args) throws MQClientException {
 DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.subscribe("Order");
 consumer.start();
 while (true){
 List<MessageExt> poll = consumer.poll();
 System.out.println("消息拉取成功");
 poll.forEach(messageExt -> {
 System.out.println("消息消费成功:"+new String(messageExt.getBody()));
 });
 }
 }
 }
 
 | 
 
            还有一种是指定 queue,需要管理消息的进度也就是 offset
 指定 queue 
              
              | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 
 | public class LitePullConsumerAssign {public static void main(String[] args) throws Exception{
 DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
 consumer.setNamesrvAddr("IP:9876");
 consumer.start();
 Collection<MessageQueue> best = consumer.fetchMessageQueues("best");
 ArrayList<MessageQueue> msg = new ArrayList<>(best);
 consumer.assign(msg);
 consumer.seek(msg.get(0), 10);
 System.out.println("consumer start");
 while (true){
 List<MessageExt> poll = consumer.poll();
 System.out.println("消息拉取成功");
 for (MessageExt messageExt : poll) {
 System.out.println("消息消费成功:"+new String(messageExt.getBody()));
 }
 }
 }
 }
 
 | 
 
            指定 queue 的拉模式多数用于开发者维护消息队列