写点什么

PD api 基础框架源码分析

  • 2022 年 7 月 11 日
  • 本文字数:4000 字

    阅读完需:约 13 分钟

作者: 薛港 - 移动云原文来源:https://tidb.net/blog/df7a1570


PD 提供两类 API,一类是 restful api ,一例是 grpc 服务。这两类服务都依赖于 ETCD。也就是依赖 ETCD 实例提供 restful 以及 grpc 服务。我们要做的就是启动 ETCD 的时候,参数配置里设置以下两类参数:


// Config holds the arguments for configuring an etcd


// server.


type Config struct {


// UserHandlers is for registering users handlers and only used


// for


// embedding etcd into other applications.


// The map key is the route path for the handler, and


// you must ensure it can’t be conflicted with etcd’s.


UserHandlers map[string]http.Handler json:"-"


// ServiceRegister is for registering users’ gRPC services. A


// simple usage example:


// cfg := embed.NewConfig()


// cfg.ServerRegister = func(s *grpc.Server) {


// pb.RegisterFooServer(s, &fooServer{})


// pb.RegisterBarServer(s, &barServer{})


// }


// embed.StartEtcd(cfg)


ServiceRegister func(*grpc.Server) json:"-"


}


ETCD 里这两个参数初始代码如下:


// CreateServer creates the UNINITIALIZED pd server with


// given configuration.


func CreateServer(ctx context.Context, cfg *config.Config,


serviceBuilders …HandlerBuilder) (*Server, error) {


s := &Server{


cfg: cfg,


persistOptions: config.NewPersistOptions(cfg),


member: &member.Member{},


ctx: ctx,


startTimestamp: time.Now().Unix(),


}


// Adjust etcd config.


etcdCfg, err := s.cfg.GenEmbedEtcdConfig()


if len(serviceBuilders) != 0 {


userHandlers, err := combineBuilderServerHTTPService(ctx,


s, serviceBuilders…)


etcdCfg.UserHandlers = userHandlers


}


etcdCfg.ServiceRegister = func(gs *grpc.Server) {


pdpb.RegisterPDServer(gs, s)


diagnosticspb.RegisterDiagnosticsServer(gs, s)


}


}


先分析 combineBuilderServerHTTPService(ctx, s, serviceBuilders…),这个函数生成 UserHanders 对象,用于提供 restful api 服务


这个函数接受 HandlerBuilder 参数,返回一个 map 结构,key 对应 api path, value 对应 http.handle 用于处理 restful 对应请求的处理逻辑。


这个函数本身没什么特别的,就是基于 ServiceGroup 信息,给一类 API 引入异常恢复中间件,依赖于第三方库(github.com/urfave/negroni)


func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBuilders …HandlerBuilder) (map[string]http.Handler, error) {


}


combineBuilderServerHTTPService 的重点是参数 serviceBuilders , 这是一个 slice,元素类型是函数定义,ServiceGroup 用于生成对应的 api 的 path,http.Handle 用于 restful 函数请求处理


// HandlerBuilder builds a server HTTP handler.


type HandlerBuilder func(context.Context, *Server) (http.Handler, ServiceGroup, error)


下面我们分析 serviceBuilders 参数实例化代码,


serviceBuilders := []server.HandlerBuilder{api.NewHandler,


swaggerserver.NewHandler, autoscaling.NewHandler}


serviceBuilders = append(serviceBuilders,


dashboard.GetServiceBuilders()…)


svr, err := server.CreateServer(ctx, cfg, serviceBuilders…)


serviceBuilders 是一个 slice, 每一个元素都是一个函数定义,返回 api 处理 handle,以及用于决定 api path 的数据结构 serviceGroup.


我们重点分析 slice 元素 api.NewHandler,然后一通都通, 其它 api 定义也是类似框架:


// NewHandler creates a HTTP handler for API.


func NewHandler(ctx context.Context, svr *server.Server) (http.Handler, server.ServiceGroup, error) {


group := server.ServiceGroup{


Name: “core”,


IsCore: true,


}


router := mux.NewRouter()


r := createRouter(ctx, apiPrefix, svr)


router.PathPrefix(apiPrefix).Handler(negroni.New(


serverapi.NewRuntimeServiceValidator(svr, group),


serverapi.NewRedirector(svr),


negroni.Wrap®),


)


  return router, group, nil}
复制代码


重点分析 createRouter(ctx, apiPrefix, svr),这个函数生成一个 route, 包含 restful api 的 path 以及处理函数


func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.Router {


operatorHandler := newOperatorHandler(handler, rd)


apiRouter.HandleFunc(“/operators”, operatorHandler.List).Methods(“GET”)


apiRouter.HandleFunc(“/operators”, operatorHandler.Post).Methods(“POST”)


apiRouter.HandleFunc(“/operators/{region_id}“, operatorHandler.Get).Methods(“GET”)


apiRouter.HandleFunc(“/operators/{region_id}“, operatorHandler.Delete).Methods(“DELETE”)


schedulerHandler := newSchedulerHandler(svr, rd)apiRouter.HandleFunc("/schedulers", schedulerHandler.List).Methods("GET")apiRouter.HandleFunc("/schedulers", schedulerHandler.Post).Methods("POST")apiRouter.HandleFunc("/schedulers/{name}", schedulerHandler.Delete).Methods("DELETE")apiRouter.HandleFunc("/schedulers/{name}", schedulerHandler.PauseOrResume).Methods("POST") 
复制代码



}


