写点什么

tinyrpc 源码阅读

作者:骑牛上青山
  • 2023-01-14
    上海
  • 本文字数:6433 字

    阅读完需:约 21 分钟

tinyrpc是一个高性能的基于protocol buffer的 rpc 框架。项目代码非常少,很适合初学者进行 golang 的学习。

tinyrpc 功能

tinyrpc基于 TCP 协议,支持各种压缩格式,基于protocol buffer的序列化协议。其 rpc 是基于 golang 原生的net/rpc开发而成。

tinyrpc 项目结构

tinyrpc基于net/rpc开发而成,在此基础上集成了额外的能力。项目结构如图:



功能目录如下:


  • codec 编码模块

  • compressor 压缩模块

  • header 请求/响应头模块

  • protoc-gen-tinyrpc 代码生成插件

  • serializer 序列化模块

tinyrpc 源码解读

客户端和服务端构建

客户端是以net/rpcrpc.Client为基础构建,在此基础上定义了Option以配置压缩方式和序列化方式:


type Option func(o *options)
type options struct { compressType compressor.CompressType serializer serializer.Serializer}
复制代码


在创建客户端的时候将配置好的压缩算法和序列化方式作为创建客户端的参数:


func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {    options := options{        compressType: compressor.Raw,        serializer:   serializer.Proto,    }    for _, option := range opts {        option(&options)    }    return &Client{rpc.NewClientWithCodec(        codec.NewClientCodec(conn, options.compressType, options.serializer))}}
复制代码


服务端是以net/rpcrpc.Server为基础构建,在此基础上扩展了Server的定义:


type Server struct {    *rpc.Server    serializer.Serializer}
复制代码


在创建客户端和开启服务时传入序列化方式:


func NewServer(opts ...Option) *Server {    options := options{        serializer: serializer.Proto,    }    for _, option := range opts {        option(&options)    }
return &Server{&rpc.Server{}, options.serializer}}
func (s *Server) Serve(lis net.Listener) { log.Printf("tinyrpc started on: %s", lis.Addr().String()) for { conn, err := lis.Accept() if err != nil { continue } go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer)) }}
复制代码

压缩算法 compressor

压缩算法的实现中首先是定义了压缩的接口:


type Compressor interface {    Zip([]byte) ([]byte, error)    Unzip([]byte) ([]byte, error)}
复制代码


压缩的接口包含压缩和解压方法。


压缩算法使用的是uint类型,使用iota来初始化,并且使用 map 来进行所有压缩算法实现的管理:


type CompressType uint16
const ( Raw CompressType = iota Gzip Snappy Zlib)
// Compressors which supported by rpcvar Compressors = map[CompressType]Compressor{ Raw: RawCompressor{}, Gzip: GzipCompressor{}, Snappy: SnappyCompressor{}, Zlib: ZlibCompressor{},}
复制代码

序列化 serializer

序列化部分代码非常简单,提供了一个接口:


type Serializer interface {    Marshal(message interface{}) ([]byte, error)    Unmarshal(data []byte, message interface{}) error}
复制代码


目前只有ProtoSerializer一个实现,ProtoSerializer内部的实现是基于"google.golang.org/protobuf/proto"来实现的,并没有什么特殊的处理,因此就不花费笔墨详述了。

请求/响应头 header

tinyrpc定义了自己的请求头和响应头:


// RequestHeader request header structure looks like:// +--------------+----------------+----------+------------+----------+// | CompressType |      Method    |    ID    | RequestLen | Checksum |// +--------------+----------------+----------+------------+----------+// |    uint16    | uvarint+string |  uvarint |   uvarint  |  uint32  |// +--------------+----------------+----------+------------+----------+type RequestHeader struct {    sync.RWMutex    CompressType compressor.CompressType    Method       string    ID           uint64    RequestLen   uint32    Checksum     uint32}
复制代码


请求头由压缩类型,方法,id,请求长度和校验码组成。


// ResponseHeader request header structure looks like:// +--------------+---------+----------------+-------------+----------+// | CompressType |    ID   |      Error     | ResponseLen | Checksum |// +--------------+---------+----------------+-------------+----------+// |    uint16    | uvarint | uvarint+string |    uvarint  |  uint32  |// +--------------+---------+----------------+-------------+----------+type ResponseHeader struct {    sync.RWMutex    CompressType compressor.CompressType    ID           uint64    Error        string    ResponseLen  uint32    Checksum     uint32}
复制代码


