黄埔班第41节:消息队列应用实战-2

2020/02/26

1、RabbitMQ集群架构原理解析

主备模式

master-slave结构,可以理解为热备份,master负责读写,master宕机后就会切换到slave,对业务不会造成太大的影响,master和slave之间是通过复制(ActiveMQ是通过zookeeper),业务不会主动访问备份节点,除非宕机切换

RabbitMQ的主备复制是通过自己内部的机制来进行,消息是一并消费的。HAproxy相当于Nginx,备份节点只做备份

远程模式

将数据传输到外部节点,当数据处理不过来时可以用这个分流一部分数据,结构简单但配置复杂,使用的不多

主要应用的场景是:对不同数据中心进行复制时使用,可以跨地域的让两个MQ集群互联

镜像模式

业界对RabbtiMQ使用比较多的架构模式,就是镜像模式

高可用的数据复制过程

VIP: 虚拟ip

HAProxy:就是一个专门的提供负载均衡的中间件

keepalived:就是支持VRRP的一个心跳检测工具(VRRP:virtual router redundancy photocol 虚拟路由器冗余协议)

缺点

  • 镜像队列集群的缺陷是无法进行横向扩容,因为每个节点都是一个完整的互相复制的节点,保持数据的强一致性,

  • 并且镜像节点过多会使MQ负担增加,一个数据写入后复制到多个节点会使吞吐量下降。消息队列,天生的读写分离。
  • 消息的发送后会马上同步到其他镜像节点,消费也并一起消费的。

多活模式

个人理解:是远程模式和镜像模式的结合,解决跨地域的MQ集群数据一致性问题,通过DNS的解析服务,(就近寻址)使APP访问同一个地域的MQ集群,加快访问速度。同时单个MQ集群使用LBS中间件提高并发访问性能。

DNS:就是一个IP域名的解析服务

LBS:负载均衡服务(Nginx、HAProxy、Lvs、SLB)

Federation[ˌfedəˈreɪʃn]:在消息队列之间提供信息传输的高性能插件,使用AMQP协议,可以支持连接双方的不同版本MQ访问

2、RocketMQ集群架构原理解析

  • 支持集群、负载均衡,水平扩展
  • 亿级别的消息堆积能力
  • 采用的是zeroCopy零拷贝、顺序写盘
  • 用NameServer代替了zookeeper
  • 集群的架构模型
    • 单点模式
    • 主从模式
    • 双主模式
    • 多主多从模式

3、Kafka高性能原因分析

Kafka的介绍

直接内存消费,不落盘(空中接力)

  • kafka是linkedIn开源的分布式消息系统,归给Apache的顶级项目
  • kafka主要特点是基于Pull的模式来处理消息的消费的,追求高吞吐量,一开始的目的就是来做日志传输的收集的
  • kafka不支持事务,对消息的丢失,错误没有太严格的要求
  • 高并发业务场景很少有用到事务的

kafka的特点

  • 跨平台
  • 分布式
  • 实时性
  • 伸缩性

kafka高性能的原因

  • 顺序写:顺序写磁盘,可以提高磁盘的利用率。
  • Page Cache:页缓存(操作系统相关术语),来提升吞吐量
  • zeroCopy
  • 后台的异步操作,主动Flush
  • 预读的策略,IO的调度

Page Cache(页面缓存)

  • Page Cache是OS实现的一种主要的磁盘缓存机制,以此来减少对磁盘I/O的操作
  • 将磁盘文件缓存到内存中(MySQL->Redis 类似将mysql的数据放到redis中,提高访问速度)
  • 如果要从磁盘中读取1.txt,OS不是直接到磁盘而是到PageCache,如果PageCache里有就拿走了,如果没有就向磁盘发出请求,通过I/O获取数据,获取数据后先放入PageCache然后再给需要的请求
  • 写入数据的时候先判单PageCache里有没有,没有先写入PageCache,然后OS将PageCache的数据刷到磁盘中(MySQL-Redis,先写MySQL后写Redis)

