Spring AMQP与RabbitMQ深度整合指南:从基础到高级应用(3)

米饭10个月前行业资讯851

3.5 声明交换机和队列:

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。


因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。


**声明交换机和队列一般是消息的接收者来做的。**这里的背景是微服务,消息的发送者和接收者一般是在不同的项目里面。


注意:如果项目中没有消费者(使用 @RabbitListener),可能创建交换机和队列会失败(MQ 上显示不出来,但是 spring 中对象成功创建),至于为什么会这样,我也不清楚,去网上找,问 AI 都没有这方面的资料。这个问题是我在项目中遇到的,解决了很久,算是比较小众的问题,如果有相同情况的友友,可以试试)。


3.5.1 使用 API:

SpringAMQP 提供了一个 Queue 类,用来创建队列:

image-20241128152047559

SpringAMQP 还提供了一个 Exchange 接口,来表示所有不同类型的交换机:

image-20241128152125575

我们可以自己创建队列和交换机,不过 SpringAMQP 还提供了 ExchangeBuilder 来简化这个过程:

image-20241128152158873

而在绑定队列和交换机时,则需要使用 BindingBuilder 来创建 Binding 对象:

image-20241128152250590

3.5.1.1 fanout 示例:
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}


3.5.1.2 direct 示例:

direct 模式由于要绑定多个 KEY,会非常麻烦,每一个 Key 都要编写一个 binding:

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

由于 Topic 交换机的声明和 Direct 差不多,大家照着上面的 Direct ,修改一下类型就能成功创建,这里就不演示了。


3.5.2 使用注解:

基于 @Bean 的方式声明队列和交换机比较麻烦,尤其是 direct 交换机,Spring 还提供了基于注解方式来声明。


3.5.2.1 fanout 示例:
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg){
    System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg){
    System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}
3.5.2.2 direct 示例:
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

3.6 消息转化器:

convertAndSend 方法可以任意类型的消息。

7944a32270d00665d9ace3ff5a5bdbba.png

而在数据传输时,它会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。


只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。JDK 序列化存在下列问题:


数据体积过大

有安全漏洞

可读性差

所以我们需要换一个消息转化器。


这里采用 JSON 方式来做序列化和反序列化。


在发送者和接受者两个服务中都引入依赖。

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在发送者和接受者两个服务的启动类中添加一个 Bean 即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的 messageId 可以便于我们将来做幂等性判断。

参考文献:

  • 黑马程序员


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!  

云掣基于多年在运维领域的丰富时间经验,编写了《云运维服务白皮书》,欢迎大家互相交流学习:

《云运维服务白皮书》下载地址:https://fs80.cn/v2kbbq

想了解更多大数据运维托管服务、数据库运维托管服务、应用系统运维托管服务的的客户,欢迎点击云掣官网沟通咨询:https://yunche.pro/?t=shequ


相关文章

MyBatisPlus从零到一:快速入门与核心功能详解(6)

MyBatisPlus从零到一:快速入门与核心功能详解(6)

四、分页插件MybatisPlus 提供了很多的插件功能,进一步拓展其功能。目前已有的插件有:PaginationInnerInterceptor:自动分页TenantLineInnerInterce...

Linux 配置MySQL环境(三)

Linux 配置MySQL环境(三)

一、下载1. 官网下载MySQL官网:https://www.mysql.com/进入官网之后点击 DOWNLOADS进入页面 ( 在这里我们选择社区版) ,点击 MySQL Community (G...

Docker:LXC容器操作实战

Docker:LXC容器操作实战

前言通过LXC来完成容器的创建、体会什么是容器。利用LXC容器技术来隔离特定的应用,提供虚拟执行环境,从而优化资源管理和部署效率。什么是LXC?LXC为Linux Container的简写,是一种可以...

运维初入门之认识运维,运维日常的工作都在干什么

运维初入门之认识运维,运维日常的工作都在干什么

运维是指系统运维,是指负责维护、管理和优化计算机系统和网络设备的工作。运维日常的工作主要包括系统监控、故障处理、性能调优、安全防护、备份和恢复、资源规划等多个方面。以下是对运维日常工作进行详细说明的2...

Docker全攻略:从入门到精通,掌握容器构建关键技能

Docker全攻略:从入门到精通,掌握容器构建关键技能

引言        Dockerfile 是构建 Docker 镜像的核心文件。它定义了如何将应用程序及其依赖打包成一个可以跨平台运行的容器。本篇博客将从基础概...

Spring AOP 实战指南:从入门到精通(1)

Spring AOP 实战指南:从入门到精通(1)

Spring 框架有两大核心 IoC,AOP。在前面我们已经学习过了 IoC 的相关知识,今天就让我们开始 AOP 的学习。一、AOP 概述Aspect Oriented Programming(面向...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。