Exchange概念
RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递到哪些队列中。
相反,**生产者只能将消息发送到交换机(Exchange)**,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须明确知道如何处理收到的消息。是应该把这些消息你放到特定队列还是说应该丢弃它们。这就由交换机的类型来决定。
Exchange的类型
无名的Exchange
一开始我们使用的就是这种,用一个空的字符串代替,这是使用的默认的交换机。消息能够由路由器发送到队列其实就是由routingKey绑定的key指定,像使用默认的路由器,队列名称就是绑定的key;
Fanou(扇出)
也就是发布订阅模式,Fanout这种类型非常简单。正如从名称中猜到的那样,他是将接收到的所有消息刚播到他知道的所有丢列中,忽略routingKey,只要绑定了这个路由的队列,都会接收到消息。
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package com.bestrookie.rabbitmq.filve; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import java.util.Scanner; public class EmitLog { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("消息发送完毕:"+msg); }
} }
|
消费者01:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.bestrookie.rabbitmq.filve; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import java.nio.charset.StandardCharsets; public class ReceiveLogs01 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,EXCHANGE_NAME,"2"); DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("接收消息 log01 :"+ new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queue,true,deliverCallback,cancelCallback->{});
} }
|
消费者02:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.bestrookie.rabbitmq.filve; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class ReceiveLogs02 { public static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,EXCHANGE_NAME,"1"); DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println("接收消息 log02 :"+ new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queue,true,deliverCallback,cancelCallback->{});
} }
|
虽然绑定的key不一致,但是这两个消费者都会接收到生产者的消息。
Direct(直接)
上面说到,Fanout交换机会忽略绑定的routingkey,而这种交换机,会根据绑定的key来进行发消息,类似于Fanout的升级。解释一下绑定。绑定是交换机和队列指尖的桥梁。交换机支队它绑定的交换机的消息感兴趣。绑定参数:routingKey。
当然,使用Direct类型的交换机,也可以完成Ranout类型交换机完成的工作,只需要绑定的key相同即可。
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.bestrookie.rabbitmq.six; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; import java.util.Scanner; public class DirectLogs { public static final String EXCHANE_NAME = "direct_logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitUtils.getChannel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.next(); channel.basicPublish(EXCHANE_NAME,"info",null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送完毕"); } } }
|
消费者01:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.bestrookie.rabbitmq.six;
import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class ReceivelDirect01 { public static final String EXCHANE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANE_NAME,BuiltinExchangeType.DIRECT); channel.queueDeclare("console",false,false,false,null); channel.queueBind("console",EXCHANE_NAME,"info"); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("接收消息 消费者01:"+new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume("console",true,deliverCallback, cancelCallback->{});
}
}
|
消费者02:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.bestrookie.rabbitmq.six; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class ReceivelDirect02 { public static final String EXCHANE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console2",false,false,false,null); channel.queueBind("console2",EXCHANE_NAME,"warning"); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("接收消息 消费者02:"+new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume("console2",true,deliverCallback, cancelCallback->{}); } }
|
消费者03:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.bestrookie.rabbitmq.six; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class ReceivelDirect03 { public static final String EXCHANE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk",false,false,false,null); channel.queueBind("disk",EXCHANE_NAME,"error"); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("接收消息 消费者03:"+new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume("disk",true,deliverCallback, cancelCallback->{}); } }
|
注意:一个队列可以绑定多个routngKey,玩法多样。
Topic(主题)
这个类型的交换机是对Direct类型的更一步升级。对routingKey的匹配进行了升级,所以对此添加了一些要求,它必须是一个单词列表,以.(符号)隔开。
***(星号)**可以代替一个单词
**#(井号)**可以代替零个或者多个单词
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.bestrookie.rabbitmq.seven; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; public class EmitLogTopic { public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> bindMap = new HashMap<>(); bindMap.put("quick.orange.rabbit","被队列Q1 Q2接收到"); bindMap.put("lazy.orange.eephant","被Q1 Q2接受"); bindMap.put("wqe.orange.asd","被Q1接受"); bindMap.put("lazy.asd","被Q2接受"); bindMap.put("asd.asd.rabbit","被Q2接受"); for (Map.Entry<String, String> mapEntry : bindMap.entrySet()){ String key = mapEntry.getKey(); String msg = mapEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,key,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息"); } } }
|
消费者01:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package com.bestrookie.rabbitmq.seven; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); String queueName = "Q1"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName, EXCHANGE_NAME,"*.orange.*"); System.out.println("Q1等待接收消息"); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("Q1接收消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queueName,false,deliverCallback, cancelCallback->{}); } }
|
消费者02
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.bestrookie.rabbitmq.seven; import com.bestrookie.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; public class ReceiveLogsTopic02 { public static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception{ Channel channel = RabbitUtils.getChannel(); String queueName = "Q2"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName, EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#"); System.out.println("Q2等待接收消息"); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("Q2接收消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queueName,false,deliverCallback, cancelCallback->{}); } }
|
这种类型的交换机使得消息的发送更加的多样性,更加的灵活。当然在实际的场景中还是需要按照自己的需求来使用。