Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

Exchange概念

RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递到哪些队列中。

相反,**生产者只能将消息发送到交换机(Exchange)**,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须明确知道如何处理收到的消息。是应该把这些消息你放到特定队列还是说应该丢弃它们。这就由交换机的类型来决定。

Exchange的类型

无名的Exchange

一开始我们使用的就是这种,用一个空的字符串代替,这是使用的默认的交换机。消息能够由路由器发送到队列其实就是由routingKey绑定的key指定,像使用默认的路由器,队列名称就是绑定的key;

Fanou(扇出)

也就是发布订阅模式,Fanout这种类型非常简单。正如从名称中猜到的那样,他是将接收到的所有消息刚播到他知道的所有丢列中,忽略routingKey,只要绑定了这个路由的队列,都会接收到消息。

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.bestrookie.rabbitmq.filve;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("消息发送完毕:"+msg);
}

}
}

消费者01:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.bestrookie.rabbitmq.filve;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.nio.charset.StandardCharsets;
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个临时对垒
String queue = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queue,EXCHANGE_NAME,"2");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收消息 log01 :"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queue,true,deliverCallback,cancelCallback->{});

}
}

消费者02:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.bestrookie.rabbitmq.filve;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个临时对垒
String queue = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queue,EXCHANGE_NAME,"1");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收消息 log02 :"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queue,true,deliverCallback,cancelCallback->{});

}
}

虽然绑定的key不一致,但是这两个消费者都会接收到生产者的消息。

Direct(直接)

上面说到,Fanout交换机会忽略绑定的routingkey,而这种交换机,会根据绑定的key来进行发消息,类似于Fanout的升级。解释一下绑定。绑定是交换机和队列指尖的桥梁。交换机支队它绑定的交换机的消息感兴趣。绑定参数:routingKey。

当然,使用Direct类型的交换机,也可以完成Ranout类型交换机完成的工作,只需要绑定的key相同即可。

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.bestrookie.rabbitmq.six;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class DirectLogs {
public static final String EXCHANE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.next();
channel.basicPublish(EXCHANE_NAME,"info",null,msg.getBytes(StandardCharsets.UTF_8));//测试是更改“info” routingKey
System.out.println("消息发送完毕");
}
}
}

消费者01:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.bestrookie.rabbitmq.six;

import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.nio.charset.StandardCharsets;

/**
* @author bestrookie
* @version 1.0
* @date 2022/1/8 21:35
*/
public class ReceivelDirect01 {
public static final String EXCHANE_NAME = "direct_logs";

public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANE_NAME,BuiltinExchangeType.DIRECT);
channel.queueDeclare("console",false,false,false,null);
channel.queueBind("console",EXCHANE_NAME,"info");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收消息 消费者01:"+new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("console",true,deliverCallback, cancelCallback->{});

}

}

消费者02:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.bestrookie.rabbitmq.six;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class ReceivelDirect02 {
public static final String EXCHANE_NAME = "direct_logs";

public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console2",false,false,false,null);
channel.queueBind("console2",EXCHANE_NAME,"warning");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收消息 消费者02:"+new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("console2",true,deliverCallback, cancelCallback->{});
}
}

消费者03:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.bestrookie.rabbitmq.six;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class ReceivelDirect03 {
public static final String EXCHANE_NAME = "direct_logs";

public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk",false,false,false,null);
channel.queueBind("disk",EXCHANE_NAME,"error");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收消息 消费者03:"+new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("disk",true,deliverCallback, cancelCallback->{});
}
}

注意:一个队列可以绑定多个routngKey,玩法多样。

Topic(主题)

这个类型的交换机是对Direct类型的更一步升级。对routingKey的匹配进行了升级,所以对此添加了一些要求,它必须是一个单词列表,以.(符号)隔开

***(星号)**可以代替一个单词

**#(井号)**可以代替零个或者多个单词

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.bestrookie.rabbitmq.seven;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindMap = new HashMap<>();
bindMap.put("quick.orange.rabbit","被队列Q1 Q2接收到");
bindMap.put("lazy.orange.eephant","被Q1 Q2接受");
bindMap.put("wqe.orange.asd","被Q1接受");
bindMap.put("lazy.asd","被Q2接受");
bindMap.put("asd.asd.rabbit","被Q2接受");
for (Map.Entry<String, String> mapEntry : bindMap.entrySet()){
String key = mapEntry.getKey();
String msg = mapEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,key,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息");
}
}
}

消费者01:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.bestrookie.rabbitmq.seven;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
String queueName = "Q1";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName, EXCHANGE_NAME,"*.orange.*");
System.out.println("Q1等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("Q1接收消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName,false,deliverCallback, cancelCallback->{});
}
}

消费者02

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.bestrookie.rabbitmq.seven;
import com.bestrookie.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
String queueName = "Q2";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName, EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("Q2等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("Q2接收消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName,false,deliverCallback, cancelCallback->{});
}
}

这种类型的交换机使得消息的发送更加的多样性,更加的灵活。当然在实际的场景中还是需要按照自己的需求来使用。

评论