传统的读取文件数据流程

解析一下步骤:

1.OS将数据从磁盘文件复制到内核空间的PageCache

2.应用程序将数据从PageCache复制到用户缓冲区

3.应用程序将数据复制到内核空间缓冲区

4.OS将数据从内核空间缓冲区复制到socket缓冲区

5.OS从socket缓冲区复制数据到网卡接口,数据网络发送给客户端

ZeroCopy(零拷贝)

通常情况下,Kafka的消息会有多个订阅者,生产者发布的消息会被不同的消费者多次消费,为了优化这个流程,Kafka使用了“零拷贝技术”,如下图所示:

零拷贝就是将数据从磁盘文件读取到内核空间的PageCache的一次,然后直接复制给网卡接口网络发送给客户端;当消费同样数据消息时就可以自己从PageCache读取,省去中间在磁盘内存中来回拷贝复制的过程,降低了上下文来回切换的过程,大大提高了应用程序的性能

Kafka的集群模式

  • 通过zookeeper来实现集群的配置
  • 大部分都是内存级别的副本复制,磁盘是异步做的一个备份

4、RabbitMQ内部核心概念掌握

  • RabbitMQ是采用Erlang开发的:Erlang有着和原生Socket一样的延迟,性能非常高
  • 提供可靠的消息传统模式:confirm(确认)、return(返回)
  • 与SpringAMQP完美结合
  • 集群模式比较丰富(主备模式、远程模式、镜像模式、多活模式),
  • 保证数据不丢失的情况下做高可靠(数据是否完整)和高可用

AMQP(Advanced Message Queue Protocol)高级消息队列协议各核心部件之间的关系

  • 消息的生产者把消息投递到server上,经过virtual host(相当于路由空间,就是一个文件夹)到Exchage(路由)就可以了,不需要关心把消息投到那个队列的。
  • 消费者只需要和Message Queue进行监听和绑定即可,这就实现了消费者和生产者的解耦
  • 生产者和消费者是通过路由(routingkey)来关联的
  • 通过routingkey来关联Exchage和Message Queue,routingkey相当于桥梁
  • 消息的生产者将消息发送给Exchage上的某个Routingkey,消费者在队列中订阅某个Routingkey的消息,这个时候Exchage就将两个Routingkey一样的生产和消费连接在了一起

AMQP名词解释

  • Server:又称Broker,就是我们的rabbitmq服务
  • Connection:就是连接
  • Channel:网络通信,每个Channel代表一个会话(session)
  • Message:消息体
  • Virtual host:虚拟地址,可以理解为一个工作空间,或者一个文件夹,进行逻辑隔离
    • 一个Virtural host下可以有多个Exchange和queue
    • 同一个Virtual host下不能有相同名称的Exchage和queue存在
  • Exchange:交换机,接收消息进行队列数据的绑定,一个路由的概念
  • Binding:Exchange和Queue之间的虚拟连接,Binding中包含Routingkey(相当于网线)
  • Routingkey:路由规则,消息的暗号
  • Queue:存放消息的具体地方

RabbitMQ消息的流转

5、RabbitMQ服务搭建

安装rabbitmq

操作系统使用centOS 7.x

# 1.安装需要的辅助工具
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

# 2.下载安装包
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

# 3.按照顺序安装rpm包,使用rpm一键安装将环境变量也一并处理好
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

# 4.修改用户登录与心跳检测的频率,这里也可以修改默认用户guest的密码
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 将{loopback_users, [<<"guest">>] }, 
# 修改为{loopback_users, [guest]},
# 心跳,将heartbeat修改为10秒
 {heartbeat, 10},

# 5.启动Rabbitmq 默认端口5672,使用rpm安装,已带有上下文。
/etc/init.d/rabbitmq-server start | stop | status | restart
# 后台启动
rabbitmq-server start &
nohup rabbitmq-server start > rabbitmq-log.file 2>&1 &

