写点什么

Python 实战之用内置模块来构建 REST 服务、RPC 服务

作者:山河已无恙
  • 2022 年 8 月 29 日
    内蒙古
  • 本文字数:13491 字

    阅读完需:约 44 分钟

写在前面



  • 和小伙伴们分享一些 Python 网络编程的一些笔记,博文为《Python Cookbook》读书后笔记整理

  • 博文涉及内容包括:

  • TCP/UDP 服务构建

  • 不使用框架创建一个REST风格的web服务

  • 基于XML-RP用【C实现简单的 RPC

  • 基于multiprocessing.connection实现简单的 RPC

  • python 实现作为客户端与 HTTP 服务交互

  • 理解不足小伙伴帮忙指正


傍晚时分,坐在屋檐下,看着天慢慢地黑下去,心里寂寞而凄凉,感到自己的生命被剥夺了。当时我是个年轻人,但我害怕这样生活下去,衰老下去。在我看来,这是比死亡更可怕的事。--------王小波




在 Python 中,构建一个静态 Web 服务器,只需要 python3 -m http.server 端口号( 端口号不指定默认是 8000) 这一条命令就可以搞定了,之前也有看到有公司内网中,一些安装包放到服务器上每次 FTP 麻烦,用 http 模块的方式很方便。


python 在网络方面封装一些内置模块,可以用很简洁的代码实现端到端的通信,比如 HTTP、RPC 服务等。


在编写 RPC 和 REST 服务之前,先来温习一下常见的的基于 Socket 模块的一些端到端的通信协议。不管是 RPC 还是 REST 都需要底层的通信协议来支持。


对于 TCP 和 UPD 协议,在常见的网络通信中,浏览器,邮件等一般应用程序在收发数据时都是通过 TCP 协议的,DNS 等收发较短的控制数据时一般会使用 UDP。

创建 TCP 服务

实现一个服务器,通过 TCP 协议和客户端通信。


创建一个 TCP 服务器的一个简单方法是使用socketserver库。一起来温习下面这个简单的 TCP 服务器


from socketserver import BaseRequestHandler, TCPServer
class EchoHandler(BaseRequestHandler): def handle(self): print('Got connection from', self.client_address) while True: #接收客户端发送的数据, 这次接收数据的最大字节数是8192 msg = self.request.recv(8192) # 接收的到数据在发送回去 if not msg: break self.request.send(msg)
if __name__ == '__main__': # 20000端口,默认IP为本地IP,监听到消息交个EchoHandler处理器 serv = TCPServer(('', 20000), EchoHandler) serv.serve_forever()
复制代码


代码很简单,指定 IP 暴露对应的端口,这里通过serv.serve_forever()来保证连接一直存在。


Got connection from ('127.0.0.1', 1675)


建立好服务端之后看下客户端


  • AF_INET:表示 ipv4

  • SOCK_STREAM: tcp 传输协议


>>> from socket import socket, AF_INET, SOCK_STREAM>>> s = socket(AF_INET, SOCK_STREAM)>>> s.connect(('localhost', 20000))>>> s.send(b'Hello')5>>> s.recv(8192)b'Hello'>>>
复制代码


socketserver 默认情况下这种服务器是单线程的,一次只能为一个客户端连接服务。如果想通过多线程处理多个客户端,可以初始化一个ForkingTCPServer 或者是ThreadingTCPServer对象。


from socketserver import ThreadingTCPServer
if __name__ == '__main__': serv = ThreadingTCPServer(('', 20000), EchoHandler) serv.serve_forever()
复制代码


使用 fork 或线程服务器有个潜在问题就是它们会为每个客户端连接创建一个新的进程或线程。由于客户端连接数是没有限制的,因此一个恶意的黑客可以同时发送大量的连接让的服务器奔溃


可以创建一个预先分配大小的 工作线程池或进程池,先创建一个普通的非线程服务器,然后在一个线程池中使用serve forever()方法来启动它们。


if __name__ == '__main__':    from threading import Thread    NWORKERS = 16    serv = TCPServer(('', 20000), EchoHandler)    for n in range(NWORKERS):        t = Thread(target=serv.serve_forever)        t.daemon = True        t.start()    serv.serve_forever()
复制代码


