场景说明
自建云服务器搭建 RocketMQ,因为是自建的服务器,所以走不了公网 IP,如果用 docker 部署的话涉及到端口的监听问题,所以现在在服务器上直接部署的方案。
RocketMQ安装及配置
相关命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| sudo yum update -y
sudo yum install java-1.8.0-openjdk -y
vim ~/.bashrc export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.412.b08-1.el7_9.x86_64 export PATH=$PATH:$JAVA_HOME/bin source ~/.bashrc
java -version
wget https://archive.apache.org/dist/rocketmq/5.3.1/rocketmq-all-5.3.1-bin-release.zip
mv rocketmq-all-5.3.1-bin-release.zip /opt/rocketmq unzip rocketmq-all-5.3.1-bin-release.zip
|
RocketMQ相关配置
配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| cd /opt/rocketmq/rocketmq-all-5.3.1-bin-release/ vim conf/broker.conf brokerIP1 = (公网ip) namesrvAddr = 127.0.0.1:9876 autoCreateTopicEnable = true
vim bin/runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m" JAVA_OPT="${JAVA_OPT} -Xmn256m"
vim bin/runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
|
启动
相关命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
nohup sh bin/mqnamesrv &
公网ip, nameserver上记录的broker的ip地址仍然为私网ip,而非公网ip nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
sh bin/mqshutdown broker sh bin/mqshutdown namesrv
|
RocketMQ-Dashboard安装
相关命令操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| wget https://dist.apache.org/repos/dist/release/rocketmq/rocketmq-dashboard/2.0.0/rocketmq-dashboard-2.0.0-source-release.zip
rm rocketmq-dashboard-2.0.0-source-release.zip /opt/rocketmq-dashboard unzip rocketmq-dashboard-2.0.0-source-release.zip
vim ~/.bashrc export M2_HOME=/opt/maven/apache-maven-3.9.2 export PATH=$PATH:$JAVA_HOME/bin:$M2_HOME/bin source ~/.bashrc
mvn clean package -Dmaven.test.skip=true
java -jar /opt/rocketmq-dashboard/rocketmq-dashboard-2.0.0.jar --server.port=9090
nohup java -jar /opt/rocketmq-dashboard/rocketmq-dashboard-2.0.0.jar --server.port=9090 &
|
访问本地9090端口便可以查看dashboard界面
简单样例
创建一个简单的使用demo
首先创建一个maven工程,导入RocketMQ的依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.3</version> </dependency>
|
创建一个生产者
代码样例
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class SyncProducer { public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("SyncProducer"); producer.setNamesrvAddr("IP:9876"); producer.start(); for (int i = 0; i < 2; i++) { Message message = new Message("best","Tags", (i+"_SyncProducer").getBytes(StandardCharsets.UTF_8)); SendResult send = producer.send(message); System.out.println("消息发送成功: " + i + send.toString() ); } producer.shutdown(); } }
|
创建一个消费者
代码样例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class SimpleConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleConsumer"); consumer.setNamesrvAddr("IP:9876"); consumer.subscribe("best","*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (int i = 0; i < list.size(); i++) { System.out.println(i+"_消息消费成功" +" " + new String(list.get(i).getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("consumer run"); } }
|
消息发送成功

消息接收成功

dashboard 也会展示相应的数据
