Skip to content

一、Spring AMQP

Spring Amqp 的官方地址:Spring AMQP

  • 将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。
  • 由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。
  • 任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
  • 但是,RabbitMQ 官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。
  • 而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

Spring AMQP 提供了三个功能:

  • 自动声明队列交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

二、快速入门

  • spring-boot-example-10-rabbitmq:父工程,管理项目依赖
    • spring-boot-example-publisher:消息的发送者
    • spring-boot-example-consumer:消息的消费者
  • pom.xml,已经配置好了SpringAMQP相关的依赖:
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.calvin</groupId>
  <artifactId>spring-boot-example-10-rabbitmq</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <modules>
    <module>spring-boot-example-publisher</module>
    <module>spring-boot-example-consumer</module>
  </modules>
  <packaging>pom</packaging>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.12</version>
    <relativePath/>
  </parent>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
  </properties>

  <dependencies>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>

    <!-- START: AMQP (RabbitMQ) -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- END: AMQP (RabbitMQ)  -->

    <!-- START: 单元测试 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <!-- END: 单元测试 -->

  </dependencies>
</project>

1. 消息发送

  • pom.xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.calvin</groupId>
        <artifactId>spring-boot-example-10-rabbitmq</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-boot-example-publisher</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>
  • 首先配置MQ地址,在publisher服务的application.yml中添加配置:
yaml
spring:
  rabbitmq:
    host: 127.0.0.1       # 你的虚拟机IP
    port: 5672            # 端口
    virtual-host: /calvin # 虚拟主机
    username: admin       # 用户名
    password: 123456      # 密码
  • 然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
java
package com.itheima.publisher.amqp;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
  • 为了方便测试,我们现在控制台新建一个队列:simple.queue
  • 添加成功:
  • 接下来,我们就可以利用Java代码收发消息了。

  • 打开控制台,可以看到消息已经发送到队列中:

接下来,我们再来实现消息接收。

2. 消息接收

  • pom.xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-boot-example-10-rabbitmq</artifactId>
        <groupId>com.calvin</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-boot-example-consumer</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>
  • 首先配置MQ地址,在consumer服务的application.yml中添加配置:
yaml
spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
  • 然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:
java
package com.itheima.consumer.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
	  // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

这种模式一般【测试使用】,很少在生产中使用,在之前的案例中,我们都是经过交换机发送消息到队列,不过有时候为了测试方便, 我们也可以直接向队列发送消息,跳过交换机。 在入门案例中,我们就演示这样的简单模型,如图: image

  • 发布者直接发送消息到队列
  • 消费者监听并处理队列中的消息

3. 测试消息【发布/接收】是否正常

启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息。最终consumer收到消息: image.png