飞天班第41节:ActiveMQ消息队列实战-1

2020/05/30

1、MQ学习方式

中间件是什么

从架构上来讲,它就是在应用之间加了一层,可以独立部署,提供服务的应用,比如zookeeper、eureka、m q都是中间件

消息中间件

MQ = Message Queue = 消息中间件,处理消息的一层中间应用,也是消息的暂存地方,消息一定是有一个发送方,一个接受方的

主流的MQ产品

  • Kafka
  • RabbitMQ
  • RocketMQ
  • ActiveMQ

如何学习MQ?学习顺序,研究内容

1、消息如何发送,消息如何接受

2、MQ 如何实现高可用

3、MQ集群容错

4、MQ一定要持久化

5、延时投递、定时投递

6、签收机制

7、整合到Spring和SpringBoot中

8、JMS规范

9、高级-死信队列、消息重发、异步投递。。。。

2、MQ解决了什么问题

1、原来必须等待【异步】,使用mq 异步缓存

2、抵御洪流,保护主业务【消峰】

3、中间件,加一层【解耦】

生活场景举例:

一个面试官要面试100个人,如果让100个人一直排队等待那是非常浪费时间的,那100个人除了等待就什么事情都不干,相当于我必须面试完才能干别的事情,即使前面有99个人在面试,那显然不合理而且浪费时间。所以正常情况应该是,把面试的个人信息(消息)投递给面试的公司,然后面试官通知你才去面试,这样在等待面试的过程中,我可以继续做别的事情,不浪费时间(资源),这就是对应MQ生活中使用的例子。

系统场景举例:

1、系统接口耦合严重

2、大流量并发

一个前端下单的操作,后台其实由多个操作组成:读取订单、库存是否ok,库存冻结,余额检查,余额冻结,订单交易信息生成、余额-xx,库存-xx,流水号,余额解冻、库存解冻…. 等,这种情况下,高并发的下单操作,就容易将服务冲垮。

3、同步等待

我们上面说的3种情况,要是有一种技术能够解决。我们就一定会使用它? 那就是MQ

1、系统的解耦,我们要接入新的模块,可以将代码的改动做到最小。【能够解耦】

2、设置一个流量的缓冲池,保证我们的系统不被冲垮。【能够消峰】

3、优化系统调用的强弱关系。我们可以按照实际业务来进行操作。减少耗时【能够异步】

3、ActiveMQ

简介

官方地址:http://activemq.apache.org/

其它主流MQ

ActiveMQ 是最流行开源的,支持多协议的,Java基础的消息服务。它支持行业的标准协议,因此用户选择多种语言和平台的client客户端来使用它。你可以使用C,C++,python,.net还有更多的语言都可以连接ActiveMQ。它使用无处不在的AMQP协议集成到你的多平台应用中。websocket上面使用STOMP协议可以在你的Web应用间交互消息。使用MQTT协议在你的IOT设备上交互消息。它支持在你已存在的JMS规范的设施上使用。ActiveMQ提供强力的和弹性的性能支持任何的消息使用场景。

下载最新的5.x版本,下一代产品是Artemis

安装

Windows安装ActiveMQ

只需要一步,解压安装即可,

点击 examples,有各种协议例子代码

点击 webapps,有一个admin管理activemq的控制台应用

点击bin,根据自己系统是32位还是64位,进入win32/win64,直接点击activemq.bat文件启动

访问:http://127.0.0.1:8161/admin ,默认的用户名和密码都是 admin。点击根目录下的conf/jetty-realm.properties 文件,里面定义了访问控制台的用户,默认有两个admin/admin,user/user,所以可以在这里修改登录用户密码

Linux 安装ActiveMQ,推荐使用Docker,需要先安装docker,也可以下载mq解压安装

方式一:解压安装

