Spring AMQP与RabbitMQ深度整合指南:从基础到高级应用(3)
3.5 声明交换机和队列:
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
**声明交换机和队列一般是消息的接收者来做的。**这里的背景是微服务,消息的发送者和接收者一般是在不同的项目里面。
注意:如果项目中没有消费者(使用 @RabbitListener),可能创建交换机和队列会失败(MQ 上显示不出来,但是 spring 中对象成功创建),至于为什么会这样,我也不清楚,去网上找,问 AI 都没有这方面的资料。这个问题是我在项目中遇到的,解决了很久,算是比较小众的问题,如果有相同情况的友友,可以试试)。
3.5.1 使用 API:
SpringAMQP 提供了一个 Queue 类,用来创建队列:
我们可以自己创建队列和交换机,不过 SpringAMQP 还提供了 ExchangeBuilder 来简化这个过程:
而在绑定队列和交换机时,则需要使用 BindingBuilder 来创建 Binding 对象:
3.5.1.1 fanout 示例:
package com.itheima.consumer.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("hmall.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
3.5.1.2 direct 示例:
package com.itheima.consumer.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectConfig { /** * 声明交换机 * @return Direct类型交换机 */ @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange("hmall.direct").build(); } /** * 第1个队列 */ @Bean public Queue directQueue1(){ return new Queue("direct.queue1"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("blue"); } /** * 第2个队列 */ @Bean public Queue directQueue2(){ return new Queue("direct.queue2"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("red"); } /** * 绑定队列和交换机 */ @Bean public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow"); } }
由于 Topic 交换机的声明和 Direct 差不多,大家照着上面的 Direct ,修改一下类型就能成功创建,这里就不演示了。
3.5.2 使用注解:
基于 @Bean 的方式声明队列和交换机比较麻烦,尤其是 direct 交换机,Spring 还提供了基于注解方式来声明。
3.5.2.1 fanout 示例:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue2"), exchange = @Exchange(name = "hmall.fanout", type = ExchangeTypes.FANOUT) )) public void listenFanoutQueue2(String msg){ System.out.println("消费者2接收到fanout.queue2的消息:【" + msg + "】"); }
3.5.2.2 direct 示例:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
3.6 消息转化器:
而在数据传输时,它会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。
只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。JDK 序列化存在下列问题:
数据体积过大
有安全漏洞
可读性差
所以我们需要换一个消息转化器。
这里采用 JSON 方式来做序列化和反序列化。
在发送者和接受者两个服务中都引入依赖。
<dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
@Bean public MessageConverter messageConverter(){ // 1.定义消息转换器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息 jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
云掣基于多年在运维领域的丰富时间经验,编写了《云运维服务白皮书》,欢迎大家互相交流学习:
《云运维服务白皮书》下载地址:https://fs80.cn/v2kbbq
想了解更多大数据运维托管服务、数据库运维托管服务、应用系统运维托管服务的的客户,欢迎点击云掣官网沟通咨询:https://yunche.pro/?t=shequ