写点什么

HiJobQueue:一个简单的线程安全任务队列

作者:EquatorCoco
  • 2025-01-23
    福建
  • 本文字数:3568 字

    阅读完需:约 12 分钟

概述


HiJobQueue 是一个线程安全的任务队列,用于在多线程环境中管理和执行异步任务。它的设计参考了 Cobalt 项目中的 JobQueue,并做了适当的简化。HiJobQueue 提供了任务推送(push)、任务弹出(pop)、队列退出(quit)等功能,适用于需要异步任务调度的场景。


核心功能


1、线程安全

使用 std::mutex 和 std::condition_variable 实现线程安全的任务队列。


2、任务调度

支持任务的异步推送和弹出。


3、退出机制

提供 quit() 方法,用于安全地停止任务队列。


4、跨平台

使用 C++ 标准库实现,不依赖平台特定的 API。


实现代码


以下是 HiJobQueue 的实现代码:


#pragma once
#include <mutex>#include <functional>#include <queue>#include <condition_variable>
/** * @brief 线程安全的任务队列,用于管理和执行异步任务。 */class HiJobQueue final {public: using Job = std::function<void()>; // 任务类型
public: HiJobQueue() : is_exit_(false) {}
/** * @brief 推送任务到队列。 * @param job 要执行的任务。 * @return 如果队列已退出,返回 false;否则返回 true。 */ bool push(Job job);
/** * @brief 从队列中弹出任务。 * @param job 用于存储弹出的任务。 * @return 如果队列为空且已退出,返回 false;否则返回 true。 */ bool pop(Job& job);
/** * @brief 获取队列中的任务数量。 * @return 队列中的任务数量。 */ size_t size();
/** * @brief 退出队列,停止任务处理。 */ void quit();
/** * @brief 检查队列是否已退出。 * @return 如果队列已退出,返回 true;否则返回 false。 */ bool is_quited();
// 禁用拷贝构造函数和赋值运算符 HiJobQueue(HiJobQueue&) = delete; HiJobQueue(const HiJobQueue&) = delete;
private: bool is_exit_; // 队列退出标志 std::mutex mutex_; // 互斥锁,保护队列访问 std::condition_variable cond_; // 条件变量,用于任务通知 std::queue<Job> queue_; // 任务队列};
// 实现
bool HiJobQueue::push(Job job) { std::lock_guard<std::mutex> locker(mutex_); if (is_exit_) { return false; } queue_.push(std::move(job)); cond_.notify_one(); return true;}
bool HiJobQueue::pop(Job& job) { std::unique_lock<std::mutex> locker(mutex_); cond_.wait(locker, [this]() { return is_exit_ || !queue_.empty(); }); if (is_exit_ && queue_.empty()) { return false; } job = std::move(queue_.front()); queue_.pop(); return true;}
size_t HiJobQueue::size() { std::lock_guard<std::mutex> locker(mutex_); return queue_.size();}
void HiJobQueue::quit() { std::lock_guard<std::mutex> locker(mutex_); is_exit_ = true; cond_.notify_all();}
bool HiJobQueue::is_quited() { std::lock_guard<std::mutex> locker(mutex_); return is_exit_;}
复制代码


测试用例


为了验证 HiJobQueue 的正确性和线程安全性,我们设计了以下测试用例:


测试代码


#include <gtest/gtest.h>#include <future>#include <atomic>#include <thread>#include <chrono>#include "hi_job_queue.h"
class TestCls {public: void test(const char* text, int i) { printf("%s-%d\n", text, i); }};
TEST(HiJobQueueTest, ConcurrentPushPop) { HiJobQueue queue; TestCls cls;
std::atomic<int> job_count{0};
auto f1 = std::async(std::launch::async, [&] { HiJobQueue::Job job; while (queue.pop(job)) { job(); job_count++; } });
auto f2 = std::async(std::launch::async, [&] { HiJobQueue::Job job; while (queue.pop(job)) { job(); job_count++; } });
auto f3 = std::async(std::launch::async, [&] { for (int i = 0; i < 200; i++) { queue.push(std::bind(&TestCls::test, &cls, "test1", i)); std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 跨平台休眠 } });
auto f4 = std::async(std::launch::async, [&] { for (int i = 0; i < 200; i++) { queue.push(std::bind(&TestCls::test, &cls, "test2", i)); std::this_thread::sleep_for(std::chrono::milliseconds(5)); // 跨平台休眠 } });
f3.wait(); f4.wait();
queue.quit();
f1.wait(); f2.wait();
// 验证所有任务被执行 EXPECT_EQ(job_count.load(), 400); // 200 (test1) + 200 (test2)}
TEST(HiJobQueueTest, QuitBehavior) { HiJobQueue queue;
auto consumer = std::async(std::launch::async, [&] { HiJobQueue::Job job; while (queue.pop(job)) { job(); } });
for (int i = 0; i < 10; i++) { queue.push([]() {}); }
queue.quit();
consumer.wait();
EXPECT_TRUE(queue.is_quited());
EXPECT_FALSE(queue.push([]() {}));}
TEST(HiJobQueueTest, EmptyQueueBehavior) { HiJobQueue queue;
HiJobQueue::Job job; bool pop_result = false;
auto pop_thread = std::async(std::launch::async, [&] { pop_result = queue.pop(job); });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_FALSE(pop_result);
queue.quit();
pop_thread.wait();
EXPECT_FALSE(pop_result);
EXPECT_FALSE(queue.pop(job));}
复制代码


测试用例说明


1、ConcurrentPushPop

  • 测试多线程环境下 push 和 pop 的并发行为。

  • 验证所有任务是否被正确执行。


2、QuitBehavior

  • 测试队列退出时的行为。

  • 验证退出后是否不再接受新任务。


3、EmptyQueueBehavior

  • 测试队列为空时的行为。

  • 验证退出后 pop 的行为。


适用场景


HiJobQueue 适用于以下场景:


1、多线程任务调度

在需要将任务分发到多个工作线程执行的场景中,HiJobQueue 可以作为任务调度器使用。

例如:线程池中的任务队列。


2、事件驱动架构

在事件驱动的系统中,HiJobQueue 可以用于存储和处理事件。

例如:GUI 应用中的事件队列。


3、异步任务处理

在需要异步执行任务的场景中,HiJobQueue 可以用于存储任务并由后台线程处理。

例如:日志系统的异步写入。


4、生产者-消费者模型

在生产者-消费者模型中,HiJobQueue 可以作为共享的任务缓冲区。

例如:多线程下载任务的分发。


优缺点分析

优点


1、线程安全

使用 std::mutex 和 std::condition_variable 确保多线程环境下的安全性。


2、简单易用

提供了简洁的接口(pushpopquit),易于集成到现有项目中。


3、跨平台

基于 C++ 标准库实现,不依赖平台特定的 API,具有良好的可移植性。


4、退出机制

提供 quit() 方法,可以安全地停止任务队列,避免资源泄漏。


5、轻量级

代码简洁,性能开销小,适合对性能要求较高的场景。


缺点


1、功能单一

仅支持基本的任务队列功能,不支持优先级调度或任务取消。


2、性能瓶颈

在高并发场景下,std::mutex 可能成为性能瓶颈。

如果需要更高的性能,可以考虑无锁队列(如 boost::lockfree::queue)。


3、任务类型限制

任务类型为 std::function<void()>,不支持返回值或参数传递。

如果需要更复杂的任务类型,需要自行扩展。


4、缺乏任务状态管理

不支持任务的状态管理(如任务完成通知或错误处理)。


5、退出时未执行任务

在调用 quit() 方法退出队列时,如果仍有未执行的任务,这些任务会被直接丢弃。

如果任务中存在阻塞操作(如等待 I/O、锁、条件变量等),可能导致进程无法及时退出。


例如:在希望进程退出时,如果任务队列中有未执行的任务,且任务中存在阻塞操作,进程可能会卡住,无法正常退出。


总结


HiJobQueue 是一个简单但功能强大的线程安全任务队列,适用于多线程环境中的异步任务调度。通过参考 Cobalt 项目中的 JobQueue,我们实现了一个更轻量级的版本,并通过单元测试验证了其正确性和线程安全性。希望这篇文章能帮助你理解和使用 HiJobQueue


文章转载自:荣--

原文链接:https://www.cnblogs.com/Rong-/p/18687042

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
HiJobQueue:一个简单的线程安全任务队列_c++_EquatorCoco_InfoQ写作社区