# 1、解压
[root@helloworld opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
# 2、重命名
[root@helloworld opt]# mv apache-activemq-5.15.9 activemq
# 显示linux系统内核信息,32或64位
[root@helloworld bin]# cat /proc/version

[root@helloworld conf]# pwd
/opt/activemq/conf
# activemq控制台的登录用户
[root@helloworld conf]# vim jetty-realm.properties 
# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin
user: user, user
# 3、启动
[root@helloworld linux-x86-64]# ./activemq start
[root@helloworld linux-x86-64]# ./activemq status
# 4、停止,停不了,就使用kill命令
[root@helloworld linux-x86-64]# ./activemq stop
# 查看帮助
[root@helloworld bin]# ./activemq --help

4、远程访问测试(要开启linux的防火墙,阿里云的安全组放行端口,activemq默认是http访问端口8161和tcp程序通讯端口61616)

方式二:Docker安装

[root@helloworld opt]# docker -v
Docker version 19.03.5, build 633a0ea
# 1、搜索activemq
[root@helloworld opt]# docker search activemq
# 搜索到后,拉取下来
[root@helloworld opt]# docker pull webcenter/activemq
# 2、下载成功后,查看镜像
[root@helloworld opt]# docker images
# 3、启动容器,宿主端口容器端口绑定
[root@helloworld opt]# dokcer run -d  --name activemq -p 61616:61616 -p 8161:8161 webcenter/activemq
# 4、启动成功,查看运行的容器
[root@helloworld opt]# docker ps -a
# 5、activemq的启动停止卸载删除,看docer篇笔记

5、进入activemq的容器镜像

docker exec -it 800ee68400ab /bin/bash

6、成功进入容器里activemq的根目录,发现与windows下安装的activemq根目录是一致的,可以在这里修改配置

使用文档的地址:http://activemq.apache.org/features

小结

1、什么是MQ

MQ是消息中间件,独立部署提供消息处理服务的中间层应用,消息可以持久化存储,必须有一个消息发送方和一个消息接受方。

2、MQ是干什么

从架构上来讲,MQ就是应用平台间加了一层,它解决了下面4个方面的问题:

  • 异步缓冲

    web应用间的消息,减少等待耗时,提高应用性能和用户体验。

  • 在应用发生高并发访问的时候,可以将高峰时期的业务信息缓存到一个地方,放到后面非高峰时期逐一处理,保证系统不被冲垮,这就是削峰填谷
  • 在应用系统间加了一层MQ,让它们由原来的强依赖关系降为弱依赖关系,降低系统间的耦合性,同时也能保证应用间消息数据的最终一致性,这就是服务解耦,举例 订单系统- MQ - 库存系统
  • 点对点的消息通讯 (queue 队列)和发布订阅的方式多方消息通讯(topic 主题)

4、JMS规范

简介

Java 集成MQ如何发送消息,如何接受消息,它必须遵循JMS规范

它类似于JDBC(用来连接许多不同的关系数据库的API),JMS则是用来访问收发系统消息的API。回顾一下JDBC的流程:

1、注册驱动

2、连接

3、创建SQL执行

4、运行

5、处理结果

6、释放连接资源

而JMS的流程也比较相似:

图中的destination目的地,在ActiveMQ来说,那就是队列Queue和主题Topic

什么是JavaEE

JavaEE 是一个Java企业级开发使用。JavaEE提供了核心的了13个规范。

1、JDBC 关系数据库 连接

2、JNDI java命令和目录接口

3、EJB 企业级的JavaBean

4、RMI 远程方法调用

5、Java IDL, 接口定于语言,代码结构体系

6、JSP

7、Servlet

8、XML

**9、JMS java消息服务**

10、JTA java的事务API

11、JTS java的事务服务器

12、JavaMail Java邮件发送协议

13、JAF javabean actiion 框架

参考:https://blog.csdn.net/u012410733/article/details/72567195

JMS的组成元素

1、JMS Provider MQ服务器(服务本身)

2、JMS Prodiucer 生成者

3、JMS Consumer 消费者

4、JMS Message 消息 (消息头,消息属性,消息体)

Msg

消息头

生产者发送消息

/* @param destination 目的地:queue 或者 topic
     * @param message 
     * @param deliveryMode 是否持久化模式
     * @param priority the priority for this message 优先级 0-4普通消息,5-9紧急消息,默认4级
     * @param timeToLive the message's lifetime (in milliseconds) 生存时间,过期时间
     */
    @Override
    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
        this.send(destination, message, deliveryMode, priority, timeToLive, null);
    }

消息体

// 发送具体的消息到队列, 创建的消息类型是什么就些什么
简单文本(TextMessage)            String  (重要)
可序列化的对象 (ObjectMessage)    对象,序列化的Java对象。
属性集合 (MapMessage)            Map, key => String,   Vlaue=> Java基本类型 (重点)
字节流 (BytesMessage)           二进制数组  byte[]
原始值流 (StreamMessage)        Java标准流 Streem
还有无有效负载的消息 (Message)。   接口