一般来讲,一个TCPServer在实例化的时候会绑定并激活相应的socket,如果 bind_and_activate 为真,则构造方法会自动调用server_bind() 和 server_activate()方法。其余参数传给基类 BaseServer,有时候想通过设置某些选项去调整底下的 socket,可以设置参数bind_and_activate=False


if __name__ == '__main__':    serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)    # Set up various socket options    serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)    # Bind and activate    serv.server_bind()    serv.server_activate()    serv.serve_forever()
复制代码


socket.SO_REUSEADDR允许服务器重新绑定一个之前使用过的端口号。由于要被经常使用到,它被放置到类变量中,可以直接在 TCPServer上面设置


当然,也可以不使用一个工具类,直接使用原生的Socket的编写TCP服务端


from socket import socket, AF_INET, SOCK_STREAM

def echo_handler(address, client_sock): print('Got connection from {}'.format(address)) while True: msg = client_sock.recv(8192) if not msg: break client_sock.sendall(msg) client_sock.close()
def echo_server(address, backlog=5): sock = socket(AF_INET, SOCK_STREAM) sock.bind(address) sock.listen(backlog) while True: client_sock, client_addr = sock.accept() echo_handler(client_addr, client_sock)
if __name__ == '__main__': echo_server(('', 20000))
复制代码


了解了 TCP 的服务通信,在来看一下 UDP 的

创建 UDP 服务

实现一个基于 UDP 协议的服务器来与客户端通信。


跟 TCP 一样,UDP 服务器也可以通过使用socketserver库很容易的被创建。例如,下面是一个简单的时间服务器:


from socketserver import BaseRequestHandler, UDPServerimport timeclass TimeHandler(BaseRequestHandler):    def handle(self):        print('Got connection from', self.client_address)        # Get message and client socket request 属性是一个包含了数据报和底层 socket 对象的元组        msg, sock = self.request        resp = time.ctime()        sock.sendto(resp.encode('ascii'), self.client_address)
if __name__ == '__main__': serv = UDPServer(('', 20000), TimeHandler) serv.serve_forever()
复制代码


测试一下


>>> from socket import socket, AF_INET, SOCK_DGRAM>>> s = socket(AF_INET, SOCK_DGRAM)>>> s.sendto(b'', ('localhost', 20000))0>>> s.recvfrom(8192)(b'Tue May  3 11:48:53 2022', ('127.0.0.1', 20000))>>>
复制代码


对于UPD协议而言,对于数据报的传送,应该使用 socket 的sendto() 和 recvfrom() 方法,因为是面向无连接的,没有建立连接的步骤,但是要在发生时跟着接受方


了解基本的通信协议之后,回到今天要讲的 ERST 接口。REST 接口是基于 HTTP 协议的,而 HTTP 是直接依赖 TCP 的协议栈,负责约束表示层

创建一个简单的 REST 接口

使用一个简单的 REST 接口通过网络远程控制或访问的应用程序,但是又不想自己去安装一个完整的 web 框架。



可以构建一个 REST 风格的接口,最简单的方法是创建一个基于 WSGI 标准(Web服务网关接口,PEP 3333)的很小的库。类似支持 REST 风格的 Python Web 框架 Flask。


#!/usr/bin/env python# -*- encoding: utf-8 -*-"""@File    :   app.py@Time    :   2022/05/03 14:43:56@Author  :   Li Ruilong@Version :   1.0@Contact :   1224965096@qq.com@Desc    :   None"""
# here put the import lib
import timeimport cgi

def notfound_404(environ, start_response): start_response('404 Not Found', [('Content-type', 'text/plain')]) return [b'Not Found']
# 核心控制器,用于路由注册class PathDispatcher: def __init__(self): # 映射字典 self.pathmap = {} # 核心控制器的回调 def __call__(self, environ, start_response): # 获取路由 path = environ['PATH_INFO'] # 获取请求参数 params = cgi.FieldStorage(environ['wsgi.input'], environ=environ) # 获取请求方法 method = environ['REQUEST_METHOD'].lower() environ['params'] = {key: params.getvalue(key) for key in params} # 找到映射的函数 handler = self.pathmap.get((method, path), notfound_404) # 返回函数 return handler(environ, start_response)
def register(self, method, path, function): # 请求方法和路由作为K,执行函数为V self.pathmap[method.lower(), path] = function return function

_hello_resp = "wo jiao {name}"

def hello_world(environ, start_response): start_response('200 OK', [('Content-type', 'text/html')]) params = environ['params'] resp = _hello_resp.format(name=params.get('name')) yield resp.encode('utf-8')

