1、MQ学习方式
中间件是什么
从架构上来讲,它就是在应用之间加了一层,可以独立部署,提供服务的应用,比如zookeeper、eureka、m q都是中间件
消息中间件
MQ = Message Queue = 消息中间件,处理消息的一层中间应用,也是消息的暂存地方,消息一定是有一个发送方,一个接受方的
主流的MQ产品
- Kafka
- RabbitMQ
- RocketMQ
- ActiveMQ
如何学习MQ?学习顺序,研究内容
1、消息如何发送,消息如何接受
2、MQ 如何实现高可用
3、MQ集群容错
4、MQ一定要持久化
5、延时投递、定时投递
6、签收机制
7、整合到Spring和SpringBoot中
8、JMS规范
9、高级-死信队列、消息重发、异步投递。。。。
2、MQ解决了什么问题
1、原来必须等待【异步】,使用mq 异步缓存
2、抵御洪流,保护主业务【消峰】
3、中间件,加一层【解耦】
生活场景举例:
一个面试官要面试100个人,如果让100个人一直排队等待那是非常浪费时间的,那100个人除了等待就什么事情都不干,相当于我必须面试完才能干别的事情,即使前面有99个人在面试,那显然不合理而且浪费时间。所以正常情况应该是,把面试的个人信息(消息)投递给面试的公司,然后面试官通知你才去面试,这样在等待面试的过程中,我可以继续做别的事情,不浪费时间(资源),这就是对应MQ生活中使用的例子。
系统场景举例:
1、系统接口耦合严重
2、大流量并发
一个前端下单的操作,后台其实由多个操作组成:读取订单、库存是否ok,库存冻结,余额检查,余额冻结,订单交易信息生成、余额-xx,库存-xx,流水号,余额解冻、库存解冻…. 等,这种情况下,高并发的下单操作,就容易将服务冲垮。
3、同步等待
我们上面说的3种情况,要是有一种技术能够解决。我们就一定会使用它? 那就是MQ
1、系统的解耦,我们要接入新的模块,可以将代码的改动做到最小。【能够解耦】
2、设置一个流量的缓冲池,保证我们的系统不被冲垮。【能够消峰】
3、优化系统调用的强弱关系。我们可以按照实际业务来进行操作。减少耗时【能够异步】
3、ActiveMQ
简介
官方地址:http://activemq.apache.org/
其它主流MQ
-
RabbitMQ https://www.rabbitmq.com/
-
Kafka kafka.apache.org
-
RocketMQ http://rocketmq.apache.org/
ActiveMQ 是最流行开源的,支持多协议的,Java基础的消息服务。它支持行业的标准协议,因此用户选择多种语言和平台的client客户端来使用它。你可以使用C,C++,python,.net还有更多的语言都可以连接ActiveMQ。它使用无处不在的AMQP协议集成到你的多平台应用中。websocket上面使用STOMP协议可以在你的Web应用间交互消息。使用MQTT协议在你的IOT设备上交互消息。它支持在你已存在的JMS规范的设施上使用。ActiveMQ提供强力的和弹性的性能支持任何的消息使用场景。
下载最新的5.x版本,下一代产品是Artemis
安装
Windows安装ActiveMQ
只需要一步,解压安装即可,
点击 examples,有各种协议例子代码
点击 webapps,有一个admin管理activemq的控制台应用
点击bin,根据自己系统是32位还是64位,进入win32/win64,直接点击activemq.bat文件启动
访问:http://127.0.0.1:8161/admin ,默认的用户名和密码都是 admin。点击根目录下的conf/jetty-realm.properties 文件,里面定义了访问控制台的用户,默认有两个admin/admin,user/user,所以可以在这里修改登录用户密码
Linux 安装ActiveMQ,推荐使用Docker,需要先安装docker,也可以下载mq解压安装
方式一:解压安装
# 1、解压
[root@helloworld opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
# 2、重命名
[root@helloworld opt]# mv apache-activemq-5.15.9 activemq
# 显示linux系统内核信息,32或64位
[root@helloworld bin]# cat /proc/version
[root@helloworld conf]# pwd
/opt/activemq/conf
# activemq控制台的登录用户
[root@helloworld conf]# vim jetty-realm.properties
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: user, user
# 3、启动
[root@helloworld linux-x86-64]# ./activemq start
[root@helloworld linux-x86-64]# ./activemq status
# 4、停止,停不了,就使用kill命令
[root@helloworld linux-x86-64]# ./activemq stop
# 查看帮助
[root@helloworld bin]# ./activemq --help
4、远程访问测试(要开启linux的防火墙,阿里云的安全组放行端口,activemq默认是http访问端口8161和tcp程序通讯端口61616)
方式二:Docker安装
[root@helloworld opt]# docker -v
Docker version 19.03.5, build 633a0ea
# 1、搜索activemq
[root@helloworld opt]# docker search activemq
# 搜索到后,拉取下来
[root@helloworld opt]# docker pull webcenter/activemq
# 2、下载成功后,查看镜像
[root@helloworld opt]# docker images
# 3、启动容器,宿主端口容器端口绑定
[root@helloworld opt]# dokcer run -d --name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq
# 4、启动成功,查看运行的容器
[root@helloworld opt]# docker ps -a
# 5、activemq的启动停止卸载删除,看docer篇笔记
5、进入activemq的容器镜像
docker exec -it 800ee68400ab /bin/bash
6、成功进入容器里activemq的根目录,发现与windows下安装的activemq根目录是一致的,可以在这里修改配置
使用文档的地址:http://activemq.apache.org/features
小结
1、什么是MQ
MQ是消息中间件,独立部署提供消息处理服务的中间层应用,消息可以持久化存储,必须有一个消息发送方和一个消息接受方。
2、MQ是干什么
从架构上来讲,MQ就是应用平台间加了一层,它解决了下面4个方面的问题:
-
异步缓冲
web应用间的消息,减少等待耗时,提高应用性能和用户体验。
- 在应用发生高并发访问的时候,可以将高峰时期的业务信息缓存到一个地方,放到后面非高峰时期逐一处理,保证系统不被冲垮,这就是削峰填谷
- 在应用系统间加了一层MQ,让它们由原来的强依赖关系降为弱依赖关系,降低系统间的耦合性,同时也能保证应用间消息数据的最终一致性,这就是服务解耦,举例 订单系统- MQ - 库存系统
- 点对点的消息通讯 (queue 队列)和发布订阅的方式多方消息通讯(topic 主题)
4、JMS规范
简介
Java 集成MQ如何发送消息,如何接受消息,它必须遵循JMS规范
它类似于JDBC(用来连接许多不同的关系数据库的API),JMS则是用来访问收发系统消息的API。回顾一下JDBC的流程:
1、注册驱动
2、连接
3、创建SQL执行
4、运行
5、处理结果
6、释放连接资源
而JMS的流程也比较相似:
图中的destination目的地,在ActiveMQ来说,那就是队列Queue和主题Topic
什么是JavaEE
JavaEE 是一个Java企业级开发使用。JavaEE提供了核心的了13个规范。
1、JDBC 关系数据库 连接
2、JNDI java命令和目录接口
3、EJB 企业级的JavaBean
4、RMI 远程方法调用
5、Java IDL, 接口定于语言,代码结构体系
6、JSP
7、Servlet
8、XML
**9、JMS java消息服务**
10、JTA java的事务API
11、JTS java的事务服务器
12、JavaMail Java邮件发送协议
13、JAF javabean actiion 框架
参考:https://blog.csdn.net/u012410733/article/details/72567195
JMS的组成元素
1、JMS Provider MQ服务器(服务本身)
2、JMS Prodiucer 生成者
3、JMS Consumer 消费者
4、JMS Message 消息 (消息头,消息属性,消息体)
Msg
消息头
生产者发送消息
/* @param destination 目的地:queue 或者 topic
* @param message
* @param deliveryMode 是否持久化模式
* @param priority the priority for this message 优先级 0-4普通消息,5-9紧急消息,默认4级
* @param timeToLive the message's lifetime (in milliseconds) 生存时间,过期时间
*/
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, null);
}
消息体
// 发送具体的消息到队列, 创建的消息类型是什么就些什么
简单文本(TextMessage) String (重要)
可序列化的对象 (ObjectMessage) 对象,序列化的Java对象。
属性集合 (MapMessage) Map, key => String, Vlaue=> Java基本类型 (重点)
字节流 (BytesMessage) 二进制数组 byte[]
原始值流 (StreamMessage) Java标准流 Streem
还有无有效负载的消息 (Message)。 接口
注意点:接收消息和发送消息,格式一定要一致!
消息的属性
可以在消息中添加一些自己的属性信息,用来识别消息或者加密解密都可以,可以用来传递参数。
5、ActiveMQ基本使用
原生依赖
创建一个mave项目,导入依赖,我自己使用的5.15.9版本
<dependencies>
<!--尽量和activemq版本一致-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- spring 和 其他-->
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
队列Queue
核心:
-
点对点发送接收消息
- 接收方不用去管发送方是否在运行状态,只需要去队列中去取即可
- 在队列中,这个信息一但被消费,则消息不见了,不再存储。
编写消息生产者
package com.coding.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 3、获得连接
Connection connection = factory.createConnection();
connection.start();
// 4、创建会话
// 第一个参数:是否开启事务,不开启,默认自动提交,开启需要手动提交事务
// 参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、模式,目的地(队列,主题)
// session.createTopic();
Queue queue = session.createQueue("queue01");
// 6、发送消息(生成者)
MessageProducer producer = session.createProducer(queue);
// 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
// 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("msg=>" + i);
// 8、发送消息
producer.send(message);
}
// 9、用完记得关闭资源
producer.close();
session.close();
connection.close();
System.out.println("发送完毕");
}
}
发送的消息对象支持
// 发送具体的消息到队列, 创建的消息类型是什么就些什么
简单文本(TextMessage) String (重要)
可序列化的对象 (ObjectMessage) 对象,序列化的Java对象。
属性集合 (MapMessage) Map, key => String, Vlaue=> Java基本类型 (重点)
字节流 (BytesMessage) 二进制数组 byte[]
原始值流 (StreamMessage) Java标准流 Streem
还有无有效负载的消息 (Message)。 接口
启动生产者,发现消息会发送到队列了:
编写消息消费者(接收者)
package com.coding.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 3、获得连接
Connection connection = factory.createConnection();
connection.start();
// 4、创建会话
// 参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、模式,目的地(队列,主题)
// session.createTopic();
Queue queue = session.createQueue("queue01");
// 6、消息接受者(角色变化)
MessageConsumer consumer = session.createConsumer(queue);
// 7、获取消息
while (true){
// tcp 等待接收消息的过程。 receive
TextMessage receive = (TextMessage) consumer.receive();
// receive 阻塞等待
// receive(long timeout) 超过多少ms,没有消息可以消费,就消费结束,然后不等待了
// TextMessage receive = (TextMessage) consumer.receive(1000);
if (receive!=null){
System.out.println("接收到消息:"+ receive.getText());
}else {
break;
}
}
// 9、用完记得关闭资源
consumer.close();
session.close();
connection.close();
System.out.println("接收完毕");
}
}
启动消费者,发现队列中的消息被消费了
还有消费者并没有接收完消息后就退出,使用consumer.receive()它会一直阻塞主线程main,等待接收消息,再启动一次生产者发送消息,消费者又很快接受到消息了:
测试发现:
-
先生成消息,消费者是否可以消费。Y
-
先生产消息,一号消费者消费完毕后,2号消费者是否可以消费。N,不能重复消费
-
先启动2个消费者(一直阻塞监听),然后生成消息,2个消费者轮询消费。
Queue队列,默认支持持久化
小结:
JMS规范基本的开发步骤相同的?对比上面的JMS规范流程图, 是的
1、创建 connection factory
2、通过 factory 创建 connection 连接
3、connection 创建 会话 session
4、session 创建目的地
5、创建发送者或者接收者接收消息 (目的地:queue,topic)
主题Topic
核心:
发布订阅,一对多发送接收消息
编写消息生产者(发送方)
package com.coding.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTopic {
// 1、消息发送到哪里
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 3、获得连接
Connection connection = factory.createConnection();
connection.start();
// 4、创建会话
// 参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建主题 Topic
Topic topic = session.createTopic("topic01");
// 6、发送消息(生成者)
MessageProducer producer = session.createProducer(topic);
// 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
// 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
for (int i = 0; i < 6; i++) {
TextMessage message = session.createTextMessage("msg=>" + i);
// 8、发送消息
producer.send(message);
}
// 9、用完记得关闭资源
producer.close();
session.close();
connection.close();
System.out.println("发送完毕");
}
}
编写消息消费者(接收方)
package com.coding.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTopic {
// 1、消息发送到哪里
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException, IOException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 3、获得连接
Connection connection = factory.createConnection();
connection.start();
// 4、创建会话
// 参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建主题 Topic
Topic topic = session.createTopic("topic01");
// 6、消息接受者(角色变化)
MessageConsumer consumer = session.createConsumer(topic);
// 7、获取消息,监听
consumer.setMessageListener((message) -> {
if (message!=null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到主题消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 卡住等待
System.in.read();
// 9、用完记得关闭资源
consumer.close();
session.close();
connection.close();
System.out.println("接收完毕");
}
}
测试
-
先启动生产者,发送消息,后启动消费者,消息没有被消费掉。
消息已发送到ActiveMQ,但消息并没有消费掉。
一定要先启动消费者订阅Topic,生产者再发送消息// 消费者,卡住等待,生产者发送消息 System.in.read();
-
先启动两个消费者,订阅topic,再启动生产者,发送消息,两个消费者都能收到全部消息
消息的持久化
队列Queue是默认持久化的
// 6、发送消息(生成者), 默认是持久化的,重启之后,数据还在。
MessageProducer producer = session.createProducer(topic);
// 练习测试:先发送消息,然后关掉MQ服务。(对比测试)
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
主题持久化
-
消息生产者
public class JmsProducerTopicPersist { public static final String ACTIVEMQ_URL = "tcp://109.109.13.109:61616"; public static void main(String[] args) throws JMSException { // 1、创建一个连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2、创建连接,先不开启连接 Connection connection = factory.createConnection(); // 3、创建会话 // 第一个参数:是否开启事务,不开启,默认自动提交,开启需要手动提交事务 // 第二个参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建目的地 Topic topic = session.createTopic("topic01"); // 5、创建消息生产者 MessageProducer producer = session.createProducer(topic); // 持久化策略开启后再连接(注意即可,消息发送就持久化) producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); // 6、发送具体的消息到队列, 创建的消息类型是什么就些什么 for (int i = 24; i < 30; i++) { TextMessage textMessage = session.createTextMessage("msg=>" + i); // 8、发送消息 producer.send(textMessage); } // 9、用完记得关闭资源 producer.close(); session.close(); connection.close(); System.out.println("发送完毕"); } }
-
消息接收者
public class JmsConsumerTopicPersist { public static final String ACTIVEMQ_URL = "tcp://109.109.13.109:61616"; public static void main(String[] args) throws JMSException, IOException { // 1、创建一个连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2、创建连接 Connection connection = factory.createConnection(); // 设置连接ID,我是谁, connection.setClientID("icoding"); // 3、创建会话 // 第一个参数:是否开启事务,不开启,默认自动提交,开启需要手动提交事务 // 第二个参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4、创建目的地 Topic topic = session.createTopic("topic01-persist"); // 5、创建订阅人 TopicSubscriber subscriber = session.createDurableSubscriber(topic, "学员-jude"); // 开启连接 connection.start(); // 6、获取消息,可以写基本的聊天程序了 // 接收,会一直阻塞接收 Message message = subscriber.receive(); while (message != null){ TextMessage textMessage = (TextMessage) message; System.out.println("收到持久化消息" + textMessage.getText()); // 自动取消订阅,默认是永久的 message = subscriber.receive(5000); } // 9、用完记得关闭资源 subscriber.close(); session.close(); connection.close(); System.out.println("接收完毕"); } }
启动消费者
使用subscriber.receive(),它会一直阻塞等待消息
启动生产者,发送消息,消费者接收到消息,5秒钟后自动下线
2、先启动生产者发送消息,已订阅的下线消费者有6个消息等待消费
启动消费者上线,消息马上被消费
多个消费者
小结:
-
主题持久化可以完成完整的发布订阅模型,而非消息及时发送,一对多的模型。
- 一定要先订阅这个主题,然后才能接收到消息,(客户端:客户端id,用户名 服务端:消息是持久化)
- 主题删除后重新创建,之前订阅的消费者也要重新订阅才能接收到消息。
服务端(生产者)
// 持久化策略开启后再连接(注意即可,消息发送就持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
客户端(消费者)
// 设置连接ID,我是谁,
connection.setClientID("icoding");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic-coding-persist");
// 5、创建订阅人
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "coding老师");
// 开启连接
connection.start();
事务模型
事务:要么同时成功,要么同时失败
继续使用上面主题持久化的代码,队列也是一样开启事务的
// true: 先执行send方法,然后执行 commit 才是真正的放入队列。可以积压多个消息,然后一次发送
// false: send,直接进入主题,如果你关闭了事务,签收一定要设置为自动签收。
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
上面代码中的消息生产者,开启事务,执行send发送消息,但是不执行commit,消息是没有发送到ActiveMQ的
提交事务
session.commit();
再启动生产者,消息成功发送到ActivMQ的topic
消费者开启事务,不提交事务,启动测试
它消费了消息,但没有提交,访问Activemq的控制台,依然显示它有6条消息等待消费
再启动消费者,它有消费了同样的消息,重复消费1次
发现我重复很多次,都是可以的。队列queue重复消费6次后,就会放入死信队列。主题topic不会吗
消费者提交事务消费后,就正常了
签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
创建会话的时候,第一个参数是关于事务的,第二个参数是关于消息签收确认的,点击Session的源码
public interface Session extends Runnable {
int AUTO_ACKNOWLEDGE = 1; // 自动签收
int CLIENT_ACKNOWLEDGE = 2; // 客户端确认签收
int DUPS_OK_ACKNOWLEDGE = 3; // 自动批量进行签收
int SESSION_TRANSACTED = 0; // 事务提交就签收
}
注意
-
如果true开启了事务,一定要进行事务提交,这个时候才会签收,否则事务回滚,消息会被MQ再次传送
-
false关闭事务,签收的参数值是CLIENT_ACKNOWLEDGE = 2,那则需要手动签收。
message.acknowledge(); // 消息确认签收
死信队列
上面的例子中消费者开启了事务,获取消息,但是没有提交事务,对ActiveMq来说,消息是没有消费成功的,所以消息还在队列中,导致可以重复获取该消息,但事务没有提交,消费其实的失败的,当消费失败达到6次,消失就会从原队列移除放到死信对列中。
如何消费死信队列中的消息
小结
1、ActiveMQ的消息通讯只有两种模型:
-
点对点模型
Queue:生成者 + 消费者 + 消息存在队列
通过队列实现异步传输完全解耦,两步:1、生成者=> 队列 2、消费者<= 队列
不需要关心生产者和消费者的启动顺序 -
发布订阅模型
Topic:消息发布者 + 消息订阅者,可以存在多个订阅者订阅同一个消息 + 消息存在Topic
非持久化模式:消费者一定先启动,然后才可以接收到消息,否则接收不到。
持久化模式:消费者需要先订阅才可以接收到消息,无论什么时候上线,只有有消息就可以看到。
2、事务是否开启,假设开启就一定要处理事务。如果事务处理失败,消息就没有签收。