注意点:接收消息和发送消息,格式一定要一致!

消息的属性

可以在消息中添加一些自己的属性信息,用来识别消息或者加密解密都可以,可以用来传递参数。

5、ActiveMQ基本使用

原生依赖

创建一个mave项目,导入依赖,我自己使用的5.15.9版本

<dependencies>
  <!--尽量和activemq版本一致-->
  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
  </dependency>
  <!--  spring 和 其他-->
  <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
  <dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.17</version>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.30</version>
  </dependency>
</dependencies>

队列Queue

核心:

  • 点对点发送接收消息

  • 接收方不用去管发送方是否在运行状态,只需要去队列中去取即可
  • 在队列中,这个信息一但被消费,则消息不见了,不再存储。

编写消息生产者

package com.coding.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 3、获得连接
        Connection connection = factory.createConnection();
        connection.start();

        // 4、创建会话
      	// 第一个参数:是否开启事务,不开启,默认自动提交,开启需要手动提交事务
        // 参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5、模式,目的地(队列,主题)
        // session.createTopic();
        Queue queue = session.createQueue("queue01");

        // 6、发送消息(生成者)
        MessageProducer producer = session.createProducer(queue);

        // 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
        // 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("msg=>" + i);
            // 8、发送消息
            producer.send(message);
        }

        // 9、用完记得关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送完毕");
    }
}

发送的消息对象支持

// 发送具体的消息到队列, 创建的消息类型是什么就些什么
简单文本(TextMessage)            String  (重要)
可序列化的对象 (ObjectMessage)    对象,序列化的Java对象。
属性集合 (MapMessage)            Map, key => String,   Vlaue=> Java基本类型 (重点)
字节流 (BytesMessage)           二进制数组  byte[]
原始值流 (StreamMessage)        Java标准流 Streem
还有无有效负载的消息 (Message)。   接口

启动生产者,发现消息会发送到队列了:

编写消息消费者(接收者)

package com.coding.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsConsumer {
    // 1、消息发送到哪里
    // 39.105.61.80:8161  控制板
    // 39.105.61.80:61616 程序通信tcp协议
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 3、获得连接
        Connection connection = factory.createConnection();
        connection.start();

        // 4、创建会话
        // 参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5、模式,目的地(队列,主题)
        // session.createTopic();
        Queue queue = session.createQueue("queue01");

        // 6、消息接受者(角色变化)
        MessageConsumer consumer = session.createConsumer(queue);

        // 7、获取消息
        while (true){
            // tcp  等待接收消息的过程。 receive
            TextMessage receive = (TextMessage) consumer.receive();
          // receive 阻塞等待
					// receive(long timeout) 超过多少ms,没有消息可以消费,就消费结束,然后不等待了
					// TextMessage receive = (TextMessage) consumer.receive(1000);
            if (receive!=null){
                System.out.println("接收到消息:"+ receive.getText());
            }else {
                break;
            }
        }

        // 9、用完记得关闭资源
        consumer.close();
        session.close();
        connection.close();
        System.out.println("接收完毕");
    }
}

启动消费者,发现队列中的消息被消费了

还有消费者并没有接收完消息后就退出,使用consumer.receive()它会一直阻塞主线程main,等待接收消息,再启动一次生产者发送消息,消费者又很快接受到消息了:

测试发现

  • 先生成消息,消费者是否可以消费。Y

  • 先生产消息,一号消费者消费完毕后,2号消费者是否可以消费。N,不能重复消费

  • 先启动2个消费者(一直阻塞监听),然后生成消息,2个消费者轮询消费。

    Queue队列,默认支持持久化

小结:

JMS规范基本的开发步骤相同的?对比上面的JMS规范流程图, 是的

1、创建 connection factory

2、通过 factory 创建 connection 连接

3、connection 创建 会话 session

4、session 创建目的地

5、创建发送者或者接收者接收消息 (目的地:queue,topic)

主题Topic

核心:

发布订阅,一对多发送接收消息

编写消息生产者(发送方)

