飞天班第42节:ActiveMQ消息队列实战-2

2020/05/31

1、Broker

MQ 可以单独启动的一个服务,也可以使用Broker内置到代码中不用单独部署。

把MQ服务嵌入到代码中:activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection

在昨天的项目中

1、导入依赖包

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.11.0</version>
</dependency>

2、创建一个Broker server

import org.apache.activemq.broker.BrokerService;

public class EmbedBroker {
    public static void main(String[] args) throws Exception {
        // 1.创建服务
        BrokerService broker = new BrokerService();

        // 2、配置连接地址
        // jmx是java的一个新框架
        // 允许将我们所有的资源(软件和硬件)封装一个Java对象,直接暴露在服务中即可使用
        broker.setUseJmx(true);
        broker.addConnector("tcp://localhost:61616");

        // 3、启动
        broker.start();
    }
}

启动

3、使用队列测试

发现和之前启动的MQ服务效果是一样的,这样子方便测试,不用独立部署一个MQ服务。

2、整合到SpringBoot

创建项目

新建一个springboot项目,可以使用spring initializer

1、导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

全局搜索自动配置类ActiveMQAutoConfiguration

点进JmsAutoConfiguration.class

里面创建了两个bean:jmsTemplatejmsMessagingTemplate,后者是基于前者创建的bean,springboot中我们使用jmsMessagingTemplate 模板类发送消息

点进ActiveMQConnectionFactoryConfiguration.class

里面创建了一个bean:jmsConnectionFactory,连接activemq 服务

2、application.yaml配置

server:
  port: 6666
spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin  	# 登录activemq 控制台的用户密码
    password: admin
  jms:
    pub-sub-domain: false # false 队列,true 主题,看JmsProperties文件,默认就是false,使用队列

myqueue: coding-queue

队列Queue

消息生产者

1、编写bean,定义一个myqueue

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;

// 注意导包,util java中的队列
// JMS 下的队列
import javax.jms.Queue;

@Configuration
@EnableJms // 声明对 JMS 注解支持
public class ActiveMQConfig {
    @Value("${myqueue}")
    private String myqueue;

    @Bean
    public Queue myqueue(){
        return new ActiveMQQueue(myqueue);
    }
}

2、业务实现类QueueProduceService

@Service
public class QueueProduceService {
	@Autowired
	JmsMessagingTemplate jmsMessagingTemplate;

	@Autowired
	Queue myqueue;

	public void produceMsg() {
		jmsMessagingTemplate.convertAndSend(myqueue,"生产者发送一个消息");
	}
}

3、web层接口

@RestController
public class ActiveMQController {
	@Autowired
	QueueProduceService queueProduceService;

	@RequestMapping("/queue/send")
	public String queueSend() {
		queueProduceService.produceMsg();
		return "消息发送成功";
	}
}

启动项目,访问http://localhost:8080/queue/send,消息发送成功

定时发送消息

1、编写定时发送消息的任务,业务实现类QueueProduceService 添加

	// 每隔3秒 发送一次
	@Scheduled(fixedDelay = 3000)
	public void produceMsgScheduled() {
		jmsMessagingTemplate.convertAndSend(myqueue,"系统定时发送一个消息");
		System.out.println("系统正在定时发送消息.....");
	}

2、启动类添加注解@EnableScheduling支持定时

@EnableScheduling
@SpringBootApplication
public class SpringbootActivemqApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootActivemqApplication.class, args);
    }
}

启动项目测试

消息消费者

编写一个消费业务,使用@JmsListener监听某个队列,接收消息

@Service
public class QueueConsumerService {

	// 通过监听注解实现消息接收。
	// 具体消费  目的地:(队列还是主题)  接收到的消息
	@JmsListener(destination= "${myqueue}")
	public void receive(TextMessage textMessage) throws JMSException {
		// 接收消息
		System.out.println(textMessage.getText());
	}
}

启动项目,前面消息生产者已发送了好几个消息,队列是默认持久化消息的,所以启动后消费者马上监听到之前的消息,并消费它

主题Topic

修改application.yaml配置文件

spring:
	jms:
    pub-sub-domain: true # 使用主题
mytopic: coding-topic    

消息生产者

1、ActiveMQConfig 编写bean,定义mytopic

// 注意导包,JMS下的主题
import javax.jms.Topic;

@Value("${mytopic}")
private String mytopic;

@Bean
public Topic mytopic(){
  return new ActiveMQTopic(mytopic);
}

2、业务实现类TopicProduceService

