1 | 作者: 夜泊1990 |
第一章 RabbitMQ介绍
第1节 MQ是什么
1 | 1. 消息队列(Message Queue),又叫做消息中间件 |
第2节 MQ 的主要特点
1 | 1. 是一个独立运行的服务.生产者发送消息,消费者接收消费,需要先跟服务器建立连接 |
第3节 RabbitMQ介绍
- 官网地址
1 | https://www.rabbitmq.com/ |
1 | 官网介绍: RabbitMQ is the most widely deployed open source message broker |
第二章 RabbitMQ安装(Docker)
第1节 安装步骤
1 | 1. 首先就是查询镜像,我们要用带有management版本的,此版本是带有管理界面的 |
第2节 用户管理
1 | 给RabbitMQ服务器添加新用户并且赋予权限 |
第三章 RabbitMQ架构及通讯范式
第1节 RabbitMQ的整体架构图
架构 |
---|
- Producer
1 | 消息发布者,主要用来进行消息发布 |
- Exchange
1 | 交换器,消息发布者将消息发送给交换器,交换器在通过路由发送到队列中 |
- Queue
1 | 保存消息 |
- Customer
1 | 消息消费者 |
一个消息(message)从开始到结束的过程
1 | 消息(message)被发布者(Producer)发送给交换器(exchange)然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)最后将消息投递给订阅了此队列的消费者(consumer),或者消费者按照需求自行获取 |
第四章 快速入门(RabbitMQ-HelloWorld)
1 | RabbitMQ快速入门,使用Java语言来向RabbitMQ服务器发送和消费消息 |
第1节 原理图
HelloWorld |
---|
- P: producer 生产者
- C: consumer 消费者
- 中间红色的部分是一个队列,在RabbitMQ中代表消息缓冲区
第2节 客户端依赖
在编写Java代码之前首先需要创建Java项目并且添加maven依赖,使用的是5.8.0版本的MQ客户端
1 | <dependency> |
第3节 消息发布
- 代码
1 | /** |
- 控制台查看
管理页面 |
---|
第4节 消息消费
- 代码
1 | /** |
- 控制台查看
1 | 当消费完成在回到RabbitMQ的web管理页面查看消息,会发队列中的消息已经被消费 |
- 注意
1
2在启动代码的时候先启动消费者,后启动提供者,先启动消费者创建队列,后启动发布者向队列中发布数据
当前入门的例子,因为手动创建了具体的队列所以不会出现发送消息到指定队列不存在的问题,但是接下来后面的练习可能会出错.
第五章 工作队列(RabbitMQ-Work)
第1节 原理图
第2节 Work(工作队列)介绍
1 | HelloWorld入门的例子介绍了我们提供者向队列中发送一个消息,消费者从队列中取出消息消费;接下来我们将要创建一个工作队列,用来在多个消费者(consumer)之间分发任务,然后观察,我们分发的任务在多个工作者之间是怎么进行消费的 |
- 工作队列(任务队列)
1
例如消息提供者将消息发送到任务队列中,根据上面图示,任务队列会被多个消费者(consumer)连接,这时候如果我们向任务队列中发送多个任务,这些任务会被所有的消费者(consumer)共享,那么问题来了,这个任务队列是怎么将任务合理的分配给消费者(consumer)进行消费的.
第3节 Work(工作队列)实现
- 需求举例
1 | 消息发布者: 向队列中发送多条消息(比如我这里发送10条消息) |
- 消息发布代码
1 | /** |
- 消息消费代码(下面是两个消费者)
1 | //消费者1 |
- 结果展示
1 | 上面代码所得的结果显示: |
- 结果分析
1
2
3
4
5
6
7
8
1. 通过上面的结果可以看出发布者发布消息会平均分配给每一个消费者,采用默认的任务分发机制(轮询)
2. 这种方式有优点当然也有缺点
2.1 优点: 可以轻易的并行工作,如果我们积压很多任务,我们可以通过增加工作者(consumer)来解决这一问题,使得系统的伸缩性更强
2.2 缺点: 这种分发机制没有考虑处理处理任务的时间问题(因为他分配任务的时候是一次性分配,并非是一个一个分配),按照轮询的方式将任务等分给了两个消费者,可能某一个消费者性能比较差,累积任务会越来越多,所以一直忙个不停;而另一个消费者性能比较好,处理任务块,可能闲的不行,这就造成了资源浪费
怎么解决这个问题呢? ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 向下看 ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
第4节 Fair dispatch(公平分发)
4.1 原理图
公平分发 |
---|
4.2 公平分发原则
1 | 上面的轮询机制会造成资源浪费的问题,原因是因为RabbitMQ在分发任务的时候盲目的一次性平均分配任务,它不看消费者是否应答(分发给消费者的任务后,不看消费者是否完成,直接在分配,这样就造成了累积),为了解决这个问题,RabbitMQ提供了一个方法来解决 |
4.3 代码实现
- 消息发布代码
1 | /** |
- 消息消费代码
1 | // 消费者1 |
- 结果展示
1 | 运行上面的代码打印出来的消息为: |
第5节 消息队列的持久化
5.1 消息持久化介绍
1 | 上面介绍的队列都是在RabbitMQ退出或者崩溃时就会消失的队列,如果当时队列里面还有消息未被消费,那么异常退出会造成任务丢失,怎么保证就算RabbitMQ意外退出,也不会造成队列中的任务丢失呢,这回就用到了消息队列的持久化. |
5.2 消息持久化设置
1 | 设置持久化队列比较简单只需要在提供者和消费者都设置 |
5.3 消息持久化实现
- 消息发送代码
1 | /** |
- 消息消费代码
1 | /** |
第六章 发布/订阅(RabbitMQ-Publish&Subscribe)
第1节 Publish&Subscribe原理图
Publish&Subscribe |
---|
第2节 Publish&Subscribe介绍
1 | 在前面的例子中,我们创建了一个工作队列,都是一个任务只交给一个消费者.这次我们做一些完全不同的事儿.将消息发送给多个消费者,这种模式叫做 "发布/订阅" |
第3节 Exchanges(交换器)
1 | 要想实现上面的发布/订阅这种模式,需要使用到交换器 |
3.1 交换器是干什么的
前面的例子,我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式
1 | RabbitMQ消息模式的核心理念: |
3.2 交换器的常见类型
1 | 常见的交换器类型有: direct、topic、headers 和 fanout |
3.3 交换器的创建
1 | channel.exchangeDeclare("logs", "fanout"); //使用channel对象创建一个名为logs,类型为fanout交换器 |
3.4 匿名交换器和临时队列
- 匿名交换器
1
2
3
4
5上面两个例子我们没有使用交换器,但是也可以将消息发送到队列,那是因为我们使用了默认的交换器,交换器名字为空字符串
代码: channel.basicPublish("", "hello", null, message.getBytes()); 第1个参数空字符串就是交换器名称,现在创建了一个类型为fanout名字为logs的交换器,可以指定交换器的名字了
代码: channel.basicPublish("logs","",null,message.getBytes()); - 临时队列
1 | 我们上面的例子在使用队列的时候都会指定一个名字,队列有名字对我们来说是非常重要的因为我们需要为消费者指定同一个队列去消费消息,但是接下来对于我们要完成的日志系统的例子来说指定具体队列不是我们所必须的,我们只关心消息发布出来之后消费者全部都接收到,不需要指定一个具体队列,使用临时队列即可.首先每当我们连接到RabbitMQ的时候需要为我们需要创建一个随机名字的空队列,其次,一旦消费者断开连接,队列将自动删除. |
3.5 队列绑定
队列绑定 |
---|
第4节 Publish&Subscribe实现
练习 消息的发布/订阅
1 | 构建一个简单日志系统.它包含2段程序:第一个是消息提供者,将发布日志消息,第二个是消息消费者接受并打印消息,当然需要启动多个消费者,验证多个消费者同时订阅消息,这个模式类似于我们的广播(一个电台播放音乐,每个人订阅这个电台频道,都可以收到这个电台放的音乐,对应我们的需求,就是发布者发布消息,可以被所有的消费者接收到) |
- 发布消息(消息提供者)代码
1 | /** |
1 | 生产者声明了一个广播模式的交换器,订阅这个交换器的消费者都可以收到每一条消息.可以看到在生产者中,没有声明队列.这也验证了之前说的.生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的 |
- 订阅消息(消息消费者)代码
1 | //在这里使用两个消费者测试,一个消费者将订阅到的消息直接打印到控制台,另一个消费者将订阅到的消息保存为本地日志文件 |
1 | 2个消费者,一个打印日志,一个写入文件,消费者实例启动后,会创建一个随机队列,这个在管理页面可以看到(如下图).而消费者实例关闭后,随机队列也会自动删除(所以需要先启动消费者创建随机队列,后启动生产者) |
- 管理控制台查看
管理页面 |
---|
第七章 路由(RabbitMQ-Route)
第1节 原理图
Route |
---|
第2节 Route(路由)介绍
- Route的功能
1 | 1. 在上个消息"发布/订阅"案例中我们建立了一个简单的日志系统,可以广播消息给多个消费者 |
- 交换器的选择
1 | 前面讲到我们的日志系统广播消息给所有的消费者.我们想对其扩展,根据消息的严重性来过滤消息.我们使用的fanout交换器,不能给我们太多的灵活性.它仅仅只是盲目的广播而已.我们使用direct交换器进行代替,其背后的算法很简单,消息会被推送至绑定键(routingKey)和消息发布附带的选择键(routingKey)完全匹配的队列 |
- Route的配置
1
2
3
4
5
6
71. 消息发布
1.1 交换器类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT)
1.2 发布消息的参数 channel.basicPublish(EXCHANGE_NAME,"routingKey",null,message.getBytes())
2. 消息订阅
2.1 交换器类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT)
2.2 队列绑定 channel.queueBind(queueName, EXCHANGE_NAME, severity) //可以进行多重绑定
第3节 Route(路由)实现
- 发布消息(消息提供者)代码
1 | /** |
- 订阅消息(消息消费者)代码
1 | //客户端1 |
1 | 上面说了很多,其实就为了做一件事,我们可以使用Direct exchange+routingKey来过滤自己感兴趣的消息,一个队列可以绑定多个routingKey,这就是有选择 |
第八章 主题(RabbitMQ-Topic)
第1节 Topic(主题)原理图
Topic |
---|
第2节 Topic(主题)介绍
- Topic(主题)交换器的功能
1 | 在上一节中我们使用了direct类型的交换器改进了日志系统,但是还是具有一定的局限性,不能根据多重条件进行路由选择,在我们的日志系统中,我们可能不仅仅根据日志严重性(info/warning/error)订阅日志,也想根据日志来源(auth/cron/kern)订阅日志,这将给我们带来更大的灵活性.比如我们可以订阅auth来源的error级别日志,还可以订阅cron来源的所有级别日志,这就需要我们强大的主题类型的交换器 |
- Topic(主题)交换器使用
1 | 我们主题交换器的使用和上面的Route(路由交换器)使用是类似的,都需要指定routingKey,只是发送给主题交换器的消息不能是任意设置的routingKey,必须是用小数点隔开的一系列的标识符.这些标识符可以是随意,但是通常跟消息的某些特性相关联.合法的routingKey 比如"socket.usd.nyse","nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节,关于routingKey有两种特殊的情况:*(星号)可以代替任意一个标识符; #(井号)可以代替零个或多个标识符(单词) |
1 | 消息发布: |
第3节 Topic(主题)实现
- 发布消息(消息提供者)代码
1 | /** |
- 订阅消息(消息消费者)代码
1 | // 消费者1 |
- 结果
1 | 通过上面的例子的结果 |
第九章 SpringBoot整合RabbitMQ
1 | 创建SpringBoot项目,并且选择RabbitMQ的依赖,由于不同的时间段可能SpringBoot和RabbitMQ的版本略微不同 |
第1节 SpringBoot和RabbitMQ整合并配置
1 | 在application.properties配置文件中配置基本的配置信息 |
第2节 代码实现
2.1 不设置交换器
- RabbitMQ configuration配置
1 | @Configuration |
- 消息发布和消费
- 定义一个User的POJO类用于作为消息发送
1 | //类一定序列化 |
- 消息发布
1 | @Component |
- 消息订阅
1 | @Component |
- 测试
1 | //测试的时候只需要测试消息发布,不需要写消息接受的测试,因为消息消费是监听器(RabbitListener),是时时的,在我们发送完成之后,直接就会消费,不需要在写测试类,去消费。 |
2.2 设置交换器
- RabbitMQ configuration配置
1 | @Configuration |
- 消息发布和消费
- 消息发布
1 | @Component |
- 消息消费
1 | @Component |
- 测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducerMessageTest{
@Resource
private ProducerMessage producerMessage;
@Test
public void sendLevelMessage(){
producerMessage.sendLevelMessage();
}
}
//日志中的打印结果:
/**
* 获取到的error消息为:我app1.error下的消息
* 获取到的error消息为:我app2.error下的消息
* 获取到的error消息为:我app3.error下的消息
*/
- 测试
第十章 RabbitMQ事务&Confirm
第1节 RabbitMQ事务机制
1 | 1. 通过上面的学习大家都知道我们的RabbitMQ的队列是可以持久化保存数据的,就算我们的MQ服务器挂掉或者重启也不会造成数据丢失,但是如果我们的消息还没有到队列就丢失了怎么办呢? |
第2节 AMQP事物机制
2.1 AMQP事物机制介绍
1 | 该模式与数据库的事务非常相似。RabbitMQ中与事务机制有关的方法有txSelect(),txCommit()以及txRollback()。txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。在通过txSelect开启事务之后,我们便可以发布消息给broker了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了 |
2.2 AMQP事物机制实现
- 代码如下
1 | public class ProducerAMQP { |
第3节 Confirm机制
1 | Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的,并且Confirm支持异步. |
- 消息提供者代码如下
1 | public class ProducerConfirm { |
- 消息消费者代码如下
1 | public class ConsumerConfirm { |
以上为RabbitMQ的常见操作,信息来源于官网,如果有什么翻译或者讲解错误,欢迎发送邮件指正.