有意思的是如下代码


router.PathPrefix(apiPrefix).Handler(negroni.New(


serverapi.NewRuntimeServiceValidator(svr,


group),


serverapi.NewRedirector(svr),


negroni.Wrap®),


)


上面代码 hander 接受一个 negroni 创建的 handle 对象,这个对象可以理解服务中间件拦截器,也就是任何一个 api 请求会依次经过前面两个 hanlde 拦截,才会被业务 route 处理。我们重点关注第二拦截器:


serverapi.NewRedirector(svr)。这个拦截器也很简单,主要检察当前接受请求的 pd 实例,是否是 leader, 如果不是,那么这个 pd 作为服务转发方,给 pd leader 转发 restful api 请求


func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {


r.Header.Set(RedirectorHeader, h.s.Name())


  leader := h.s.GetMember().GetLeader()  if leader == nil {    http.Error(w, "no leader", http.StatusServiceUnavailable)    return  }
urls, err := config.ParseUrls(strings.Join(leader.GetClientUrls(), ","))
client := h.s.GetHTTPClient() NewCustomReverseProxies(client, urls).ServeHTTP(w, r)}
复制代码


下面分析 grpc 服务,主要通过 ETCD 参数注册 grpc 服务, 下面的代码表明 Server 对象提供 grpc 服务


etcdCfg.ServiceRegister = func(gs *grpc.Server) {


pdpb.RegisterPDServer(gs, s)


diagnosticspb.RegisterDiagnosticsServer(gs, s)


}


我们先关注一下 grpc 提供服务 api,然后后面分析每个模块细节时,一个个 api 开展深入分析


type PDServer interface {


// GetMembers get the member list of this cluster. It does not require


// the cluster_id in request matchs the id of this cluster.


GetMembers(context.Context, *GetMembersRequest) (*GetMembersResponse, error)


Tso(PD_TsoServer) error


Bootstrap(context.Context, *BootstrapRequest) (*BootstrapResponse, error)


IsBootstrapped(context.Context, *IsBootstrappedRequest) (*IsBootstrappedResponse, error)


AllocID(context.Context, *AllocIDRequest) (*AllocIDResponse, error)


GetStore(context.Context, *GetStoreRequest) (*GetStoreResponse, error)


PutStore(context.Context, *PutStoreRequest) (*PutStoreResponse, error)


GetAllStores(context.Context, *GetAllStoresRequest) (*GetAllStoresResponse, error)


StoreHeartbeat(context.Context, *StoreHeartbeatRequest) (*StoreHeartbeatResponse, error)


RegionHeartbeat(PD_RegionHeartbeatServer) error


GetRegion(context.Context, *GetRegionRequest) (*GetRegionResponse, error)


GetPrevRegion(context.Context, *GetRegionRequest) (*GetRegionResponse, error)


GetRegionByID(context.Context, *GetRegionByIDRequest) (*GetRegionResponse, error)


ScanRegions(context.Context, *ScanRegionsRequest) (*ScanRegionsResponse, error)


AskSplit(context.Context, *AskSplitRequest) (*AskSplitResponse, error)


ReportSplit(context.Context, *ReportSplitRequest) (*ReportSplitResponse, error)


AskBatchSplit(context.Context, *AskBatchSplitRequest) (*AskBatchSplitResponse, error)


ReportBatchSplit(context.Context, *ReportBatchSplitRequest) (*ReportBatchSplitResponse, error)


GetClusterConfig(context.Context, *GetClusterConfigRequest) (*GetClusterConfigResponse, error)


PutClusterConfig(context.Context, *PutClusterConfigRequest) (*PutClusterConfigResponse, error)


ScatterRegion(context.Context, *ScatterRegionRequest) (*ScatterRegionResponse, error)


GetGCSafePoint(context.Context, *GetGCSafePointRequest) (*GetGCSafePointResponse, error)


UpdateGCSafePoint(context.Context, *UpdateGCSafePointRequest) (*UpdateGCSafePointResponse, error)


UpdateServiceGCSafePoint(context.Context, *UpdateServiceGCSafePointRequest) (*UpdateServiceGCSafePointResponse, error)


SyncRegions(PD_SyncRegionsServer) error


GetOperator(context.Context, *GetOperatorRequest) (*GetOperatorResponse, error)


SyncMaxTS(context.Context, *SyncMaxTSRequest) (*SyncMaxTSResponse, error)


SplitRegions(context.Context, *SplitRegionsRequest) (*SplitRegionsResponse, error)


GetDCLocationInfo(context.Context, *GetDCLocationInfoRequest) (*GetDCLocationInfoResponse, error)


}


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
PD api基础框架源码分析_TiDB 底层架构_TiDB 社区干货传送门_InfoQ写作社区