1. 什么是生产端的可靠性投递
消费端:之前在做消费端的消息确认的时候有ACK和NACK的机制,来确保消息的消费应答
生产端:provider将消息发送给broker,在这个环节如何保证消息的可靠投递,(邮件到邮局)
如果因为投递不成功,那么消息的消费就更无从谈起
如何做到可靠性投递
- 首先要保障消息的成功发出
- 保障MQ节点(Broker)的成功接收
- 发送端发送消息后也要有一个broker确认应答
- 完善的消息补偿机制
- 如果要保证100%消息投递成功,就必须要有补偿机制
- 补偿就需要知道消息发送是否成功的状态
- 消息发送失败后,需要考虑重发或其他方式
生产端的可靠性投递,解决就是保证消息要到达消息队列中
2. 常用的可靠性投递方案有哪些
- 第一种:对消息状态进行标记
- 将消息的内容和消息的状态计入到DB中,比如发送中,发送成功
- 对于没有发送成功的进行轮询操作,然后重试
- 重新发送需要考虑消息的异常重发次数,如果异常一直存在,那么消息不停的发送就会占用服务带宽和计算资源,这个时候就需要将消息不再补偿了
- 第二种:通过消息的延迟投递,做二次确认,回调检查
3. 可靠性方案
消息落库进行状态标记保证可靠投递
可靠性方案流程设计图
0、一开始要有一个业务DB和MQDB(记录你消息体内容和消息状态的数据库,你自己创建的),前期如果系统规模比较小,就可以合并,MQ记录只是数据库中的一张表
1、第一步将orderInfo进行业务数据的落库,
2、第二步业务落库后就通过Sender发送消息给Broker,发送消息后就将消息体和消息状态记录到MQDB中(发送中,发送完成),并且会接收到Broker返回的消息投递确认状态(需要代码支持)
3、第三步消息发送方接收Broker返回的消息和网络状态的异常
4、第四步,如果消费发送确认失败应该调用业务来将MQDB消息的状态更新为发送失败,如果发送状态成功就将MQDB中的记录更新为成功
5、第五步,使用分布式定时Job来对我们的消息进行判断,只要是消息状态不是我们发送成功的都需要进行重新发送
6、第六步,定时Job可以直接将没有发送成功的消息体从MQDB中取出来给到Sender来进行重新发送,如果我们刚发送了一个消息,还没有接收到消息确认并修改消息状态时(可能网络问题),这个时候刚好轮询拿出这一条消息,这就需要对发送的消息做一个重试的时间间隔的验证(消息记录在落库的时候需要记录发送时间),记录拿出来之后根据发送时间和当前时间计算超过一个间隔时间长度才重新发送。所以发送中、发送失败的消息都要进行重试发送
7、还有一种极端的情况,routingkey被不小心删除了,肯定无法投递成功,这个时候就需要有个重试次数了,需要在消息的记录里有一个字段做计数器,重试一次加1一次,在进行重试的时候需要判断这个重试计数器,如果重试计数器大于我们要求的重试数量,就需要再更新一个字段(重试超次数的字段),如果这个字段状态为1,我们也不再进行重试,每次触发重试后都需要将发送时间更新成最新的时间,确保下次拿到这个数据时和当前时间比对时间间隔不会因为这个而重复发送
8、人工补偿:对于这些重试次数超过还没有发送成功的消息,就需要统计出来进行人工分析和补偿了
9、还需要消费端进行消息的NACK重回队列策略
10、消费端也需要将消息状态进行同步到MQDB中,如果成功就更新消息状态为接收成功,如果超过重试次数则更新状态为消费重试超标,这个消费超标的状态也需要最后统计出来进行人工补偿
11、至此方案一的发送和消费的100%可靠投递方案就完成
这个方案有什么问题吗?
- MQDB如果使用MySQL或其他的数据库进行存储,就导致表的数据量很大,数据库压力,导致时效性差
- 轮询查询的时候由于发送状态他的变化不大,所以数据会查询比较慢,这个时候就需要将发送成功的数据定期归档
消息延迟投递,二次确认保证可靠投递
方案二和方案一的核心区别在于:有消息的上下游了
Upstream是生产端、Downstream是消费端
0、首先还是业务数据先落库
1、业务数据落库完成开始我们的发送,将消息发送给下游消费者
2、当发送业务消息的同时发送一个延时业务消息(这个延时的长度取决于你认为的消息需要执行时间间隔)这个延时业务消息的内容和业务消息是要基本一致的
3、下游消费端接收消息
4、当下游消费端接收消息并完成消费后会发送一个新的消息给Broker
5、外部有一个callback service来接收下游发送的消费确认成功的消息
6、callback service接收到下游发送的消费确认成功的消息后会将消息消费的成功状态记录进消息日志DB中
7、这个时候恰巧延时消息到了callback service触发他的检查服务,根据这个消息的id去消息日志DB中查询是否消费成功了,如果消息日志DB中消息消费成功则完成此次消息发送。有个问题,如果消息根本投递不到Broker的话,消息ID哪里来? 要确保消息投递到broker,生产端要做ACK确认,消费端不用做ACK确认。
8、如果callback service检查监听没有发现消息执行成功的记录则会给消息发送的上游调用RPC来做Retry,监听就是定时机制?怎么确认消息发送成功,根据消息ID到消息日志DB检查吗,那消息ID又是哪里来
9、如果进行Retry,要么retry的时候就把消息体带上,要么从数据库中查询数据然后进行重新发送并发送延时消息
10、如果重试一直不成功,这个时候就可以通过callback service结合消息日志DB来做消息重试的记录,如果超过我们的要求数量就不再调用Retry
11、人工补偿:在消息日志DB中把重试次数超过我们要求的记录导出进行补偿
这个方案直接将发送和消费一并进行了可靠性投递保障
这个方案有什么问题?
- 个人感觉无法保证消息可靠性投递到MQ Broker,如果网络故障,step1,step2发送消息到MQ Broker都失败了,下面的步骤就无法执行了。如果方案可行的话,那就是落地的问题,解决我消息生产端可靠性投递到Broker的疑问,否则我觉得这个方案无法保证可靠性投递。
4. 消费端如何接收多个队列的消息
我们的消息队列就相当于一个数据暂存的容器,有点像一个表,正常使用中会在系统代码中创建交换机和队列吗?
- 一般不会在代码里创建,直接在消息队列控制台或提前创建好
- 我们之前是使用的一个序列化的对象来存储消息体内容的,实际工作中为了降低消息的复杂度,一般使用json格式的字符串进行传输,两端约定好json的格式然后解析就好
- 一般一个exchange绑定一个queue
消息按照字符串来接收,如果一个消费端需要接收多个队列的消息可以创建多个实现类
控制台创建order-exchange-1和order-exchange-2
控制台创建order-queue-1和order-queue-2
bindings
接受方,直接指定监听的queue就好了
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class OrderReceiver {
@RabbitListener(queues = "order-queue-1")
@RabbitHandler
public void onOrderMessage(@Payload String orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("Order Info: "+orderInfo);
Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliverTag,false); // 确认签收
}
@RabbitListener(queues = "order-queue-2")
@RabbitHandler
public void onOrderMessage(@Payload String orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("Order Info: "+orderInfo);
Long deliverTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliverTag,false); // 确认签收
}
}
发送方
@Autowired
RabbitTemplate rabbitTemplate;
public void sendOrder(String orderInfo,String messageId){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(messageId);
rabbitTemplate.convertAndSend("order-exchange-1","order.1.key",orderInfo,correlationData);
}
@Test
void testJson() {
String orderInfo = "消息队列RabbitMQ从入门到精通,order-exchange-1");
String messageId = "MS9999904-1";
try {
System.out.println("***********开始发送************");
orderSender.sendOrder(orderInfo,messageId);
System.out.println("-----------发送完成------------");
}catch (Exception ex){
ex.printStackTrace();
}
}
接受方结果
5. 发送端如何接收交换机返回的回调应答
1、发送方的application.yaml 需要打开确认响应开关
spring:
rabbitmq:
host: 39.98.151.138
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
2、发送消息的ConfirmCallback和ReturnCallback使用
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendOrder(String orderInfo,String message_id) throws Exception{
/**
* exchange: 交换机名字
* routingkey: 队列关联的key
* object: 要传输的消息对象
* correlationData: 消息的唯一id
*/
CorrelationData correlationData = new CorrelationData();
correlationData.setId(message_id);
// 设置callback内容
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("order-exchange-2","order.3.key",orderInfo,correlationData);
}
// exchange返回调用
// 只有exchange交换机是正确存在的,消息都能发送到rabbitmq mq,routing key 错误了,消息就寻找不到正确的queue,消息就丢失了,但是对发送端来说消息是成功发送到MQ的,这不是异常,消息是先到达exchange层,再路由到queue,延时投递也是基于这个机制
// 总结:confirmCallback 只判断到exchange层
final ConfirmCallback confirmCallback = new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("==========消息publish成功的ID: " + correlationData.getId());
System.out.println("==========消息是否发布成功: " + b);
System.out.println("==========失败的异常信息: " + s);
}
};
// 判断层到达routingkey,只有在exchange正确,routingkey不正确的情况下才会触发改方法
final ReturnCallback returnCallback = new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replayCode, String replayText, String exchange, String routingkey) {
System.out.println("******replayCode: "+replayCode);
System.out.println("******replayText: "+replayText);
System.out.println("******exchange: "+exchange);
System.out.println("******routingkey: "+routingkey);
}
};
}
// 注意,ConfirmCallback和ReturnCallback的包都是org.springframework.amqp.rabbit.core.RabbitTemplate
测试
exchange不存在的:
routtingkey不存在的:
总结
结合前面的可靠性投递方案一,
- 在confirmcallback方法里如果返回true,修改消息状态为发送成功,如果false,修改消息状态为发送失败。
- 在returncallback方法里修改消息状态为发送失败。
6. 消费端幂等性问题解决
无论消息端接收到多少一样的消息,都只产生一个消费结果
为了确保发送端的投递成功,我们都会进行消息的补偿,有可能消息补偿的过程中会多发送几次消息导致重复
这个时候就需要提前考虑消费端的幂等问题
消费端的幂等性保障方案
- 唯一ID+业务码在数据库中做主键来做去重
- 为执行的内容做前置条件,类似MySQL的version乐观锁
- 利用Redis的原子性去实现
唯一ID+业务码做主键来做去重
- 利用的数据库里的主键约束
- 会对数据库产生异常压力
- 如果业务量并发较大会导致数据库夯住
- 优点:实现简单方便
- 缺点:高并发下数据库就会成为瓶颈
- 缺陷:只能用来做insert的幂等
- 解决方案:将ID进行分库分表通过hash路由的算法将压力分摊到多个数据库中
为执行的内容做前置条件
- 给数据更新前增加一个前置的条件
- 你的操作不是原子隔离的,当你的查询前置条件和更新的过程中有可能其他线程已经更新完成了
- 需要将拿到的前置条件做为更新的条件之一来做操作,如果在你之前已经更新了这个前置条件,那么你的更新就会不成功
利用Redis的原子性去实现
- 这里就涉及分布式锁的概念了
- 第一个进入并拿到锁的线程,在锁内部先判断是否已消费,如果没有消费则操作并记录这个消息ID已消费
- 并发过程中的线程如果拿不到锁,就直接中止消费,如果有延时消息过来,这个线程能拿到锁,但拿到后要去查询这个Measge_ID是否消费过,如果已经消费了,就中止消费
7. 消费端的消息可靠性如何保障
消费端面向的是消息队列中已经有的消息
- 如果消费端监听消费成功,并手动发送ACK响应,那么这个消息就肯定完成了消费
- 如果出现异常可以通过NACK将消息重回队尾变成Ready状态然后再次消费
- 可以通过NACK来进行Retry和重试次数验证,并且在验证的过程中如果超过次数就将这个消息发送到一个人工补偿消息队列或持久化对象中等待人工补偿
代码实践
当消息在客户端消费失败时,我们会将异常的消息加入到一个消息重试对象中,同时设置最大重试次数,并将消息重新推送到 MQ 消息中间件里,当重试次数超过最大值时,会将异常的消息存储到 MongoDB
数据库中,方便后续查询异常的信息。
1、创建一个消息实体类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 原始消息body
*/
private String bodyMsg;
/**
* 消息来源ID
*/
private String sourceId;
/**
* 消息来源描述
*/
private String sourceDesc;
/**
* 交换器
*/
private String exchangeName;
/**
* 路由键
*/
private String routingKey;
/**
* 队列
*/
private String queueName;
/**
* 状态,1:初始化,2:成功,3:失败
*/
private Integer status = 1;
/**
* 最大重试次数
*/
private Integer maxTryCount = 3;
/**
* 当前重试次数
*/
private Integer currentRetryCount = 0;
/**
* 重试时间间隔(毫秒)
*/
private Long retryIntervalTime = 0L;
/**
* 任务失败信息
*/
private String errorMsg;
/**
* 创建时间
*/
private Date createTime;
/**
* 检查重试次数是否超过最大值
*
* @return
*/
public boolean checkRetryCount() {
retryCountCalculate();
//检查重试次数是否超过最大值
if (this.currentRetryCount < this.maxTryCount) {
return true;
}
return false;
}
/**
* 重新计算重试次数
*/
private void retryCountCalculate() {
this.currentRetryCount = this.currentRetryCount + 1;
}
}
2、服务抽象类
public abstract class CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MongoTemplate mongoTemplate;
/**
* 初始化消息
*
* @param message
*/
public void initMessage(Message message) {
log.info("{} 收到消息: {},业务数据:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
try {
//封装消息
MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
if (log.isInfoEnabled()) {
log.info("反序列化消息:{}", messageRetryDto.toString());
}
prepareAction(messageRetryDto);
} catch (Exception e) {
log.warn("处理消息异常,错误信息:", e);
}
}
/**
* 准备执行
*
* @param retryDto
*/
protected void prepareAction(MessageRetryDTO retryDto) {
try {
execute(retryDto);
doSuccessCallBack(retryDto);
} catch (Exception e) {
log.error("当前任务执行异常,业务数据:" + retryDto.toString(), e);
//执行失败,计算是否还需要继续重试
if (retryDto.checkRetryCount()) {
if (log.isInfoEnabled()) {
log.info("重试消息:{}", retryDto.toString());
}
retrySend(retryDto);
} else {
if (log.isWarnEnabled()) {
log.warn("当前任务重试次数已经到达最大次数,业务数据:" + retryDto.toString(), e);
}
doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
}
}
}
/**
* 任务执行成功,回调服务(根据需要进行重写)
*
* @param messageRetryDto
*/
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
try {
successCallback(messageRetryDto);
} catch (Exception e) {
log.warn("执行成功回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* 任务执行失败,回调服务(根据需要进行重写)
*
* @param messageRetryDto
*/
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
try {
saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
failCallback(messageRetryDto);
} catch (Exception e) {
log.warn("执行失败回调异常,队列描述:{},错误原因:{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* 执行任务
*
* @param messageRetryDto
*/
protected abstract void execute(MessageRetryDTO messageRetryDto);
/**
* 成功回调
*
* @param messageRetryDto
*/
protected abstract void successCallback(MessageRetryDTO messageRetryDto);
/**
* 失败回调
*
* @param messageRetryDto
*/
protected abstract void failCallback(MessageRetryDTO messageRetryDto);
/**
* 构建消息补偿实体
* @param message
* @return
*/
private MessageRetryDTO buildMessageRetryInfo(Message message){
//如果头部包含补偿消息实体,直接返回
Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
if(messageHeaders.containsKey("message_retry_info")){
Object retryMsg = messageHeaders.get("message_retry_info");
if(Objects.nonNull(retryMsg)){
return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
}
}
//自动将业务消息加入补偿实体
MessageRetryDTO messageRetryDto = new MessageRetryDTO();
messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
messageRetryDto.setCreateTime(new Date());
return messageRetryDto;
}
/**
* 异常消息重新入库
* @param retryDto
*/
private void retrySend(MessageRetryDTO retryDto){
//将补偿消息实体放入头部,原始消息内容保持不变
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}
/**
* 将异常消息存储到mongodb中
* @param retryDto
*/
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
try {
mongoTemplate.save(retryDto, "message_retry_info");
} catch (Exception e){
log.error("将异常消息存储到mongodb失败,消息数据:" + retryDto.toString(), e);
}
}
}
3、监听服务类
@Component
public class OrderServiceListener extends CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
/**
* 监听订单系统下单成功消息
* @param message
*/
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
log.info("收到订单下单成功消息: {}", message.toString());
super.initMessage(message);
}
@Override
protected void execute(MessageRetryDTO messageRetryDto) {
//调用扣减库存服务,将业务异常抛出来
}
@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
//业务处理成功,回调
}
@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
//业务处理失败,回调
}
}
当消息消费失败,并超过最大次数时,会将消息存储到 mongodb 中,然后像常规数据库操作一样,可以通过 web 接口查询异常消息,并针对具体场景进行重试!