Redis发布订阅
Redis发布订阅(pub/sub)是一种==消息通信模式==:发送者(pub)发送消息,订阅者(sub)接受消息。
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图
第一个:消息发送者,第二个:频道 第三个:消息订阅者
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送到订阅的其他客户
命令
序号 |
命令及描述 |
PSUBSCRIBE |
订阅一个或者多个给定模式的频道 |
PUBSUB |
查看订阅与发布系统 |
PUBLISH |
将信息发送到指定的频道 |
PUNSUBSCRIBE |
退订所有给定模式的频道 |
SUBSCRIBE |
订阅给定一个或者多个频道的信息 |
UNSUBSCRIBE |
退订给定的频道 |
这些命令被广泛用于构建及时通信应用,比如网络聊天室和实时广播、实时提醒。
测试
订阅端:
1 2 3 4 5 6 7 8 9 10 11 12 13
| bestrookie@bestrookiedeMac-mini ~ % redis-cli 127.0.0.1:6379> SUBSCRIBE bestrookie Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "bestrookie" 3) (integer) 1 #等待消息推送 1) "message" #消息 2) "bestrookie" #频道名称 3) "hello" #消息内容 1) "message" 2) "bestrookie" 3) "world"
|
发送端:
1 2 3 4
| 127.0.0.1:6379> PUBLISH bestrookie "hello" #发布者发送消息到频道 (integer) 1 127.0.0.1:6379> PUBLISH bestrookie "world" (integer) 1
|
SpringBoot使用Redis实现消息订阅
这里就简单实现一下
首先搭建一个SpringBoot项目 这个应该不用多说了吧
创建Redis配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){ RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(factory); RedisSerializer keySerializer = new StringRedisSerializer(); RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer(); redisTemplate.setKeySerializer(keySerializer); redisTemplate.setValueSerializer(valueSerializer); redisTemplate.setHashKeySerializer(keySerializer); redisTemplate.setHashValueSerializer(valueSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; }
}
|
定义发布者
使用Redis实现的发布,其实就是通过redisTemplate.converAndSend()方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@Component public class MsgPublisher { private Logger logger = LoggerFactory.getLogger(MsgPublisher.class); @Autowired private RedisTemplate<String, Object> redisTemplate; @Resource private ChannelTopic topic; public void sendMsg(String msg){ redisTemplate.convertAndSend(topic.getTopic(),"Message:"+msg+" ;TIME"+ Calendar.getInstance().getTime()); } }
|
发布者配置
主要定义了一个发布者的主题,该主题可以直接在Msgpublisher类中通过new方法创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Configuration public class PubConfig {
@Bean ChannelTopic topic(){ return new ChannelTopic("bestrookie"); } }
|
定义订阅者
使用Redis实现的订阅者,需要两个步骤:第一,定义消息监视器,需要实现MessageListener接口,第二,需要配置RedisMessageListenerContainer容器,即把消息监听器添加到容器中。
监听器
主要是实现了MessageListener接口的onMessage()方法,该方法即是用来处理监听结果的代码块,当然我们就是简单的实现一下
1 2 3 4 5 6 7 8 9 10
|
public class MsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("Msg receiver: "+message.toString() ); } }
|
订阅者配置
主要定义了RedisMessageListenerContainer容器,并把监听器放到容器中,并添加监听器监听的主题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@Configuration public class SubConfig { @Bean MessageListenerAdapter messageListener(){ return new MessageListenerAdapter(new MsgListener()); }
@Bean RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory){ final RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); container.addMessageListener(messageListener(),new ChannelTopic("bestrookie")); return container; }
}
|
好了我们进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
@RequestMapping("/msg") @RestController public class MsgController { @Autowired private MsgPublisher msgPublisher; @GetMapping("/send") public String senMsg(String msg){ msgPublisher.sendMsg(msg); return "发送成功";
}
|
[localhost:8080/msg/send?msg=”hello bestrookie”](http://localhost:8080/msg/send?msg="hello bestrookie”)