Spring AMQP与RabbitMQ深度整合指南:从基础到高级应用(3)

米饭6个月前行业资讯436

3.5 声明交换机和队列:

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。


因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。


**声明交换机和队列一般是消息的接收者来做的。**这里的背景是微服务,消息的发送者和接收者一般是在不同的项目里面。


注意:如果项目中没有消费者(使用 @RabbitListener),可能创建交换机和队列会失败(MQ 上显示不出来,但是 spring 中对象成功创建),至于为什么会这样,我也不清楚,去网上找,问 AI 都没有这方面的资料。这个问题是我在项目中遇到的,解决了很久,算是比较小众的问题,如果有相同情况的友友,可以试试)。


3.5.1 使用 API:

SpringAMQP 提供了一个 Queue 类,用来创建队列:

image-20241128152047559

SpringAMQP 还提供了一个 Exchange 接口,来表示所有不同类型的交换机:

image-20241128152125575

我们可以自己创建队列和交换机,不过 SpringAMQP 还提供了 ExchangeBuilder 来简化这个过程:

image-20241128152158873

而在绑定队列和交换机时,则需要使用 BindingBuilder 来创建 Binding 对象:

image-20241128152250590

3.5.1.1 fanout 示例:
package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}


3.5.1.2 direct 示例:

direct 模式由于要绑定多个 KEY,会非常麻烦,每一个 Key 都要编写一个 binding:

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

由于 Topic 交换机的声明和 Direct 差不多,大家照着上面的 Direct ,修改一下类型就能成功创建,这里就不演示了。


3.5.2 使用注解:

基于 @Bean 的方式声明队列和交换机比较麻烦,尤其是 direct 交换机,Spring 还提供了基于注解方式来声明。


3.5.2.1 fanout 示例:
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue1"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue1(String msg){
    System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "fanout.queue2"),
        exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT)
))
public void listenFanoutQueue2(String msg){
    System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】");
}
3.5.2.2 direct 示例:
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

3.6 消息转化器:

convertAndSend 方法可以任意类型的消息。

7944a32270d00665d9ace3ff5a5bdbba.png

而在数据传输时,它会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。


只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。JDK 序列化存在下列问题:


数据体积过大

有安全漏洞

可读性差

所以我们需要换一个消息转化器。


这里采用 JSON 方式来做序列化和反序列化。


在发送者和接受者两个服务中都引入依赖。

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在发送者和接受者两个服务的启动类中添加一个 Bean 即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的 messageId 可以便于我们将来做幂等性判断。

参考文献:

  • 黑马程序员


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!  

云掣基于多年在运维领域的丰富时间经验,编写了《云运维服务白皮书》,欢迎大家互相交流学习:

《云运维服务白皮书》下载地址:https://fs80.cn/v2kbbq

想了解更多大数据运维托管服务、数据库运维托管服务、应用系统运维托管服务的的客户,欢迎点击云掣官网沟通咨询:https://yunche.pro/?t=shequ


相关文章

CMP?MSP?1+1才能大于2

CMP?MSP?1+1才能大于2

CMP与MSP都已经出现有一段时间了,而业界对于两个名词略有混淆,CMP和MSP到底是什么,能做什么,互相边界是什么,是互补还是互相竞争傻傻分不清楚。笔者试从自己十多年的云计算实践以及对于Gartne...

Dockerfile和docker-compose详解

Dockerfile和docker-compose详解

一、Dockerfile1. Dockerfile简介Dockerfile是一个用来构建镜像的文本文件, 文本内容包含了一条条构建镜像所需的指令和说明。例如我们要在含python3的cent...

【计算机网络】详解数据链路层数据帧&Mac地址&ARP协议

【计算机网络】详解数据链路层数据帧&Mac地址&ARP协议

一、以太网帧         "以太网" 不是一种具体的网络,而是一种技术标准;既包含了数据链路层的内容,也包含了一些物理层的内容...

Docker:技术架构的演进之路(下)

Docker:技术架构的演进之路(下)

冷热分离架构六、冷热分离架构为了进一步提高系统的并发性能,系统进入冷热分离架构阶段。在这个阶段,引入缓存,实现冷热分离。将热点数据放入缓存中,冷数据放入数据库中,实现数据库的快速响应。这种架构的优点是...

Spring AOP 实战指南:从入门到精通(5)

Spring AOP 实战指南:从入门到精通(5)

四、代理模式Spring AOP 是基于动态代理来实现 AOP 的。代理模式,也叫委托模式。定义:为其他对象提供一种代理,以控制对这个对象的访问。它的作用就是通过提供一个代理类,让我们在调用目标方法的...

中国式IT运维,趟出自己的“长期主义”河流

2018年,Gartner曾做过一次长达6个月的调查问卷。在这个主题为AIOps的问卷中,Gartner选取了500家各行业头部的企业,针对其当下的IT架构来判断调研企业未来是否会加大对于AIOps的...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。