响应头由压缩类型,id,错误信息,返回长度和校验码组成。


为了实现头的重用,tinyrpc为头构建了缓存池:


var (    RequestPool  sync.Pool    ResponsePool sync.Pool)
func init() { RequestPool = sync.Pool{New: func() interface{} { return &RequestHeader{} }} ResponsePool = sync.Pool{New: func() interface{} { return &ResponseHeader{} }}}
复制代码


在使用时 get 出来,生命周期结束后放回池子,并且在 put 之前需要进行重置:


    h := header.RequestPool.Get().(*header.RequestHeader)    defer func() {        h.ResetHeader()        header.RequestPool.Put(h)    }()
复制代码


// ResetHeader reset request headerfunc (r *RequestHeader) ResetHeader() {    r.Lock()    defer r.Unlock()    r.ID = 0    r.Checksum = 0    r.Method = ""    r.CompressType = 0    r.RequestLen = 0}
// ResetHeader reset response headerfunc (r *ResponseHeader) ResetHeader() { r.Lock() defer r.Unlock() r.Error = "" r.ID = 0 r.CompressType = 0 r.Checksum = 0 r.ResponseLen = 0}
复制代码


搞清楚了头的结构以及对象池的复用逻辑,那么具体的头的编码与解码就是很简单的拆装工作,就不在此一行一行解析了,大家有兴趣可以自行去阅读。

编码 codec

由于tinyrpc是基于net/rpc开发,那么其codec模块自然也是依赖于net/rpcClientCodecServerCodec接口来实现的。

客户端实现

客户端是基于ClientCodec实现的能力:


type ClientCodec interface {    WriteRequest(*Request, any) error    ReadResponseHeader(*Response) error    ReadResponseBody(any) error
Close() error}
复制代码


client定义了一个clientCodec类型,并且实现了ClientCodec的接口方法:


type clientCodec struct {    r io.Reader    w io.Writer    c io.Closer
compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib) serializer serializer.Serializer response header.ResponseHeader // rpc response header mutex sync.Mutex // protect pending map pending map[uint64]string}
复制代码


WriteRequest实现:


// WriteRequest Write the rpc request header and body to the io streamfunc (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {    c.mutex.Lock()    c.pending[r.Seq] = r.ServiceMethod    c.mutex.Unlock()
if _, ok := compressor.Compressors[c.compressor]; !ok { return NotFoundCompressorError } reqBody, err := c.serializer.Marshal(param) if err != nil { return err } compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody) if err != nil { return err } h := header.RequestPool.Get().(*header.RequestHeader) defer func() { h.ResetHeader() header.RequestPool.Put(h) }() h.ID = r.Seq h.Method = r.ServiceMethod h.RequestLen = uint32(len(compressedReqBody)) h.CompressType = compressor.CompressType(c.compressor) h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
if err := sendFrame(c.w, h.Marshal()); err != nil { return err } if err := write(c.w, compressedReqBody); err != nil { return err }
c.w.(*bufio.Writer).Flush() return nil}
复制代码


可以看到代码的实现还是比较清晰的,主要分为几个步骤:


  1. 将数据进行序列化构成请求体

  2. 选择相应的压缩算法进行压缩

  3. 从 Pool 中获取请求头实例将数据全部填入其中构成最后的请求头

  4. 分别通过 io 操作发送处理过的请求头和请求体


ReadResponseHeader实现:


// ReadResponseHeader read the rpc response header from the io streamfunc (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {    c.response.ResetHeader()    data, err := recvFrame(c.r)    if err != nil {        return err    }    err = c.response.Unmarshal(data)    if err != nil {        return err    }    c.mutex.Lock()    r.Seq = c.response.ID    r.Error = c.response.Error    r.ServiceMethod = c.pending[r.Seq]    delete(c.pending, r.Seq)    c.mutex.Unlock()    return nil}
复制代码


此方法作用是读取返回的响应头,并解析成具体的结构体


ReadResponseBody实现:


func (c *clientCodec) ReadResponseBody(param interface{}) error {    if param == nil {        if c.response.ResponseLen != 0 {            if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {                return err            }        }        return nil    }
respBody := make([]byte, c.response.ResponseLen) err := read(c.r, respBody) if err != nil { return err }
if c.response.Checksum != 0 { if crc32.ChecksumIEEE(respBody) != c.response.Checksum { return UnexpectedChecksumError } }
if c.response.GetCompressType() != c.compressor { return CompressorTypeMismatchError }
resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody) if err != nil { return err }
return c.serializer.Unmarshal(resp, param)}
复制代码


