基于上篇博客的工程
1. 消息接收的应答模式ACK和NACK
-
ACK就是手动签收的标示,如果消息的签收模式设置成为了手工模式,在MQ没有接收到ACK信息时都是Unacked的状态,并且消息还在队列中,这个时候消息不会重试,不会再主动发给消费者,除非消费者重启(重新监听),mq才会主动再次发送
如果在进行业务操作的时候,我们系统业务流程中出现了未知业务异常,比如里面某个服务环节出现网络超时的情况,这个时候如果签收,是不是业务根本没有达成,消息还消费掉了,这就无法补偿了,如果我没签收,这个时候消息就停留在这个队列里了,这个时候如果想要重试再次接收消息难道要重启服务吗?有可能是网络原因导致的
建议配置手动签收,代码如下:
// 消息消费完后,签收 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "order-exchange",type = "topic"), key = "order.update" )) @RabbitHandler public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws IOException { System.out.println("*********消息接收开始*********"); System.out.println("Order name:" + orderInfo.getOrderName()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); // ACK进行签收,第一个参数是标识,第二个参数是否批量接收,false否 channel.basicAck(deliveryTag,false); }
-
NACK:将消息重回队列,如果我们发现异常,就可以调用NACK来将消息重回队列,他会重回到队尾(不影响消息的执行顺序)
- 比如说执行的过程中发现异常,我们可以在catch里进行重回队列让消息再次执行
- 一般在业务中,我们重回队列执行的过程中会设置一个最大重回次数(重回计数可以使用redis)避免服务器内存溢出,如果超过这个次数就执行ACK并进行记录,记录这个消息没有执行成功,放到DB ,代码如下
import com.icoding.basic.po.OrderInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class OrderReceiver {
int flag = 0;
// 监听队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.*"
)
)
@RabbitHandler
public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("Order Name: "+orderInfo.getOrder_name());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
try{
// ACK进行签收,第一个参数是标识,第二个参数是否批量接收,false否
channel.basicAck(deliveryTag,false);
flag = 0;
}catch (Exception e){
if(flag>2){
//说明执行了3次都没有成功
//消息确认
channel.basicAck(deliveryTag,false);
//记录这个消息的日志或数据到DB/Redis/file
}else {
//可以设置时延迟几秒,
flag = flag+1;
//前两个参数和上面ACK一样,第三个参数是否重回队列,模拟发送异常,让消息重回队列尾部
channel.basicNack(deliveryTag, false, true);
}
}
}
}
上面模拟消息接收消费异常,让它回到队列尾部,重复消费3次以后,消息还没确认签收,就确认签收并放入数据库,避免消息堆积。
2. Exchange交换机Type详解
-
direct : 点对点直连的概念,比如我在binding里设置了一个routingkey是order.update,这个时候我们发送消息的routingkey就必须是order.update,如果不是order.update,这个消息就接收不到(一对一)
// 发送方 public void sendOrder(OrderInfo orderInfo){ /** 可以放在application.xml配置,通过@Value加载 * exchange: 交换机名字,就是个你自己定义的字符串,这里是order-exchange * routingkey: 队列关联的key,是个你自己定义的字符串,这里是order.updates, * object: 要传输的消息对象 * correlationData: 消息的唯一id */ CorrelationData correlationData = new CorrelationData(); correlationData.setId(orderInfo.getMessageId()); rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,correlationData); } // 接收方 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "order-exchange",type = "direct"), key = "order.update" )) @RabbitHandler public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws IOException { .... }
- topic : 点对点直连的概念,但是他支持routingkey的模糊匹配,可以在我们的routingke写匹配符(一对多)
- *:代表一个单词 ,如routingkey: order.insert
- #:代表没有或多个单词,如routingkey: order.insert.app
这个匹配符用在我们binding里,既可以写在routingkey的前面也可以写在routingkey的后面(order.*或者#.insert)
// 接收方 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "order-exchange",type = "topic"), key = "order.*" ))
-
fanout:只要exchange进行binding了消息队列,就直接将消息传给消息队列了,因为不绑定任何的routingkey所以是转发消息最快的(广播方式)
发送方,routingkey为空
// 接收方,可以同一个exchange绑定多个queue实现广播,fanout类型不用routingkey,前面两种类型需要绑定routingkey @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "order-exchange",type = "fanout"), key = "" ))
启动接收方,登录rabbitmq控制台
- header:根据消息header头来判断,较少使用
3. 消息队列的TTL设置和使用
什么是TTL:Time To Live,也就是生存时间
队列设置TTL(整体)
- RabbitMQ是支持消息过期机制的,如果你设置了消息的TTL,这个消息没有及时消费,这个消息就丢了,或者说消失了
- 队列整体的消息过期时间,就是一个Time box,给这个队列设置一个过期时间,那么这个队列里的消息从进入队列开始计算,达到了这个时间如果还没有消费就直接丢掉了
- x-message-ttl : 消息队列的整体消息的TTL,单位是毫秒,把消息删除
- x-expires :消息队列空闲的时间,如果空闲超过这个时间就会自动删除,单位毫秒,把队列删除
- x-max-length :消息队列存放消息的总消息数(条数),如果超过会挤掉最早的那个数据
- x-max-length-bytes :消息队列的最大容量,新消息过来如果容量不够会删除最早的消息,如果还不够,再删一条次最早的消息
登录rabbitmq控制台,创建新的消息队列时(队列不允许创建后修改),增加参数
消息本身设置TTL
如果我只想某个消息按时间过期,那么就不能使用消息队列的TTL,就要对消息本身进行TTL
- 在发送端进行消息过期设置
import com.icoding.basic.po.OrderInfo;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
RabbitTemplate rabbitTemplate;
public void sendOrder(OrderInfo orderInfo) throws Exception{
/**
* exchange: 交换机名字
* routingkey: 队列关联的key
* object: 要传输的消息对象
* correlationData: 消息的唯一id
*/
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getMessage_id());
//在这里设置message的TTL
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
};
//将messagePostProcessor加入到参数中
rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,messagePostProcessor,correlationData);
// 函数式接口,使用lambda表达式
rabbitTemplate.convertAndSend("delay-exchange","order.update",orderInfo,message -> {
message.getMessageProperties().setExpiration("5000");
return message;
},correlationData);
}
}
4. 死信队列详解及进入死信队列的机制
首先要看一下什么是死信:当一个消息无法被消费时就变成了死信
死信是怎么形成的:消息在没有被消费前就失效的就属于死信
死信队列的作用:失效的消息进入到死信队列中,转人工补偿消费
一个系统中是没有无缘无故失效的消息,如果这个消息失效了没有了(设置TTL),是不是可能导致业务损失,如果这种消息我们需要记录或补偿,将这种消息失效的时候放到一个队列中,待我们人工补偿和消费,这个放死信的队列就是死信队列
我们的死信队列其实也是一个正常队列,只是赋予了他这个概念。
创建死信队列
1、dead-exchange
2、dead-queue
3、binding(dead-exchange ->routing key -> dead-queue)
4、order-queue
设置死信相关的参数:dead letter exchange和dead letter routing key
x-dead-letter-exchange :这个参数就是指定死信队列的Exchange的名字,这个Exchange就是一个普通的Exchange,需要手工创建
x-dead-letter-routing-key :这个参数就是指定死信队列的Routingkey,这个routingkey需要自己创建好队列和Exchange进行binding时填入,相当于把消息转发到dead-exchange,通过routing key再转发到dead-queue(因为做了binding)
5、bind(order-exchange -> routing key ->order-queue)
6、测试消息过期没消费进入死信队列
发送方发送消息,消息设置TTL=5秒
// ttl 设置消息本身的过去时间,单位毫秒
public void sendOrder(OrderInfo orderInfo,String ttl){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getMessageId());
// 支持函数式接口
rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,message -> {
message.getMessageProperties().setExpiration(ttl);
return message;
},correlationData);
}
// 单元测试
@Test
void contextLoads() {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setId("10004");
orderInfo.setOrderName("消息队列RabbitMQ从入门到精通");
orderInfo.setMessageId("MS9999904");
try {
System.out.println("***********开始发送************");
//orderSender.sendOrder(orderInfo);
orderSender.sendOrder(orderInfo,"5000");
System.out.println("-----------发送完成------------");
}catch (Exception ex){
ex.printStackTrace();
}
}
启动发送消息,登录rabbitmq控制台,查看order-queue有一条消息
5秒后,消息失效,rabbitmq把它移动到了dead-queue
场景:同理,消息数超过了队列设置的限制或者容量,被删除的最早消息也会移动到死信队列dead-queue
死信队列的另一作用:延迟消息,就是把消息发送没有人消费的队列,队列有设置TTL失效时间,并设置失效后转到死信队列,达到延迟的效果
应用场景:用户下了订单,立马生成一个消息,30分钟后失效并触发死信队列,用来检查30分钟后还没付款,把订单删除掉。
死信队列和普通队列一样,也可以监听获取队列中的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "dead-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "dead-exchange",type = "topic"),
key = "dead.*"
))
@RabbitHandler
public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws IOException {
System.out.println("*********消息接收开始*********");
System.out.println("Order name:" + orderInfo.getOrderName());
}
redis的keyevent通过pub/sub机制来订阅信息的,如果sub端(订阅端)在pub(发布端)发布信息之后订阅就会导致信息丢失,而我们的死信因为是队列所以无所谓什么时候消费
5. RabbitMQ镜像集群构建
我们再来看一下镜像集群的结构,每个节点都是完整的互相复制,
所有的RabbitMQ都是平行结构,在搭建的过程中有一个加入主机(Master)的概念,管理集群上要有一个Master的概念,但在数据读写上每一个节点都是平等的,因为镜像模式是互相复制的。
搭建镜像集群
所有机器的服务和控制台都应该提前安装好
# 单机安装参考上次课程,每个机子都安装rabbitmq
# 0.给集群中的服务设置hostname和IP的映射
# 三台服务器:RMQ146、RMQ159、RMQ160都修改hostname
vi /etc/hostname
RMQ146
vi /etc/hostname
RMQ159
vi /etc/hostname
RMQ160
# 修改/etc/hosts能互相识别,找到对方
vi /etc/hosts
192.168.0.146 RMQ146
192.168.0.159 RMQ159
192.168.0.160 RMQ160
# 修改/etc/hosts之后正常情况应该是保存之后立即生效的,如果不生效,可以重启机器,或者重启服务
不同系统的生效方法:
Ubuntu: $sudo /etc/init.d/networking restart
Gentoo: /etc/init.d/net.eth0 restart
# 注意单机上不要配置rabbitmq集群
# 1.先停掉三个节点的服务
rabbitmqctl stop
# 2.进行文件同步将RMQ146上的文件复制给159和160,相当于服务中的cookie文件
# 三台服务器上的erlang.cookie都不一致,我们要让它们保持一致(使用Master RMQ146的cookie文件)
scp /var/lib/rabbitmq/.erlang.cookie root@192.168.0.159:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@192.168.0.160:/var/lib/rabbitmq/
[root@helloworld rabbitmq]# cat .erlang.cookie
AMSVSNNNZMXGKOMLEMXS
# 3.三台机器启动服务,集群方式启动,后台启动
rabbitmq-server -detached
# 4.其他节点加入集群,相当于寻址操作,slave加入master
# 159服务器上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@RMQ146
rabbitmqctl start_app
# 160服务器上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@RMQ146
rabbitmqctl start_app
# 5.查看集群的状态,可以在任何节点上执行
rabbitmqctl cluster_status
# 6.设置集群节点名称
rabbitmqctl set_cluster_name cluster_name_0812
集群启动后,登录rabbitmq控制台,查看集群节点
DISC:落盘
查看集群的状态,可以在任何节点上执行
注意:
加入集群的时候以IP方式可能会报错:join_cluster rabbit@192.168.0.146 ,这个地方必须是hostname,在局域网内
最重要的一步:还需要一个镜像队列数据复制开通的命令
- 将集群中所有队列设置成镜像队列,让数据进行互相复制,状态一致
- master节点上只要有一个消息,其他节点上也会同步
# 在集群的任何节点执行都可以,开启策略ha-mode
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
测试
Springboot 项目的application.yml配置不用修过,还是连接一个rabbitmq
spring:
rabbitmq:
host: 139.199.13.139
username: guest
password: guest
virtual-host: /
connection-timeout: 15000
接收方监听
@Component
public class OrderReceiver {
// 这个很好理解,就是创建queue和exchange,然后通过key绑定,配置是跟控制台是一样的
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.update"
)
)
// @Payload 看上面的截图知道是具体的消息体
@RabbitHandler
public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("Order Name: "+orderInfo.getOrder_name());
}
}
启动接收方,登录三个rabbitmq的控制台,发现都创建了order-exchange和order-queue,因为镜像集群互相复制数据
ha-all:高可用
消息发送方和接收方随便连接集群内的任一节点都可以,数据写入其中一个节点并同时复制给其它节点完成后才返回写入成功,相当于保证数据的强一致性,这是同步的,不是异步的。
消息队列是解偶的,它允许数据写入队列后,读取有延时。
镜像集群模式下,应该有个nginx或者haproxy进行负载均衡,springboot项目应该连接的是nginx或者haproxy而不是具体的某个节点。
挂掉一个节点后
# 停掉RMQ159的rabbitmq服务
rabbitmqctl stop_app
rabbitmq控制台马上看到该节点not running,节点间是有心跳保持连接的,发送消息就不会同步到down掉的节点
# 在做消费填谷的时候,不要马上启动停掉的rabbitmq
rabbitmqctl start_app
将节点设置成内存模式
# 停掉服务后进行修改落盘为内存模式,消息转发会更快
# 在需要修改的机器上
rabbitmqctl stop_app
rabbitmqctl change_cluster_node_type ram
rabbitmqctl start_app
将节点从集群中去掉
# 1、先停掉自己,在自己机器上执行
rabbitmqctl stop_app
# 2、去其他活着的节点上将刚刚停掉的机器移除
rabbitmqctl forget_cluster_node rabbit@RMQ159
# 3、在加入集群的时候就设置成RAM模式
# 在要加入的节点上执行
rabbitmqctl join_cluster --ram rabbit@RMQ146
rabbitmqctl start_app
6. 集群核心参数配置
以下为可选参数,集群的时候要注意,避免数据存不进rabbitmq,导致丢失
-
tcp_listerners : 端口,
-
disk_free_limit : 磁盘的低水位线,高于磁盘水平线就不能写入数据了,单位是byte,rabbitmq存储数据的可用空间限制,当低于该值的时候,将触发流量限制
三台机器如果一台20G,一台15G,一台30G,他们是镜像关系,最小的机器就是磁盘的底线:15G
-
vm_memory_high_watermark : 内存的最高水位线,默认值0.4,为内存总量的40%,
如果将节点设置成内存模式,三台机器的内存不一样,一台16G,一台8G,一台32G,最高水位线=8G * 0.4=3G
# rpm安装的rabbitmq,配置文件在
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
配置属性描述
官网说明:www.rabbitmq.com/configure.html#config-items
查看rabbitmq的有效配置
rabbitmqctl environment
属性 | 描述 | 默认值 |
---|---|---|
listeners | 要监听 AMQP 0-9-1 and AMQP 1.0 的端口 | listeners.tcp.default = 5672 |
num_acceptors.tcp | 接受tcp连接的erlang 进程数 | num_acceptors.tcp = 10 |
handshake_timeout | AMQP 0-9-1 超时时间,也就是最大的连接时间,单位毫秒 | handshake_timeout = 10000 |
listeners.ssl | 启用TLS的协议 | 默认值为none |
num_acceptors.ssl | 接受基于TLS协议的连接的erlang 进程数 | num_acceptors.ssl = 10 |
ssl_options | TLS 配置 | ssl_options =none |
ssl_handshake_timeout | TLS 连接超时时间 单位为毫秒 | ssl_handshake_timeout = 5000 |
vm_memory_high_watermark | 触发流量控制的内存阈值,可以为相对值(0.5),或者绝对值 vm_memory_high_watermark.relative = 0.6 ,vm_memory_high_watermark.absolute = 2GB | 默认vm_memory_high_watermark.relative = 0.4 |
vm_memory_calculation_strategy | 内存使用报告策略,assigned:使用Erlang内存分配器统计信息 rss:使用操作系统RSS内存报告。这使用特定于操作系统的方法,并可能启动短期子进程。legacy:使用遗留内存报告(运行时认为将使用多少内存)。这种策略相当不准确。erlang 与legacy一样 是为了向后兼容 | vm_memory_calculation_strategy = allocated |
vm_memory_high_watermark_paging_ratio | 当内存的使用达到了50%后,队列开始将消息分页到磁盘 | vm_memory_high_watermark_paging_ratio = 0.5 |
total_memory_available_override_value | 该参数用于指定系统的可用内存总量,一般不使用,适用于在容器等一些获取内存实际值不精确的环境 | 默认未设置 |
disk_free_limit | Rabbitmq存储数据的可用空间限制,当低于该值的时候,将触发流量限制,设置可参考vm_memory_high_watermark参数 | disk_free_limit.absolute = 50MB |
log.file.level | 控制记录日志的等级,有info,error,warning,debug | log.file.level = info |
channel_max | 最大通道数,但不包含协议中使用的特殊通道号0,设置为0表示无限制,不建议使用该值,容易出现channel泄漏 | channel_max = 2047 |
channel_operation_timeout | 通道操作超时,单位为毫秒 | channel_operation_timeout = 15000 |
heartbeat | 表示连接参数协商期间服务器建议的心跳超时的值。如果两端都设置为0,则禁用心跳,不建议禁用 | heartbeat = 60 |
default_vhost | rabbitmq安装后启动创建的虚拟主机 | default_vhost = / |
default_user | 默认创建的用户名 | default_user = guest |
default_pass | 默认用户的密码 | default_pass = guest |
default_user_tags | 默认用户的标签 | default_user_tags.administrator = true |
default_permissions | 在创建默认用户是分配给默认用户的权限 | default_permissions.configure = .* default_permissions.read = .* default_permissions.write = .* |
loopback_users | 允许通过回环地址连接到rabbitmq的用户列表,如果要允许guest用户远程连接(不安全)请将该值设置为none,如果要将一个用户设置为仅localhost连接的话,配置loopback_users.username =true(username要替换成用户名) | loopback_users.guest = true(默认为只能本地连接) |
cluster_formation.classic_config.nodes | 设置集群节点cluster_formation.classic_config.nodes.1 = rabbit@hostname1 | |
cluster_formation.classic_config.nodes.2 = rabbit@hostname2 | 默认为空,未设置 | |
collect_statistics | 统计收集模式,none 不发出统计信息事件,coarse每个队列连接都发送统计一次,fine每发一条消息的统计数据 | collect_statistics = none |
collect_statistics_interval | 统计信息收集间隔,以毫秒为单位 | collect_statistics_interval = 5000 |
delegate_count | 用于集群内通信的委托进程数。在多核的服务器上我们可以增加此值 | delegate_count = 16 |
tcp_listen_options | 默认的套接字选项 | tcp_listen_options.backlog = 128 ….. |
hipe_compile | 设置为true以使用HiPE预编译RabbitMQ的部分,HiPE是Erlang的即时编译器,启用HiPE可以提高吞吐量两位数,但启动时会延迟几分钟。Erlang运行时必须包含HiPE支持。如果不是,启用此选项将不起作用。HiPE在某些平台上根本不可用,尤其是Windows。 | hipe_compile = false |
cluster_keepalive_interval | 节点应该多长时间向其他节点发送keepalive消息(以毫秒为单位),keepalive的消息丢失不会被视为关闭 | cluster_keepalive_interval = 10000 |
queue_index_embed_msgs_below | 消息的字节大小,低于该大小,消息将直接嵌入队列索引中 bytes | queue_index_embed_msgs_below = 4096 |
mnesia_table_loading_retry_timeout | 等待集群中Mnesia表可用的超时时间,单位毫秒 | mnesia_table_loading_retry_timeout = 30000 |
mnesia_table_loading_retry_limit | 集群启动时等待Mnesia表的重试次数,不适用于Mnesia升级或节点删除。 | mnesia_table_loading_retry_limit = 10 |
mirroring_sync_batch_size | 要在队列镜像之间同步的消息的批处理大小 | mirroring_sync_batch_size = 4096 |
queue_master_locator | 队列主节点的策略,有三大策略 min-masters,client-local,random | queue_master_locator = client-local |
proxy_protocol | 如果设置为true ,则连接需要通过反向代理连接,不能直连接 | proxy_protocol = false |
management.listener.port | rabbitmq web管理界面使用的端口 | management.listener.port = 15672 |