# 6.安装一个端口访问工具
yum -y install lsof
# 查看rabbitmq服务是否已启动
lsof -i:5672

# 7.启动控制台 默认端口15672
cd /usr/lib/rabbitmq/bin
rabbitmq-plugins list # 查看rabbitmq都有哪些插件
rabbitmq-plugins enable rabbitmq_management # 启动管理控制台插件 
# 管理控制台的端口是15672,查看是否已启动
lsof -i:15672 # 管理控制台的端口是15672

# 8.登录访问控制台,默认用户密码guest/guest,登录控制台后修改
http://192.168.1.100:15672

卸载rabbitmq

# 1、查看rpm安装包
[root@helloworld ~]# rpm -qa|grep rabbitmq
rabbitmq-server-3.6.5-1.noarch

# 2、卸载
rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch

访问控制台

云服务器要开启安全组和防火墙端口5672,15672

disc就是指磁盘存储消息,如果想要内存方式存储,在启动rabbitmq的时候加上–ram即可

集群配置的时候可以导入导出rabbitmq的 配置

Admin

可以在这个界面修改密码,添加用户

点击guest用户,它默认是administrators的权限,发现它可以访问所有的virtual host,也可以配置用户访问Virtual Hosts的权限

查看添加Virtual Hosts

RabbitMQ常用命令

# --help查看帮助命令
rabbitmqctl --help
# 关闭应用
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 节点状态
rabbitmqctl status
# 添加用户密码
rabbitmqctl add_user username password
# 修改用户密码
rabbitmqctl change_password username newpassword
# 列出所有用户
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 设置用户权限
# 三个*对应:configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出虚拟主机的权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列
rabbitmqctl list_queues
# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue queueName
# 清除所有数据
rabbitmqctl reset # 这个动作最好在MQ服务停掉后操作

6、Springboot整合RabbitMQ

创建工程

  • rabbitmq-common项目:提供消息对象

    实体类Orderinfo.java,必须序列化,因为要跨项目传输

    import lombok.Data;
    import java.io.Serializable;
      
    @Data
    public class OrderInfo implements Serializable {
      	//消息对象需要序列化
        private static final long serialVersionUID = 4084996990296644842L;
        private String id;
        private String orderName;
        //消息id是用来生成一个消息的唯一id,通过消息id能找到这个消息的业务信息
        private String messageId;
    }
    
  • rabbitmq-provider项目:消息发送方
  • rabbitmq-receiver项目:消息接收方

项目的结构:

发送方的设置

1、pom.xml导入依赖

 spirngboot 版本使用2.2.5
<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.2.5.RELEASE</version>
  <relativePath></relativePath><!-- lookup parent from repository -->
</parent>

<dependency>
  <groupId>com.jude</groupId>
  <artifactId>rabbitmq-common</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yml配置文件

spring:
  rabbitmq:
    host: 39.99.219.219
    port: 5672  # 默认5672
    username: guest  # 访问的用户密码
    password: guest
    virtual-host: /  # exchange 和 queue所在的虚拟空间目录
    connection-timeout: 15000  # 连接超时时间15秒

3、编写发送类OrderSender.java

spring是如何简化开发的,主要通过以下4点:

  1. Spring Bean,生命周期由spring 容器管理的ava对象
  2. IOC,控制反转的思想,所有的对象都去spring容器getbean
  3. AOP,切面编程降低侵入
  4. xxxTemplate模版技术,如RestTemplate,RedisTemplate
import com.icoding.basic.po.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

// @Service
@Component
public class OrderSender {
    // spring的4大特点之一,使用xxxTemplate模版类,让调用变得简单,
    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendOrder(OrderInfo orderInfo) throws Exception{
        /** 可以放在application.xml配置,通过@Value加载
         * exchange: 交换机名字,就是个你自己定义的字符串,这里是order-exchange
         * routingkey: 队列关联的key,是个你自己定义的字符串,这里是order.updates
         * object: 要传输的消息对象
         * correlationData: 消息的唯一id
         */
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(orderInfo.getMessage_id());
        rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,correlationData);
    }
}

