RabbitMQ 进阶2(消费者可靠性+延迟消息)

米饭5个月前行业资讯281

三、消费者可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:


消息投递的过程中出现了网络故障

消费者接收到消息后突然宕机

消费者接收到消息后,因处理不当导致异常

一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。


但问题来了:RabbitMQ 如何得知消费者的处理状态呢?


接下来我们一起来学习一下消费者处理消息时的可靠性解决方案。


3.1 消费者确认机制:

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:


ack:成功处理消息,RabbitMQ 从队列中删除该消息

nack:消息处理失败,RabbitMQ 需要再次投递消息

reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.


由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置处理方式,有三种模式:


none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用 api,发送ack或reject,存在业务入侵,但更灵活

auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回nack;

如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改 SpringAMQP 的 ACK 处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 采用自动模式

我们一般使用 auto 模式。


3.2 失败重试机制:

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue 到队列,再次投递,直到消息处理成功为止。


极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力:


当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。


修改 consumer 服务的 application.yml 文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

配置完后需要重启 consumer 服务。


开启失败重试机制的效果如下:开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试,重试达到最大次数后,Spring 会返回 reject,消息会被丢弃。


3.3 失败处理策略:

本地达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。


因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有 3 个不同实现:


RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。


RepublishMessageRecoverer使用示例如下:


在 consumer 服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
  1. 定义一个 RepublishMessageRecoverer,关联队列和交换机

  2. @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
  3. 3.4 业务幂等性:

  4. 幂等在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。


  5. 例如:根据 id 删除数据,查询数据。


  6. 但是有的操作不是幂等的,例如扣减余额。


  7. 因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:


  8. 唯一消息 ID

  9. 业务状态判断

  10. 3.4.1 唯一消息 ID:

  11. 这个思路非常简单:


  12. 每一条消息都生成一个唯一的 ID,与消息一起投递给消费者。

  13. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库。

  14. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

  15. 如何给消息添加唯一 ID 呢?


  16. 其实很简单,SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可。

  17. @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jjmc.setCreateMessageIds(true);
        return jjmc;
    }

3.4.2 业务判断:

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。


例如要把订单状态从未支付修改成已支付,我们可以在执行业务时判断订单状态是不是未支付,如果不是未支付,说明订单已经被处理过了,无需重复处理。


3.5 兜底方案:

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息 100% 的可靠。万一真的 MQ 通知失败该怎么办呢?


有没有什么兜底的方案?


解决思路:既然 MQ 通知不一定发送到消费者服务,那么消费者服务就必须自己主动去查询。这样即便消费者服务的 MQ 通知失败,我们依然能通过主动查询来保证业务状态的一致。


这里又出现一个问题:消费者什么时候主动查询?总不能一开始就查吧。


这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔 20 秒就查询一次,并判断发送者业务状态。如果发现发送者业务状态已经修改,则立刻更新消费者业务状态即可。


至此,消息可靠性的问题已经解决了。

四、延迟消息

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。


但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!


因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。


像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用 MQ 的延迟消息了。


在 RabbitMQ 中实现延迟消息也有两种方案:


死信交换机 + TTL

延迟消息插件

4.1 死信交换机和延迟消息:

4.1.1 死信交换机:

什么是死信?


当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):


消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为 false

消息是一个过期消息,超时无人消费

要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机**(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。


4.1.2 延迟消息:

前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer作用类似。


而最后一种场景,大家设想一下这样的场景:


如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是 blue:

image-20241206132453029

假如我们现在发送一条消息到ttl.fanout,RoutingKey 为 blue,并设置消息的有效期为 5000 毫秒:

image-20241206132601232

注意:尽管这里的ttl.fanout不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct才能正确路由消息。


消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信,死信被再次投递到死信交换机hmall.direct,并沿用之前的 RoutingKey,也就是blue,由于direct.queue1与hmall.direct绑定的 key 是 blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是 5 秒钟以后了。


也就是说,publisher 发送了一条消息,但最终 consumer 在 5 秒后才收到消息。我们成功实现了延迟消息。


注意:


RabbitMQ 的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。


当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的 TTL 时间不一定准确。


4.2 DelayExchange 插件:

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此 RabbitMQ 社区提供了一个延迟消息插件来实现相同的效果。


官方文档说明:


https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq


下面演示使用,不演示下载。

4.2.1 基于注解方式声明延迟交换机:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

4.2.2 基于 @Bean 的方式:

@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder
        .directExchange("delay.direct") // 指定交换机类型和名称
        .delayed() // 设置delay的属性为true
        .durable(true) // 持久化
        .build();
}

4.2.3 发送延迟消息:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:


延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。


因此,不建议设置延迟时间过长的延迟消息。


参考文献:


黑马程序员

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!  

云掣基于多年在运维领域的丰富时间经验,编写了《云运维服务白皮书》,欢迎大家互相交流学习:

《云运维服务白皮书》下载地址:https://fs80.cn/v2kbbq

想了解更多大数据运维托管服务、数据库运维托管服务、应用系统运维托管服务的的客户,欢迎点击云掣官网沟通咨询:https://yunche.pro/?t=shequ


相关文章

【JavaEE初阶】网络编程TCP协议实现回显服务器以及如何处理多个客户端的响应

【JavaEE初阶】网络编程TCP协议实现回显服务器以及如何处理多个客户端的响应

1.TCP相关API 和前一期的UDP基本是大差不差的,但是这里提供的方法来模拟对于网卡的操作是有一定的区别的,所示API如下:ServerSocket是Socket类对应到网卡给服务器使用...

SRE(站点可靠性工程)介绍

SRE(站点可靠性工程)介绍

概述站点可靠性工程(SRE)是 IT 运维的软件工程方案。SRE 团队使用软件作为工具,来管理系统、解决问题并实现运维任务自动化。SRE 执行的任务以前通常由运维团队手动执行,或者交给使用软件和自动化...

深入了解Linux命名空间中cgroups相关概念:打开容器技术的黑匣子

深入了解Linux命名空间中cgroups相关概念:打开容器技术的黑匣子

一、cgroups概念cgroup全称是control groups,被整合在了linux内核当中,把进程(tasks)放到组里面,对组设置权限,对进程进行控制。可以理解为用户和组的概念,用户会继承它...

Docker 基础与实战指南(3)

Docker 基础与实战指南(3)

2.2 数据卷:容器是隔离环境,容器内程序的文件、配置、运行时产生的容器都在容器内部,我们要读写容器内的文件非常不方便。大家思考几个问题:如果要升级 MySQL 版本,需要销毁旧容器,那么数据岂不是跟...

Linux 环境下Docker将镜像打包导出到本地,上传至内网服务器(八)

Linux 环境下Docker将镜像打包导出到本地,上传至内网服务器(八)

背景docker将镜像导出到本地,上传至内网服务器上背景:在企业中往往出现了内网不能和外网相通,不能使用docker pull命令来拉取镜像,这个时候我们就可以考虑在有所需镜像的服务器上导出...

MySQL数据库运维篇

MySQL数据库运维篇

一、日志1.1、错误日志它记录了当mysqld启动和停止时,以及服务器在运行过程中发生任何严重错误时的相关信息。当数据库出现任何故障导致无法正常使用时,建议首先查看此日志。该日志是默认开启的,默认存放...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。