当 SeldonDeployment 服务部署完成后,将会在 kubernetes 集群节点上创建服务容器,而服务容器内将会启动 seldon-core-microservices 进程。该进程负责加载模型,并对外提供 http 服务。接下来,我们将介绍 seldon-core-microservices 的启动过程及它所提供的相关 http 服务。
1 seldon-core-microservices 启动过程
通过如下命令进入模型服务容器
kubectl exec -it seldon-model-example-0-classifier-xxxx bash
复制代码
查看相关进程,如下:
/usr/local/bin/seldon-core-microservice MyModel REST --service-type MODEL --persistence 0
复制代码
这与 Dockerfile 文件定义的 cmd 一致,seldon-core-microservice 脚本负责启动服务。
查看该脚本文件内容,如下:
#!/usr/local/bin/python# EASY-INSTALL-ENTRY-SCRIPT: 'seldon-core','console_scripts','seldon-core-microservice'__requires__ = 'seldon-core'import reimport sysfrom pkg_resources import load_entry_point
if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0]) sys.exit( load_entry_point('seldon-core', 'console_scripts', 'seldon-core-microservice')() )
复制代码
即这是 seldon-core 这个 python 包安装的启动脚本。启动时,它会执行 console_scripts 中定义的 seldon-core-microservice 所对应的方法。
接下来我们需要分析 seldon-core 源码包,看看启动脚本的执行内容。
按图索骥,我们找到安装文件 seldon-core/python/setup.py,其中相关内容如下:
from setuptools import find_packages, setupsetup( entry_points={ "console_scripts": [ "seldon-core-microservice = seldon_core.microservice:main", "seldon-batch-processor = seldon_core.batch_processor:run_cli", ] },)
复制代码
由此可见,seldon-core-microservice 所对应的是 seldon_core/microservice.py 模块的 main()方法。
查看 seldon-core/python/seldon_core/microservice.py 源码文件,其中 main()方法主要执行如下内容:
def main(): # 1定义命令行参数 parser = argparse.ArgumentParser() parser.add_argument("interface_name", type=str, help="Name of the user interface.") parser.add_argument( "--service-type", type=str, choices=["MODEL", "ROUTER", "TRANSFORMER", "COMBINER", "OUTLIER_DETECTOR"], default="MODEL", ) parser.add_argument("--persistence", nargs="?", default=0, const=1, type=int) parser.add_argument( "--parameters", type=str, default=os.environ.get(PARAMETERS_ENV_NAME, "[]") ) # 1.1解析命令行参数 args, remaining = parser.parse_known_args() # 2加载模型 interface_file = importlib.import_module(args.interface_name) user_class = getattr(interface_file, args.interface_name) if args.persistence: user_object = persistence.restore(user_class, parameters) persistence.persist(user_object, parameters.get("push_frequency")) else: user_object = user_class(**parameters) # 3定义rest服务 def rest_prediction_server(): # rest服务参数:ip、端口、进程数,超时时间,pid文件等 options = {} app = seldon_microservice.get_rest_microservice(user_object, seldon_metrics) UserModelApplication( app, user_object, jaeger_extra_tags, args.interface_name, options=options, ).run() server1_func = rest_prediction_server # 4使用multiprocessing启动rest进程 start_servers(server1_func, server2_func, server3_func, metrics_server_func)
复制代码
如代码所示,启动过程很清晰简洁的,主要包含如下 4 个步骤:
定义及解析命令行参数
加载模型
定义 Rest 服务
启动进程
注:
--service-type 参数,当前代码中并未使用到。该参数中声明的几种类型,在后面定义的 rest 服务有相对应的 api 接口。因暂未使用到,本文不展开讨论。
--persistence 参数,如果开启,则会启动一个后台进程,定期地将 user_model 使用 pickle 的方式序列化并保存在 redis 中。后续启动时可以从 redis 中获取 pickle 数据并加载。因非核心功能,本文不展开讨论。
2 如何加载模型
首先我们看未使用 persistence 的模型加载方式。主要代码如下:
interface_file = importlib.import_module(args.interface_name)user_class = getattr(interface_file, args.interface_name)user_object = user_class(**parameters)
复制代码
使用 importlib.import_module()加载模块,模块名称是由命令行参数指定的 interface_name(在本例中就是 MyModel)。加载完成后,从该模块获取属性名称 interface_name(即 MyModel)的类(class)。最后使用该类(即 MyModel)创建模型对象(user_object)。
这样就完成了模型加载及初始化。通常在模型定义类(class)的__init__()方法负责创建模型,并加载模型的权重参数(load)。加载权重参数的另一个可选方式,是将其定义在模型类的 load()方法内。
在 Rest 服务定义时,创建的 UserModelApplication(继承 gunicorn 的 BaseApplication)在启动时会调用其 load()方法,而该方法会调用模型类的 load()方法。其内容如下:
class UserModelApplication(StandaloneApplication): def load(self): try: self.user_object.load() except (NotImplementedError, AttributeError): logger.debug("No load method in user model") return self.application
复制代码
3 Rest 服务如何处理 predict 请求
由上文可知,Rest 服务是由 get_rest_microservice()定义的,其主要内容如下:
def get_rest_microservice(user_model, seldon_metrics): app = Flask(__name__, static_url_path="") @app.route("/predict", methods=["GET", "POST"]) @app.route("/api/v1.0/predictions", methods=["POST"]) @app.route("/api/v0.1/predictions", methods=["POST"]) def Predict(): requestJson = get_request(skip_decoding=PAYLOAD_PASSTHROUGH) response = seldon_core.seldon_methods.predict( user_model, requestJson, seldon_metrics )
json_response = jsonify(response, skip_encoding=PAYLOAD_PASSTHROUGH) return json_response # 其他api接口定义,如/health, /route, /aggregate等 return app
复制代码
这是 Flask 处理 Rest 请求的基本过程:
从 http 请求 body 序列化 json 数据 requestJson
使用 requestJson 请求数据,调用 user_model 进行推理,得到 response
返回将 response 序列化成 json 数据并返回
那么 seldon_methods.predict()如何调用 user_model 进行推理呢?继续查看代码,predict()方法会先后检查模型类(class)是否实现 predict_rest(), predict_grpc(), predict_raw()方法。若都没有,则调用默认实现模型类(class)的 predict()方法,这里仅介绍默认情况,如下:
def predict( user_model: Any, request: Union[prediction_pb2.SeldonMessage, List, Dict, bytes], seldon_metrics: SeldonMetrics,) -> Union[prediction_pb2.SeldonMessage, List, Dict, bytes]: (features, meta, datadef, data_type) = extract_request_parts_json(request) class_names = datadef["names"] if datadef and "names" in datadef else [] client_response = client_predict( user_model, features, class_names, meta=meta ) return construct_response_json( user_model, False, request, client_response.data, meta, metrics, client_response.tags, )
复制代码
实现过程也是三段式:
而 client_predict()其实就是直接调用 user_model.predict()
try: client_response = user_model.predict(features, feature_names, **kwargs)except TypeError: client_response = user_model.predict(features, feature_names)return SeldonResponse.create(client_response)
复制代码
综述整个 Rest 服务的推理过程,就是做了三件事情:
格式化请求数据,转换成模型推理需要的格式 features
调用 user_model.predict(features)进行推理,得到推理结果 client_response
格式化 client_response,并将其序列化后返回
总结一下,本文主要介绍了一个典型的 python 模型服务,使用 seldon-core-microservices 是如何启动,如何通过 REST 方式响应模型推理请求的。
后续我们将介绍 seldon-core 所提供的其他类型推理服务。
评论