写点什么

Docker 下 RabbitMQ 四部曲之三:细说 java 开发

作者:程序员欣宸
  • 2022 年 5 月 28 日
  • 本文字数:5846 字

    阅读完需:约 19 分钟

Docker下RabbitMQ四部曲之三:细说java开发

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


  • 本文是《Docker 下 RabbitMQ 四部曲》系列的第三篇,实战两个基于 SpringBoot 的工程,分别用来生产和消费 RabbitMQ 消息;

前文链接

  • 前两章的内容是体验 RabbitMQ 服务,以及制作 RabbitMQ 镜像:


  1. 《Docker下RabbitMQ四部曲之一:极速体验(单机和集群)》

  2. 《Docker下RabbitMQ四部曲之二:细说RabbitMQ镜像制作》

本文内容简述

  • 今天的实战由以下几部分组成:


  1. 开发 SpringBoot 工程 rabbitmqproducer,用来生产消息;

  2. 分析 docker-compose.yml 中对 rabbitmqproducer 镜像的使用;

  3. 开发 SpringBoot 工程 rabbitmqconsumer,用来消费消息;

  4. 分析 docker-compose.yml 中对 rabbitmqconsumer 镜像的使用;

源码下载

  • 您可以在 GitHub 下载本文涉及到的文件和源码,地址和链接信息如下表所示:



  • 这个 git 项目中有多个文件夹,本章创建的两个工程分别在 rabbitmqproducer、rabbitmqconsumer 这两个文件夹下,如下图红框所示:

如何将 SpringBoot 的 web 工程制作成 Docker 镜像

  • 如果您想了解制作 Docker 镜像的更多细节,请参考以下三篇文章:


  1. 《maven构建docker镜像三部曲之一:准备环境》

  2. 《maven构建docker镜像三部曲之二:编码和构建镜像》

  3. 《maven构建docker镜像三部曲之三:推送到远程仓库(内网和阿里云)》

生产消息的 rabbitmqproducer

  • rabbitmqproducer 是个普通的 SpringBoot 工程,pom.xml 内容如下:


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId> <artifactId>rabbitmqproducer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging>
<name>rabbitmqproducer</name> <description>Demo project for Spring Boot</description>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>com.spotify</groupId> <artifactId>docker-maven-plugin</artifactId> <version>0.4.12</version> <!--docker镜像相关的配置信息--> <configuration> <!--镜像名,这里用工程名--> <imageName>bolingcavalry/${project.artifactId}</imageName> <!--TAG,这里用工程版本号--> <imageTags> <imageTag>${project.version}</imageTag> </imageTags> <!--镜像的FROM,使用java官方镜像--> <baseImage>java:8u111-jdk</baseImage> <!--该镜像的容器启动后,直接运行spring boot工程--> <entryPoint>["java", "-jar", "/${project.build.finalName}.jar"]</entryPoint> <!--构建镜像的配置信息--> <resources> <resource> <targetPath>/</targetPath> <directory>${project.build.directory}</directory> <include>${project.build.finalName}.jar</include> </resource> </resources> </configuration> </plugin> </plugins> </build></project>
复制代码


  • 上述 poml.xml 中,有以下两点要注意:


  1. 依赖 spring-boot-starter-amqp 这个 starter,用于 RabbitMQ;

  2. 使用插件 docker-maven-plugin 将工程制作为 docker 镜像;


  • 接下来看看关键源码:

  • application.properties 文件内容如下,是关于 RabbitMQ 的 vhost 和交换机名称的配置:


mq.rabbit.virtualHost=/mq.rabbit.exchange.name=bolingcavalryFanoutExchange
复制代码


  • RabbitConfig 类加了**@Configuration**注解,表示这是个配置类,代码如下:


@Bean(name = "connectionFactory")    public ConnectionFactory connectionFactory() {        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(mqRabbitVirtualHost); connectionFactory.setPublisherConfirms(true);
//该方法配置多个host,在当前连接host down掉的时候会自动去重连后面的host connectionFactory.setAddresses(address);
return connectionFactory; }
@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }
//Fanout交换器 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(exchangeName); }
复制代码