_localtime_resp = "dang qian shjian {t}"
# 路由的回调def localtime(environ, start_response): start_response('200 OK', [('Content-type', 'application/xml')]) resp = _localtime_resp.format(t=time.localtime()) yield resp.encode('utf-8')

if __name__ == '__main__': from wsgiref.simple_server import make_server # 创建一个核心控制器,用于路由注册 dispatcher = PathDispatcher() # 注册路由,对应的回调方法 dispatcher.register('GET', '/hello', hello_world) dispatcher.register('GET', '/localtime', localtime) # Launch a basic server 监听8080端口,注入核心控制器 httpd = make_server('', 8080, dispatcher) print('Serving on port 8080...') httpd.serve_forever()
复制代码


测试一下


┌──[root@liruilongs.github.io]-[~]└─$coproc (./app.py)[2] 130447
复制代码


curl localhost:8080/hello


┌──[root@liruilongs.github.io]-[~]└─$curl  localhost:8080/hello127.0.0.1 - - [03/May/2022 16:09:12] "GET /hello HTTP/1.1" 200 12wo jiao None
复制代码


curl localhost:8080/hello?name=liruilong


┌──[root@liruilongs.github.io]-[~]└─$curl  localhost:8080/hello?name=liruilong127.0.0.1 - - [03/May/2022 16:09:47] "GET /hello?name=liruilong HTTP/1.1" 200 17wo jiao liruilong┌──[root@liruilongs.github.io]-[~]└─$jobs....[2]-  运行中               coproc COPROC ( ./app.py ) &
复制代码


实现一个简单的 REST 接口,只需让的程序代码满足 Python 的 WSGI标准即可。WSGI 被标准库支持,同时也被绝大部分第三方 web 框架支持


这里感觉Python WebWSGI标准Java Web 体系的Servlet规范特别接近,但是Servlet是侵入式的,同时需要特定的 Web 容器(Tomcat)支持,而 WSGI 好像对代码的影响很少...感兴趣小伙伴可以研究下.


另一方面,通过上面的代码,可以对当下这种Web端MVC的设计模式流程(Flask,Django,SpringMVC)有一个基本的认识,当然实际的框架要复杂的多。但是基本构建思路一样。

关于 WSGI 标准,简单来分析一下

以一个可调用对象形式来实现路由匹配要操作的方法


import cgi
def wsgi_app(environ, start_response): pass
复制代码


environ 属性是一个字典,包含了从 web 服务器如 Apache[参考 Internet RFC 3875]提供的 CGI 接口中获取的值。要将这些不同的值提取出来,可以像这么这样写:


def wsgi_app(environ, start_response):    method = environ['REQUEST_METHOD']    path = environ['PATH_INFO']    # Parse the query parameters    params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)
复制代码


start_response 参数是一个为了初始化一个请求对象而必须被调用的函数。第一个参数是返回的 HTTP 状态值,第二个参数是一个 (名, 值) 元组列表,用来构建返回的 HTTP 头


def wsgi_app(environ, start_response):    pass    start_response('200 OK', [('Content-type', 'text/plain')])
复制代码


为了返回数据,一个 WSGI 程序必须返回一个字节字符串序列。可以像下面这样使用一个列表来完成


def wsgi_app(environ, start_response):    pass    start_response('200 OK', [('Content-type', 'text/plain')])    resp = []    resp.append(b'Hello World\n')    resp.append(b'Goodbye!\n')    return resp
复制代码


或者,还可以使用 yield


def wsgi_app(environ, start_response):    pass    start_response('200 OK', [('Content-type', 'text/plain')])    yield b'Hello World\n'    yield b'Goodbye!\n'
复制代码


最后返回的必须是字节字符串。如果返回结果包含文本字符串,必须先将其编码成字节。图片也是 OK 的


class WSGIApplication:    def __init__(self):        ...    def __call__(self, environ, start_response)        ...
复制代码


PathDispatcher 类。这个分发器仅仅只是管理一个字典,将 (方法, 路径) 对映射到处理器函数上面。当一个请求到来时,它的方法和路径被提取出来,然后被分发到对应的处理器上面去。


    dispatcher = PathDispatcher()    # 注册路由,对应的回调    dispatcher.register('GET', '/hello', hello_world)    dispatcher.register('GET', '/localtime', localtime)