@Service
public class TopicProduceService {
	@Autowired
	JmsMessagingTemplate jmsMessagingTemplate;

	@Autowired
	Topic mytopic; // 注意导包,JMS下的主题

	public void produceMsg() {
		jmsMessagingTemplate.convertAndSend(mytopic,"生产者发送一个主题消息");
	}
}

3、web层接口ActiveMQController

@RequestMapping("/topic/send")
public String topicSend() {
  topicProduceService.produceMsg();
  return "主题消息发送成功";
}

消息消费者

// 启动两个订阅者监听
@Service
public class TopicConsumerService {
	@JmsListener(destination= "${mytopic}")
	public void receive1(TextMessage textMessage) throws JMSException {
		// 接收消息
		System.out.println("订阅者1号:" + textMessage.getText());
	}

	@JmsListener(destination= "${mytopic}")
	public void receive2(TextMessage textMessage) throws JMSException {
		// 接收消息
		System.out.println("订阅者2号:" + textMessage.getText());
	}
}

启动项目,这时消费者已经开始订阅主题了,浏览器访问http://localhost:8080/topic/send,生产者发送主题消息

两个订阅者都会收到消息:

小结

SpringBoot去集成任何一个第三方框架,套路都是相同的。

1、导入依赖

2、寻找配置 , 全局搜索xxxxAutoConfig 和 xxxxProperties

3、查看源码,

4、编写Bean,然后测试使用

3、ActiveMQ传输协议

activemq支持多种传输协议:amqp、mqtt、ssl、tcp、nio、udp、ssl、https、http等,具体看官网

http://activemq.apache.org/configuring-version-5-transports.html

配置说明

activemq的所有配置都在activemq.xml里,在activemq根目录conf下,配置连接方式如下图

uri格式: 协议名称://地址:端口号

发现其他的名称都是和协议对等的,但是 openwire 对应的是tcp。

ActiveMQ 默认的协议就是 openwire

登录ActiveMQ的控制台,看看当前activemq支持的协议

传输协议基本分析

  • TCP 默认协议

    tcp://0.0.0.0:61616?key=value&key=value
    

    activemq.xml里的配置名称叫openwire,http://activemq.apache.org/openwire

    参数配置,官网地址http://activemq.apache.org/tcp-transport-reference

  • Nio

    Nio 和 tcp 很类似。 NIO 更偏向底层的访问操作。

    什么情况下使用NIO。

    • 可能存在大量的客户端去连接消息队列服务器,默认tcp被限制的,一个tcp连接相当于一个线程,开启太多线程肯定不行,占有太多资源。
    • 默认Tcp(3次握手),可能会有些迟钝,这时候NIO 效率也会比 Tcp高

    activemq.xml 添加支持nio传输协议

    <transportConnectors>
      <!--端口不要冲突-->
      <transportConnector name="nio" uri="nio://0.0.0.0:61617"/>  </<transportConnectors>
    

    修改后,重启activemq

    nio的增强,让一个端口同时支持nio和ssl协议

  • AMQP

    一个消息服务层的协议,很多语言都可以去实现它。我们基于这个协议,客户端和消息中间件传递消息。

  • STOMP

    简单文本协议。

  • SSL

    安全套接字协议

  • MQTT

    Iot设备,物联网相关的传输协议

使用其他协议测试连接

启动springboot 项目,登录ActiveMQ的控制台查看当前的连接,默认是使用openwire tcp协议进行连接的

修改application.yaml配置文件

spring:
  activemq:
    broker-url: nio://127.0.0.1:61617

重启springboot项目,再登录ActiveMQ的控制台查看当前的连接,看它使用的是否使用nio协议进行连接的

云服务器安全组只支持开放tcp/udp协议端口,所以这个测试不了。

本地使用broker开启一个nio协议的mq服务,发现可以正常接收消息

4、消息持久化到DB

KahaDB(默认)

MQ 宕机了,数据会丢失,所以需要持久化!持久化的方式:JDBC 、KahaDB、LevelDB ,无论使用什么DB 来存储,道理相同,两步:

  • 消息发送出去(将数据存储到本地的数据库文件中)

  • 消息接收成功(删除这个记录)

在ActiveMQ的data文件夹下面,有个kahadb

ActiveMQ默认就是把消息持久化到kahadb,看配置文件activemq.xml

消息持久化更多信息,请看官网 http://activemq.apache.org/persistence.html

如果需要配置自己的持久化策略,就可以在这里进行配置。

KahaDB特性:

1、日志的方式存储的信息

