1、RabbitMQ集群架构原理解析
主备模式
master-slave结构,可以理解为热备份,master负责读写,master宕机后就会切换到slave,对业务不会造成太大的影响,master和slave之间是通过复制(ActiveMQ是通过zookeeper),业务不会主动访问备份节点,除非宕机切换
RabbitMQ的主备复制是通过自己内部的机制来进行,消息是一并消费的。HAproxy相当于Nginx,备份节点只做备份
远程模式
将数据传输到外部节点,当数据处理不过来时可以用这个分流一部分数据,结构简单但配置复杂,使用的不多
主要应用的场景是:对不同数据中心进行复制时使用,可以跨地域的让两个MQ集群互联
镜像模式
业界对RabbtiMQ使用比较多的架构模式,就是镜像模式
高可用的数据复制过程
VIP: 虚拟ip
HAProxy:就是一个专门的提供负载均衡的中间件
keepalived:就是支持VRRP的一个心跳检测工具(VRRP:virtual router redundancy photocol 虚拟路由器冗余协议)
缺点
-
镜像队列集群的缺陷是无法进行横向扩容,因为每个节点都是一个完整的互相复制的节点,保持数据的强一致性,
- 并且镜像节点过多会使MQ负担增加,一个数据写入后复制到多个节点会使吞吐量下降。消息队列,天生的读写分离。
- 消息的发送后会马上同步到其他镜像节点,消费也并一起消费的。
多活模式
个人理解:是远程模式和镜像模式的结合,解决跨地域的MQ集群数据一致性问题,通过DNS的解析服务,(就近寻址)使APP访问同一个地域的MQ集群,加快访问速度。同时单个MQ集群使用LBS中间件提高并发访问性能。
DNS:就是一个IP域名的解析服务
LBS:负载均衡服务(Nginx、HAProxy、Lvs、SLB)
Federation[ˌfedəˈreɪʃn]:在消息队列之间提供信息传输的高性能插件,使用AMQP协议,可以支持连接双方的不同版本MQ访问
2、RocketMQ集群架构原理解析
- 支持集群、负载均衡,水平扩展
- 亿级别的消息堆积能力
- 采用的是zeroCopy零拷贝、顺序写盘
- 用NameServer代替了zookeeper
- 集群的架构模型
- 单点模式
- 主从模式
- 双主模式
- 多主多从模式
3、Kafka高性能原因分析
Kafka的介绍
直接内存消费,不落盘(空中接力)
- kafka是linkedIn开源的分布式消息系统,归给Apache的顶级项目
- kafka主要特点是基于Pull的模式来处理消息的消费的,追求高吞吐量,一开始的目的就是来做日志传输的收集的
- kafka不支持事务,对消息的丢失,错误没有太严格的要求
- 高并发业务场景很少有用到事务的
kafka的特点
- 跨平台
- 分布式
- 实时性
- 伸缩性
kafka高性能的原因
- 顺序写:顺序写磁盘,可以提高磁盘的利用率。
- Page Cache:页缓存(操作系统相关术语),来提升吞吐量
- zeroCopy
- 后台的异步操作,主动Flush
- 预读的策略,IO的调度
Page Cache(页面缓存)
- Page Cache是OS实现的一种主要的磁盘缓存机制,以此来减少对磁盘I/O的操作
- 将磁盘文件缓存到内存中(MySQL->Redis 类似将mysql的数据放到redis中,提高访问速度)
- 如果要从磁盘中读取1.txt,OS不是直接到磁盘而是到PageCache,如果PageCache里有就拿走了,如果没有就向磁盘发出请求,通过I/O获取数据,获取数据后先放入PageCache然后再给需要的请求
- 写入数据的时候先判单PageCache里有没有,没有先写入PageCache,然后OS将PageCache的数据刷到磁盘中(MySQL-Redis,先写MySQL后写Redis)
传统的读取文件数据流程
解析一下步骤:
1.OS将数据从磁盘文件复制到内核空间的PageCache
2.应用程序将数据从PageCache复制到用户缓冲区
3.应用程序将数据复制到内核空间缓冲区
4.OS将数据从内核空间缓冲区复制到socket缓冲区
5.OS从socket缓冲区复制数据到网卡接口,数据网络发送给客户端
ZeroCopy(零拷贝)
通常情况下,Kafka的消息会有多个订阅者,生产者发布的消息会被不同的消费者多次消费,为了优化这个流程,Kafka使用了“零拷贝技术”,如下图所示:
零拷贝就是将数据从磁盘文件读取到内核空间的PageCache的一次,然后直接复制给网卡接口网络发送给客户端;当消费同样数据消息时就可以自己从PageCache读取,省去中间在磁盘内存中来回拷贝复制的过程,降低了上下文来回切换的过程,大大提高了应用程序的性能
Kafka的集群模式
- 通过zookeeper来实现集群的配置
- 大部分都是内存级别的副本复制,磁盘是异步做的一个备份
4、RabbitMQ内部核心概念掌握
- RabbitMQ是采用Erlang开发的:Erlang有着和原生Socket一样的延迟,性能非常高
- 提供可靠的消息传统模式:confirm(确认)、return(返回)
- 与SpringAMQP完美结合
- 集群模式比较丰富(主备模式、远程模式、镜像模式、多活模式),
- 保证数据不丢失的情况下做高可靠(数据是否完整)和高可用
AMQP(Advanced Message Queue Protocol)高级消息队列协议各核心部件之间的关系
- 消息的生产者把消息投递到server上,经过virtual host(相当于路由空间,就是一个文件夹)到Exchage(路由)就可以了,不需要关心把消息投到那个队列的。
- 消费者只需要和Message Queue进行监听和绑定即可,这就实现了消费者和生产者的解耦
- 生产者和消费者是通过路由(routingkey)来关联的
- 通过routingkey来关联Exchage和Message Queue,routingkey相当于桥梁
- 消息的生产者将消息发送给Exchage上的某个Routingkey,消费者在队列中订阅某个Routingkey的消息,这个时候Exchage就将两个Routingkey一样的生产和消费连接在了一起
AMQP名词解释
- Server:又称Broker,就是我们的rabbitmq服务
- Connection:就是连接
- Channel:网络通信,每个Channel代表一个会话(session)
- Message:消息体
- Virtual host:虚拟地址,可以理解为一个工作空间,或者一个文件夹,进行逻辑隔离
- 一个Virtural host下可以有多个Exchange和queue
- 同一个Virtual host下不能有相同名称的Exchage和queue存在
- Exchange:交换机,接收消息进行队列数据的绑定,一个路由的概念
- Binding:Exchange和Queue之间的虚拟连接,Binding中包含Routingkey(相当于网线)
- Routingkey:路由规则,消息的暗号
- Queue:存放消息的具体地方
RabbitMQ消息的流转
5、RabbitMQ服务搭建
安装rabbitmq
操作系统使用centOS 7.x
# 1.安装需要的辅助工具
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
# 2.下载安装包
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
# 3.按照顺序安装rpm包,使用rpm一键安装将环境变量也一并处理好
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
# 4.修改用户登录与心跳检测的频率,这里也可以修改默认用户guest的密码
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
# 将{loopback_users, [<<"guest">>] },
# 修改为{loopback_users, [guest]},
# 心跳,将heartbeat修改为10秒
{heartbeat, 10},
# 5.启动Rabbitmq 默认端口5672,使用rpm安装,已带有上下文。
/etc/init.d/rabbitmq-server start | stop | status | restart
# 后台启动
rabbitmq-server start &
nohup rabbitmq-server start > rabbitmq-log.file 2>&1 &
# 6.安装一个端口访问工具
yum -y install lsof
# 查看rabbitmq服务是否已启动
lsof -i:5672
# 7.启动控制台 默认端口15672
cd /usr/lib/rabbitmq/bin
rabbitmq-plugins list # 查看rabbitmq都有哪些插件
rabbitmq-plugins enable rabbitmq_management # 启动管理控制台插件
# 管理控制台的端口是15672,查看是否已启动
lsof -i:15672 # 管理控制台的端口是15672
# 8.登录访问控制台,默认用户密码guest/guest,登录控制台后修改
http://192.168.1.100:15672
卸载rabbitmq
# 1、查看rpm安装包
[root@helloworld ~]# rpm -qa|grep rabbitmq
rabbitmq-server-3.6.5-1.noarch
# 2、卸载
rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch
访问控制台
云服务器要开启安全组和防火墙端口5672,15672
disc就是指磁盘存储消息,如果想要内存方式存储,在启动rabbitmq的时候加上–ram即可
集群配置的时候可以导入导出rabbitmq的 配置
Admin
可以在这个界面修改密码,添加用户
点击guest用户,它默认是administrators的权限,发现它可以访问所有的virtual host,也可以配置用户访问Virtual Hosts的权限
查看添加Virtual Hosts
RabbitMQ常用命令
# --help查看帮助命令
rabbitmqctl --help
# 关闭应用
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 节点状态
rabbitmqctl status
# 添加用户密码
rabbitmqctl add_user username password
# 修改用户密码
rabbitmqctl change_password username newpassword
# 列出所有用户
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 设置用户权限
# 三个*对应:configure write read
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"
# 列出所有虚拟主机
rabbitmqctl list_vhosts
# 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出虚拟主机的权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath
# 查看所有队列
rabbitmqctl list_queues
# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue queueName
# 清除所有数据
rabbitmqctl reset # 这个动作最好在MQ服务停掉后操作
6、Springboot整合RabbitMQ
创建工程
-
rabbitmq-common项目:提供消息对象
实体类Orderinfo.java,必须序列化,因为要跨项目传输
import lombok.Data; import java.io.Serializable; @Data public class OrderInfo implements Serializable { //消息对象需要序列化 private static final long serialVersionUID = 4084996990296644842L; private String id; private String orderName; //消息id是用来生成一个消息的唯一id,通过消息id能找到这个消息的业务信息 private String messageId; }
- rabbitmq-provider项目:消息发送方
- rabbitmq-receiver项目:消息接收方
项目的结构:
发送方的设置
1、pom.xml导入依赖
spirngboot 版本使用2.2.5
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath></relativePath><!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>com.jude</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml配置文件
spring:
rabbitmq:
host: 39.99.219.219
port: 5672 # 默认5672
username: guest # 访问的用户密码
password: guest
virtual-host: / # exchange 和 queue所在的虚拟空间目录
connection-timeout: 15000 # 连接超时时间15秒
3、编写发送类OrderSender.java
spring是如何简化开发的,主要通过以下4点:
- Spring Bean,生命周期由spring 容器管理的ava对象
- IOC,控制反转的思想,所有的对象都去spring容器getbean
- AOP,切面编程降低侵入
- xxxTemplate模版技术,如RestTemplate,RedisTemplate
import com.icoding.basic.po.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
// @Service
@Component
public class OrderSender {
// spring的4大特点之一,使用xxxTemplate模版类,让调用变得简单,
@Autowired
RabbitTemplate rabbitTemplate;
public void sendOrder(OrderInfo orderInfo) throws Exception{
/** 可以放在application.xml配置,通过@Value加载
* exchange: 交换机名字,就是个你自己定义的字符串,这里是order-exchange
* routingkey: 队列关联的key,是个你自己定义的字符串,这里是order.updates
* object: 要传输的消息对象
* correlationData: 消息的唯一id
*/
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getMessage_id());
rabbitTemplate.convertAndSend("order-exchange","order.update",orderInfo,correlationData);
}
}
4、编写测试类
@Autowired
OrderSender orderSender;
@Test
void contextLoads() {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setId("10004");
orderInfo.setOrderName("消息队列RabbitMQ从入门到精通");
orderInfo.setMessageId("MS9999904");
try {
System.out.println("***********开始发送************");
orderSender.sendOrder(orderInfo);
System.out.println("-----------发送完成------------");
}catch (Exception ex){
ex.printStackTrace();
}
}
跑一跑测试 能跑通,发送成功
但是登录Rabbitmq控制台,并没有发现有消息
为什么?因为你还没有创建exchange和queue,可以在控制台创建exchange和queue
Rabbitmq 控制台创建exchange和queue
登录Rabbitmq 控制台
1.创建exchange
- Type是exchage的routingkey的绑定类型:fanout、 headers、direct、topic(常用)
- Durability:消息是否持久化,Durable持久化
- Auto delete:如果设置为yes则当exchange最后一个绑定的队列被删除后,就会自动删除
- Internal:如果设置为yes,是RabbitMQ的内部使用,不提供给外部,自己编写erlang语言做扩展时使用
- Arguments:扩展AMQP协议的自定义参数
2.创建queue
3.在exchange里创建Binding并输入routingkey
这里的routingkey就是我们的一个接收规则,exchange 只是一个路由的概念,queue就是存放消息的容器,通过routingkey 绑定转发,那样provider就可以把消息发送给具体的队列,一个exchange同一个routingkey可以绑定多个queue
在queue里也可以创建binding绑定exchange
重新发送消息
发送成功,登录控制台,查看queues
点进去看具体消息
注意
接收方的设置
exchange和queue是由消息消费端建立的。
1、pom.xml导入依赖,与发送方一样
2、application.yaml 配置文件,接收方同时也可以是发送方,一切皆配置
spring:
rabbitmq:
host: 39.99.219.219
username: guest
password: guest
virtual-host: / # exchange 和 queue所在的虚拟空间目录,发送方和接收方一致
connection-timeout: 15000
listener: # 消费端配置,监听queue
simple:
concurrency: 5 # 初始化并发数
max-concurrency: 10 # 最大并发数
auto-startup: true # 自动开启监听,默认开启
prefetch: 1 # 每个并发连接同一时间最多处理几个消息,限流设置(消费端)
acknowledge-mode: manual # 签收模式,设置为手动
看看其它配置的属性
3、编写接收的实现类
上面我们是登录控制台手动创建exchange和queue,创建binding,指定routingkey 绑定交换机和队列的,也可以通过消息接收端在代码里创建(没有就创建,有就使用)。我们删除刚刚手动创建的exchange和queue,下面通过代码创建
import com.icoding.basic.po.OrderInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class OrderReceiver {
// 这个很好理解,就是创建queue和exchange,然后通过key绑定,配置是跟控制台是一样的
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "order-exchange",durable = "true",type = "topic"),
key = "order.update"
)
)
// @Payload 看上面的截图知道是具体的消息体
@RabbitHandler
public void onOrderMessage(@Payload OrderInfo orderInfo, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println("************消息接收开始***********");
System.out.println("Order Name: "+orderInfo.getOrder_name());
}
}
启动项目,登录Rabbitmq控制台,发现exchange和queue都自动创建了
启动消费发送方,接收方就会监听到,输出消息
登录控制台,发现order-queue有一个未确认的消息,为什么是Unacked,因为上面application.yml设置的是manual手动签收,
我们的接收方法里还没有手动签收,没确认签收的消息,Rabbitmq会重复发送给接收端,只要接收端重启会再次接收该消息,看下篇笔记如何确认签收。
不确认会造成消息的堆积