此方法是用于读取返回的响应结构体,流程如下:


  1. 读取流获取响应体

  2. 根据响应头中的校验码来比对响应体是否完整

  3. 根据压缩算法来解压具体的结构体

  4. 进行反序列化

服务端实现

服务端是基于ServerCodec实现的能力:


type ServerCodec interface {    ReadRequestHeader(*Request) error    ReadRequestBody(any) error    WriteResponse(*Response, any) error
// Close can be called multiple times and must be idempotent. Close() error}
复制代码


和客户端类似,server定义了一个serverCodec类型,并且实现了ServerCodec的接口方法:


type serverCodec struct {    r io.Reader    w io.Writer    c io.Closer
request header.RequestHeader serializer serializer.Serializer mutex sync.Mutex // protects seq, pending seq uint64 pending map[uint64]*reqCtx}
复制代码


ReadRequestHeader实现:


// ReadRequestHeader read the rpc request header from the io streamfunc (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {    s.request.ResetHeader()    data, err := recvFrame(s.r)    if err != nil {        return err    }    err = s.request.Unmarshal(data)    if err != nil {        return err    }    s.mutex.Lock()    s.seq++    s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}    r.ServiceMethod = s.request.Method    r.Seq = s.seq    s.mutex.Unlock()    return nil}
复制代码


此方法用于读取请求头并解析成结构体


ReadRequestBody实现:


// ReadRequestBody read the rpc request body from the io streamfunc (s *serverCodec) ReadRequestBody(param interface{}) error {    if param == nil {        if s.request.RequestLen != 0 {            if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {                return err            }        }        return nil    }
reqBody := make([]byte, s.request.RequestLen)
err := read(s.r, reqBody) if err != nil { return err }
if s.request.Checksum != 0 { if crc32.ChecksumIEEE(reqBody) != s.request.Checksum { return UnexpectedChecksumError } }
if _, ok := compressor. Compressors[s.request.GetCompressType()]; !ok { return NotFoundCompressorError }
req, err := compressor. Compressors[s.request.GetCompressType()].Unzip(reqBody) if err != nil { return err }
return s.serializer.Unmarshal(req, param)}
复制代码


此方法用于读取请求体,流程和读取响应体差不多,大致如下:


  1. 读取流并解析成请求体

  2. 根据请求头中的校验码进行校验

  3. 根据压缩算法进行解压

  4. 反序列化


WriteResponse实现:


// WriteResponse Write the rpc response header and body to the io streamfunc (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {    s.mutex.Lock()    reqCtx, ok := s.pending[r.Seq]    if !ok {        s.mutex.Unlock()        return InvalidSequenceError    }    delete(s.pending, r.Seq)    s.mutex.Unlock()
if r.Error != "" { param = nil } if _, ok := compressor. Compressors[reqCtx.compareType]; !ok { return NotFoundCompressorError }
var respBody []byte var err error if param != nil { respBody, err = s.serializer.Marshal(param) if err != nil { return err } }
compressedRespBody, err := compressor. Compressors[reqCtx.compareType].Zip(respBody) if err != nil { return err } h := header.ResponsePool.Get().(*header.ResponseHeader) defer func() { h.ResetHeader() header.ResponsePool.Put(h) }() h.ID = reqCtx.requestID h.Error = r.Error h.ResponseLen = uint32(len(compressedRespBody)) h.Checksum = crc32.ChecksumIEEE(compressedRespBody) h.CompressType = reqCtx.compareType
if err = sendFrame(s.w, h.Marshal()); err != nil { return err }
if err = write(s.w, compressedRespBody); err != nil { return err } s.w.(*bufio.Writer).Flush() return nil}
复制代码


此方法用于写入响应体,大致与写入请求体差不多,流程如下:


  1. 将响应体序列化

  2. 使用压缩算法将响应体进行压缩

  3. 使用 Pool 管理响应头

  4. 分别发送返回头和返回体

总结

tinyrpc是基于golang原生的net/rpc包实现,在此基础上实现了压缩和序列化等能力扩展。整体来看tinyrpc的代码非常简单,比较适合刚接触golang的程序员来进行阅读学习,学习一些golang的基础的开发技巧和一些语言特性。


发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2021-05-18 加入

还未添加个人简介

评论

发布
暂无评论
tinyrpc源码阅读_golang_骑牛上青山_InfoQ写作社区