package com.coding.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduceTopic {
    // 1、消息发送到哪里
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 3、获得连接
        Connection connection = factory.createConnection();
        connection.start();

        // 4、创建会话
        // 参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5、创建主题 Topic
        Topic topic = session.createTopic("topic01");

        // 6、发送消息(生成者)
        MessageProducer producer = session.createProducer(topic);

        // 7、发送具体的消息到队列, 创建的消息类型是什么就些什么
        // 简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
        for (int i = 0; i < 6; i++) {
            TextMessage message = session.createTextMessage("msg=>" + i);
            // 8、发送消息
            producer.send(message);
        }

        // 9、用完记得关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送完毕");
    }
}

编写消息消费者(接收方)

package com.coding.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsConsumerTopic {
    // 1、消息发送到哪里
    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";

    public static void main(String[] args) throws JMSException, IOException {
        // 2、创建一个连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 3、获得连接
        Connection connection = factory.createConnection();
        connection.start();

        // 4、创建会话
        // 参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5、创建主题 Topic
        Topic topic = session.createTopic("topic01");

        // 6、消息接受者(角色变化)
        MessageConsumer consumer = session.createConsumer(topic);

        // 7、获取消息,监听
        consumer.setMessageListener((message) -> {
            if (message!=null && message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收到主题消息"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        // 卡住等待
        System.in.read();

        // 9、用完记得关闭资源
        consumer.close();
        session.close();
        connection.close();
        System.out.println("接收完毕");
    }
}

测试

  • 先启动生产者,发送消息,后启动消费者,消息没有被消费掉。

    消息已发送到ActiveMQ,但消息并没有消费掉。

    一定要先启动消费者订阅Topic,生产者再发送消息
    // 消费者,卡住等待,生产者发送消息
    System.in.read();
    

  • 先启动两个消费者,订阅topic,再启动生产者,发送消息,两个消费者都能收到全部消息

消息的持久化

队列Queue是默认持久化的

// 6、发送消息(生成者), 默认是持久化的,重启之后,数据还在。
MessageProducer producer = session.createProducer(topic);

// 练习测试:先发送消息,然后关掉MQ服务。(对比测试)
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);

主题持久化

  • 消息生产者

    public class JmsProducerTopicPersist {
      
    	public  static final String ACTIVEMQ_URL = "tcp://109.109.13.109:61616";
      
    	public static void main(String[] args) throws JMSException {
    		// 1、创建一个连接工厂
    		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
      
    		// 2、创建连接,先不开启连接
    		Connection connection = factory.createConnection();
      
    		// 3、创建会话
    		// 第一个参数:是否开启事务,不开启,默认自动提交,开启需要手动提交事务
    		// 第二个参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
    		// 4、创建目的地
    		Topic topic = session.createTopic("topic01");
      
    		// 5、创建消息生产者
    		MessageProducer producer = session.createProducer(topic);
    		// 持久化策略开启后再连接(注意即可,消息发送就持久化)
    		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    		connection.start();
      
    		// 6、发送具体的消息到队列, 创建的消息类型是什么就些什么
    		for (int i = 24; i < 30; i++) {
    			TextMessage textMessage = session.createTextMessage("msg=>" + i);
    			// 8、发送消息
    			producer.send(textMessage);
      
    		}
      
    		// 9、用完记得关闭资源
    		producer.close();
    		session.close();
    		connection.close();
      
    		System.out.println("发送完毕");
    	}
    }
    
  • 消息接收者

    public class JmsConsumerTopicPersist {
    	public  static final String ACTIVEMQ_URL = "tcp://109.109.13.109:61616";
      
    	public static void main(String[] args) throws JMSException, IOException {
    		// 1、创建一个连接工厂
    		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
      
    		// 2、创建连接
    		Connection connection = factory.createConnection();
    		// 设置连接ID,我是谁,
    		connection.setClientID("icoding");
      
    		// 3、创建会话
    		// 第一个参数:是否开启事务,不开启,默认自动提交,开启需要手动提交事务
    		// 第二个参数:事务、签收  AUTO_ACKNOWLEDGE : 自动确认
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
    		// 4、创建目的地
    		Topic topic = session.createTopic("topic01-persist");
      
    		// 5、创建订阅人
    		TopicSubscriber subscriber = session.createDurableSubscriber(topic, "学员-jude");
      
    		// 开启连接
    		connection.start();
      
    		// 6、获取消息,可以写基本的聊天程序了
    		// 接收,会一直阻塞接收
    		Message message = subscriber.receive();
    		while (message != null){
    			TextMessage textMessage = (TextMessage) message;
    			System.out.println("收到持久化消息" + textMessage.getText());
    			// 自动取消订阅,默认是永久的
    			message = subscriber.receive(5000);
    		}
      
    		// 9、用完记得关闭资源
        subscriber.close();
    		session.close();
    		connection.close();
      
    		System.out.println("接收完毕");
    	}
    }
    

    启动消费者

    使用subscriber.receive(),它会一直阻塞等待消息

    启动生产者,发送消息,消费者接收到消息,5秒钟后自动下线

2、先启动生产者发送消息,已订阅的下线消费者有6个消息等待消费

启动消费者上线,消息马上被消费

多个消费者

小结:

  • 主题持久化可以完成完整的发布订阅模型,而非消息及时发送,一对多的模型。

  • 一定要先订阅这个主题,然后才能接收到消息,(客户端:客户端id,用户名 服务端:消息是持久化)
  • 主题删除后重新创建,之前订阅的消费者也要重新订阅才能接收到消息。

服务端(生产者)

// 持久化策略开启后再连接(注意即可,消息发送就持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();

客户端(消费者)

// 设置连接ID,我是谁,
connection.setClientID("icoding");

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic-coding-persist");
// 5、创建订阅人
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "coding老师");
// 开启连接
connection.start();

事务模型

事务:要么同时成功,要么同时失败

继续使用上面主题持久化的代码,队列也是一样开启事务的

// true: 先执行send方法,然后执行 commit 才是真正的放入队列。可以积压多个消息,然后一次发送
// false: send,直接进入主题,如果你关闭了事务,签收一定要设置为自动签收。
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

上面代码中的消息生产者,开启事务,执行send发送消息,但是不执行commit,消息是没有发送到ActiveMQ的

提交事务

session.commit();

再启动生产者,消息成功发送到ActivMQ的topic

消费者开启事务,不提交事务,启动测试

它消费了消息,但没有提交,访问Activemq的控制台,依然显示它有6条消息等待消费

再启动消费者,它有消费了同样的消息,重复消费1次

发现我重复很多次,都是可以的。队列queue重复消费6次后,就会放入死信队列。主题topic不会吗

消费者提交事务消费后,就正常了

签收

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

创建会话的时候,第一个参数是关于事务的,第二个参数是关于消息签收确认的,点击Session的源码

public interface Session extends Runnable {
    int AUTO_ACKNOWLEDGE = 1;   // 自动签收
    int CLIENT_ACKNOWLEDGE = 2;  // 客户端确认签收
    int DUPS_OK_ACKNOWLEDGE = 3; // 自动批量进行签收
    int SESSION_TRANSACTED = 0;  // 事务提交就签收   
}

注意

  • 如果true开启了事务,一定要进行事务提交,这个时候才会签收,否则事务回滚,消息会被MQ再次传送

  • false关闭事务,签收的参数值是CLIENT_ACKNOWLEDGE = 2,那则需要手动签收。

    message.acknowledge();  // 消息确认签收
    

死信队列

上面的例子中消费者开启了事务,获取消息,但是没有提交事务,对ActiveMq来说,消息是没有消费成功的,所以消息还在队列中,导致可以重复获取该消息,但事务没有提交,消费其实的失败的,当消费失败达到6次,消失就会从原队列移除放到死信对列中。

如何消费死信队列中的消息

小结

1、ActiveMQ的消息通讯只有两种模型:

  • 点对点模型

    Queue:生成者 + 消费者 + 消息存在队列

    通过队列实现异步传输完全解耦,两步:1、生成者=> 队列 2、消费者<= 队列

    不需要关心生产者和消费者的启动顺序
  • 发布订阅模型

    Topic:消息发布者 + 消息订阅者,可以存在多个订阅者订阅同一个消息 + 消息存在Topic

    非持久化模式

    :消费者一定先启动,然后才可以接收到消息,否则接收不到。

    持久化模式

    :消费者需要先订阅才可以接收到消息,无论什么时候上线,只有有消息就可以看到。

2、事务是否开启,假设开启就一定要处理事务。如果事务处理失败,消息就没有签收。

6、主流MQ产品的比较

Post Directory