写点什么

Java 多线程消费消息

  • 2023-11-21
    福建
  • 本文字数:6554 字

    阅读完需:约 22 分钟

多线程消费消息


关键词:Java,多线程,消息队列,rocketmq


多线程一个用例之一就是消息的快速消费,比如我们有一个消息队列我们希望以更快的速度消费消息,假如我们用的是 rocketmq,我们从中获取消息,然后使用多线程处理。


代码地址Github


实现思路


  1. 不停的拉取消息


  1. 将拉取的消息分片


  1. 多个线程一起消费每一片消息


  1. 将所有消息消费完成后,接着拉取新的消息


代码


CrazyTask


这是一个抽象类,针对不同的任务可能有不同的处理逻辑,对于不同的任务去继承这个 CrazyTask 实现他的 process 方法。


package crazyConsumer;
import com.google.common.collect.Lists;
import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;
/** * {@code @author:} keboom * {@code @date:} 2023/11/17 * {@code @description:} */public abstract class CrazyTask { String taskName; int threadNum; volatile boolean isTerminated; // every partition data num. // for example: I receive 5 messages, partitionDataNum is 2, then i will partition 5 messages to 3 parts, 2,2,1 int partitionDataCount = 2;
abstract void process(Message message);
void doExecute(SimpleConsumer consumer) { while (true) { // 此消费者每次主动拉取消息队列中消息 List<Message> messages = consumer.receive(); if (messages.isEmpty()) { try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } continue; } // 获取处理此topic或者说处理此类型task的线程池 ExecutorService executor = CrazyTaskUtil.getOrInitExecutor(taskName, threadNum); // 将消息分片,每个线程处理一部分消息 List<List<Message>> partition = Lists.partition(messages, partitionDataCount); // 以消息分片数初始化CountDownLatch,每个线程处理完一片消息,countDown一次 // 当countDownLatch为0时,说明所有消息都处理完了,countDownLatch.await();继续向下执行 CountDownLatch countDownLatch = new CountDownLatch(partition.size());
partition.forEach(messageList -> { executor.execute(() -> { messageList.forEach(message -> { process(message); consumer.ack(message); }); countDownLatch.countDown(); }); }); try { countDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } if (isTerminated) { break; } } // 释放线程池 CrazyTaskUtil.shutdownThreadPool(taskName); }
void terminate() { isTerminated = true; System.out.println(); System.out.println(taskName + " shut down"); }
public String getTaskName() { return taskName; }}
复制代码


PhoneTask


package crazyConsumer;
/** * {@code @author:} keboom * {@code @date:} 2023/11/17 * {@code @description:} */public class PhoneTask extends CrazyTask {
public PhoneTask(String taskName, int threadNum) { this.taskName = taskName; // default thread num this.threadNum = threadNum; this.isTerminated = false; }
@Override void process(Message message) { System.out.println(Thread.currentThread().getName() +" process "+ message.toString()); try { Thread.sleep(30); } catch (InterruptedException e) { throw new RuntimeException(e); } }
@Override public String toString() { return "PhoneTask{" + "taskName='" + taskName + '\'' + ", threadNum=" + threadNum + ", isTerminated=" + isTerminated + '}'; }}
复制代码


EmailTask


package crazyConsumer;
/** * {@code @author:} keboom * {@code @date:} 2023/11/17 * {@code @description:} */public class EmailTask extends CrazyTask{
public EmailTask(String taskName, int threadNum) { this.taskName = taskName; // default thread num this.threadNum = threadNum; this.isTerminated = false; }
@Override void process(Message message) { // do something System.out.println(Thread.currentThread().getName() +" process "+ message.toString()); try { Thread.sleep(20); } catch (InterruptedException e) { throw new RuntimeException(e); } }
@Override public String toString() { return "EmailTask{" + "taskName='" + taskName + '\'' + ", threadNum=" + threadNum + ", isTerminated=" + isTerminated + '}'; }}
复制代码


CrazyTaskUtil


创建销毁线程池的工具类


package crazyConsumer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;import java.util.concurrent.*;
/** * {@code @author:} keboom * {@code @date:} 2023/11/17 * {@code @description:} */public class CrazyTaskUtil {
private static final Map<String, ExecutorService> executors = new ConcurrentHashMap<>();
public static ExecutorService getOrInitExecutor(String taskName, int threadNum) { ExecutorService executorService = executors.get(taskName); if (executorService == null) { synchronized (CrazyTaskUtil.class) { executorService = executors.get(taskName); if (executorService == null) { executorService = initPool(taskName, threadNum); executors.put(taskName, executorService); } } } return executorService; }
private static ExecutorService initPool(String taskName, int threadNum) { // init pool return new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat(taskName + "-%d").build()); }
public static void shutdownThreadPool(String taskName) { ExecutorService remove = executors.remove(taskName); if (remove != null) { remove.shutdown(); } }
}
复制代码


Main


程序入口


package crazyConsumer;
import java.util.ArrayList;
/** * {@code @author:} keboom * {@code @date:} 2023/11/17 * {@code @description:} */public class Main {
/** * 一种多线程消费场景。比如我们有一个消费队列,里面有各种消息,我们需要尽快的消费他们,不同的消息对应不同的业务 * * @param args */ public static void main(String[] args) throws InterruptedException {
// 比方说我们这个有rocketmq不同主题的consumer /* List<MessageView> messageViewList = null; try { messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView -> { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); }
*/
// 想要实现多线程消费消息,我们希望有一个任务,此任务能够不停的拉取消息,然后创建子线程池去消费消息。 // 停止任务后,需要将任务中的消息消费完后,再关闭任务。
ArrayList<CrazyTask> tasks = new ArrayList<>(); tasks.add(new PhoneTask("phoneTask", 2)); tasks.add(new EmailTask("emailTask", 3));
for (CrazyTask task : tasks) { new Thread(() -> { task.doExecute(new SimpleConsumer("topic"+task.getTaskName().charAt(0), "tagA")); }).start(); }
// task running Thread.sleep(150);
// task terminated tasks.forEach(CrazyTask::terminate); }}
复制代码


最终执行结果


receive message: [Message{messageBody='topice-tagA-0-1700470193487'}, Message{messageBody='topice-tagA-1-1700470193487'}, Message{messageBody='topice-tagA-2-1700470193487'}, Message{messageBody='topice-tagA-3-1700470193487'}, Message{messageBody='topice-tagA-4-1700470193487'}]receive message: [Message{messageBody='topicp-tagA-0-1700470193487'}, Message{messageBody='topicp-tagA-1-1700470193487'}, Message{messageBody='topicp-tagA-2-1700470193487'}, Message{messageBody='topicp-tagA-3-1700470193487'}, Message{messageBody='topicp-tagA-4-1700470193487'}]phoneTask-0  process  Message{messageBody='topicp-tagA-0-1700470193487'}emailTask-1  process  Message{messageBody='topice-tagA-2-1700470193487'}emailTask-0  process  Message{messageBody='topice-tagA-0-1700470193487'}phoneTask-1  process  Message{messageBody='topicp-tagA-2-1700470193487'}emailTask-2  process  Message{messageBody='topice-tagA-4-1700470193487'}ack message: Message{messageBody='topice-tagA-2-1700470193487'}emailTask-1  process  Message{messageBody='topice-tagA-3-1700470193487'}ack message: Message{messageBody='topice-tagA-4-1700470193487'}ack message: Message{messageBody='topice-tagA-0-1700470193487'}emailTask-0  process  Message{messageBody='topice-tagA-1-1700470193487'}ack message: Message{messageBody='topicp-tagA-2-1700470193487'}ack message: Message{messageBody='topicp-tagA-0-1700470193487'}phoneTask-0  process  Message{messageBody='topicp-tagA-1-1700470193487'}phoneTask-1  process  Message{messageBody='topicp-tagA-3-1700470193487'}ack message: Message{messageBody='topice-tagA-1-1700470193487'}ack message: Message{messageBody='topice-tagA-3-1700470193487'}receive message: [Message{messageBody='topice-tagA-0-1700470193570'}, Message{messageBody='topice-tagA-1-1700470193570'}, Message{messageBody='topice-tagA-2-1700470193570'}, Message{messageBody='topice-tagA-3-1700470193570'}, Message{messageBody='topice-tagA-4-1700470193570'}]emailTask-0  process  Message{messageBody='topice-tagA-2-1700470193570'}emailTask-2  process  Message{messageBody='topice-tagA-0-1700470193570'}emailTask-1  process  Message{messageBody='topice-tagA-4-1700470193570'}ack message: Message{messageBody='topicp-tagA-3-1700470193487'}ack message: Message{messageBody='topicp-tagA-1-1700470193487'}phoneTask-1  process  Message{messageBody='topicp-tagA-4-1700470193487'}ack message: Message{messageBody='topice-tagA-0-1700470193570'}ack message: Message{messageBody='topice-tagA-4-1700470193570'}ack message: Message{messageBody='topice-tagA-2-1700470193570'}emailTask-0  process  Message{messageBody='topice-tagA-3-1700470193570'}emailTask-2  process  Message{messageBody='topice-tagA-1-1700470193570'}ack message: Message{messageBody='topicp-tagA-4-1700470193487'}receive message: [Message{messageBody='topicp-tagA-0-1700470193618'}, Message{messageBody='topicp-tagA-1-1700470193618'}, Message{messageBody='topicp-tagA-2-1700470193618'}, Message{messageBody='topicp-tagA-3-1700470193618'}, Message{messageBody='topicp-tagA-4-1700470193618'}]phoneTask-0  process  Message{messageBody='topicp-tagA-0-1700470193618'}phoneTask-1  process  Message{messageBody='topicp-tagA-2-1700470193618'}ack message: Message{messageBody='topice-tagA-1-1700470193570'}ack message: Message{messageBody='topice-tagA-3-1700470193570'}receive message: [Message{messageBody='topice-tagA-0-1700470193634'}, Message{messageBody='topice-tagA-1-1700470193634'}, Message{messageBody='topice-tagA-2-1700470193634'}, Message{messageBody='topice-tagA-3-1700470193634'}, Message{messageBody='topice-tagA-4-1700470193634'}]emailTask-1  process  Message{messageBody='topice-tagA-0-1700470193634'}emailTask-0  process  Message{messageBody='topice-tagA-4-1700470193634'}emailTask-2  process  Message{messageBody='topice-tagA-2-1700470193634'}ack message: Message{messageBody='topicp-tagA-2-1700470193618'}ack message: Message{messageBody='topicp-tagA-0-1700470193618'}phoneTask-0  process  Message{messageBody='topicp-tagA-1-1700470193618'}phoneTask-1  process  Message{messageBody='topicp-tagA-3-1700470193618'}
phoneTask shut down
emailTask shut downack message: Message{messageBody='topice-tagA-0-1700470193634'}ack message: Message{messageBody='topice-tagA-2-1700470193634'}emailTask-1 process Message{messageBody='topice-tagA-1-1700470193634'}ack message: Message{messageBody='topice-tagA-4-1700470193634'}emailTask-2 process Message{messageBody='topice-tagA-3-1700470193634'}ack message: Message{messageBody='topicp-tagA-3-1700470193618'}ack message: Message{messageBody='topicp-tagA-1-1700470193618'}phoneTask-1 process Message{messageBody='topicp-tagA-4-1700470193618'}ack message: Message{messageBody='topice-tagA-3-1700470193634'}ack message: Message{messageBody='topice-tagA-1-1700470193634'}ack message: Message{messageBody='topicp-tagA-4-1700470193618'}
复制代码


可以看到结果是,当每次收到的消息消费完后会拉取新的消息。当执行 shutdown 任务时,会将当前任务执行完后再销毁线程池。


文章转载自:KeBoom

原文链接:https://www.cnblogs.com/keboom/p/17844431.html

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Java多线程消费消息_Java_不在线第一只蜗牛_InfoQ写作社区