当 SeldonDeployment 服务部署完成后,将会在 kubernetes 集群节点上创建服务容器,而服务容器内将会启动 seldon-core-microservices 进程。该进程负责加载模型,并对外提供 http 服务。接下来,我们将介绍 seldon-core-microservices 的启动过程及它所提供的相关 http 服务。
1 seldon-core-microservices 启动过程
通过如下命令进入模型服务容器
kubectl exec -it seldon-model-example-0-classifier-xxxx bash
复制代码
查看相关进程,如下:
/usr/local/bin/seldon-core-microservice MyModel REST --service-type MODEL --persistence 0
复制代码
这与 Dockerfile 文件定义的 cmd 一致,seldon-core-microservice 脚本负责启动服务。
查看该脚本文件内容,如下:
#!/usr/local/bin/python
# EASY-INSTALL-ENTRY-SCRIPT: 'seldon-core','console_scripts','seldon-core-microservice'
__requires__ = 'seldon-core'
import re
import sys
from pkg_resources import load_entry_point
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
sys.exit(
load_entry_point('seldon-core', 'console_scripts', 'seldon-core-microservice')()
)
复制代码
即这是 seldon-core 这个 python 包安装的启动脚本。启动时,它会执行 console_scripts 中定义的 seldon-core-microservice 所对应的方法。
接下来我们需要分析 seldon-core 源码包,看看启动脚本的执行内容。
按图索骥,我们找到安装文件 seldon-core/python/setup.py,其中相关内容如下:
from setuptools import find_packages, setup
setup(
entry_points={
"console_scripts": [
"seldon-core-microservice = seldon_core.microservice:main",
"seldon-batch-processor = seldon_core.batch_processor:run_cli",
]
},
)
复制代码
由此可见,seldon-core-microservice 所对应的是 seldon_core/microservice.py 模块的 main()方法。
查看 seldon-core/python/seldon_core/microservice.py 源码文件,其中 main()方法主要执行如下内容:
def main():
# 1定义命令行参数
parser = argparse.ArgumentParser()
parser.add_argument("interface_name", type=str, help="Name of the user interface.")
parser.add_argument(
"--service-type",
type=str,
choices=["MODEL", "ROUTER", "TRANSFORMER", "COMBINER", "OUTLIER_DETECTOR"],
default="MODEL",
)
parser.add_argument("--persistence", nargs="?", default=0, const=1, type=int)
parser.add_argument(
"--parameters", type=str, default=os.environ.get(PARAMETERS_ENV_NAME, "[]")
)
# 1.1解析命令行参数
args, remaining = parser.parse_known_args()
# 2加载模型
interface_file = importlib.import_module(args.interface_name)
user_class = getattr(interface_file, args.interface_name)
if args.persistence:
user_object = persistence.restore(user_class, parameters)
persistence.persist(user_object, parameters.get("push_frequency"))
else:
user_object = user_class(**parameters)
# 3定义rest服务
def rest_prediction_server():
# rest服务参数:ip、端口、进程数,超时时间,pid文件等
options = {}
app = seldon_microservice.get_rest_microservice(user_object, seldon_metrics)
UserModelApplication(
app,
user_object,
jaeger_extra_tags,
args.interface_name,
options=options,
).run()
server1_func = rest_prediction_server
# 4使用multiprocessing启动rest进程
start_servers(server1_func, server2_func, server3_func, metrics_server_func)
复制代码
如代码所示,启动过程很清晰简洁的,主要包含如下 4 个步骤:
定义及解析命令行参数
加载模型
定义 Rest 服务
启动进程
注:
--service-type 参数,当前代码中并未使用到。该参数中声明的几种类型,在后面定义的 rest 服务有相对应的 api 接口。因暂未使用到,本文不展开讨论。
--persistence 参数,如果开启,则会启动一个后台进程,定期地将 user_model 使用 pickle 的方式序列化并保存在 redis 中。后续启动时可以从 redis 中获取 pickle 数据并加载。因非核心功能,本文不展开讨论。
2 如何加载模型
首先我们看未使用 persistence 的模型加载方式。主要代码如下:
interface_file = importlib.import_module(args.interface_name)
user_class = getattr(interface_file, args.interface_name)
user_object = user_class(**parameters)
复制代码
使用 importlib.import_module()加载模块,模块名称是由命令行参数指定的 interface_name(在本例中就是 MyModel)。加载完成后,从该模块获取属性名称 interface_name(即 MyModel)的类(class)。最后使用该类(即 MyModel)创建模型对象(user_object)。
这样就完成了模型加载及初始化。通常在模型定义类(class)的__init__()方法负责创建模型,并加载模型的权重参数(load)。加载权重参数的另一个可选方式,是将其定义在模型类的 load()方法内。
在 Rest 服务定义时,创建的 UserModelApplication(继承 gunicorn 的 BaseApplication)在启动时会调用其 load()方法,而该方法会调用模型类的 load()方法。其内容如下:
class UserModelApplication(StandaloneApplication):
def load(self):
try:
self.user_object.load()
except (NotImplementedError, AttributeError):
logger.debug("No load method in user model")
return self.application
复制代码
3 Rest 服务如何处理 predict 请求
由上文可知,Rest 服务是由 get_rest_microservice()定义的,其主要内容如下:
def get_rest_microservice(user_model, seldon_metrics):
app = Flask(__name__, static_url_path="")
@app.route("/predict", methods=["GET", "POST"])
@app.route("/api/v1.0/predictions", methods=["POST"])
@app.route("/api/v0.1/predictions", methods=["POST"])
def Predict():
requestJson = get_request(skip_decoding=PAYLOAD_PASSTHROUGH)
response = seldon_core.seldon_methods.predict(
user_model, requestJson, seldon_metrics
)
json_response = jsonify(response, skip_encoding=PAYLOAD_PASSTHROUGH)
return json_response
# 其他api接口定义,如/health, /route, /aggregate等
return app
复制代码
这是 Flask 处理 Rest 请求的基本过程:
从 http 请求 body 序列化 json 数据 requestJson
使用 requestJson 请求数据,调用 user_model 进行推理,得到 response
返回将 response 序列化成 json 数据并返回
那么 seldon_methods.predict()如何调用 user_model 进行推理呢?继续查看代码,predict()方法会先后检查模型类(class)是否实现 predict_rest(), predict_grpc(), predict_raw()方法。若都没有,则调用默认实现模型类(class)的 predict()方法,这里仅介绍默认情况,如下:
def predict(
user_model: Any,
request: Union[prediction_pb2.SeldonMessage, List, Dict, bytes],
seldon_metrics: SeldonMetrics,
) -> Union[prediction_pb2.SeldonMessage, List, Dict, bytes]:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_predict(
user_model, features, class_names, meta=meta
)
return construct_response_json(
user_model,
False,
request,
client_response.data,
meta,
metrics,
client_response.tags,
)
复制代码
实现过程也是三段式:
而 client_predict()其实就是直接调用 user_model.predict()
try:
client_response = user_model.predict(features, feature_names, **kwargs)
except TypeError:
client_response = user_model.predict(features, feature_names)
return SeldonResponse.create(client_response)
复制代码
综述整个 Rest 服务的推理过程,就是做了三件事情:
格式化请求数据,转换成模型推理需要的格式 features
调用 user_model.predict(features)进行推理,得到推理结果 client_response
格式化 client_response,并将其序列化后返回
总结一下,本文主要介绍了一个典型的 python 模型服务,使用 seldon-core-microservices 是如何启动,如何通过 REST 方式响应模型推理请求的。
后续我们将介绍 seldon-core 所提供的其他类型推理服务。
评论