PD api 基础框架源码分析
作者: 薛港 - 移动云原文来源:https://tidb.net/blog/df7a1570
PD 提供两类 API,一类是 restful api ,一例是 grpc 服务。这两类服务都依赖于 ETCD。也就是依赖 ETCD 实例提供 restful 以及 grpc 服务。我们要做的就是启动 ETCD 的时候,参数配置里设置以下两类参数:
// Config holds the arguments for configuring an etcd
// server.
type Config struct {
// UserHandlers is for registering users handlers and only used
// for
// embedding etcd into other applications.
// The map key is the route path for the handler, and
// you must ensure it can’t be conflicted with etcd’s.
UserHandlers map[string]http.Handler json:"-"
// ServiceRegister is for registering users’ gRPC services. A
// simple usage example:
// cfg := embed.NewConfig()
// cfg.ServerRegister = func(s *grpc.Server) {
// pb.RegisterFooServer(s, &fooServer{})
// pb.RegisterBarServer(s, &barServer{})
// }
// embed.StartEtcd(cfg)
ServiceRegister func(*grpc.Server) json:"-"
}
ETCD 里这两个参数初始代码如下:
// CreateServer creates the UNINITIALIZED pd server with
// given configuration.
func CreateServer(ctx context.Context, cfg *config.Config,
serviceBuilders …HandlerBuilder) (*Server, error) {
s := &Server{
cfg: cfg,
persistOptions: config.NewPersistOptions(cfg),
member: &member.Member{},
ctx: ctx,
startTimestamp: time.Now().Unix(),
}
// Adjust etcd config.
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
if len(serviceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx,
s, serviceBuilders…)
etcdCfg.UserHandlers = userHandlers
}
etcdCfg.ServiceRegister = func(gs *grpc.Server) {
pdpb.RegisterPDServer(gs, s)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
}
}
先分析 combineBuilderServerHTTPService(ctx, s, serviceBuilders…),这个函数生成 UserHanders 对象,用于提供 restful api 服务
这个函数接受 HandlerBuilder 参数,返回一个 map 结构,key 对应 api path, value 对应 http.handle 用于处理 restful 对应请求的处理逻辑。
这个函数本身没什么特别的,就是基于 ServiceGroup 信息,给一类 API 引入异常恢复中间件,依赖于第三方库(github.com/urfave/negroni)
func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBuilders …HandlerBuilder) (map[string]http.Handler, error) {
}
combineBuilderServerHTTPService 的重点是参数 serviceBuilders , 这是一个 slice,元素类型是函数定义,ServiceGroup 用于生成对应的 api 的 path,http.Handle 用于 restful 函数请求处理
// HandlerBuilder builds a server HTTP handler.
type HandlerBuilder func(context.Context, *Server) (http.Handler, ServiceGroup, error)
下面我们分析 serviceBuilders 参数实例化代码,
serviceBuilders := []server.HandlerBuilder{api.NewHandler,
swaggerserver.NewHandler, autoscaling.NewHandler}
serviceBuilders = append(serviceBuilders,
dashboard.GetServiceBuilders()…)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders…)
serviceBuilders 是一个 slice, 每一个元素都是一个函数定义,返回 api 处理 handle,以及用于决定 api path 的数据结构 serviceGroup.
我们重点分析 slice 元素 api.NewHandler,然后一通都通, 其它 api 定义也是类似框架:
// NewHandler creates a HTTP handler for API.
func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {
group := server.ServiceGroup{
Name: “core”,
IsCore: true,
}
router := mux.NewRouter()
r := createRouter(ctx, apiPrefix, svr)
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
serverapi.NewRedirector(svr),
negroni.Wrap®),
)
重点分析 createRouter(ctx, apiPrefix, svr),这个函数生成一个 route, 包含 restful api 的 path 以及处理函数
func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.Router {
operatorHandler := newOperatorHandler(handler, rd)
apiRouter.HandleFunc(“/operators”, operatorHandler.List).Methods(“GET”)
apiRouter.HandleFunc(“/operators”, operatorHandler.Post).Methods(“POST”)
apiRouter.HandleFunc(“/operators/{region_id}“, operatorHandler.Get).Methods(“GET”)
apiRouter.HandleFunc(“/operators/{region_id}“, operatorHandler.Delete).Methods(“DELETE”)
…
}
有意思的是如下代码
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr,
group),
serverapi.NewRedirector(svr),
negroni.Wrap®),
)
上面代码 hander 接受一个 negroni 创建的 handle 对象,这个对象可以理解服务中间件拦截器,也就是任何一个 api 请求会依次经过前面两个 hanlde 拦截,才会被业务 route 处理。我们重点关注第二拦截器:
serverapi.NewRedirector(svr)。这个拦截器也很简单,主要检察当前接受请求的 pd 实例,是否是 leader, 如果不是,那么这个 pd 作为服务转发方,给 pd leader 转发 restful api 请求
func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
r.Header.Set(RedirectorHeader, h.s.Name())
下面分析 grpc 服务,主要通过 ETCD 参数注册 grpc 服务, 下面的代码表明 Server 对象提供 grpc 服务
etcdCfg.ServiceRegister = func(gs *grpc.Server) {
pdpb.RegisterPDServer(gs, s)
diagnosticspb.RegisterDiagnosticsServer(gs, s)
}
我们先关注一下 grpc 提供服务 api,然后后面分析每个模块细节时,一个个 api 开展深入分析
type PDServer interface {
// GetMembers get the member list of this cluster. It does not require
// the cluster_id in request matchs the id of this cluster.
GetMembers(context.Context, *GetMembersRequest) (*GetMembersResponse, error)
Tso(PD_TsoServer) error
Bootstrap(context.Context, *BootstrapRequest) (*BootstrapResponse, error)
IsBootstrapped(context.Context, *IsBootstrappedRequest) (*IsBootstrappedResponse, error)
AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error)
GetStore(context.Context, *GetStoreRequest) (*GetStoreResponse, error)
PutStore(context.Context, *PutStoreRequest) (*PutStoreResponse, error)
GetAllStores(context.Context, *GetAllStoresRequest) (*GetAllStoresResponse, error)
StoreHeartbeat(context.Context, *StoreHeartbeatRequest) (*StoreHeartbeatResponse, error)
RegionHeartbeat(PD_RegionHeartbeatServer) error
GetRegion(context.Context, *GetRegionRequest) (*GetRegionResponse, error)
GetPrevRegion(context.Context, *GetRegionRequest) (*GetRegionResponse, error)
GetRegionByID(context.Context, *GetRegionByIDRequest) (*GetRegionResponse, error)
ScanRegions(context.Context, *ScanRegionsRequest) (*ScanRegionsResponse, error)
AskSplit(context.Context, *AskSplitRequest) (*AskSplitResponse, error)
ReportSplit(context.Context, *ReportSplitRequest) (*ReportSplitResponse, error)
AskBatchSplit(context.Context, *AskBatchSplitRequest) (*AskBatchSplitResponse, error)
ReportBatchSplit(context.Context, *ReportBatchSplitRequest) (*ReportBatchSplitResponse, error)
GetClusterConfig(context.Context, *GetClusterConfigRequest) (*GetClusterConfigResponse, error)
PutClusterConfig(context.Context, *PutClusterConfigRequest) (*PutClusterConfigResponse, error)
ScatterRegion(context.Context, *ScatterRegionRequest) (*ScatterRegionResponse, error)
GetGCSafePoint(context.Context, *GetGCSafePointRequest) (*GetGCSafePointResponse, error)
UpdateGCSafePoint(context.Context, *UpdateGCSafePointRequest) (*UpdateGCSafePointResponse, error)
UpdateServiceGCSafePoint(context.Context, *UpdateServiceGCSafePointRequest) (*UpdateServiceGCSafePointResponse, error)
SyncRegions(PD_SyncRegionsServer) error
GetOperator(context.Context, *GetOperatorRequest) (*GetOperatorResponse, error)
SyncMaxTS(context.Context, *SyncMaxTSRequest) (*SyncMaxTSResponse, error)
SplitRegions(context.Context, *SplitRegionsRequest) (*SplitRegionsResponse, error)
GetDCLocationInfo(context.Context, *GetDCLocationInfoRequest) (*GetDCLocationInfoResponse, error)
}
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/1d047bb35c151442de0aeba03】。文章转载请联系作者。
评论