写点什么

Python 项目实战│ Python 实现线程池工作模式

作者:TiAmo
  • 2023-04-25
    江苏
  • 本文字数:3953 字

    阅读完需:约 13 分钟

Python项目实战│ Python实现线程池工作模式

简介: Python 项目实战│ Python 实现线程池工作模式

01、客户机/服务器通信逻辑


客户机与服务器通信逻辑如图 1 所示。



■ 图 1 智能桌面 App 的客户机/服务器通信逻辑


02、数据交换协议


客户机与服务器之间一次信息往返的协议会话过程,定义为图 2 所示的逻辑时序。



协议会话逻辑解析:

(1)消息交换基于消息头机制。消息头中包含消息类型和消息长度。消息类型包含图像消息与下线消息。

(2)用 Json 格式的数据表示消息头。图像数据用 base64 编码与解码。

(3)发送数据分两个步骤完成,首先发送消息头,然后发送消息内容。

(4)接收数据分两个步骤完成,首先接收消息头,然后接收消息内容。


消息头的结构设计如图 3 所示,消息头的固定长度为 128 字节,包含消息类型(msg_type)与消息内容长度(msg_len)两个字段。



消息类型包括:

(1)CLIENT_IMAGE:表示收到来自客户机的图像数据。

(2)CLIENT_MESSAGE:表示收到来自客户机的下线消息。


消息内容长度用消息包含的字符数表示。对于图像数据而言,因为采用 base64 编码,其传输的数据也是字符消息。

消息头的长度在服务器与客户机两端均约定为 128 字节,用常量 MSG_HEADER_LEN 定义。发送消息头之前,需要检查消息的长度,如果不足 128 字节,其左侧用字节型空格字符填充。


03、服务器主体逻辑


根据图 1 描述的服务器逻辑,完成服务器的主体逻辑设计,如程序段 P7.1 所示。



第 32~39 行定义服务器端的主循环,处理客户机连接,采用的是一客户一线程模式。服务器会话线程定义为 handle_client 模块,主线程向会话线程传递三个参数:

(1)client_socket: 会话套接字

(2)client_addr: 客户机地址

(3)model: 用于预测的智能模型


运行服务器程序,观察输出结果,此时服务器虽然处于侦听连接的状态,但是由于 handle_client 模块还没有实现,故无法处理来自客户机的各种消息。


04、服务器会话线程


服务器会话线程包括接收数据与发送数据两个模块,对应图 1 中的内循环。服务器完成数据接收后,需要回送预测结果或者确认消息给客户机,所以将接收数据与发送数据的逻辑定义在同一函数模块 handle_client 中,收发数据的逻辑流程如图 4 所示。



■ 图 4 服务器收发数据会话线程逻辑


会话线程的主逻辑是一个循环,循环条件为远程客户机是否结束会话,逻辑流程解析如下:

(1)如果客户机断开了与服务器的连接,会话线程结束。

(2)在连接正常的情况下,服务器首先接收来自客户机的消息头,解析消息头,根据消息类型,分为一般消息与图像消息。

(3)如果是图像消息,则通过一个循环,根据图像的大小完成数据接收,然后经过 base64 解码、图像变换(调整颜色模式、归一化、缩放)、模型预测、重构预测结果、定义消息头、回送消息头、回送预测结果。回到步骤(1)。

(4)如果是一般消息,则继续判断是否为下线消息。

(5)如果是下线消息,则更新连接数量,定义下线消息(原消息加上时间戳),定义消息头,回送消息头,回送消息内容,会话线程结束。

(6)如果不是下线消息,则做其他消息处理,为简化设计,其他消息处理模块暂不编程,留作扩展。回到步骤(1)。


会话线程 handle_client 的逻辑实现如程序段 P7.2 所示。



第 46 行–第 51 行定义的循环结构,根据图像数据的长度 msg_len 完成数据接收工作。


运行服务器程序,输出结果为:

服务器开始在('192.168.0.102', 5050)侦听...
复制代码

待客户机程序完成后,再做联合测试。