上述代码将 RabbitTemplate 对象和交换机对象的实例化分别通过两个加了 @Bean 注解的方法实现;


  • RabbitConfig 类中有三个 @Value 注解配置的成员变量:address、username、password,这三个配置对应的值并没有写在 application.properties 中,因此 SpringBoot 会去系统环境变量中查找,找到后设置给它们;

  • SendController 类,在收到 http 请求后,会向 RabbitMQ 发送一条消息,代码如下,调用 rabbitTemplate.convertAndSend 即可发送:


@RequestMapping(value = "/send/{name}/{message}", method = RequestMethod.GET)    public @ResponseBody String send(@PathVariable("name") final String name, @PathVariable("message") final String message) {        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        String timeStr = simpleDateFormat.format(new Date());        String sendMessage = "hello, " + name + ", " + message  + ", " + timeStr;        rabbitTemplate.convertAndSend(exchangeName,"", sendMessage);        return "send message to [" +  name + "] success (" + timeStr + ")";    }
复制代码


  • 以上就是消息生产者的功能代码,在 pom.xml 所在目录下执行 mvn clean package -U -DskipTests docker:build 即可编译构建工程并生成 docker 镜像文件;

  • 接下来我们看看 docker-compose.yml 中是如何使用这个镜像的;

docker-compose.yml 中对 rabbitmqproducer 镜像的使用

  • 在集群版的 docker-compose.yml 文件中,关于镜像的配置如下:


producer:    image: bolingcavalry/rabbitmqproducer:0.0.2-SNAPSHOT    hostname: producer    depends_on:      - rabbit3    links:      - rabbit1:rabbitmqhost    ports:      - "18080:8080"    environment:      - mq.rabbit.address=rabbitmqhost:5672      - mq.rabbit.username=admin      - mq.rabbit.password=888888
复制代码


  • 这些配置中有以下几点需要注意:


  1. links 参数将 rabbit1 容器对应为别名 rabbitmqhost,在 environment 参数中,mq.rabbit.address 等于 rabbitmqhost:5672,也就是 rabbit1 容器的 IP 地址;

  2. mq.rabbit.address、mq.rabbit.username、mq.rabbit.password 都被设置到容器的环境变量中,因此 RabbitConfig.java 中的 address、username、password 就会被设置为 mq.rabbit.address、mq.rabbit.username、mq.rabbit.password 对应的值;

  3. 将当前电脑的 18080 端口映射到 8080 端口,假设当前电脑 IP 地址为 192.168.119.155,那么要向容器发送 http 请求时,向这个地址发起 http 请求即可:http://192.168.119.155:18080/send/aaa/bbb


  • 至此,消息生产者的开发已经完成,接下来开始开发消息消费者吧;

消费消息的 rabbitmqconsumer

  • rabbitmqconsumer 是个普通的 SpringBoot 工程,pom.xml 的内容和 rabbitmqproducer 的 pom.xml 是一样的(除了工程名不一样):

  • 接下来看看关键源码:

  • 配置类是 RabbitConfig.java,和 rabbitmqproducer 的 RabbitConfig.java 基本一致,不同的是多了个成员变量 queuename,用于表示队列名称,对应的值也是来自环境变量;

  • 作为 RabbitMQ 的消息消费者,rabbitmqconsumer 要主动连接到 RabbitMQ 的队列上,以保持随时可以消费消息,对应的绑定代码如下:


 @Bean    FanoutExchange fanoutExchange() {        return new FanoutExchange(exchangeName);    }
//队列A @Bean public Queue fanoutQueue() { return new Queue(queuename); }
//绑定对列到Fanout交换器 @Bean Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); }
复制代码


  • 收到消息后,对消息的消费操作由 FanoutReceiver 类负责,在日志中将消息打印出来:


@Component@RabbitListener(queues = "${mq.rabbit.queue.name}")public class FanoutReceiver {    private static final Logger logger = LoggerFactory.getLogger(FanoutReceiver.class);
@RabbitHandler public void process(String message) { logger.info("receive message : " + message); }}
复制代码


  • 以上就是消息消费者的功能代码,在 pom.xml 所在目录下执行 mvn clean package -U -DskipTests docker:build 即可编译构建工程并生成 docker 镜像文件;

