1、Broker
MQ 可以单独启动的一个服务,也可以使用Broker内置到代码中不用单独部署。
把MQ服务嵌入到代码中:activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection
在昨天的项目中
1、导入依赖包
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
2、创建一个Broker server
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
// 1.创建服务
BrokerService broker = new BrokerService();
// 2、配置连接地址
// jmx是java的一个新框架
// 允许将我们所有的资源(软件和硬件)封装一个Java对象,直接暴露在服务中即可使用
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
// 3、启动
broker.start();
}
}
启动
3、使用队列测试
发现和之前启动的MQ服务效果是一样的,这样子方便测试,不用独立部署一个MQ服务。
2、整合到SpringBoot
创建项目
新建一个springboot项目,可以使用spring initializer
1、导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
全局搜索自动配置类ActiveMQAutoConfiguration
点进JmsAutoConfiguration.class
里面创建了两个bean:jmsTemplate和jmsMessagingTemplate,后者是基于前者创建的bean,springboot中我们使用jmsMessagingTemplate 模板类发送消息
点进ActiveMQConnectionFactoryConfiguration.class
里面创建了一个bean:jmsConnectionFactory,连接activemq 服务
2、application.yaml配置
server:
port: 6666
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin # 登录activemq 控制台的用户密码
password: admin
jms:
pub-sub-domain: false # false 队列,true 主题,看JmsProperties文件,默认就是false,使用队列
myqueue: coding-queue
队列Queue
消息生产者
1、编写bean,定义一个myqueue
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
// 注意导包,util java中的队列
// JMS 下的队列
import javax.jms.Queue;
@Configuration
@EnableJms // 声明对 JMS 注解支持
public class ActiveMQConfig {
@Value("${myqueue}")
private String myqueue;
@Bean
public Queue myqueue(){
return new ActiveMQQueue(myqueue);
}
}
2、业务实现类QueueProduceService
@Service
public class QueueProduceService {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Queue myqueue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(myqueue,"生产者发送一个消息");
}
}
3、web层接口
@RestController
public class ActiveMQController {
@Autowired
QueueProduceService queueProduceService;
@RequestMapping("/queue/send")
public String queueSend() {
queueProduceService.produceMsg();
return "消息发送成功";
}
}
启动项目,访问http://localhost:8080/queue/send,消息发送成功
定时发送消息
1、编写定时发送消息的任务,业务实现类QueueProduceService 添加
// 每隔3秒 发送一次
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(myqueue,"系统定时发送一个消息");
System.out.println("系统正在定时发送消息.....");
}
2、启动类添加注解@EnableScheduling支持定时
@EnableScheduling
@SpringBootApplication
public class SpringbootActivemqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootActivemqApplication.class, args);
}
}
启动项目测试
消息消费者
编写一个消费业务,使用@JmsListener监听某个队列,接收消息
@Service
public class QueueConsumerService {
// 通过监听注解实现消息接收。
// 具体消费 目的地:(队列还是主题) 接收到的消息
@JmsListener(destination= "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException {
// 接收消息
System.out.println(textMessage.getText());
}
}
启动项目,前面消息生产者已发送了好几个消息,队列是默认持久化消息的,所以启动后消费者马上监听到之前的消息,并消费它
主题Topic
修改application.yaml配置文件
spring:
jms:
pub-sub-domain: true # 使用主题
mytopic: coding-topic
消息生产者
1、ActiveMQConfig 编写bean,定义mytopic
// 注意导包,JMS下的主题
import javax.jms.Topic;
@Value("${mytopic}")
private String mytopic;
@Bean
public Topic mytopic(){
return new ActiveMQTopic(mytopic);
}
2、业务实现类TopicProduceService
@Service
public class TopicProduceService {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Topic mytopic; // 注意导包,JMS下的主题
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(mytopic,"生产者发送一个主题消息");
}
}
3、web层接口ActiveMQController
@RequestMapping("/topic/send")
public String topicSend() {
topicProduceService.produceMsg();
return "主题消息发送成功";
}
消息消费者
// 启动两个订阅者监听
@Service
public class TopicConsumerService {
@JmsListener(destination= "${mytopic}")
public void receive1(TextMessage textMessage) throws JMSException {
// 接收消息
System.out.println("订阅者1号:" + textMessage.getText());
}
@JmsListener(destination= "${mytopic}")
public void receive2(TextMessage textMessage) throws JMSException {
// 接收消息
System.out.println("订阅者2号:" + textMessage.getText());
}
}
启动项目,这时消费者已经开始订阅主题了,浏览器访问http://localhost:8080/topic/send,生产者发送主题消息
两个订阅者都会收到消息:
小结
SpringBoot去集成任何一个第三方框架,套路都是相同的。
1、导入依赖
2、寻找配置 , 全局搜索xxxxAutoConfig 和 xxxxProperties
3、查看源码,
4、编写Bean,然后测试使用
3、ActiveMQ传输协议
activemq支持多种传输协议:amqp、mqtt、ssl、tcp、nio、udp、ssl、https、http等,具体看官网
http://activemq.apache.org/configuring-version-5-transports.html
配置说明
activemq的所有配置都在activemq.xml里,在activemq根目录conf下,配置连接方式如下图
uri格式: 协议名称://地址:端口号
发现其他的名称都是和协议对等的,但是 openwire 对应的是tcp。
ActiveMQ 默认的协议就是 openwire
登录ActiveMQ的控制台,看看当前activemq支持的协议
传输协议基本分析
-
TCP 默认协议
tcp://0.0.0.0:61616?key=value&key=value
activemq.xml里的配置名称叫openwire,http://activemq.apache.org/openwire
参数配置,官网地址http://activemq.apache.org/tcp-transport-reference
-
Nio
Nio 和 tcp 很类似。 NIO 更偏向底层的访问操作。
什么情况下使用NIO。
- 可能存在大量的客户端去连接消息队列服务器,默认tcp被限制的,一个tcp连接相当于一个线程,开启太多线程肯定不行,占有太多资源。
- 默认Tcp(3次握手),可能会有些迟钝,这时候NIO 效率也会比 Tcp高
activemq.xml 添加支持nio传输协议
<transportConnectors> <!--端口不要冲突--> <transportConnector name="nio" uri="nio://0.0.0.0:61617"/> </<transportConnectors>
修改后,重启activemq
nio的增强,让一个端口同时支持nio和ssl协议
-
AMQP
一个消息服务层的协议,很多语言都可以去实现它。我们基于这个协议,客户端和消息中间件传递消息。
-
STOMP
简单文本协议。
-
SSL
安全套接字协议
-
MQTT
Iot设备,物联网相关的传输协议
使用其他协议测试连接
启动springboot 项目,登录ActiveMQ的控制台查看当前的连接,默认是使用openwire tcp协议进行连接的
修改application.yaml配置文件
spring:
activemq:
broker-url: nio://127.0.0.1:61617
重启springboot项目,再登录ActiveMQ的控制台查看当前的连接,看它使用的是否使用nio协议进行连接的
云服务器安全组只支持开放tcp/udp协议端口,所以这个测试不了。
本地使用broker开启一个nio协议的mq服务,发现可以正常接收消息
4、消息持久化到DB
KahaDB(默认)
MQ 宕机了,数据会丢失,所以需要持久化!持久化的方式:JDBC 、KahaDB、LevelDB ,无论使用什么DB 来存储,道理相同,两步:
-
消息发送出去(将数据存储到本地的数据库文件中)
-
消息接收成功(删除这个记录)
在ActiveMQ的data文件夹下面,有个kahadb
ActiveMQ默认就是把消息持久化到kahadb,看配置文件activemq.xml
消息持久化更多信息,请看官网 http://activemq.apache.org/persistence.html
如果需要配置自己的持久化策略,就可以在这里进行配置。
KahaDB特性:
1、日志的方式存储的信息
2、B-Tree 索引,快速更新
3、可以快速恢复数据
点击kahadb,进入目录
-
db.data 保存数据的,B-tree 所用。
-
db.redo 恢复消息的,自动启动然后备份
-
lock 锁,表示 kahadb的读写权限。
核心就是几个配置文件组成,十分小巧和方便。
LevelDB
从官网上知道 ,ActiveMQ 5.8 版本之后推荐使用它持久化,也是基于文件来存储的,经过优化的比kahadb更快持久化文件。
JDBC配置持久化到Mysql
原理图:
如何配置
1、将mysql的驱动jar包放到activemq的lib目录下
2、修改配置文件conf/activemq.xml
查看官网的配置http://activemq.apache.org/persistence.html
发现JDBC持久化配置
查看官网 http://activemq.apache.org/jdbc-support
[root@helloworld conf]# vim activemq.xml
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
# 配置持久化策略
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
# 数据源,注意这里是dbcp2,官网上的文档没更新
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
bean 不能放在broker里面
3、创建数据库activemq
4、重启MQ,观察数据库,会自动帮我们生成3个表
-
acks: 存储 订阅关系,持久化的 topic。
-
lock: 集群中才会生效,保证了只有一个 Broker 获取的消息, Master
-
msgs: 存储消息,Topic 和 Queue 都是 这个表中
队列测试
1、编写生产者
package com.coding.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
// 1、消息发送到哪里
// http://139.199.13.139:8161 控制台
// tcp://139.199.13.139:61616 程序通信协议
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 3、获得连接
Connection connection = factory.createConnection();
connection.start();
// 4、创建会话
// 参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、模式,目的地(队列,主题)
// session.createTopic();
Queue queue = session.createQueue("jdbc-queue01");
// 6、发送消息(生成者)
MessageProducer producer = session.createProducer(queue);
// 开启持久化配置,否则不进入数据库
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
// 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("msg=>" + i);
// 8、发送消息
producer.send(message); // 积压,但是没有放入队列
}
// 9、用完记得关闭资源
producer.close();
session.close();
connection.close();
System.out.println("发送完毕");
}
}
2、持久化测试
执行生产者发送消息,先发送到ActiveMq中
在查询mysql数据库,消息已存放到msgs表
3、编写消费者
package com.coding.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
// true开启事务,需要session.commit()
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5、模式,目的地(队列,主题)
// session.createTopic();
Queue queue = session.createQueue("jdbc-queue01");
// 6、消息接受者(角色变化)
MessageConsumer consumer = session.createConsumer(queue);
// 7、获取消息
while (true){
// tcp 等待接收消息的过程。 receive
// receive 阻塞等待
// receive(long timeout) 超过多少ms就消费结束,然后不等待了
TextMessage receive = (TextMessage) consumer.receive();
if (receive!=null){
System.out.println("接收到消息:"+ receive.getText());
}else {
break;
}
}
session.commit(); // 假设客户端开启了事务,业务提交事务,否则不成功的!
// 9、用完记得关闭资源
consumer.close();
session.close();
connection.close();
System.out.println("接收完毕");
}
}
4、查看效果
执行消费者,获取消息,mysql中msgs表的消息就会被删除
道理:持久化的时候将消息存储到数据库中,然后只要消息被消费成功,就从数据库中删除。
主题持久化测试
主题消息发送者
package com.coding.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTopic_Persist {
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException {
// 1、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2、创建连接
Connection connection = factory.createConnection();
// 3、创建会话
// connection.createSession(是否开启事务,签收机制)
// true: 先执行send方法,然后在执行 commit 才是真正的放入队列。可以积压多个消息,然后一次发送
// false: send。直接进入队列,加入你关闭了事务,签收一定设置。
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 4、创建目的地
Topic topic = session.createTopic("jdbc-topic01-persist");
// 5、创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 持久化策略开启后再连接(注意即可,消息发送就持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、开启连接
connection.start();
// 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
// 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
for (int i = 0; i < 6; i++) {
TextMessage message = session.createTextMessage("msg=>" + i);
producer.send(message);
}
session.commit();
// 8、用完记得关闭资源
producer.close();
session.close();
connection.close();
System.out.println("发送完毕");
}
}
主题消息消费者
package com.coding.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTopic_Persist {
// 1、消息发送到哪里
// 39.105.61.80:8161 控制板
// 39.105.61.80:61616 程序通信tcp协议
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2、创建连接
Connection connection = factory.createConnection();
// 设置连接ID,我是谁(id),要订阅谁(topic-coding-persist),(客户端信息)
connection.setClientID("icoding");
// 3、创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建目的地
Topic topic = session.createTopic("jdbc-topic01-persist");
// 5、创建订阅人
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "学员-jude");
//TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "coding老师2");
// 6、开启连接
connection.start();
// 7、获取消息,可以写基本的聊天程序了,接收,会一直阻塞接收
Message message = topicSubscriber.receive();
while (message!=null){
TextMessage textMessage = (TextMessage) message;
System.out.println("收到持久化的消息:"+textMessage.getText());
// 自动取消订阅,默认永久的
message = topicSubscriber.receive(5000L);
}
// 8、用完记得关闭资源
session.close();
connection.close();
System.out.println("接收完毕");
}
}
先启动消费者订阅,在ActiveMQ的控制台看到订阅者
这个订阅者信息也会被写到数据库的acks表
这里的sub_name订阅者名称??要在持久化的mysql连接字符串加上characterEncoding=utf-8
注意:ActiveMQ的控制台把订阅者jude删除了,acks表也会清除订阅者记录。
发现问题
主题消息持久化,写到mysql后,即使被订阅者消费了也不会被删除,这是什么问题?
小结
1、队列持久化消息就是存在msg中
2、主题,维护一个订阅表,然后消息还是msg中存放。
5、集群
基于zookeeper的HA方案:
https://www.cnblogs.com/yjmyzz/p/activemq-ha-with-zookeeper.html;
3,这种方案搭建了zookeeper集群和activemq集群,总共6个服务,但是只有一个 activemq生效,master,其余两个不提供服务, 假设一个服务挂了,zk会自动在选举新的master来提供服务。
缺点:
1) 占用的节点数过多,1个zk集群至少3个节点,1个activemq集群也至少得3个节点,但其实正常运行时,只有一个master节点在对外响应,换句话说,花6个节点的成本只为了保证1个activemq master节点的高可用,太浪费资源了。
2) 性能下降太明显,比起单节点的activemq,性能下降了近1个数量级。
基于Networks of brokers的HA方案
https://www.cnblogs.com/yjmyzz/p/activemq-ha-using-networks-of-brokers.html
6、其他特性
异步投递
官方文档:http://activemq.apache.org/async-sends
什么是异步投递?
同步:发送确认接收后才能做下一个事情
异步:发送之后,就完事了,问题:如何确定这个消息发送成功了?使用回调函数确认
1、3种开启异步投递的方式
2、重点:异步发送如何确定你这个消息发送成功了?
由于咋们的消息不阻塞,这时候send差不多都会被发送到MQ。
MQ服务突然挂了。内存还没有被发送到MQ的消息就会丢失,需要重新发送
正确的异步发送一定是需要开启 回调函数的,确认消息的发送状态。
同步发送==> 等待接收成功了,
package com.coding.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class JmsProduce {
// 开启异步投递(默认开启)
public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616?jms.useAsyncSend=true";
public static void main(String[] args) throws JMSException {
// 2、创建一个连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 3、获得连接
Connection connection = factory.createConnection();
connection.start();
// 4、创建会话
// 参数:事务、签收 AUTO_ACKNOWLEDGE : 自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、模式,目的地(队列,主题)
// session.createTopic();
Queue queue = session.createQueue("queue01");
// ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
// 开启异步投递之后,我们可以使用 ActiveMQMessageProducer 来进行回调的接收
// 6、发送消息(生成者)
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
// 开启持久化配置
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
// 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("msg=>" + i);
// 自己给消息定义一个消息的id
message.setJMSMessageID(UUID.randomUUID().toString()+"自定义id");
String messageID = message.getJMSMessageID();
// 8、发送消息,回调函数(很重要)。
producer.send(message, new AsyncCallback() {
@Override
public void onSuccess() { // 发送成功!那个消息发送成功了,发送的时候就知道了
System.out.println(messageID);
}
@Override
public void onException(JMSException exception) { // 发送失败!在针对这个消息重发即可。
// 处理失败的异常即可
System.out.println(messageID);
}
});
//producer.send(message); // 这里我们应该要确认消息是发送完毕的,这样才能保证效率。
}
// 9、用完记得关闭资源
producer.close();
session.close();
connection.close();
System.out.println("发送完毕");
}
}
定时和延时投递
SpringBoot就可以实现定时和延时投递。cron表达式。
可以通过在activemq.xml配置中将 broker 的schedulerSupport 这个值设置为true即可。
消息发送端可以给消息设置下面4个属性,来设定消息的延迟发送
- Springboot,我们现在直接使用springboot来继承 acitvemq即可操作定时和延时消息发送
// 每隔3秒 发送一次
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled(){
template.convertAndSend(topic,"produceMsgTopic");
System.out.println("系统正在定时发送消息.....");
}
- 原生代码,直接给消息增加延时设置即可
TextMessage message = session.createTextMessage("msg=>" + i);
// 消息的延时和定时发送
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); // 每分钟发送一次
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
消息重发
在哪些情况下,消息会重发?
1、客户端开启了事务,没有进行rollback().就会重发(消息还在队列中,没有被成功消费)
2、客户端开启了事务,没有进行commit(),
3、开启了签收模式,没有手动签收
消息重发的次数说明?
官网:http://activemq.apache.org/redelivery-policy
6次之后就会进入死信队列。
核心参数:
最大重传次数 | 默认 | 在activemq.xml修改为-1 |
---|---|---|
maximumRedeliveries | 6 | Sets the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a Dead Letter Queue. Set to -1 for unlimited redeliveries. |
初始重发延时时间 | 1秒 | |
---|---|---|
redeliveryDelay |
1000L | The delivery delay if initialRedeliveryDelay=0 (v5.4). |
逻辑:
正常消息应该直接就可以接受成功,假设一个消息,被重复消费了6次,如果还没有签收成功了!
这个时候客户端将收不到这个消息了,这个消息就被转义到死信队列中。
如果要消费这个消息,就需要到死信队列中进行消费。
// 假设进入了死信队列,我们从死信队列取值即可
Queue queue = session.createQueue("ActiveMQ.DLQ");
死信队列
ActiveMQ 引入了 死信队列。DLQ,如果一个消息6次都没有被接收成功,这个时候 ActiveMQ就会自动将消息放入死信队列中。
人工干预,从死信队列中再次消费。
死信队列就是用来处理失败的消息的。
在我们生产环境中,在使用MQ的时候,一般都有两个队列:业务队列 + 死信队列。
可以在配置文件中修改对队列的名称,不使用缺省的死信队列
[root@helloworld conf]# vim activemq.xml
<!--默认配置的是队列,通常情况下死信通道就是队列。-->
<policyEntry queue=">">
<deadLetterStrategy>
<!--queuePrefix:设置死信队列前缀 useQueueForQueueMessages: 设置使用队列保存死信,还可以设置 useQueueForTopicMessages,使用Topic来保存死信-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
<!-- 配置其他的策略。 -->
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>
默认的情况下:Queue Topic
Queue 默认死信队列名称:ActiveMQ.DLQ.Queue
Topic 默认死信队列名称:ActiveMQ.DLQ.Topic
处理业务:用户业务 ActiveMQ.DLQ.Queue.User ActiveMQ.DLQ.Queue.Order。
7、面试
引入了消息队列之后,如何保证它的高可用性?
集群配置:zookeeper + 可复制的 levelDB store 主从集群!
了解 RocketMQ、RabbitMQ、Kafaka!的几个方面
-
消息的发送和接收
-
如何整合到项目中SpringBoot
-
配置研究
-
集群高可用
-
了解这个消息队列产品独有特性
看看面试题。