2、B-Tree 索引,快速更新

3、可以快速恢复数据

点击kahadb,进入目录

  • db.data 保存数据的,B-tree 所用。

  • db.redo 恢复消息的,自动启动然后备份

  • lock 锁,表示 kahadb的读写权限。

核心就是几个配置文件组成,十分小巧和方便。

LevelDB

从官网上知道 ,ActiveMQ 5.8 版本之后推荐使用它持久化,也是基于文件来存储的,经过优化的比kahadb更快持久化文件。

JDBC配置持久化到Mysql

原理图:

如何配置

1、将mysql的驱动jar包放到activemq的lib目录下

2、修改配置文件conf/activemq.xml

查看官网的配置http://activemq.apache.org/persistence.html

发现JDBC持久化配置

查看官网 http://activemq.apache.org/jdbc-support

[root@helloworld conf]# vim activemq.xml 
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:

http://activemq.apache.org/persistence.html
-->
# 配置持久化策略
<persistenceAdapter>
    <!--  <kahaDB directory="${activemq.data}/kahadb"/> -->
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/> 
</persistenceAdapter>
# 数据源,注意这里是dbcp2,官网上的文档没更新
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

bean 不能放在broker里面

3、创建数据库activemq

4、重启MQ,观察数据库,会自动帮我们生成3个表

  • acks: 存储 订阅关系,持久化的 topic。

  • lock: 集群中才会生效,保证了只有一个 Broker 获取的消息, Master

  • msgs: 存储消息,Topic 和 Queue 都是 这个表中

队列测试

1、编写生产者

package com.coding.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
    // 1、消息发送到哪里
 	// http://139.199.13.139:8161 控制台
	// tcp://139.199.13.139:61616 程序通信协议
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 3、获得连接
        Connection connection = factory.createConnection();
        connection.start();

        // 4、创建会话
        // 参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5、模式,目的地(队列,主题)
        // session.createTopic();
        Queue queue = session.createQueue("jdbc-queue01");

        // 6、发送消息(生成者)
        MessageProducer producer = session.createProducer(queue);

        // 开启持久化配置,否则不进入数据库
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
        // 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("msg=>" + i);
            // 8、发送消息
            producer.send(message); // 积压,但是没有放入队列
        }

        // 9、用完记得关闭资源
        producer.close();
        session.close();
        connection.close();

        System.out.println("发送完毕");
    }
}

2、持久化测试

执行生产者发送消息,先发送到ActiveMq中

在查询mysql数据库,消息已存放到msgs表

3、编写消费者

package com.coding.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {


    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = factory.createConnection();
        connection.start();

      // true开启事务,需要session.commit()
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        // 5、模式,目的地(队列,主题)
        // session.createTopic();
        Queue queue = session.createQueue("jdbc-queue01");

        // 6、消息接受者(角色变化)
        MessageConsumer consumer = session.createConsumer(queue);

        // 7、获取消息
        while (true){
            // tcp  等待接收消息的过程。 receive
            // receive 阻塞等待
            // receive(long timeout) 超过多少ms就消费结束,然后不等待了
            TextMessage receive = (TextMessage) consumer.receive();

            if (receive!=null){
                System.out.println("接收到消息:"+ receive.getText());
            }else {
                break;
            }
        }

        session.commit(); // 假设客户端开启了事务,业务提交事务,否则不成功的!

        // 9、用完记得关闭资源
        consumer.close();
        session.close();
        connection.close();

        System.out.println("接收完毕");
    }
}

4、查看效果

执行消费者,获取消息,mysql中msgs表的消息就会被删除

道理:持久化的时候将消息存储到数据库中,然后只要消息被消费成功,就从数据库中删除。

主题持久化测试

主题消息发送者

package com.coding.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduceTopic_Persist {
  // 1、消息发送到哪里
  // 39.105.61.80:8161  控制板
  // 39.105.61.80:61616 程序通信tcp协议
  public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

  public static void main(String[] args) throws JMSException {
    // 1、创建一个连接工厂
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    // 2、创建连接
    Connection connection = factory.createConnection();
   
    // 3、创建会话
    // connection.createSession(是否开启事务,签收机制)
    // true: 先执行send方法,然后在执行 commit 才是真正的放入队列。可以积压多个消息,然后一次发送
    // false:  send。直接进入队列,加入你关闭了事务,签收一定设置。
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
   	// 4、创建目的地
    Topic topic = session.createTopic("jdbc-topic01-persist");
    // 5、创建消息生产者
    MessageProducer producer = session.createProducer(topic);
    // 持久化策略开启后再连接(注意即可,消息发送就持久化)
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    // 6、开启连接
    connection.start();

    // 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
    // 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
    for (int i = 0; i < 6; i++) {
      TextMessage message = session.createTextMessage("msg=>" + i);
      producer.send(message);
    }

    session.commit();
    
    // 8、用完记得关闭资源
    producer.close();
    session.close();
    connection.close();

    System.out.println("发送完毕");
  }
}

