Skip to content

Commit

Permalink
update rabbitMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
rbmonster committed Sep 24, 2022
1 parent 98da0aa commit 3bf01c2
Showing 1 changed file with 175 additions and 4 deletions.
179 changes: 175 additions & 4 deletions src/main/java/com/other/middleware/MESSAGEQUEUE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- [Kafka 概述:深入理解架构](https://juejin.cn/post/6844904050064883725#heading-8)
- [冰河-字节跳动面试官这样问消息队列:高可用、不重复消费、可靠传输、顺序消费、消息堆积,我整理了下](https://segmentfault.com/a/1190000039274777)
- [面试官让我重构 Kafka,懵了…… Pulsar](https://mp.weixin.qq.com/s/_RIvZwK1sJJP8xnUDyAk1Q)
- [【RabbitMQ的那点事】Exchange类型(超详细)](https://www.jianshu.com/p/d9561f13e28b)
- [RabbitMQ](https://www.jianshu.com/p/78847c203b76)

为什么使用消息队列?
- 异步、解耦、削峰、统一管理所有的消息
Expand Down Expand Up @@ -64,9 +66,9 @@

### 保证消息不丢失
- 生产者端:不论是同步还是异步发送消息,同步和异步回调都需要做好try-catch,妥善的处理响应,如果Broker返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。
- 消息队列端:存储消息阶段需要在消息刷盘之后再给生产者响应,防止突然宕机导致消息丢失。
> 如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。
- 消费端:消费者真正执行完业务逻辑之后,再发送给Broker消费成功
- 消息队列端:存储消息阶段需要在消息刷盘之后再给生产者响应,防止突然宕机导致消息丢失。
> 如果Broker是集群部署,有多副本机制,即消息不仅仅要写入当前Broker,还需要写入副本机中。那配置成至少写入两台机子后再给生产者响应。
**消息可靠性增强了,性能就下降**

Expand Down Expand Up @@ -642,7 +644,7 @@ RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协

## 基本概念

![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/rabbitmq.png)
![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/mq/rabbitmq.png)

1. Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
2. Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
Expand All @@ -653,4 +655,173 @@ RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协
7. Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。
8. Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
9. Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。
10. Broker:RabbitMQ Server,服务器实体。
10. Broker:RabbitMQ Server,服务器实体。



Producer 通过channel 绑定队列Queue、exchange以及routing Key的关系
```java
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();

String exchangeName = "X";

/**
* 绑定Exchange,按routingKey对Queue进行绑定:
* 参数1为Queue name, 参数2是Exchange Name, 参数3是routing key
* queueBind可以在UI上做,也可以放在各自的Consumer中进行绑定:
*/
channel.queueBind("Q1", exchangeName, "orange");
channel.queueBind("Q2", exchangeName, "black");
channel.queueBind("Q2", exchangeName, "green");

// basicPublish 将消息发送到指定的交换机
channel.basicPublish(exchangeName, "orange", null, "Orange Msg!".getBytes());
channel.basicPublish(exchangeName, "black", null, "Black Msg!".getBytes());
channel.basicPublish(exchangeName, "green", null, "Green Msg!".getBytes());

channel.close();
connection.close();
}
}
```

Consumer 无需关心exchange 关系,只管消费就行。
```java
public class MyConsumer1 {
public static void main(String[] args) throws Exception {
// 创建connection/channel,略,同上#1.4
// 创建消费者,并接收消息,略,同上#1.4

// 开始获取消息,push模式:queue = "Q1", autoAct = true
channel.basicConsume("Q1", true, consumer);
}
}
```

## 消费端如何限流

当海量消息瞬间推送过来,单个客户端无法同时处理那么多数据,严重会导致系统宕机。这时,需要削峰。

RabbitMQ提供了一种qos(服务质量保证)功能。即在非自动确认消息的前提下(非ACK),如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息。

```text
// prefetchSize:消息体大小限制;0为不限制
// prefetchCount:RabbitMQ同时给一个消费者推送的消息个数。即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。默认是1.
// global:限流策略的应用级别。consumer[false]、channel[true]。
void BasicQos(unit prefetchSize, unshort prefetchCount, bool global);
channel.basicQos(...);
```

## Exchange类型

Exchange分发消息时,根据类型的不同分发策略有区别。目前共四种类型:**direct****fanout****topic**、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,目前几乎用不到了,所以直接看另外三种类型)。

### Direct模式

![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/mq/rabbitmq-direct.png)

消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配。


direct模式下,如果有两个Consumer同时订阅一个queue?
> RabbitMQ的设定:**当多个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息,即一个队列中一条消息,只能被一个消费者消费。**\
> Direct exchange经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/mq/rabbitmq-direct-multi.png)

也就是官网workingQueue模式

![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/mq/rabbitmq-direct-workingqueue.png)


### Fanout模式

![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/mq/rabbitmq-fanout.png)

每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理该路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。

扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
- 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新

> **在fanout的模式下,不需要指定routing key**\
> Fanout exchange会忽略routing key。在Producer端,只需要关心将消息发送给哪个fanout exchange。在Consumer端,会负责将queue和上述的fanout exchange绑定起来,而且订阅该queue。

### Topic模式
Topic exchange通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。

routing key可以有通配符:
- 星号(`*`):表示准确的匹配一个单词
- 井号(`#`):则表示匹配0个或者多个单词。

![avatar](https://raw.githubusercontent.com/rbmonster/file-storage/main/learning-note/other/mq/rabbitmq-topic.png)

> 如果一个message在发送的时候,routing key是quick.orange.rabbit,那么两个queue都能收到。\
如果routing key是lazy.orange.elephant,那么两个queue也都能收到。\
如果routing key是quick.orange.fox,那么只有Q1能收到。\
如果routing是lazy.brown.fox,那么只有Q2能收到。\
如果routing key是quick.brown.fox,那么没有queue会收到。


Topic exchange的功能非常强大,也可以通过设置不同的routing key,达到别的exchange的用法:
- 如果queue绑定在`routing key = “#”`时,那么它可以匹配所有的routing key,就像**fanout exchange**
- 如果queue绑定的时候不使用“*”或者“#”,那么此时的topic exchange就像是**direct exchange**,是完全匹配模式。


使用案例:
- 分发有关于特定地理位置的数据,例如销售点由多个工作者(workers)完成的
- 后台任务,每个工作者负责处理某些特定的任务
- 股票价格更新(以及其他类型的金融数据更新)
- 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)


## 死信队列DLX

TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共两种。
- **在消息发送时可以进行指定**。通过配置消息体的properties,可以指定当前消息的过期时间。
- **在创建Exchange时可进行指定**。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。

死信队列(DLX Dead-Letter-Exchange):利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

消息变成死信的几种情况:
- 消息被拒绝(basic.reject/basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度


死信队列设置:需要设置死信队列的exchange和queue,然后通过routing key进行绑定。**只不过我们需要在队列加上一个参数即可**

```text
Map<String, Object> arguments = Maps.newHashMapWithExpectedSize(3);
arguments.put("x-message-ttl", dlx-ttl);
arguments.put("x-dead-letter-exchange","exchange-name");
arguments.put("x-dead-letter-routing-key", "routing-key");
Queue ret = QueueBuilder.durable("queue-name".withArguments(arguments).build();
```

## 延迟队列

### 基于死信队列实现
RabbitMQ 实现延迟队列可以是通过使用消息的 `TTL` 属性,将过期的消息转发到死信队列中,业务监听死信队列的消息就行了。

TTL 的设置有两种设置方式
- **设置消息过期时间****在消息发送时可以进行指定**
- **设置队列过期时间****在创建Exchange时可进行指定**

死信队列实现延迟队列问题:
- 设置消息过期时间实现方式缺陷:⚠️由于队列先进先出的属性,如果使用**设置消息过期时间** 的方式,无法正常的实现延迟队列的功能。主要是因为如果 **延迟时间长的Msg** 在队列头部等待,会阻塞队列后面 **延迟消息短的Msg**
- 设置队列过期时间实现方式缺陷:需要创建1个普通队列加1个对应的死信队列,创建的队列过多


### Delayed Message 插件

`DLX + TTL` 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 `Mnesia`(一个分布式数据库管理系统,适合于电信和其它需要持续运行和具备软实时特性的 Erlang 应用。)

> 局限性:目前该插件的当前设计并不真正适合包含大量延迟消息(例如数十万或数百万)的场景,详情参见 #/issues/72 另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源。

0 comments on commit 3bf01c2

Please sign in to comment.