Spring AMQP与RabbitMQ深度整合指南:从基础到高级应用(3)

米饭8个月前行业资讯504

3.5 声明交换机和队列:

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。


因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。


**声明交换机和队列一般是消息的接收者来做的。**这里的背景是微服务,消息的发送者和接收者一般是在不同的项目里面。


注意:如果项目中没有消费者(使用 @RabbitListener),可能创建交换机和队列会失败(MQ 上显示不出来,但是 spring 中对象成功创建),至于为什么会这样,我也不清楚,去网上找,问 AI 都没有这方面的资料。这个问题是我在项目中遇到的,解决了很久,算是比较小众的问题,如果有相同情况的友友,可以试试)。


3.5.1 使用 API:

SpringAMQP 提供了一个 Queue 类,用来创建队列:

image-20241128152047559

SpringAMQP 还提供了一个 Exchange 接口,来表示所有不同类型的交换机:

image-20241128152125575

我们可以自己创建队列和交换机,不过 SpringAMQP 还提供了 ExchangeBuilder 来简化这个过程:

image-20241128152158873

而在绑定队列和交换机时,则需要使用 BindingBuilder 来创建 Binding 对象:

image-20241128152250590

3.5.1.1 fanout 示例:
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}


3.5.1.2 direct 示例:

direct 模式由于要绑定多个 KEY,会非常麻烦,每一个 Key 都要编写一个 binding:

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

由于 Topic 交换机的声明和 Direct 差不多,大家照着上面的 Direct ,修改一下类型就能成功创建,这里就不演示了。


3.5.2 使用注解:

基于 @Bean 的方式声明队列和交换机比较麻烦,尤其是 direct 交换机,Spring 还提供了基于注解方式来声明。


3.5.2.1 fanout 示例:
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg){
    System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg){
    System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}
3.5.2.2 direct 示例:
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

3.6 消息转化器:

convertAndSend 方法可以任意类型的消息。

7944a32270d00665d9ace3ff5a5bdbba.png

而在数据传输时,它会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。


只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。JDK 序列化存在下列问题:


数据体积过大

有安全漏洞

可读性差

所以我们需要换一个消息转化器。


这里采用 JSON 方式来做序列化和反序列化。


在发送者和接受者两个服务中都引入依赖。

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在发送者和接受者两个服务的启动类中添加一个 Bean 即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的 messageId 可以便于我们将来做幂等性判断。

参考文献:

  • 黑马程序员


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!  

云掣基于多年在运维领域的丰富时间经验,编写了《云运维服务白皮书》,欢迎大家互相交流学习:

《云运维服务白皮书》下载地址:https://fs80.cn/v2kbbq

想了解更多大数据运维托管服务、数据库运维托管服务、应用系统运维托管服务的的客户,欢迎点击云掣官网沟通咨询:https://yunche.pro/?t=shequ


相关文章

Linux 环境下Docker将镜像打包导出到本地,上传至内网服务器(八)

Linux 环境下Docker将镜像打包导出到本地,上传至内网服务器(八)

背景docker将镜像导出到本地,上传至内网服务器上背景:在企业中往往出现了内网不能和外网相通,不能使用docker pull命令来拉取镜像,这个时候我们就可以考虑在有所需镜像的服务器上导出...

【Docker】0.空间资源隔离

【Docker】0.空间资源隔离

NameSpacedd + option : 可以从标准输入或文件中读取数据,根据指定格式来转换数据,再输出到文件、设备或标准输出 --help 显...

大数据存储方案

大数据存储方案

1 结构布局目前大数据存储有两种方案可供选择:行存储和列存储。业界对两种存储方案有很多争持,集中焦点是:谁能够更有效地处理海量数据,且兼顾安全、可靠、完整性。从目前发展情况看,关系数据库已经不适应这种...

MyBatisPlus从零到一:快速入门与核心功能详解(2)

MyBatisPlus从零到一:快速入门与核心功能详解(2)

二、核心功能刚才的案例中都是以 id 为条件的简单 CRUD,一些复杂条件的 SQL 语句就要用到一些更高级的功能了。2.1 条件构造器:除了新增以外,修改、删除、查询的 SQL 语句都需要指定 wh...

Docker 基础与实战指南(4)

Docker 基础与实战指南(4)

2.4 网络:默认情况下,所有容器都是以 bridge 方式连接到 Docker 的一个虚拟网桥上:容器在同一个网桥上就可以相互访问。下图就是我的 linux 上的默认网桥。下面我们来测试一下。首先,...

Docker 基础与实战指南(1)

Docker 基础与实战指南(1)

Docker 可以使项目的部署变得简单,大大减少了运维工作量。即便你对 Linux 不熟悉,你也能轻松部署各种常见软件、Java项目。linux 上安装 docker 这里就不进行讲解,我使用的 li...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。