Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

Redis发布订阅

Redis发布订阅(pub/sub)是一种==消息通信模式==:发送者(pub)发送消息,订阅者(sub)接受消息。

Redis客户端可以订阅任意数量的频道。

订阅/发布消息图

第一个:消息发送者,第二个:频道 第三个:消息订阅者

image-20210728142141250

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

img

当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送到订阅的其他客户

img

命令

序号 命令及描述
PSUBSCRIBE 订阅一个或者多个给定模式的频道
PUBSUB 查看订阅与发布系统
PUBLISH 将信息发送到指定的频道
PUNSUBSCRIBE 退订所有给定模式的频道
SUBSCRIBE 订阅给定一个或者多个频道的信息
UNSUBSCRIBE 退订给定的频道

这些命令被广泛用于构建及时通信应用,比如网络聊天室和实时广播、实时提醒。

测试

订阅端:

1
2
3
4
5
6
7
8
9
10
11
12
13
bestrookie@bestrookiedeMac-mini ~ % redis-cli
127.0.0.1:6379> SUBSCRIBE bestrookie
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bestrookie"
3) (integer) 1
#等待消息推送
1) "message" #消息
2) "bestrookie" #频道名称
3) "hello" #消息内容
1) "message"
2) "bestrookie"
3) "world"

发送端:

1
2
3
4
127.0.0.1:6379> PUBLISH bestrookie "hello" #发布者发送消息到频道
(integer) 1
127.0.0.1:6379> PUBLISH bestrookie "world"
(integer) 1

SpringBoot使用Redis实现消息订阅

这里就简单实现一下

首先搭建一个SpringBoot项目 这个应该不用多说了吧

创建Redis配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @author bestrookie
* @date 2021/7/28 3:54 下午
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(factory);
//key序列化
RedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
redisTemplate.setKeySerializer(keySerializer);
//value序列化
redisTemplate.setValueSerializer(valueSerializer);
//hash key 序列化
redisTemplate.setHashKeySerializer(keySerializer);
//hash value 序列化
redisTemplate.setHashValueSerializer(valueSerializer);
//redis初始化
redisTemplate.afterPropertiesSet();
return redisTemplate;
}

}

定义发布者

使用Redis实现的发布,其实就是通过redisTemplate.converAndSend()方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @author bestrookie
* @date 2021/7/28 3:58 下午
*/
@Component
public class MsgPublisher {
private Logger logger = LoggerFactory.getLogger(MsgPublisher.class);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Resource
private ChannelTopic topic;
public void sendMsg(String msg){
redisTemplate.convertAndSend(topic.getTopic(),"Message:"+msg+" ;TIME"+ Calendar.getInstance().getTime());
}
}

发布者配置

主要定义了一个发布者的主题,该主题可以直接在Msgpublisher类中通过new方法创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @author bestrookie
* @date 2021/7/28 4:05 下午
*/
@Configuration
public class PubConfig {
/**
* 订阅者发布的主题
*/
@Bean
ChannelTopic topic(){
return new ChannelTopic("bestrookie");
}
}

定义订阅者

使用Redis实现的订阅者,需要两个步骤:第一,定义消息监视器,需要实现MessageListener接口,第二,需要配置RedisMessageListenerContainer容器,即把消息监听器添加到容器中。

监听器

主要是实现了MessageListener接口的onMessage()方法,该方法即是用来处理监听结果的代码块,当然我们就是简单的实现一下

1
2
3
4
5
6
7
8
9
10
/**
* @author bestrookie
* @date 2021/7/28 4:08 下午
*/
public class MsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("Msg receiver: "+message.toString() );
}
}

订阅者配置

主要定义了RedisMessageListenerContainer容器,并把监听器放到容器中,并添加监听器监听的主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @author bestrookie
* @date 2021/7/28 4:09 下午
*/
@Configuration
public class SubConfig {
@Bean
MessageListenerAdapter messageListener(){
return new MessageListenerAdapter(new MsgListener());
}

@Bean
RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory){
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(messageListener(),new ChannelTopic("bestrookie"));
return container;
}

}

好了我们进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @author bestrookie
* @date 2021/7/28 4:15 下午
*/
@RequestMapping("/msg")
@RestController
public class MsgController {
@Autowired
private MsgPublisher msgPublisher;
@GetMapping("/send")
public String senMsg(String msg){
msgPublisher.sendMsg(msg);
return "发送成功";

}

[localhost:8080/msg/send?msg=”hello bestrookie”](http://localhost:8080/msg/send?msg="hello bestrookie”)

image-20210729092811904

评论