主题消息消费者

package com.coding.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class JmsConsumerTopic_Persist {
  // 1、消息发送到哪里
  // 39.105.61.80:8161  控制板
  // 39.105.61.80:61616 程序通信tcp协议
  public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

  public static void main(String[] args) throws JMSException, IOException {
    // 1、创建一个连接工厂
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    // 2、创建连接
    Connection connection = factory.createConnection();
    // 设置连接ID,我是谁(id),要订阅谁(topic-coding-persist),(客户端信息)
    connection.setClientID("icoding");
    // 3、创建会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 4、创建目的地
    Topic topic = session.createTopic("jdbc-topic01-persist");
    // 5、创建订阅人
    TopicSubscriber subscriber = session.createDurableSubscriber(topic, "学员-jude");
    //TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "coding老师2");
    
    // 6、开启连接
    connection.start(); 

    // 7、获取消息,可以写基本的聊天程序了,接收,会一直阻塞接收
    Message message = topicSubscriber.receive();
    while (message!=null){
      TextMessage textMessage = (TextMessage) message;
      System.out.println("收到持久化的消息:"+textMessage.getText());
      // 自动取消订阅,默认永久的
      message = topicSubscriber.receive(5000L);
    }

    // 8、用完记得关闭资源
    session.close();
    connection.close();

    System.out.println("接收完毕");
  }
}

先启动消费者订阅,在ActiveMQ的控制台看到订阅者

这个订阅者信息也会被写到数据库的acks表

这里的sub_name订阅者名称??要在持久化的mysql连接字符串加上characterEncoding=utf-8

注意:ActiveMQ的控制台把订阅者jude删除了,acks表也会清除订阅者记录。

发现问题

主题消息持久化,写到mysql后,即使被订阅者消费了也不会被删除,这是什么问题?

小结

1、队列持久化消息就是存在msg中

2、主题,维护一个订阅表,然后消息还是msg中存放。

5、集群

基于zookeeper的HA方案:

https://www.cnblogs.com/yjmyzz/p/activemq-ha-with-zookeeper.html

3,这种方案搭建了zookeeper集群和activemq集群,总共6个服务,但是只有一个 activemq生效,master,其余两个不提供服务, 假设一个服务挂了,zk会自动在选举新的master来提供服务。

缺点:

1) 占用的节点数过多,1个zk集群至少3个节点,1个activemq集群也至少得3个节点,但其实正常运行时,只有一个master节点在对外响应,换句话说,花6个节点的成本只为了保证1个activemq master节点的高可用,太浪费资源了

2) 性能下降太明显,比起单节点的activemq,性能下降了近1个数量级。

基于Networks of brokers的HA方案

https://www.cnblogs.com/yjmyzz/p/activemq-ha-using-networks-of-brokers.html

6、其他特性

异步投递

官方文档:http://activemq.apache.org/async-sends

默认就是异步模式来发送消息,提高系统性能

什么是异步投递?

同步:发送确认接收后才能做下一个事情

异步:发送之后,就完事了,问题:如何确定这个消息发送成功了?使用回调函数确认

1、3种开启异步投递的方式

2、重点:异步发送如何确定你这个消息发送成功了?

由于咋们的消息不阻塞,这时候send差不多都会被发送到MQ。

MQ服务突然挂了。内存还没有被发送到MQ的消息就会丢失,需要重新发送

正确的异步发送一定是需要开启 回调函数的,确认消息的发送状态。

同步发送==> 等待接收成功了,

package com.coding.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;

import javax.jms.*;
import java.util.UUID;

public class JmsProduce {
  	// 开启异步投递(默认开启)
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616?jms.useAsyncSend=true";

    public static void main(String[] args) throws JMSException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 3、获得连接
        Connection connection = factory.createConnection();
        connection.start();

        // 4、创建会话
        // 参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5、模式,目的地(队列,主题)
        // session.createTopic();
        Queue queue = session.createQueue("queue01");

        // ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
        // 开启异步投递之后,我们可以使用 ActiveMQMessageProducer 来进行回调的接收
        // 6、发送消息(生成者)
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);

        // 开启持久化配置
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
        // 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("msg=>" + i);
            // 自己给消息定义一个消息的id
            message.setJMSMessageID(UUID.randomUUID().toString()+"自定义id");
            String messageID = message.getJMSMessageID();
            // 8、发送消息,回调函数(很重要)。
            producer.send(message, new AsyncCallback() {
                @Override
                public void onSuccess() { // 发送成功!那个消息发送成功了,发送的时候就知道了
                    System.out.println(messageID);
                }

                @Override
                public void onException(JMSException exception) { // 发送失败!在针对这个消息重发即可。
                    // 处理失败的异常即可
                    System.out.println(messageID);
                }
            });
            //producer.send(message); // 这里我们应该要确认消息是发送完毕的,这样才能保证效率。
        }
        // 9、用完记得关闭资源
        producer.close();
        session.close();
        connection.close();

        System.out.println("发送完毕");
    }
}

定时和延时投递

SpringBoot就可以实现定时和延时投递。cron表达式。

可以通过在activemq.xml配置中将 broker 的schedulerSupport 这个值设置为true即可。

消息发送端可以给消息设置下面4个属性,来设定消息的延迟发送

  • Springboot,我们现在直接使用springboot来继承 acitvemq即可操作定时和延时消息发送
// 每隔3秒 发送一次
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled(){
    template.convertAndSend(topic,"produceMsgTopic");
    System.out.println("系统正在定时发送消息.....");
}
  • 原生代码,直接给消息增加延时设置即可
TextMessage message = session.createTextMessage("msg=>" + i);
// 消息的延时和定时发送
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); // 每分钟发送一次
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);

消息重发

在哪些情况下,消息会重发?

1、客户端开启了事务,没有进行rollback().就会重发(消息还在队列中,没有被成功消费)

2、客户端开启了事务,没有进行commit(),

3、开启了签收模式,没有手动签收

消息重发的次数说明?

官网:http://activemq.apache.org/redelivery-policy

6次之后就会进入死信队列。

核心参数:

最大重传次数 默认 在activemq.xml修改为-1
maximumRedeliveries 6 Sets the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a Dead Letter Queue. Set to -1 for unlimited redeliveries.
初始重发延时时间 1秒  
redeliveryDelay 1000L The delivery delay if initialRedeliveryDelay=0 (v5.4).

逻辑:

正常消息应该直接就可以接受成功,假设一个消息,被重复消费了6次,如果还没有签收成功了!

这个时候客户端将收不到这个消息了,这个消息就被转义到死信队列中。

如果要消费这个消息,就需要到死信队列中进行消费。

// 假设进入了死信队列,我们从死信队列取值即可
Queue queue = session.createQueue("ActiveMQ.DLQ");

死信队列

ActiveMQ 引入了 死信队列。DLQ,如果一个消息6次都没有被接收成功,这个时候 ActiveMQ就会自动将消息放入死信队列中。

人工干预,从死信队列中再次消费。

死信队列就是用来处理失败的消息的。

在我们生产环境中,在使用MQ的时候,一般都有两个队列:业务队列 + 死信队列。

可以在配置文件中修改对队列的名称,不使用缺省的死信队列

[root@helloworld conf]# vim activemq.xml 
<!--默认配置的是队列,通常情况下死信通道就是队列。-->
<policyEntry queue=">">
    <deadLetterStrategy>
        <!--queuePrefix:设置死信队列前缀 useQueueForQueueMessages: 设置使用队列保存死信,还可以设置		   useQueueForTopicMessages,使用Topic来保存死信-->
        <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
        <!-- 配置其他的策略。 -->
        <sharedDeadLetterStrategy processExpired="false" />
    </deadLetterStrategy>
</policyEntry>

默认的情况下:Queue Topic

Queue 默认死信队列名称:ActiveMQ.DLQ.Queue

Topic 默认死信队列名称:ActiveMQ.DLQ.Topic

处理业务:用户业务 ActiveMQ.DLQ.Queue.User ActiveMQ.DLQ.Queue.Order。

7、面试

引入了消息队列之后,如何保证它的高可用性?

集群配置:zookeeper + 可复制的 levelDB store 主从集群!

了解 RocketMQ、RabbitMQ、Kafaka!的几个方面

  • 消息的发送和接收

  • 如何整合到项目中SpringBoot

  • 配置研究

  • 集群高可用

  • 了解这个消息队列产品独有特性

看看面试题。

8、ActiveMQ知识点总结

Post Directory