复制代码


任何查询变量会被解析后放到一个字典中,以 environ['params'] 形式存储。后面这个步骤太常见,所以建议在分发器里面完成,这样可以省掉很多重复代码。使用分发器的时候,只需简单的创建一个实例,然后通过它注册各种 WSGI 形式的函数。编写这些函数应该超级简单了,只要遵循 start_response() 函数的编写规则,并且最后返回字节字符串即可。


WSGI 本身是一个很小的标准。因此它并没有提供一些高级的特性比如认证、cookies、重定向、全局的异常处理等。这些自己实现起来也不难。不过如果想要更多的支持,可以考虑第三方库


上面服务端的构建,我们使用了curl工具来访问,那么作为客户端 Python 有哪些交互方式?

作为客户端与 HTTP 服务交互

需要通过 HTTP 协议以客户端的方式访问多种服务。例如,下载数据或者与基于 REST 的 API 进行交互。


对于简单的事情来说,通常使用urllib.request模块就够了.一个 Get 请求的 Demo


┌──[root@liruilongs.github.io]-[~]└─$python3Python 3.6.8 (default, Nov 16 2020, 16:55:22)[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linuxType "help", "copyright", "credits" or "license" for more information.>>> from urllib import request, parse>>> url = 'http://httpbin.org/get'>>> parms = {...     'name1': 'value1',...     'name2': 'value2'... }>>> querystring = parse.urlencode(parms)>>> querystring'name1=value1&name2=value2'>>> request.urlopen(url+'?' + querystring)<http.client.HTTPResponse object at 0x7ffa0ef0f710>>>> u = request.urlopen(url+'?' + querystring)>>> u.read()b'{    "args": {        "name1": "value1",        "name2": "value2"    },    "headers": {        "Accept-Encoding": "identity",        "Host": "httpbin.org",        "User-Agent": "Python-urllib/3.6",        "X-Amzn-Trace-Id": "Root=1-62707b15-41a1169c0897c9001a07f948"    },    "origin": "39.154.13.139",    "url": "http://httpbin.org/get?name1=value1&name2=value2"}'
复制代码


如果需要使用POST方法在请求主体中发送查询参数,可以将参数编码后作为可选参数提供给urlopen()函数,就像这样:


>>> from urllib import request, parse>>> url = 'http://httpbin.org/post'>>> parms = {... 'name1' : 'value1',... 'name2' : 'value2'... }>>> querystring = parse.urlencode(parms)>>> querystring.encode('ascii')b'name1=value1&name2=value2'>>> u = request.urlopen(url, querystring.encode('ascii'))>>> resp = u.read()>>> respb'{    "args": {},    "data": "",    "files": {},    "form": {        "name1": "value1",        "name2": "value2"    },    "headers": {        "Accept-Encoding": "identity",        "Content-Length": "25",        "Content-Type": "application/x-www-form-urlencoded",        "Host": "httpbin.org",        "User-Agent": "Python-urllib/3.6",        "X-Amzn-Trace-Id": "Root=1-62707d24-15e9944760d3bbaa36c3714a"    },    "json": null,    "origin": "39.154.13.139",    "url": "http://httpbin.org/post"}'>>>
复制代码


在发出的请求中提供一些自定义的 HTTP 请求首部,创建一个 Request 实例然后将其传给urlopen()


>>> from urllib import request, parse>>> headers = {... 'User-agent' : 'none/ofyourbusiness',... 'Spam' : 'Eggs'... }>>> req = request.Request(url, querystring.encode('ascii'), headers=headers)>>> u = request.urlopen(req)>>> u.read()b'{    "args": {},    "data": "",    "files": {},    "form": {        "name1": "value1",        "name2": "value2"    },    "headers": {        "Accept-Encoding": "identity",        "Content-Length": "25",        "Content-Type": "application/x-www-form-urlencoded",        "Host": "httpbin.org",        "Spam": "Eggs",        "User-Agent": "none/ofyourbusiness",        "X-Amzn-Trace-Id": "Root=1-62707f0e-308a8137555e15797d950018"    },    "json": null,    "origin": "39.154.13.139",    "url": "http://httpbin.org/post"}'>>>
复制代码


如果需要交互的服务,可以使用 requests 模块, 这个不是自带模块,需要安装python3 -m pip install requests


>>> import requests>>> url = 'http://httpbin.org/post'>>> parms = {... 'name1' : 'value1',... 'name2' : 'value2'... }>>> headers = {... 'User-agent' : 'none/ofyourbusiness',... 'Spam' : 'Eggs'... }>>> resp = requests.post(url, data=parms, headers=headers)>>> resp.text'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "name1": "value1", \n    "name2": "value2"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Content-Length": "25", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "httpbin.org", \n    "Spam": "Eggs", \n    "User-Agent": "none/ofyourbusiness", \n    "X-Amzn-Trace-Id": "Root=1-62708080-7a14319e699baa2e35a352fb"\n  }, \n  "json": null, \n  "origin": "39.154.13.139", \n  "url": "http://httpbin.org/post"\n}\n'>>> resp.json(){'args': {}, 'data': '', 'files': {}, 'form': {'name1': 'value1', 'name2': 'value2'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '25', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'Spam': 'Eggs', 'User-Agent': 'none/ofyourbusiness', 'X-Amzn-Trace-Id': 'Root=1-62708080-7a14319e699baa2e35a352fb'}, 'json': None, 'origin': '39.154.13.139', 'url': 'http://httpbin.org/post'}>>>
复制代码


requests 模块支持很多种数据的返会方式,可以直接返回以 Unicode 解码的响应文本,也可以返回 JSON 数据


利用 requests 库发起一个 HEAD 请求


>>> import requests>>> resp = requests.head( 'http://httpbin.org/post')>>> resp<Response [405]>>>> resp = requests.head( 'http://httpbin.org/')>>> resp<Response [200]>>>> resp.status_code200>>> resp.headers['content-length']'9593'>>> resp.headers['content-type']'text/html; charset=utf-8'>>> resp.text''>>>
复制代码


如果决定坚持使用标准的程序库而不考虑像requests这样的第三方库,可以使用底层的 http.client 模块来实现自己的代码。


from http.client import HTTPConnectionfrom urllib import parse

c = HTTPConnection('www.python.org', 80)c.request('HEAD', '/index.html')resp = c.getresponse()print('Status', resp.status)for name, value in resp.getheaders(): print(name, value)
复制代码


测试 HTTP 客户端,考虑使用httpbin服务(http://httpbin.org)。这个站点会接收发出的请求,然后以JSON 的形式将相应信息回传回来。


>>> import requests>>> r = requests.get('http://httpbin.org/get?name=Dave&n=37',... headers = { 'User-agent': 'goaway/1.0' })>>> resp = r.json()>>> resp['headers']{'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'goaway/1.0', 'X-Amzn-Trace-Id': 'Root=1-62708c06-7c7d8cc4441479c65faea5b4'}>>>
复制代码

通过 XML-RPC 实现简单的远程调用

RPC,通俗的讲,想找到一个方式去运行在远程机器上面的 Python 程序中的函数或方法。


实现一个远程方法调用的最简单方式是使用 XML-RPC。下面实现了键 值存储功能的简单 RPC 服务器:


#!/usr/bin/env python# -*- encoding: utf-8 -*-"""@File    :   app.py@Time    :   2022/05/03 17:07:02@Author  :   Li Ruilong@Version :   1.0@Contact :   1224965096@qq.com@Desc    :   None"""
# here put the import libfrom xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer: _rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address): self._data = {} self._serv = SimpleXMLRPCServer(address, allow_none=True) # 注册方法 for name in self._rpc_methods_: self._serv.register_function(getattr(self, name))
def get(self, name): return self._data[name]
def set(self, name, value): self._data[name] = value
def delete(self, name): del self._data[name]
def exists(self, name): return name in self._data
def keys(self): return list(self._data)
def serve_forever(self): self._serv.serve_forever()
# Exampleif __name__ == '__main__': kvserv = KeyValueServer(('', 15001)) kvserv.serve_forever()
复制代码


RPC 客户端测试


PS E:\docker> pythonPython 3.9.0 (tags/v3.9.0:9cf6752, Oct  5 2020, 15:23:07) [MSC v.1927 32 bit (Intel)] on win32Type "help", "copyright", "credits" or "license" for more information.>>> from xmlrpc.client import ServerProxy>>> s = ServerProxy('http://localhost:15001', allow_none=True)>>> s.set('foo','bar')>>> s.set('spam', [1, 2, 3])>>> s.keys()['foo', 'spam']>>> s.get('foo')'bar'>>> s.get('spam')[1, 2, 3]>>> s.delete('spam')>>> s.exists('spam')False>>>
复制代码


XML-RPC 可以让很容易的构造一个简单的远程调用服务。所需要做的仅仅是创建一个服务器实例,通过它的方法register_function()来注册函数,然后使用方法serve_forever()启动它。在上面将这些步骤放在一起写到一个类中


这并不是必须的。还可以像下面这样创建一个服务器:


from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCServerdef add(x,y): return x+yserv = SimpleXMLRPCServer(('', 15000))serv.register_function(add)serv.serve_forever()
复制代码


XML-RPC 暴露出来的函数只能适用于部分数据类型,比如字符串、整形、列表和字典,不应该将 XML-RPC 服务以公共 API 的方式暴露出来。


XML-RPC 的一个缺点是它的性能。SimpleXMLRPCServer 的实现是单线程的,所以它不适合于大型程序


由于 XML-RPC 将所有数据都序列化为 XML 格式,所以它会比其他的方式运行的慢一些。但是它也有优点,这种方式的编码可以被绝大部分其他编程语言支持。通过使用这种方式,其他语言的客户端程序都能访问的服务。

通过 multiprocessing 实现 RPC 调用

在一个消息传输层如 sockets、multiprocessing.connections或zeroMQ的基础之上实现一个简单的远程过程调用(RPC)


函数请求参数返回值使用pickle编码后,在不同的解释器直接传送pickle字节字符串,可以很容易的实现 RPC。下面是一个简单的PRC处理器,可以被整合到一个服务器中去:


RPC 服务端


#!/usr/bin/env python# -*- encoding: utf-8 -*-"""@File    :   rpcserver.py@Time    :   2022/07/08 20:16:21@Author  :   Li Ruilong@Version :   1.0@Contact :   1224965096@qq.com@Desc    :   远程调用服务"""
# here put the import libimport picklefrom multiprocessing.connection import Listenerfrom threading import Thread

"""@Time : 2022/07/08 20:28:02@Author : Li Ruilong@Version : 1.0@Desc : None Args: 远程调用处理器 Returns: void"""
class RPCHandler:
def __init__(self): self._functions = {}
""" @Time : 2022/07/08 20:16:47 @Author : Li Ruilong @Version : 1.0 @Desc : 函数注册 Args: func Returns: void """
def register_function(self, func): self._functions[func.__name__] = func
""" @Time : 2022/07/08 20:17:51 @Author : Li Ruilong @Version : 1.0 @Desc : 调用函数 Args: connection Returns: void """ def handle_connection(self, connection): try: while True: func_name, args, kwargs = pickle.loads(connection.recv()) try: print("调用函数:",(func_name, args, kwargs)) r = self._functions[func_name](*args,**kwargs) print("返回结果:",r) connection.send(pickle.dumps(r)) except Exception as e: connection.send(pickle.dumps(e)) except Exception as e: pass

def rpc_server(handler, address, authkey): sock = Listener(address, authkey=authkey) while True: client = sock.accept() t = Thread(target=handler.handle_connection, args=(client,)) t.daemon = True print("函数开始执行") t.start()

def add(x, y): return x + y

def sub(x, y): return x - y
if __name__ == '__main__': print(format("开始加载RPC处理器",'》<20')) handler = RPCHandler() print(format("处理器加载完成,注册函数",'》<20')) handler.register_function(add) handler.register_function(sub) print(format("函数注册成功,服务启动",'》<20')) rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')
复制代码


RPC 客户端


import pickle
from multiprocessing.connection import Client

class RPCProxy: def __init__(self, connection): self._connection = connection def __getattr__(self, name): print("开始调用函数",name) def do_rpc(*args, **kwargs): self._connection.send(pickle.dumps((name, args, kwargs))) result = pickle.loads(self._connection.recv()) print("返回结果",result) if isinstance(result, Exception): raise result return result
return do_rpc

c = Client(('localhost', 17000), authkey=b'peekaboo')print(format("建立连接,创建RPC代理",'》<30'),c)proxy = RPCProxy(c)print(format("创建代理成功",'》<30'))print("add(2, 3) = ",proxy.add(2, 3) )print("sub(2, 3) = ", proxy.sub(2, 3))

复制代码


D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/rpcserver.py开始加载RPC处理器》》》》》》》》》》处理器加载完成,注册函数》》》》》》》》函数注册成功,服务启动》》》》》》》》》函数开始执行调用函数: ('add', (2, 3), {})返回结果: 5调用函数: ('sub', (2, 3), {})返回结果: -1

==============D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/RPC.py建立连接,创建RPC代理》》》》》》》》》》》》》》》》》》 <multiprocessing.connection.Connection object at 0x00DFACA0>创建代理成功》》》》》》》》》》》》》》》》》》》》》》》》开始调用函数 add返回结果 5add(2, 3) = 5开始调用函数 sub返回结果 -1sub(2, 3) = -1
Process finished with exit code 0
复制代码


RPCHandler和RPCProxy的基本思路是很比较简单的。


如果一个客户端想要调用一个远程函数,比如 foo(1,2,z=3),代理类创建一个包含了函数名和参数的元组(foo',(1,2),{'z':3})。这个元组被 pickle 序列化后通过网络连接发生出去。


由于底层需要依赖 pickle,那么安全问题就需要考虑了(因为一个聪明的黑客可以创建特定的消息,能够让任意函数通过 pickle 反序列化后被执行)。


因此永远不要允许来自不信任或未认证的客户端的RPC。特别是绝对不要允许来自 Internet 的任意机器的访问,这种只能在内部被使用,位于防火墙后面并且不要对外暴露。


作为 pickle 的替代,也许可以考虑使用 JSON、XML 或一些其他的编码格式来序列化消息。


例如,本机实例可以很容易的改写成 JSON 编码方案。还需要将pickle.1oads()和pickle.dumps()替换成json.1oads()和json.dumps()即可:



# here put the import libimport json........ def handle_connection(self, connection): try: while True: # 反序列化 func_name, args, kwargs = json.loads(connection.recv()) try: print("调用函数:",(func_name, args, kwargs)) r = self._functions[func_name](*args,**kwargs) print("返回结果:",r) # 序列化发送 connection.send(json.dumps(r)) except Exception as e: connection.send(json.dumps(e)) except Exception as e: pass
......
复制代码


import json
from multiprocessing.connection import Client

class RPCProxy: def __init__(self, connection): self._connection = connection def __getattr__(self, name): print("开始调用函数",name) def do_rpc(*args, **kwargs): print("JSON 序列化后的值",json.dumps((name, args, kwargs))) self._connection.send(json.dumps((name, args, kwargs))) result = json.loads(self._connection.recv()) print("返回结果",result) if isinstance(result, Exception): raise result return result
return do_rpc

c = Client(('localhost', 17000), authkey=b'peekaboo')print(format("建立连接,创建RPC代理",'》<30'),c)proxy = RPCProxy(c)print(format("创建代理成功",'》<30'))print("add(2, 3) = ",proxy.add(2, 3) )print("sub(2, 3) = ", proxy.sub(2, 3))
复制代码


可以看到序列化后的结果


D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/RPC.py建立连接,创建RPC代理》》》》》》》》》》》》》》》》》》 <multiprocessing.connection.Connection object at 0x0078AD30>创建代理成功》》》》》》》》》》》》》》》》》》》》》》》》开始调用函数 addJSON 序列化后的值 ["add", [2, 3], {}]返回结果 5add(2, 3) =  5开始调用函数 subJSON 序列化后的值 ["sub", [2, 3], {}]返回结果 -1sub(2, 3) =  -1
复制代码


实现 RPC 的一个比较复杂的问题是如何去处理异常。至少,当方法产生异常时服务器不应该奔溃。因此,返回给客户端的异常所代表的含义就要好好设计了。


如果使用 pickle,异常对象实例在客户端能被反序列化并抛出。如果使用其他的协议,那得想想另外的方法了。不过至少,应该在响应中返回异常字符串。在 JSON 的例子中就是使用的这种方式。

博文内容参考




发布于: 刚刚阅读数: 3
用户头像

CSDN博客专家,华为云云享专家,RHCE/CKA认证 2022.01.04 加入

Java 后端一枚,技术不高,前端、Shell、Python 也可以写一点.纯种屌丝,不热爱生活,热爱学习,热爱工作,喜欢一直忙,不闲着。喜欢篆刻,喜欢吃好吃的,喜欢吃饱了晒太阳。

评论

发布
暂无评论
Python实战之用内置模块来构建REST服务、RPC服务_RPC_山河已无恙_InfoQ写作社区