邮箱中的 Qt 线程设计
邮箱(deepin-mail)主要使用 Qt 框架开发,是一个有大量并行任务且需要监控进度和结果的项目,任务的优先级调整和支持取消回滚也是必不可少。Qt 已经为我们提供了多种线程设计的方法,可以满足绝大部分使用场景,但是每一种方案单拎出来都不能很恰到好处的在项目中使用,本文主要对项目中的线程设计进行介绍,通过将多个 Qt 线程方案相结合,衍生出一种适合邮箱这类型项目的应用方法。
浅谈 Qt 中的线程方法
QThread
基于 QThread 的方案,需要子类化 QThread 实现 run()方法,并在合适的时候调用 start(),在这之前需要做好数据交换,并在线程执行过程中通过跨线程的 connect()方法将数据传出,这里需要注意不能将连接参数设置为 Qt::DirectConnection,因为线程的数据需要通过事件循环传递来保证安全,在跨线程的 connect()方法中参数即使使用了引用 Qt 也依然会在 emit 时帮你额外触发一次参数拷贝,影响性能。
如果不是开销非常大的任务像这样直接继承其实不受推荐,现在更倾向于使用组合的方式,子类化 QObject 来创建一个 Worker 类,添加一个 QThread 成员变量,将 Worker 对象移动到启动后的 QThread 对象线程中,这样 Worker 信号触发的槽函数都会在 QThread 对象启动的线程中运行。由于 QThread 也是 QObject 派生类, 偷懒的人可以让 Worker 直接继承 QThread 然后 moveToThread(this),当然这样做需要对 QThread 的理解更 透彻一些,所以官方也不建议这种写法因为 QThread 是被设计成一个管理线程的类不应参与过多业务逻辑。
基于 QThread 的方案尤其要注意定时器和套接字的析构问题,确保他们在创建的线程中使用和析构,QThread 的使用者会不注意的将他们在线程中创建和使用,却在 QThread 自己的析构函数中析构(QThread 对象的析构是在他所在的线程而不是自己开启的线程)。
QRunnable
基于 QRunnable 的方案,将任务分解成 QRunnable 对象直接放入 QThreadPool 中执行,使用上和 QThread 一样我们完成他的 run()方法,但是能且只能通过 QThreadPool 来执行,自身不包含其他的功能特性,开销很小。但就是因为这样,他和外界交换数据或者线程同步就变得相当麻烦,由于 QRunnable 不是 QObject 派生类,无法使用 Qt 的信号槽,不做一些处理很难优雅地将进度和结果抛出,如果用同时继承 QRunnable 和 QObject 的方式进行来添加信号槽机制不如直接使用 QThread 了。还有一种方式,在 QRunnable 对象中保存一些传入的引用或指针来做消息传递,这样数据可以通过原子 变量或互斥锁来实现更新,指针可以使用元对象系统中的方法 QMetaObject::invokeMethod()通过事件循环传出消息,但实际运用起来依旧麻烦,每个数据包括指针你都要考虑他的跨线程竞争问题,不得不控制参数的数量,将每个任务尽可能的切割成更小的任务。
QtConcurrent
还有一种基于 QtConcurrent 框架的方案,是 Qt 的高级别线程特性,支持多种线程管理的方法, 只要把方法或者 lambda 表达式传入 run()并指定一个线程池(默认是全局线程池 QThreadPool::globalInstance())就完成了开线程执行直到返回结果的一系列流程,它会自动将任务分配到可用的处理器核心上执行,最大化利用现在核心越来越多的 cpu 架构。它的 run()方法重载数量非常多,包括 异步和同步等待线程执行完成的方法。选择其中的异步方法,就可以通过监控返回的 QFuture 对象来得到线程状态和结果,主要方法的官方描述如下:
Concurrent Map and Map-ReduceQtConcurrent::map() applies a function to every item in a container, modifying the items in-place.QtConcurrent::mapped() is like map(), except that it returns a new container with the modifications.QtConcurrent::mappedReduced() is like mapped(), except that the modified results are reduced or folded into a single result.Concurrent Filter and Filter-ReduceQtConcurrent::filter() removes all items from a container based on the result of a filter function.QtConcurrent::filtered() is like filter(), except that it returns a new container with the filtered results.QtConcurrent::filteredReduced() is like filtered(), except that the filtered results are reduced or folded into a single result.Concurrent RunQtConcurrent::run() runs a function in another thread.
横向比较以上方案
以上方案并没有一种是完全优于另一种的,每种都有它适应的场景,灵活运用满足项目的需要是最重要的。
邮箱中的线程方案
心路历程
QtConcurrent 方案中基于 future 的机制非常适合邮箱项目做前后端分离,后端只要提供 QFuture 对象出去被前端监控,其他的工作包括任务调度都在后端内部完成,实现解耦。但总体上它的设计初衷更多是为了使用简单而不是功能强大,对并行处理的线程增加了映射、过滤和规约算法却削弱了对线程内部的控制和线程之间的消息传递、同步能力,而 QFuture 对象提供了 pause()和 cancel()等方法 只能影响线程池中还未执行的任务。
希望使用 QtConcurrent 特性的同时还要解决一些实际问题,要能够在线程池中对不同优先级的线程进行排序,要能掌握耗时任务的实时进度并能够对其暂停和取消,操作的影响具体到任务中操作数据库、读写文件或者和服务器交互中的某一步,这样在部分取消后可以做一些回滚来保证任务的原子性。
既然 Qt 使用了 future 来命名,其实 Qt 已经实现了 future-promise 机制,还在自己的源码中大量的使用 。如果观察 QFuture 和 QThreadPool 的源码,时不时就会看到一个叫 QFutureInterface 的类, Qt 的帮助文档中不包含相关资料,但是别看他叫做"interface",其实他是可以直接使用的,并且拥有着满足项目需要的方法。有兴趣的同学可以阅读相关源码来了解,如果在源码中看到以下的描述不要紧张,一直追溯到 Qt5 的最后一个版本,这些接口也是存在并且稳定的。
方案改造
为了更好的利用这个特性,需要对它进行了改造以接地气一些,我们通过使用 QThreadPool +QRunnable 方案中的设计思路来控制线程池任务 。
首先继承 QRunnable 创建一个类模板用于后面衍生出各式各样的任务:
template <typename T>
class AbstractTask : public QRunnable
{
public:
QFuture<T> future();
protected:
inline void reportStarted();
inline void setProgressRange(int minimum, int maximum);
inline void setProgressValueAndText(int progressValue, const QString &progressText =
"");
inline void reportResult(const T &result, const int &index = -1);
inline void reportFinished(const T *result = 0);
virtual void run() = 0;
private:
QFutureInterface<T> d;
};
模板参数是每个任务想要对外提供的返回结果,可以只返回错误码和错误描述用于表示执行果,也可以添加更多的参数比如同时将下载的文件通过 future 返回。不用担心额外的拷贝开销,因为另外一个让人省心的地方是,QFutureInterface 已经通过引用计数为自己实现了隐式共享。
template <typename T>
class QFutureInterface : public QFutureInterfaceBase
{
public:
QFutureInterface(State initialState = NoState)
: QFutureInterfaceBase(initialState)
{
refT();
}
QFutureInterface(const QFutureInterface &other)
: QFutureInterfaceBase(other)
{
refT();
}
~QFutureInterface()
{
if (!derefT())
resultStoreBase().template clear<T>();
}
...
}
QFutureInterface 通过原子变量来实现引用计数,提供一个平台无关的原子操作,但并不是所有的处理器都支持 QAtomicInt,如今国产芯片百家争鸣,如果你是特殊的架构,使用前检测一下当前处理器是否支持某个 API 是很重要的。使用原子变量会比使用互斥锁的方式更加简单和高效:
class RefCount
{
public:
inline RefCount(int r = 0, int rt = 0): m_refCount(r), m_refCountT(rt) {}
inline bool ref() { return m_refCount.ref(); }
inline bool deref() { return m_refCount.deref(); }
inline int load() const { return m_refCount.load(); }inline bool refT() { return m_refCountT.ref(); }
inline bool derefT() { return m_refCountT.deref(); }
inline int loadT() const { return m_refCountT.load(); }
private:
QAtomicInt m_refCount;
QAtomicInt m_refCountT;
};
QFutureInterface 通过 future()方法创建出的 QFuture 都会存有一份自己的引用实例,参与了引用计数的计算。只有当所有的 QFutureInterface 对象都被析构(包括 QFuture 中的),他们所指向的 result()结果空间才会释放。出于灵活同一个任务是可以返回多个 QFuture 对象分发到不同的模块以被监控的, 但在任务完成后记得重置 QFuture 以释放内存,赋值为一个 QFuture()即可。
template <typename T>
class QFuture
{
public:
explicit QFuture(QFutureInterface<T> *p)
: d(*p)
{ }
mutable QFutureInterface<T> d;
}
template <typename T>
class QFutureInterface : public QFutureInterfaceBase
{
inline QFuture<T> future()
{
return QFuture<T>(this);
}
}
使用案例
上图是任务流转的一个简要流程,结合这个流程下面将给出项目中的一个实现,一个为自己的账号创建邮件目录的任务:
class CreateMailFolderTask : public AbstractTask<ResultResponse>
{
public:
CreateMailFolderTask(QSharedPointer<ProtocolEngine> engine, QSharedPointer<ProtocolCache> cache);
void run();
void initParam(const QString &folderName);
private:
QSharedPointer<ProtocolEngine> m_engine;
QSharedPointer<ProtocolCache> m_cache;
QString m_folderName;
};
...
void CreateMailFolderTask::run()
{
setProgressRange(0, 100);
reportProgressValueAndText(0,"Started");
//do something
const ResultResponse &result = m_engine->createMailFolder(m_folderName);
reportProgressValueAndText(50,"Ongoing");
//different processing according to d.isPaused() , d.isCanceled()
if (RESULT_SUCCESS == result.type)
m_cache->createFolderId(m_folderName);//do something
reportProgressValueAndText(100,"Finished");
reportResult(result);
reportFinished();
}
首先按我们的模板实现一个创建目录的任务,在任务的 run()方法中实现相关功能,这时候就可以根据需要自由的通过多种 report()方法将进度、状态描述和结果抛出,以便外部可以在每个节点获取当前任务的状态,根据是否被暂停或者被取消等通过 QFuture 设置的状态来做出不同的处理,如 果有必要比如在邮箱项目中,我们传递了一个 QAtomic 原子变量到任务甚至子任务中,进行更加精 准的控制。类中有两个成员变量 m_engine 和 m_cache,这个是项目中用于执行邮件协议和本地缓 存代码的控制类,线程安全,不做过多扩展说明。接下来是使用:
QFuture<ResultResponse> createFolder(const QString &folderId){
CreateMailFolderTask *task = new CreateMailFolderTask(m_protocolEngine, m_protocolCache);
task->initParam(folderId);
QFuture<PrepareResponse> future = task->future();
emit sigNewTask(task);
return future;
}
我们创建了一个任务,但是任务并不需要立刻开始,而是通过信号将 task 抛出等待处理,可以在合适的时候通过线程池 pool->tryStart(task)来执行,可以丢在数据结构中保存下来进行优先级排序后等待唤起,还可以格式化存储到文件中保存退出等待下次应用启动后继续执行。拿到 QFuture 对象的模块立刻就能够进行监控,是否开始、是否结束和进度都可以通过 QFuture 的方法获取或使用 QFutursSynchronizer 组合监控,也可以通过 QFutureWatcher 监控 QFuture 实现被动处理,这个具体看看官方说明即可:
QFuture represents the result of an asynchronous computation.
QFutureIterator allows iterating through results available via QFuture.
QFutureWatcher allows monitoring a QFuture using signals-and-slots.
QFutureSynchronizer is a convenience class that automatically synchronizes several
QFutures.
QFutureWatcher<ResultResponse> watcher;
watcher.setFuture(future);
QObject::connect(&watcher, &QFutureWatcher<ResultResponse>::progressValueChanged,
[ = ](int progressValue) {
progressValue;//do something});
任务的返回可以通过 QFuture 的 result()方法获取,如果是逐步抛出结果的批处理任务,可以通过 results()或者 resultAt(int index)方法获取已取得的结果列表。result()的提前调用不会产生错误,它会 阻塞当前的线程,等待任务完成后得到结果才会继续向下执行,也可以主动调用 waitForFinished() 方法阻塞等待任务完成达到一样的效果。阻塞等待可以不用为了耗时极短的任务去写监控代码,也为写单元测试代码带来了非常大的便利性:
#include <gtest/gtest.h>
TEST_F(ut_session, createFolder){
QFuture<ResultResponse> future = session->createFolder("MyBox");
future.waitForFinished();
EXPECT_TRUE(ResultCode::Success == future.result().code);
}
小结
总结一下,Qt future-promise 结合 QRunnable 的方案十分灵活,其实还有很多特性没有在此演示,我们已经将它落地在邮箱项目中,接口稳定运行,取得了不错的效果。
评论