4、编写测试类

@Autowired
OrderSender orderSender;

@Test
void contextLoads() {
  OrderInfo orderInfo = new OrderInfo();
  orderInfo.setId("10004");
  orderInfo.setOrderName("消息队列RabbitMQ从入门到精通");
  orderInfo.setMessageId("MS9999904");
  try {
    System.out.println("***********开始发送************");
    orderSender.sendOrder(orderInfo);
    System.out.println("-----------发送完成------------");
  }catch (Exception ex){
    ex.printStackTrace();
  }
}

跑一跑测试 能跑通,发送成功

但是登录Rabbitmq控制台,并没有发现有消息

为什么?因为你还没有创建exchange和queue,可以在控制台创建exchange和queue

Rabbitmq 控制台创建exchange和queue

登录Rabbitmq 控制台

1.创建exchange

  • Type是exchage的routingkey的绑定类型:fanout、 headers、direct、topic(常用)
  • Durability:消息是否持久化,Durable持久化
  • Auto delete:如果设置为yes则当exchange最后一个绑定的队列被删除后,就会自动删除
  • Internal:如果设置为yes,是RabbitMQ的内部使用,不提供给外部,自己编写erlang语言做扩展时使用
  • Arguments:扩展AMQP协议的自定义参数

2.创建queue

3.在exchange里创建Binding并输入routingkey

这里的routingkey就是我们的一个接收规则,exchange 只是一个路由的概念,queue就是存放消息的容器,通过routingkey 绑定转发,那样provider就可以把消息发送给具体的队列,一个exchange同一个routingkey可以绑定多个queue

在queue里也可以创建binding绑定exchange

重新发送消息

发送成功,登录控制台,查看queues

点进去看具体消息

注意

接收方的设置

exchange和queue是由消息消费端建立的。

1、pom.xml导入依赖,与发送方一样

2、application.yaml 配置文件,接收方同时也可以是发送方,一切皆配置

spring:
  rabbitmq:
    host: 39.99.219.219
    username: guest
    password: guest
    virtual-host: /  # exchange 和 queue所在的虚拟空间目录,发送方和接收方一致
    connection-timeout: 15000
    listener: # 消费端配置,监听queue
      simple:
        concurrency: 5 # 初始化并发数
        max-concurrency: 10 # 最大并发数
        auto-startup: true # 自动开启监听,默认开启
        prefetch: 1 # 每个并发连接同一时间最多处理几个消息,限流设置(消费端)
        acknowledge-mode: manual # 签收模式,设置为手动

看看其它配置的属性

3、编写接收的实现类

上面我们是登录控制台手动创建exchange和queue,创建binding,指定routingkey 绑定交换机和队列的,也可以通过消息接收端在代码里创建(没有就创建,有就使用)。我们删除刚刚手动创建的exchange和queue,下面通过代码创建

import com.icoding.basic.po.OrderInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
public class OrderReceiver {
		// 这个很好理解,就是创建queue和exchange,然后通过key绑定,配置是跟控制台是一样的
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
            exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
            key = "order.update"
        )
    )
    // @Payload 看上面的截图知道是具体的消息体
    @RabbitHandler
    public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
        System.out.println("************消息接收开始***********");
        System.out.println("Order Name: "+orderInfo.getOrder_name());
    }
}

启动项目,登录Rabbitmq控制台,发现exchange和queue都自动创建了

启动消费发送方,接收方就会监听到,输出消息

登录控制台,发现order-queue有一个未确认的消息,为什么是Unacked,因为上面application.yml设置的是manual手动签收,

我们的接收方法里还没有手动签收,没确认签收的消息,Rabbitmq会重复发送给接收端,只要接收端重启会再次接收该消息,看下篇笔记如何确认签收。

不确认会造成消息的堆积

Post Directory