Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

整理一下 RocketMQ 相关知识点,这里简单介绍一下生产者和消费者常用的常模的模式,包括生产者发送消息的模式(同步发送、异步发送、和单向发送)、消费者的推模式和拉模式。

生产者的同步发送

同步发送的模式是三种发送模式中效率最低的一种,但是安全性是最高的一种。

同步发送
1
2
3
4
5
6
7
8
9
10
11
12
13
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("IP:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message message = new Message("best","Tags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8));
SendResult send = producer.send(message);
System.out.println("消息发送成功: " + i + send.toString() );
}
producer.shutdown();
}
}

同步发送之后,会有返回值,相当于一问一答,消息发送完之后需要,消费者应答,所以效率相对来说相对低。但是我们这会收到应答的结果,可以根据结果来进行业务处理,可靠性是最高的。

生产者的异步发送

异步发送的效率相对较高,因为会有异步回调,安全性处于中等

异步发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("IP:9876");
producer.start();
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
final int index = i;
Message mes= new Message("best","TagA", (i+"_AsyncProducer").getBytes(StandardCharsets.UTF_8));
producer.send(mes, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功: " + index + sendResult.toString() );
countDownLatch.countDown();
}

@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败: " + index + throwable.getMessage());
countDownLatch.countDown();
}
});
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}

针对消息的回调我们可以针对业务进行处理,这种方式在业务处理中使用的比较多。

生产者的单向发送

因为单向发只负责发送,效率是最高的,但是可靠性是最低的,在业务中常用于日志的记录等对消息丢失容忍度较高的场景。

单向发送
1
2
3
4
5
6
7
8
9
10
11
12
public class OneWayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer");
producer.setNamesrvAddr("IP:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("best", "Tags", (i+"_OneWayProducer").getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
}
producer.shutdown();
}
}

三种方式各有优点和缺点,我们需要针对业务场景来原则最合适的发送方式。

消费者推模式和拉模式

这里用的是 5.1.3 版本的 RocketMQ,SimpleConsumer 已被移出,取而代之的是 DefaultMQPushConsumer 和DefaultMQPullConsumer

消费者基础类型分为推模式(push)和拉模式(pull)。

消费者的推模式

消费者的推(push)模式 也是常用的一个模式。只要生产者推送消息消费者就可以接受消息进行消费。

PUSH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer");
consumer.setNamesrvAddr("IP:9876");
consumer.subscribe("best","*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (int i = 0; i < list.size(); i++) {
System.out.println(i+"_消息消费成功" +" " + new String(list.get(i).getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer run");
}

在监听器中进行消息的状态的应答处理,在业务处理中可以进行异常处理,对异常信息回复失败状态,生产者重新发送消息

消费者的拉模式

消费者的拉模式分为两种,一种是随机选择一个 queue 进行消息消费

随机获取 queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LitePullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
consumer.setNamesrvAddr("IP:9876");
consumer.subscribe("Order");
consumer.start();
while (true){
List<MessageExt> poll = consumer.poll();
System.out.println("消息拉取成功");
poll.forEach(messageExt -> {
System.out.println("消息消费成功:"+new String(messageExt.getBody()));
});
}
}
}

还有一种是指定 queue,需要管理消息的进度也就是 offset

指定 queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LitePullConsumerAssign {
public static void main(String[] args) throws Exception{
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("LitePullConsumer");
consumer.setNamesrvAddr("IP:9876");
consumer.start();
Collection<MessageQueue> best = consumer.fetchMessageQueues("best");
ArrayList<MessageQueue> msg = new ArrayList<>(best);
consumer.assign(msg);
consumer.seek(msg.get(0), 10);
System.out.println("consumer start");
while (true){
List<MessageExt> poll = consumer.poll();
System.out.println("消息拉取成功");
for (MessageExt messageExt : poll) {
System.out.println("消息消费成功:"+new String(messageExt.getBody()));
}
}
}
}

指定 queue 的拉模式多数用于开发者维护消息队列

评论