本篇为【写给 go 开发者的 gRPC 教程】系列第三篇
第一篇:protobuf基础
第二篇:通信模式
第三篇:拦截器
gRPC
的拦截器和其他框架的拦截器(也称 middleware)作用是一样的。利用拦截器我们可以在不侵入业务逻辑的前提下修改或者记录服务端或客户端的请求与响应,利用拦截器我们可以实现诸如日志记录、权限认证、限流等诸多功能
上一篇提到gRPC
的通信模式分为unary
和streaming
几种模式,拦截器也分为两种:unary interceptors
和streaming interceptors
,两种拦截器可以分别应用在服务端和客户端,所以 gRPC 总共为我们提供了四种拦截器。它们已经被定义成了 go 中的接口,我们创建的拦截器只要实现这些接口即可
服务端拦截器
服务端的拦截器从请求开始按顺序执行拦截器,在执行完对应 RPC 的逻辑之后,再按反向的顺序执行拦截器中对响应的处理逻辑
unary interceptors
对于unary
服务的拦截器只需实现UnaryServerInterceptor
接口即可
func(ctx context.Context, req interface{},
info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
复制代码
ctx context.Context
:单个请求的上下文
req interface{}
:RPC 服务的请求结构体
info *UnaryServerInfo
:RPC 的服务信息
handler UnaryHandler
:它包装了服务实现,通过调用它我们可以完成 RPC 并获取到响应
参数看不懂没关系,我们来看一个例子
示例
// 实现 unary interceptors
func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Pre-processing logic
s := time.Now()
// Invoking the handler to complete the normal execution of a unary RPC.
m, err := handler(ctx, req)
// Post processing logic
log.Printf("Method: %s, req: %s, resp: %s, latency: %s\n",
info.FullMethod, req, m, time.Now().Sub(s))
return m, err
}
func main() {
s := grpc.NewServer(
// 使用 unary interceptors
grpc.UnaryInterceptor(orderUnaryServerInterceptor),
)
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
// ...
}
复制代码
完整代码参考:https://github.com/liangwt/grpc-example/tree/main/06-interceptors/server。下同
假设我们的客户端请求了GetOrder
,根据示例再重新看下拦截器接口的每一个参数
🌲 req interface{}
RPC 服务的请求结构体,对于GetOrder
来说就是orderId *wrapperspb.StringValue
🌲 info *UnaryServerInfo
包含两个字段:
FullMethod
是请求的 method 名字(例如/ecommerce.OrderManagement/getOrder
);
Server
就是服务实现(就是示例RegisterOrderManagementServer
中的&OrderManagementImpl{}
)
🌲 handler
包装了服务实现
所以在调用它之前我们可以进行改写req
或ctx
、记录逻辑开始时间等操作
调用完handler
即完成了 RPC 并获取到响应,我们不仅可以记录响应还可以改写响应
总结
这张图大致展示了UnaryServerInterceptor
接口的每个参数的含义
streaming interceptors
对于stream
服务的拦截器只要实现StreamServerInterceptor
接口即可。它适用于我们上一篇介绍的
服务器端流式 RPC
客户端流式 RPC
双向流式 RPC
func(srv interface{}, ss ServerStream,
info *StreamServerInfo, handler StreamHandler) error
复制代码
srv interface{}
:服务实现
ss ServerStream
:服务端视角的流。怎么理解呢?无论是哪一种流式 RPC 对于服务端来说发送(SendMsg
)就代表着响应数据,接收(RecvMsg
)就代表着请求数据,不同的流式 RPC 的区别就在于是多次发送数据(服务器端流式 RPC)还是多次接收数据(客户端流式 RPC)或者两者均有(双向流式 RPC)。因此仅使用这一个抽象就代表了所有的流式 RPC 场景
info *StreamServerInfo
:RPC 的服务信息
handler StreamHandler
:它包装了服务实现,通过调用它我们可以完成 RPC
示例
我们来看一个例子
func orderStreamServerInterceptor(srv interface{},
ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Pre-processing logic
s := time.Now()
// Invoking the StreamHandler to complete the execution of RPC invocation
err := handler(srv, ss)
// Post processing logic
log.Printf("Method: %s, latency: %s\n", info.FullMethod, time.Now().Sub(s))
return err
}
func main() {
s := grpc.NewServer(
grpc.StreamInterceptor(orderStreamServerInterceptor),
)
pb.RegisterOrderManagementServer(s, &OrderManagementImpl{})
//...
}
复制代码
根据示例再重新看下拦截器接口的参数
🌲 srv interface{}
服务实现(就是示例RegisterOrderManagementServer
中的&OrderManagementImpl{}
)
🌲 ss grpc.ServerStream
服务端发送和接收数据的接口,注意它是一个接口
🌲 info *grpc.StreamServerInfo
包含三个字段:
FullMethod
是请求的 method 名字(例如/ecommerce.OrderManagement/updateOrders
);
IsClientStream
是否是客户端流
IsServerStream
是否是服务端流
🌲 handler
包装了服务实现
所以在调用它之前我们可以进行改写数据流、记录逻辑开始时间等操作
调用完handler
即完成了 RPC,因为是流式调用所以不会返回响应数据,只有error
流式拦截器既没有请求字段,handler
也不会返回响应,该如何记录、修改请求响应呢?
如果想劫持流数据,答案就在ss ServerStream
。再重复一遍它的含义:服务端视角的流,它是一个接口。无论是哪一种流式 RPC 对于服务端来说发送(SendMsg
)就代表着响应数据,接收(RecvMsg
)就代表着请求数据,不同的流式 RPC 的区别就在于是多次发送数据(服务器端流式 RPC)还是多次接收数据(客户端流式 RPC)或者两者均有(双向流式 RPC)。因此可以对ss
进行包装,只要传入handler
的类型实现ServerStream
即可
// SendMsg method call.
type wrappedStream struct {
Recv []interface{}
Send []interface{}
grpc.ServerStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
w.Recv = append(w.Recv, m)
return err
}
func (w *wrappedStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
w.Send = append(w.Send, m)
return err
}
func newWrappedStream(s grpc.ServerStream) *wrappedStream {
return &wrappedStream{
make([]interface{}, 0),
make([]interface{}, 0),
s,
}
}
func orderStreamServerInterceptor(srv interface{},
ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// Pre-processing logic
s := time.Now()
// Invoking the StreamHandler to complete the execution of RPC invocation
nss := newWrappedStream(ss)
err := handler(srv, nss)
// Post processing logic
log.Printf("Method: %s, req: %+v, resp: %+v, latency: %s\n",
info.FullMethod, nss.Recv, nss.Send, time.Now().Sub(s))
return err
}
复制代码
客户端拦截器
客户端拦截器和服务端拦截器类似,从请求开始按顺序执行拦截器,在获取到服务端响应之后,再按反向的顺序执行拦截器中对响应的处理逻辑
unary interceptors
client 端要实现UnaryClientInterceptor
接口实现的接口如下
func(ctx context.Context, method string, req, reply interface{},
cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
复制代码
你可以在调用远程函数前拦截 RPC,通过获取 RPC 相关信息,如参数,上下文,函数名,请求等,你甚至可以修改原始的远程调用
ctx context.Context
:单个请求的上下文
method string
:请求的 method 名字(例如/ecommerce.OrderManagement/getOrder
)
req, reply interface{}
:请求和响应数据
cc *ClientConn
:客户端与服务端的链接
invoker UnaryInvoker
:通过调用它我们可以完成 RPC 并获取到响应
opts ...CallOption
:RPC 调用的所有配置项,包含设置到 conn 上的,也包含配置在每一个调用上的
示例
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// Pre-processor phase
s := time.Now()
// Invoking the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
// Post-processor phase
log.Printf("method: %s, req: %s, resp: %s, latency: %s\n",
method, req, reply, time.Now().Sub(s))
return err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
)
if err != nil {
panic(err)
}
c := pb.NewOrderManagementClient(conn)
// ...
}
复制代码
根据示例再重新看下拦截器接口的参数
🌲 cc *grpc.ClientConn
客户端与服务端的链接
这里的cc
就是示例代码中c := pb.NewOrderManagementClient(conn)
的conn
🌲 invoker grpc.UnaryInvoker
包装了服务实现
调用完invoker
即完成了 RPC,所以我们可以改写req
或者在获取到reply
之后修改响应
streaming interceptors
要实现的接口StreamClientInterceptor
func(ctx context.Context, desc *StreamDesc, cc *ClientConn,
method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
复制代码
和 serve 端类似的参数类似,重点关注下面几个参数
示例
这里不再赘述,可以参考服务端拦截器
func orderStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing logic
s := time.Now()
cs, err := streamer(ctx, desc, cc, method, opts...)
// Post processing logic
log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))
return cs, err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)
if err != nil {
panic(err)
}
c := pb.NewOrderManagementClient(conn)
// ...
}
复制代码
如何记录或者修改流拦截器的请求响应数据?
和服务端stream interceptor
同样的道理,通过包装ClientStream
即可做到
// SendMsg method call.
type wrappedStream struct {
method string
grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)
log.Printf("method: %s, res: %s\n", w.method, m)
return err
}
func (w *wrappedStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
log.Printf("method: %s, req: %s\n", w.method, m)
return err
}
func newWrappedStream(method string, s grpc.ClientStream) *wrappedStream {
return &wrappedStream{
method,
s,
}
}
func orderStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing logic
s := time.Now()
cs, err := streamer(ctx, desc, cc, method, opts...)
// Post processing logic
log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))
return newWrappedStream(method, cs), err
}
复制代码
拦截器链
服务器只能配置一个 unary interceptor
和 stream interceptor
,否则会报错,客户端也是,虽然不会报错,但是只有最后一个才起作用。
// 服务端拦截器
s := grpc.NewServer(
grpc.UnaryInterceptor(orderUnaryServerInterceptor),
grpc.StreamInterceptor(orderStreamServerInterceptor),
)
复制代码
// 客户端拦截器
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(orderUnaryClientInterceptor),
grpc.WithStreamInterceptor(orderStreamClientInterceptor),
)
复制代码
如果你想配置多个,可以使用拦截器链或者自己实现一个。
// 服务端拦截器
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
orderUnaryServerInterceptor1,
orderUnaryServerInterceptor2,
),
grpc.ChainStreamInterceptor(
orderServerStreamInterceptor1,
orderServerStreamInterceptor2,
),
)
复制代码
// 客户端拦截器
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithChainUnaryInterceptor(
orderUnaryClientInterceptor1,
orderUnaryClientInterceptor2,
),
grpc.WithChainStreamInterceptor(
orderStreamClientInterceptor1,
orderStreamClientInterceptor2,
),
)
复制代码
生态
除了可以自己实现拦截器外,gRPC 生态也提供了一系列的开源的拦截器可供使用,覆盖权限、日志、监控等诸多方面
https://github.com/grpc-ecosystem/go-grpc-middleware
Auth
Logging
grpc_ctxtags
- a library that adds a Tag
map to context, with data populated from request body
grpc_zap
- integration of zap logging library into gRPC handlers.
grpc_logrus
- integration of logrus logging library into gRPC handlers.
grpc_kit
- integration of go-kit/log logging library into gRPC handlers.
grpc_grpc_logsettable
- a wrapper around grpclog.LoggerV2
that allows to replace loggers in runtime (thread-safe).
Monitoring
grpc_prometheus
⚡ - Prometheus client-side and server-side monitoring middleware
otgrpc
⚡ - OpenTracing client-side and server-side interceptors
grpc_opentracing
- OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
Client
Server
grpc_validator
- codegen inbound message validation from .proto
options
grpc_recovery
- turn panics into gRPC errors
ratelimit
- grpc rate limiting by your own limiter
示例代码
https://github.com/liangwt/grpc-example/tree/main/06-interceptors
微信公众号【凉凉的知识库】同步更新,欢迎关注获取最新最有用的后端知识 ✨
评论