写点什么

odejs+Redis 实现简易消息队列

作者:coder2028
  • 2022-10-17
    浙江
  • 本文字数:3052 字

    阅读完需:约 1 分钟

前言

消息队列是存储数据的一个中间件,可以理解为一个容器。生产者生产消息投递 到队列中,消费者可以拉取消息进行消费,如果消费者目前没有消费的打算,则消息队列会保留消息,直到消费者有消费的打算。


设计思路

生产者

  • 连接 redis

  • 向指定通道 通过 lpush 消息

消费者

  • 连接 redis

  • 死循环通过 brpop 阻塞式获取消息

  • 拿到消息进行消费

  • 循环拿去下一个消息

Redis

安装及启动

此步骤各位道友随意就好,不一定要用 docker 。只要保证自己能连接到 redis 服务即可。


# 使用docker 拉取redis 镜像docker pull redis:latest
# 启动redis服务 # --name 后面是容器名字方便后续维护和管理 # -p 后面是指映射容器服务的 6379 端口到宿主机的 6379 端口docker run -itd --name redis-mq -p 6379:6379 redis

# ============ docker 常用基本操作(题外话) =================
# 拉取镜像docker pull 镜像名称
# 查看镜像docker images
# 删除镜像docker rmi 镜像名称
# 查看运行容器(仅为启动中的)docker ps
# 查看运行容器(包含未启动)docker ps -a
# 启动容器docker start 容器名称/容器id
# 停止容器docker stop 容器名称/容器id

复制代码

Nodejs 连接

初始化工程


# 创建文件夹并进入mkdir queue-node-redis && cd queue-node-redis
# yarn 初始化yarn init -y
# 下载redis包,# 指定版本的原因是尽量减少道友们的失败几率 毕竟前端的工具迭代太快了yarn add redis@4.2.0
复制代码


创建 lib 与 utils 目录


├── .gitignore├── lib├── package.json├── utils│   └── redis.js└── yarn.lock
复制代码


utils/redis.js


const redis = require("redis");
const redisCreateClient = async (config) => { try { const client = redis.createClient({ url: `redis://${config.host}:${config.port}`, }); await client.connect(); await client.select(config.db); console.log("redis connect success"); return client; } catch (err) { console.log("redis connect error"); throw err; }};
module.exports = { redisCreateClient,};
复制代码


index.js


在项目根目录下创建此文件,测试 redis 连接是否成功


const { redisCreateClient } = require("./utils/redis");const test = async () => {  const client = await redisCreateClient({    host: "127.0.0.1",    port: 6379,    db: 0,  });};test();

复制代码


出现如下图所示即可


minimist

轻量级的命令行参数解析引擎。


# 安装 minimistyarn add minimist@1.2.6
复制代码

使用方法

const systemArg = require("minimist")(process.argv.slice(2));console.log(systemArg);

复制代码


# 运行 node index.js --name test
# 输出{ _: [], name: 'test' }
复制代码

正文开始

从目录结构及文件创建,手把手教程


目录结构变更

├── config.js       # 配置文件├── lib│   └── index.js # 主目录入口文件├── package.json ├── utils                 # 工具函数库│   └── redis.js└── yarn.lock
复制代码

config.js

配置文件思路的重要性大于代码的实现

参考 nodejs 进阶视频讲解:进入学习


module.exports = {  // redis 配置  redis: {    default: {      host: "127.0.0.1",      port: 6379,      password: "",      db: 0,    },  },  // 消息队列频道设置  mqList: [    {      // 消息频道名称      name: "QUEUE_MY_MQ",      // 阻塞式取值超时配置      brPopTimeout: 100,      // 开启任务数 此配置需要 PM 启动生效      instances: 1,      // redis 配置key      redis: "default",    },  ],};
复制代码

lib/index.js

针对配置做程序异常处理


const systemArg = require("minimist")(process.argv.slice(2));const config = require("../config");const { bootstrap } = require("./core");
// 程序自检
// 判断是否输入了 频道名称if (!systemArg.name) { console.error("ERROR: channel name cannot be empty"); process.exit(99);}
// 频道队列配置const mqConfig = config.mqList.find((item) => item.name === systemArg.name) ?? false;
// 如果config不存在if (!mqConfig) { console.error("ERROR: configuration not obtained"); process.exit(99);}
// redis 配置const redisConfig = config.redis[mqConfig.redis];if (!redisConfig) { console.error("ERROR: redis configuration not obtained"); process.exit(99);}
// node index.js --name QUEUE_MY_MQbootstrap(mqConfig, redisConfig);

复制代码

lib/core.js

后面的核心逻辑写在此处


async function bootstrap(config) {  console.log(config);}
module.exports = { bootstrap,};

复制代码

核心逻辑

lib/core.js

const { redisCreateClient } = require("../utils/redis");async function bootstrap(mqConfig, redisConfig) {  try {    // 创建redis连接    const client = await redisCreateClient(redisConfig);    // 通过死循环阻塞程序    while (true) {      let res = null;      console.log("队列执行");      try {        // 从队列中获取任务, 采用阻塞式获取任务 最大阻塞时间为config.queue.timeout        res = await client.brPop(mqConfig.name, mqConfig.brPopTimeout);        if (res === null) {          continue;        }        console.log("TODO:: Task processing", res);      } catch (error) {        console.log("ERROR: redis brPop error", error);        continue;      }    }  } catch (err) {    // 处理程序异常    console.log("ERROR: ", err);    process.exit(1);  }}module.exports = {  bootstrap,};

复制代码

生成测试数据

为了接下来的测试,我们先生成一些测试数据


test/mockMq.js


const { redisCreateClient } = require("../utils/redis");const config = require("../config");
/** 生成 1000 条测试消息 */const mockMq = async (key) => { const client = await redisCreateClient(config.redis.default); for (let i = 0; i < 1000; i++) { // 向队列中 push 消息 await client.lPush(key, "test" + i); } // 获取队列长度 const count = await client.lLen(key); console.log(`生成1000条测试消息完成,目前共有${count}条消息`); // 关闭redis连接 client.quit();};
mockMq("QUEUE_MY_MQ");
复制代码

验证脚本有效性

# 执行消息生成命令node ./test/mockMq.js
# 程序输出# redis connect success# 生成 1000 条测试消息 完成,目前共有 1000 条消息
# 执行开启消费者node ./lib/index.js --name QUEUE_MY_MQ # TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test0' }# TODO:: Task processing .......# TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test999' }
复制代码

定义 Job

后记

到此为止建议队列就实现完成了,当然后面还有一些优化。例如通过配置文件 动态引入 Job 和如何使用 Pm2 启动消费队列并且可配置启动个数、启停控制。(ps:此处的坑会很快补上)


当然除了这些,目前这个简易队列还有很多不足。例如任务执行失败如何处理,消费后如何 ack , 没有用成熟的 topic 协议,没有实现延时队列。这些坑因为个人水平以及 redis 本身的特性 可能很长一段时间都不会填了。建议生产用成熟的套件 例如 Kafka RabbitMq 以及一些其他更适合当前语言的套件。


用户头像

coder2028

关注

还未添加个人签名 2022-09-08 加入

还未添加个人简介

评论

发布
暂无评论
odejs+Redis实现简易消息队列_node.js_coder2028_InfoQ写作社区