05、客户机主体逻辑


新建主程序 MyClient.py。根据图 1 描述的客户机逻辑,完成客户机的主体逻辑设计,其主要模块如图 5 所示。


模块 send_image_data 发送图像数据,模块 send_down_msg 发送下线消息,模块 recv_message 是用于接收服务器消息的会话线程,类模块 GUI(QMainWindow)负责构建客户机图形化界面。主程序完成主控逻辑设计。



客户机的消息结构定义如图 3 所示,与服务器保持一致。消息的收发逻辑,如图 2 所示,亦与服务器保持一致。


客户机主体逻辑如程序段 P7.3 所示。



首先运行服务器程序,然后运行测试客户机程序。目前客户机还做不了具体工作,输入字符 Q 退出客户机主循环。


06、客户机发送数据


客户机向服务器发送的数据有两种类型,一是图像数据,一是下线消息。发送图像数据的流程如图 6 所示。



程序段 P7.4 描述了发送图像数据模块 send_image_data 的完整逻辑。



07、客户机接收数据


客户机定义了线程函数 recv_message,用于接收两类数据,一是普通消息(下线消息等),二是预测消息(预测结果)。消息处理流程如图 7 所示,分步描述如下。

(1)进入消息循环,接收消息头。

(2)如果消息头为空,转到步骤(1)。

(3)如果消息头非空,则解析消息头,获取消息类型与消息长度。

(4)如果是普通消息,则接收消息内容,进一步判断是否为下线消息。

(5)如果是下线消息,则跳出消息循环,转到步骤(9)。

(6)如果非下线消息,则转到步骤(1)。

(7)如果不是普通消息,则判断是否为预测消息,如果不是预测消息,则转到步骤(1)。

(8)如果是预测消息,则接收消息内容,解析消息内容,将预测结果存入队列中,显示预测结果。转到步骤(1)。

(9)显示下线消息,消息接收线程结束。



■ 图 7 客户机接收消息逻辑流程


程序段 P7.6 描述了接收消息线程函数 recv_message 的完整逻辑。



将\dataset\images 目录下的图像文件 Test_0.jpg、Test_7.jpg 拷贝到根目录下。


运行服务器程序,然后运行客户机程序,做联合测试。


客户机输入待遇测的图像文件名称 Test_0.jpg,回车后发送图像数据,服务器返回预测结果。客户机输入字符 Q,结束客户机。完成此次客户机与服务器的通信后,服务器与客户机的状态信息如图 8 所示。



此时服务器工作于一客户一线程模式,启动多个客户端,可做联合测试。


08、客户机界面设计


为了增强客户机的可操作性,基于 PyQt5 框架为客户机设计图形化界面,界面布局及其控件名称如图 9 所示。



定义图形化界面类 GUI(QMainWindow)封装图 9 所示的控件及其事件函数。


运行服务器,然后运行客户机,从 chapter7 的根目录中加载图像 Test_0.jpg,观察图像特点。然后单击“预测”按钮,观察服务器反馈的预测结果,如图 10 所示。



09、线程池


服务器现有的工作模式为一客户一线程,即为每一个连接到服务器的客户机创建独立的会话线程,当客户机并发量较大时,服务器往往面临资源枯竭的挑战。


线程池模式可以有效平衡服务器负载能力,与一客户一线程模式相比,其主要优点有:

(1)通过重用已存在的线程,降低线程创建和销毁造成的额外消耗。

(2)提高系统响应速度,当有新任务到达时,通过复用已存在的线程便能立即执行,无需等待新线程的创建。

(3)控制资源消耗,将并发线程数量限制在合理的区间。

(4)针对工作线程提供了更多的控制能力,例如线程延时、定时等。


Python 的线程池定义在 concurrent.futures 包中,使用 ThreadPoolExecutor 类创建线程池。线程池调度任务过程如图 11 所示。



将一客户一线程模式修改为线程池模式,只需做以下改动:

(1)导入线程池类 ThreadPoolExecutor。在服务器端添加语句:

from concurrent.futures import ThreadPoolExecutor # 线程池类
复制代码

(2)在服务器主线程的 while 循环前面添加创建线程池的语句:

pool = ThreadPoolExecutor(max_workers=5) # 创建线程池,指定工作线程数量为5
复制代码

此处如果省略参数 max_workers,则线程池默认工作线程数量是 CPU 数量的 5 倍。考虑到线程池往往应用于需要大量 I/O 交换的场景,而不是 CPU 计算密集型的场景,故工作线程的数量应该超过 CPU 的数量。

(3)用线程池调度语句替换原有的线程创建语句。

# 建立与客户机会话的线程,一客户一线程client_thread = threading.Thread(target=handle_client, args=(new_socket, new_addr, model))client_thread.start()
复制代码

替换为:

pool.submit(handle_client,new_socket, new_addr, model) # 创建线程任务,提交到线程池
复制代码

(4)在主程序末尾,while 循环外部,添加关闭线程池的语句,释放资源:

pool.shutdown(wait=True) # 关闭线程池
复制代码

执行 shutdown 后,线程池将不再接受新任务。参数 wait 默认为 True,表示关闭线程池之前需要等待所有工作线程结束。


10、联合测试


为便于观察,将服务器线程池的工作线程数量调整为 2。启动服务器,然后启动四个客户机,标识为客户机 1、客户机 2、客户机 3、客户机 4。


四个客户机从 dataset\images 目录中选择四幅不同的测试图片,


假定客户机 1 选择的图片是 Test_17.jpg,客户机 2 选择的是 Test_152.jpg,客户机 3 选择的是 Test_190.jpg,客户机 4 选择的是 Test_1572.jpg,然后依次点击客户机 1、客户机 2、客户机 3、客户机 4 的“预测”按钮,观察预测结果。


可以看到,只有客户机 1、客户机 2 立即反馈了预测结果,而客户机 3、客户机 4 虽然已经连接到服务器,却并没有立即得到预测结果,原因是服务器线程池大小为 2,客户机 3、客户机 4 需要在任务队列等待。


客户机 1 显示结果如图 12 所示。



客户机 2 显示结果图 13 所示。



客户机 3 显示结果如图 14 所示。由于服务器线程池大小为 2,所以客户机 1 与客户机 2 占用工作线程后,客户机 3 只能进入任务队列等待。



客户机 4 显示结果如图 15 所示。同样,客户机 4 也只能进入服务器的任务队列等待。



关闭客户机 1,则会自动释放客户机 1 占用的工作线程,此时排队中的客户机 3 会立即得到相应,其结果如图 16 所示。



此时只有客户机 4 仍处于等待中。如果继续关闭客户机 2,则客户机 4 会得到立即响应,其预测结果如图 17 所示。



关闭客户机 3、关闭客户机 4。整个会话期间,服务器状态监控界面的信息提示如下:



仔细阅读服务器的状态提示信息,与客户机的操作相对照,可以更精准地把握客户机与服务器的全程会话逻辑。


11、小结


本文基于 Socket 通信方法,自定义数据交换协议,围绕苹果树病虫害识别需求,迭代构建了客户机/服务器模式的智能桌面 App。图像数据的发送采用 base64 编码方式,消息头、消息内容采用 Json 数据格式。服务器端采用一客户一线程和线程池技术支持并发访问,客户机采用基于 PyQt5 的图像化界面技术提高其可操作性。基于 Socket 技术的网络编程,在客户机与服务器两端提供了更多的设计灵活性。

发布于: 刚刚阅读数: 3
用户头像

TiAmo

关注

有能力爱自己,有余力爱别人! 2022-06-16 加入

CSDN全栈领域优质创作者,万粉博主;阿里云专家博主、星级博主、技术博主、阿里云问答官,阿里云MVP;华为云享专家;华为Iot专家;亚马逊人工智能自动驾驶(大众组)吉尼斯世界纪录获得者

评论

发布
暂无评论
Python项目实战│ Python实现线程池工作模式_Python_TiAmo_InfoQ写作社区