整理一下 RocketMQ 相关知识点,这里简单介绍一下生产者和消费者常用的常模的模式,包括生产者发送消息的模式(同步发送、异步发送、和单向发送)、消费者的推模式和拉模式。
生产者的同步发送
同步发送的模式是三种发送模式中效率最低的一种,但是安全性是最高的一种。
同步发送
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class SyncProducer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("SyncProducer"); producer.setNamesrvAddr("IP:9876"); producer.start(); for (int i = 0; i < 2; i++) { Message message = new Message("best","Tags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8)); SendResult send = producer.send(message); System.out.println("消息发送成功: " + i + send.toString() ); } producer.shutdown(); } }
|
同步发送之后,会有返回值,相当于一问一答,消息发送完之后需要,消费者应答,所以效率相对来说相对低。但是我们这会收到应答的结果,可以根据结果来进行业务处理,可靠性是最高的。
生产者的异步发送
异步发送的效率相对较高,因为会有异步回调,安全性处于中等
异步发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class AsyncProducer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer"); producer.setNamesrvAddr("IP:9876"); producer.start(); CountDownLatch countDownLatch = new CountDownLatch(100); for (int i = 0; i < 100; i++) { final int index = i; Message mes= new Message("best","TagA", (i+"_AsyncProducer").getBytes(StandardCharsets.UTF_8)); producer.send(mes, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功: " + index + sendResult.toString() ); countDownLatch.countDown(); }
@Override public void onException(Throwable throwable) { System.out.println("消息发送失败: " + index + throwable.getMessage()); countDownLatch.countDown(); } }); } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
|
针对消息的回调我们可以针对业务进行处理,这种方式在业务处理中使用的比较多。
生产者的单向发送
因为单向发只负责发送,效率是最高的,但是可靠性是最低的,在业务中常用于日志的记录等对消息丢失容忍度较高的场景。
单向发送
1 2 3 4 5 6 7 8 9 10 11 12
| public class OneWayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer"); producer.setNamesrvAddr("IP:9876"); producer.start(); for (int i = 0; i < 10; i++) { Message message = new Message("best", "Tags", (i+"_OneWayProducer").getBytes(StandardCharsets.UTF_8)); producer.sendOneway(message); } producer.shutdown(); } }
|
三种方式各有优点和缺点,我们需要针对业务场景来原则最合适的发送方式。
消费者推模式和拉模式
这里用的是 5.1.3 版本的 RocketMQ,SimpleConsumer 已被移出,取而代之的是 DefaultMQPushConsumer 和DefaultMQPullConsumer
消费者基础类型分为推模式(push)和拉模式(pull)。
消费者的推模式
消费者的推(push)模式 也是常用的一个模式。只要生产者推送消息消费者就可以接受消息进行消费。
PUSH
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class SimpleConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer"); consumer.setNamesrvAddr("IP:9876"); consumer.subscribe("best","*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (int i = 0; i < list.size(); i++) { System.out.println(i+"_消息消费成功" +" " + new String(list.get(i).getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("consumer run"); }
|
在监听器中进行消息的状态的应答处理,在业务处理中可以进行异常处理,对异常信息回复失败状态,生产者重新发送消息
消费者的拉模式
消费者的拉模式分为两种,一种是随机选择一个 queue 进行消息消费
随机获取 queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class LitePullConsumer { public static void main(String[] args) throws MQClientException { DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer"); consumer.setNamesrvAddr("IP:9876"); consumer.subscribe("Order"); consumer.start(); while (true){ List<MessageExt> poll = consumer.poll(); System.out.println("消息拉取成功"); poll.forEach(messageExt -> { System.out.println("消息消费成功:"+new String(messageExt.getBody())); }); } } }
|
还有一种是指定 queue,需要管理消息的进度也就是 offset
指定 queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class LitePullConsumerAssign { public static void main(String[] args) throws Exception{ DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer"); consumer.setNamesrvAddr("IP:9876"); consumer.start(); Collection<MessageQueue> best = consumer.fetchMessageQueues("best"); ArrayList<MessageQueue> msg = new ArrayList<>(best); consumer.assign(msg); consumer.seek(msg.get(0), 10); System.out.println("consumer start"); while (true){ List<MessageExt> poll = consumer.poll(); System.out.println("消息拉取成功"); for (MessageExt messageExt : poll) { System.out.println("消息消费成功:"+new String(messageExt.getBody())); } } } }
|
指定 queue 的拉模式多数用于开发者维护消息队列