特别注意的地方

  • 为了让 rabbitmqconsumer 的 Docker 镜像更通用,rabbitmq 的账号、密码、队列名称等参数都没有写死在工程中,而是从系统环境变量中获取,对应的环境变量的名成分别是:mq.rabbit.username、mq.rabbit.password、mq.rabbit.queue.name,这些环境变量的值是谁负责设置的呢?是 docker-compose.yml 中的 environment 参数配置的,容器启动后 docker 就会在该容器中设置这些环境变量;

  • 在实际使用过程中,经常会出现 rabbitmqconsumer 进程在连接 RabbitMQ 时经常报错提示:ACCESS_REFUSED - Login was refused using authen

  • 目前的推测是:启动 rabbitmqconsumer 进程和环境变量设置并没有严格的先后顺序,导致了 rabbitmqconsumer 在使用 mq.rabbit.username 这些系统环境变量的时候,可能 docker 还没有设置好这些环境变量,因此导致连接 RabbitMQ 失败;

  • 针对这个问题,我在工程的启动类 RabbitmqconsumerApplication 的 main 方法中,先做了 6 秒的延时,再启动 SpringApplication,目前经过多次测试问题已经不再出现,代码如下:


@SpringBootApplicationpublic class RabbitmqconsumerApplication {  public static void main(String[] args) {    try{      Thread.sleep(1000);    }catch(Exception e){      e.printStackTrace();    }    SpringApplication.run(RabbitmqconsumerApplication.class, args);  }}
复制代码


  • 为什么 rabbitmqproducer 应用没有这个问题?

  • 查看容器的日志,重点是**Attempting to connect to: [rabbitmqhost:5672]**这段信息,会发现对于 rabbitmqproducer 应用,只有在第一次发消息的时候才会去连接 RabbitMQ,这时候环境变量已经设置好了,所以连接不会有问题;

  • 但是 rabbitmqconsumer 应用由于是消费者,应用一启动就要主动去连接队列绑定交换机,所以一旦此时环境变量没有设置好,就会导致拿不到正确的参数信息;

  • 如果您不想使用延时这种笨办法,可以在 application.properties 中将 mq.rabbit.username、mq.rabbit.password、mq.rabbit.queue.name 这几个参数配置好,再去构建镜像,此时记得在 docker-compose.yml 中去掉对应的环境变量配置;

docker-compose.yml 中对 rabbitmqconsumer 镜像的使用

  • 在集群版的 docker-compose.yml 文件中,关于镜像的配置如下:


consumer1:    image: bolingcavalry/rabbitmqconsumer:0.0.5-SNAPSHOT    hostname: consumer1    depends_on:      - producer    links:      - rabbit2:rabbitmqhost    environment:     - mq.rabbit.address=rabbitmqhost:5672     - mq.rabbit.username=admin     - mq.rabbit.password=888888     - mq.rabbit.queue.name=consumer1.queue  consumer2:    image: bolingcavalry/rabbitmqconsumer:0.0.5-SNAPSHOT    hostname: consumer2    depends_on:      - consumer1    links:      - rabbit3:rabbitmqhost    environment:      - mq.rabbit.address=rabbitmqhost:5672      - mq.rabbit.username=admin      - mq.rabbit.password=888888      - mq.rabbit.queue.name=consumer2.queue
复制代码


  • 这些配置中有以下几点需要注意:


  1. 有两个消费者镜像 consumer1、consumer2;

  2. 由于交换器是 fanout 类型,生成者每生产一条消息,由 consumer1、consumer2 分别消费一次;

  3. consumer1、consumer2 各自连接一个队列,这一点在第一章的截图中有过体现,如下图:


  • 至此,消息生产和消费对应的 java 代码就全部实战完毕了,希望能帮助您快速的开发 RabbitMQ 相关的 java 应用,下一章我们将实战 RabbitMQ 的高可用,通过 docker stop 命令模拟生产环境中的宕机,看看部分机器的故障是否影响正常的 RabbitMQ 服务;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 刚刚阅读数: 3
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Docker下RabbitMQ四部曲之三:细说java开发_Java_程序员欣宸_InfoQ写作社区