RabbitMQ 进阶2(消费者可靠性+延迟消息)

米饭7小时前行业资讯6

三、消费者可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:


消息投递的过程中出现了网络故障

消费者接收到消息后突然宕机

消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。


但问题来了:RabbitMQ 如何得知消费者的处理状态呢?


接下来我们一起来学习一下消费者处理消息时的可靠性解决方案。


3.1 消费者确认机制:

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:


ack:成功处理消息,RabbitMQ 从队列中删除该消息

nack:消息处理失败,RabbitMQ 需要再次投递消息

reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.


由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置处理方式,有三种模式:


none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用 api,发送ack或reject,存在业务入侵,但更灵活

auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回nack;

如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改 SpringAMQP 的 ACK 处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 采用自动模式

我们一般使用 auto 模式。


3.2 失败重试机制:

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue 到队列,再次投递,直到消息处理成功为止。


极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力:


当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。


修改 consumer 服务的 application.yml 文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

配置完后需要重启 consumer 服务。


开启失败重试机制的效果如下:开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试,重试达到最大次数后,Spring 会返回 reject,消息会被丢弃。


3.3 失败处理策略:

本地达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。


因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有 3 个不同实现:


RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。


RepublishMessageRecoverer使用示例如下:


在 consumer 服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
  1. 定义一个 RepublishMessageRecoverer,关联队列和交换机

  2. @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
  3. 3.4 业务幂等性:

  4. 幂等在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。


  5. 例如:根据 id 删除数据,查询数据。


  6. 但是有的操作不是幂等的,例如扣减余额。


  7. 因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:


  8. 唯一消息 ID

  9. 业务状态判断

  10. 3.4.1 唯一消息 ID:

  11. 这个思路非常简单:


  12. 每一条消息都生成一个唯一的 ID,与消息一起投递给消费者。

  13. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库。

  14. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

  15. 如何给消息添加唯一 ID 呢?


  16. 其实很简单,SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可。

  17. @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jjmc.setCreateMessageIds(true);
        return jjmc;
    }

3.4.2 业务判断:

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。


例如要把订单状态从未支付修改成已支付,我们可以在执行业务时判断订单状态是不是未支付,如果不是未支付,说明订单已经被处理过了,无需重复处理。


3.5 兜底方案:

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息 100% 的可靠。万一真的 MQ 通知失败该怎么办呢?


有没有什么兜底的方案?


解决思路:既然 MQ 通知不一定发送到消费者服务,那么消费者服务就必须自己主动去查询。这样即便消费者服务的 MQ 通知失败,我们依然能通过主动查询来保证业务状态的一致。


这里又出现一个问题:消费者什么时候主动查询?总不能一开始就查吧。


这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔 20 秒就查询一次,并判断发送者业务状态。如果发现发送者业务状态已经修改,则立刻更新消费者业务状态即可。


至此,消息可靠性的问题已经解决了。

四、延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。


但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!


因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。


像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用 MQ 的延迟消息了。


在 RabbitMQ 中实现延迟消息也有两种方案:


死信交换机 + TTL

延迟消息插件

4.1 死信交换机和延迟消息:

4.1.1 死信交换机:

什么是死信?


当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):


消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为 false

消息是一个过期消息,超时无人消费

要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机**(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。


4.1.2 延迟消息:

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。


而最后一种场景,大家设想一下这样的场景:


如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是 blue:

image-20241206132453029

假如我们现在发送一条消息到ttl.fanout,RoutingKey 为 blue,并设置消息的有效期为 5000 毫秒:

image-20241206132601232

注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。


消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信,死信被再次投递到死信交换机hmall.direct,并沿用之前的 RoutingKey,也就是blue,由于direct.queue1与hmall.direct绑定的 key 是 blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是 5 秒钟以后了。


也就是说,publisher 发送了一条消息,但最终 consumer 在 5 秒后才收到消息。我们成功实现了延迟消息。


注意:


RabbitMQ 的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。


当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的 TTL 时间不一定准确。


4.2 DelayExchange 插件:

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此 RabbitMQ 社区提供了一个延迟消息插件来实现相同的效果。


官方文档说明:


https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq


下面演示使用,不演示下载。

4.2.1 基于注解方式声明延迟交换机:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

4.2.2 基于 @Bean 的方式:

@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder
        .directExchange("delay.direct") // 指定交换机类型和名称
        .delayed() // 设置delay的属性为true
        .durable(true) // 持久化
        .build();
}

4.2.3 发送延迟消息:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:


延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。


因此,不建议设置延迟时间过长的延迟消息。


参考文献:


黑马程序员

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!  

云掣基于多年在运维领域的丰富时间经验,编写了《云运维服务白皮书》,欢迎大家互相交流学习:

《云运维服务白皮书》下载地址:https://fs80.cn/v2kbbq

想了解更多大数据运维托管服务、数据库运维托管服务、应用系统运维托管服务的的客户,欢迎点击云掣官网沟通咨询:https://yunche.pro/?t=shequ


相关文章

Linux 安装Docker完整教程(六)

Linux 安装Docker完整教程(六)

背景近些年随着云原生的发展,Docker在云原生中的作用使得它也蓬勃发展起来。今天这篇文章就带大家一起实现一下在Linux操作系统下Docker的部署过程,收藏起来,以备不时之需。当然,如果对Dock...

gitlab和jenkins连接

gitlab和jenkins连接

一:jenkins 配置安装gitlab插件   生成密钥id_rsa 要上传到jenkins,id_rsa.pub要上传到gitlabcat /root/.ssh/id_rsa复...

【Docker】在 Ubuntu 上安装 Docker 的详细指南

【Docker】在 Ubuntu 上安装 Docker 的详细指南

Docker 是一个开源的平台,可以让开发者打包应用及其依赖项为一个可移植的容器。本文将详细介绍如何在 Ubuntu 上安装 Docker,包括安装步骤、常见命令以及一些注意事项。一、准备工...

Dockerfile和docker-compose详解

Dockerfile和docker-compose详解

一、Dockerfile1. Dockerfile简介Dockerfile是一个用来构建镜像的文本文件, 文本内容包含了一条条构建镜像所需的指令和说明。例如我们要在含python3的cent...

ubuntu下如何查看显卡及显卡驱动

ubuntu下如何查看显卡及显卡驱动

ubuntu下如何查看显卡及显卡驱动使用nvidia-smi 工具查看查看显卡型号nvida-smi -L$ nvidia-smi -L GPU 0:&nbs...

【网络】NAT、代理服务、内网穿透(1)

【网络】NAT、代理服务、内网穿透(1)

1.NAT技术NAT(Network Address Translation,网络地址转换)技术,是解决IP地址不足的主要手段,并且能够有效地避免来自网络外部的攻击,隐藏并保护网络内部的计算机。1.1...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。