写点什么

CubeFS 源码分析系列(一) 创建 Volume

  • 2024-02-04
    浙江
  • 本文字数:7524 字

    阅读完需:约 25 分钟

CubeFS源码分析系列(一) 创建Volume

CubeFS 提供了客户端工具用于创建卷,执行的命令如下


./build/bin/cfs-cli volume create ltptest ltptest
复制代码


在这条命令发生后,由 cfs-cli 作为客户端发出请求,向配置文件中的masterAddr字段指定的服务端发起 http 请求, 服务端根据请求指令创建卷,在创建卷的过程中,CubeFS 做了什么。

1. 请求解析

CubeFS 接收到客户端请求时,首先将请求http.Request路由到/admin/createVol注册的服务端的 createVol()方法,进入到该方法后,CubeFS 首先生成 CubeFS 使用的 createVolReq 结构体信息,在这个过程中


  1. 信息映射:将 http.Request 字段中的信息通过extractUint之类的方法进行相应映射,如映射卷名称:


if req.name, err = extractName(r); err != nil {    return}
复制代码


​2.合法性校验:校验请求中createVolReq的参数合法性:如 volume 请求的容量大小size不能等于 0


if req.capacity == 0 {    return fmt.Errorf("vol capacity can't be zero, %d", req.capacity)}
复制代码

2. 创建卷使用的 Cluster 模块createVol方法

2.1 创建卷结构体Vol

在请求解析之后,真正的创建核心逻辑由 Cluster 模块的createVol方法根据请求解析生成的 req 执行,


if vol, err = m.cluster.createVol(req); err != nil {    sendErrReply(w, r, newErrHTTPReply(err))    return}
复制代码


createVol方法首先执行doCreateVol方法,在该请求中,根据 req 参数生成一个 volValue 结构体, 字段信息如下:


type volValue struct {    ID                    uint64    Name                  string    ReplicaNum            uint8    DpReplicaNum          uint8    Status                uint8    DataPartitionSize     uint64    Capacity              uint64    Owner                 string    FollowerRead          bool    Authenticate          bool    DpReadOnlyWhenVolFull bool
CrossZone bool DomainOn bool ZoneName string OSSAccessKey string OSSSecretKey string CreateTime int64 DeleteLockTime int64 Description string DpSelectorName string DpSelectorParm string DefaultPriority bool DomainId uint64 VolType int
EbsBlkSize int CacheCapacity uint64 CacheAction int CacheThreshold int CacheTTL int CacheHighWater int CacheLowWater int CacheLRUInterval int CacheRule string
EnablePosixAcl bool EnableQuota bool
EnableTransaction bsProto.TxOpMask TxTimeout int64 TxConflictRetryNum int64 TxConflictRetryInterval int64 TxOpLimit int
VolQosEnable bool DiskQosEnable bool IopsRLimit, IopsWLimit, FlowRlimit, FlowWlimit uint64 IopsRMagnify, IopsWMagnify, FlowRMagnify, FlowWMagnify uint32 ClientReqPeriod, ClientHitTriggerCnt uint32 TrashInterval int64 DisableAuditLog bool}
复制代码


并通过allocateCommonID方法生成 ID 填充volValue.ID字段,再通过newVol方法将volValue结构体转换为Vol结构体,该结构体中添加了dataPartitions字段,dataPartitions字段是一个*DataPartitionMap类型数据,用于存储所有的数据分片信息,结构体信息如下:


// DataPartitionMap stores all the data partitionMaptype DataPartitionMap struct {  sync.RWMutex  partitionMap           map[uint64]*DataPartition  readableAndWritableCnt int    // number of readable and writable partitionMap  lastLoadedIndex        uint64 // last loaded partition index  lastReleasedIndex      uint64 // last released partition index  partitions             []*DataPartition  responseCache          []byte  lastAutoCreateTime     time.Time  volName                string  readMutex              sync.RWMutex}
复制代码


其中正在的数据存储在 DataPartition 结构体,该结构体信息如下:


// DataPartition represents the structure of storing the file contents.type DataPartition struct {  PartitionID      uint64  PartitionType    int  PartitionTTL     int64  LastLoadedTime   int64  ReplicaNum       uint8  Status           int8  isRecover        bool  Replicas         []*DataReplica  LeaderReportTime int64  Hosts            []string // host addresses  Peers            []proto.Peer  offlineMutex     sync.RWMutex  sync.RWMutex  total                    uint64  used                     uint64  MissingNodes             map[string]int64 // key: address of the missing node, value: when the node is missing  VolName                  string  VolID                    uint64  modifyTime               int64  createTime               int64  lastWarnTime             int64  OfflinePeerID            uint64  FileInCoreMap            map[string]*FileInCore  FilesWithMissingReplica  map[string]int64 // key: file name, value: last time when a missing replica is found  SingleDecommissionStatus uint8  singleDecommissionChan   chan bool  SingleDecommissionAddr   string  RdOnly                   bool  addReplicaMutex          sync.RWMutex  DecommissionRetry        int  DecommissionStatus       uint32  DecommissionSrcAddr      string  DecommissionDstAddr      string  DecommissionRaftForce    bool  DecommissionSrcDiskPath  string //  DecommissionTerm         uint64 // only used for disk decommission  IsDiscard                bool}
复制代码


在获取Vol结构体后,通过结构体私有方法refreshOSSSecure1更新OSSAccessKey,OSSSecretKey字段信息

2.2 持久化卷信息

在创建Vol结构体后,CubeFS 通过Cluster模块调用syncAddVol方法, 该方法调用syncPutVolInfo方法,具体如下


// key=#vol#volID,value=json.Marshal(vv)func (c *Cluster) syncAddVol(vol *Vol) (err error) {  return c.syncPutVolInfo(opSyncAddVol, vol)}
复制代码


syncPutVolInfo方法生成一个 metadata,metadata 使用 RaftCmd 生成一个指针,RaftCmd 有 3 个字段,如下所示


type RaftCmd struct {    Op uint32 `json:"op"`    K  string `json:"k"`    V  []byte `json:"v"`}
复制代码


其中 Op 代表具体的操作,如这里为opSyncAddVol添加卷操作,K 为#vol#volID, value 为json.Marshal(vv),核心实现如下


func (c *Cluster) syncPutVolInfo(opType uint32, vol *Vol) (err error) {  metadata := new(RaftCmd)  metadata.Op = opType  metadata.K = volPrefix + strconv.FormatUint(vol.ID, 10)  vv := newVolValue(vol)  if metadata.V, err = json.Marshal(vv); err != nil {    return errors.New(err.Error())  }  return c.submit(metadata)}
复制代码


通过调用submit方法,最终调用 raftstore.Partition 提供的 Submit 方法将序列号后的 metadata 字段持久化到 rocksDB 里面,核心实现如下


func (c *Cluster) submit(metadata *RaftCmd) (err error) {  cmd, err := metadata.Marshal()  if err != nil {    return errors.New(err.Error())  }  if _, err = c.partition.Submit(cmd); err != nil {    msg := fmt.Sprintf("action[metadata_submit] err:%v", err.Error())    return errors.New(msg)  }  return}
复制代码


在提交后,将Vol结构体放入到 Cluster 模块维护的map[string]*Vol结构体中


func (c *Cluster) putVol(vol *Vol) {  c.volMutex.Lock()  defer c.volMutex.Unlock()  if _, ok := c.vols[vol.Name]; !ok {    c.vols[vol.Name] = vol  }}
复制代码

2.3 初始化元数据分片

在持久化卷 Volume 后,调用Volume模块提供的initMetaPartitions方法持久化元数据分片, 元数据分片最少为 3,最大为 100


if count < defaultInitMetaPartitionCount {  count = defaultInitMetaPartitionCount  // 3}if count > defaultMaxInitMetaPartitionCount {  count = defaultMaxInitMetaPartitionCount // 100}
复制代码

2.3.1 设置 MetaPartition 起始位置

通过调用vol.createMetaPartition(c, start, end)方法创建分片,start,end 为分片数据的起始位置,start, end 之间相差的距离为 defaultMetaPartitionInodeIDStep(默认为 1<<22)


for index := 0; index < count; index++ {    if index != 0 {      start = end + 1    }    end = gConfig.MetaPartitionInodeIdStep * uint64(index+1)    if index == count-1 {      end = defaultMaxMetaPartitionInodeID    }    if err = vol.createMetaPartition(c, start, end); err != nil {      log.LogErrorf("action[initMetaPartitions] vol[%v] init meta partition err[%v]", vol.Name, err)      break    }}
复制代码

2.3.2 生成 MetaPartition 结构体

实际的调用为createMetaPartition方法,该方法调用doCreateMetaPartition方法生成 MetaPartition 结构体,MetaPartition 结构体信息如下


type MetaPartition struct {    PartitionID      uint64    Start            uint64    End              uint64    MaxInodeID       uint64    InodeCount       uint64    DentryCount      uint64    FreeListLen      uint64    TxCnt            uint64    TxRbInoCnt       uint64    TxRbDenCnt       uint64    Replicas         []*MetaReplica    LeaderReportTime int64    ReplicaNum       uint8    Status           int8    IsRecover        bool    volID            uint64    volName          string    Hosts            []string    Peers            []proto.Peer    OfflinePeerID    uint64    MissNodes        map[string]int64    LoadResponse     []*proto.MetaPartitionLoadResponse    offlineMutex     sync.RWMutex    uidInfo          []*proto.UidReportSpaceInfo    EqualCheckPass   bool    sync.RWMutex}
复制代码

2.3.3 MetaPatition持久化

doCreateMetaPartition方法通过getHostFromDomainZone或者getHostFromNormalZone方法选择hostspeers,将创建的 MetaPartition,通过syncCreateMetaPartitionToMetaNode方法将 MetaPartition 封装成一个 AdminTask 任务,通过调用 metaNode 提供的Sender.syncSendAdminTask(tasks[0])将任务发出


func (c *Cluster) syncCreateMetaPartitionToMetaNode(host string, mp *MetaPartition) (err error) {  hosts := make([]string, 0)  hosts = append(hosts, host)  tasks := mp.buildNewMetaPartitionTasks(hosts, mp.Peers, mp.volName)  metaNode, err := c.metaNode(host)  if err != nil {    return  }  if _, err = metaNode.Sender.syncSendAdminTask(tasks[0]); err != nil {    return  }  return}
复制代码


创建完成后通过 cluster 模块提供的syncAddMetaPartition方法添加元数据信息


// key=#mp#volID#metaPartitionID,value=json.Marshal(metaPartitionValue)func (c *Cluster) syncAddMetaPartition(mp *MetaPartition) (err error) {  return c.putMetaPartitionInfo(opSyncAddMetaPartition, mp)}
复制代码


最后调用 Volume 模块提供的addMetaPartition模块添加 MetaPartition 信息。


func (vol *Vol) addMetaPartition(mp *MetaPartition) {  vol.mpsLock.Lock()  defer vol.mpsLock.Unlock()  if _, ok := vol.MetaPartitions[mp.PartitionID]; !ok {    vol.MetaPartitions[mp.PartitionID] = mp    return  }  // replace the old partition in the map with mp  vol.MetaPartitions[mp.PartitionID] = mp}
复制代码

2.3.4 MetaPartition 创建过程

整个过程如下创建 MetaPartition 过程如下


func (vol *Vol) createMetaPartition(c *Cluster, start, end uint64) (err error) {  var mp *MetaPartition  if mp, err = vol.doCreateMetaPartition(c, start, end); err != nil {    return  }  if err = c.syncAddMetaPartition(mp); err != nil {    return errors.NewError(err)  }  vol.addMetaPartition(mp)  return}
复制代码


当创建的 metadata 分片数目不等于需要的 count 时,创建报错,返回信息


if len(vol.MetaPartitions) != count {    err = fmt.Errorf("action[initMetaPartitions] vol[%v] init meta partition failed,mpCount[%v],expectCount[%v],err[%v]",      vol.Name, len(vol.MetaPartitions), count, err)}
复制代码

2.4 初始化对象数据分片

初始化对象分片调用卷提供的方法initDataPartitions方法,该方法调用如下


func (vol *Vol) initDataPartitions(c *Cluster) (err error) {  // initialize k data partitionMap at a time  err = c.batchCreateDataPartition(vol, defaultInitDataPartitionCnt, true)  return}
复制代码


batchCreateDataPartition方法最终调用createDataPartition方法,该方法通过allocateDataPartitionID分配 partitionID,通过调用getHostFromDomainZone或者getHostFromNormalZone选择可用的 targetHosts 主机,作为存储 DataPartition 数据节点,关键代码如下:

2.4.1 选择目标主机

if c.isFaultDomain(vol) {    if targetHosts, targetPeers, err = c.getHostFromDomainZone(vol.domainId, TypeDataPartition, dpReplicaNum); err != nil {      goto errHandler    }  } else {    zoneNum := c.decideZoneNum(vol.crossZone)    if targetHosts, targetPeers, err = c.getHostFromNormalZone(TypeDataPartition, nil, nil, nil,      int(dpReplicaNum), zoneNum, zoneName); err != nil {      goto errHandler    }  }
复制代码

2.4.2 分配 partitionID

if partitionID, err = c.idAlloc.allocateDataPartitionID(); err != nil {    goto errHandler}
复制代码

2.4.3 生成 DataPartition 结构体

dp = newDataPartition(partitionID, dpReplicaNum, volName, vol.ID, proto.GetDpType(vol.VolType, isPreload), partitionTTL)dp.Hosts = targetHostsdp.Peers = targetPeers
复制代码

2.4.4 数据分配到相应节点

通过调用 Cluster 模块执行syncCreateDataPartitionToDataNode方法将数据放置到分配节点上。


func (c *Cluster) syncCreateDataPartitionToDataNode(host string, size uint64, dp *DataPartition,  peers []proto.Peer, hosts []string, createType int, partitionType int) (diskPath string, err error) {  log.LogInfof("action[syncCreateDataPartitionToDataNode] dp [%v] createtype[%v], partitionType[%v]", dp.PartitionID, createType, partitionType)  dataNode, err := c.dataNode(host)  if err != nil {    return  }  task := dp.createTaskToCreateDataPartition(host, size, peers, hosts, createType, partitionType, dataNode.getDecommissionedDisks())  var resp *proto.Packet  if resp, err = dataNode.TaskManager.syncSendAdminTask(task); err != nil {    return  }  return string(resp.Data), nil}
复制代码


该方法调用 DataPartition 模块提供的 createTaskToCreateDataPartition 方法,将 partition 信息封装成 AdminTask 任务,


func (partition *DataPartition) createTaskToCreateDataPartition(addr string, dataPartitionSize uint64,  peers []proto.Peer, hosts []string, createType int, partitionType int, decommissionedDisks []string) (task *proto.AdminTask) {
leaderSize := 0 if createType == proto.DecommissionedCreateDataPartition { leaderSize = int(partition.Replicas[0].Used) }
task = proto.NewAdminTask(proto.OpCreateDataPartition, addr, newCreateDataPartitionRequest( partition.VolName, partition.PartitionID, int(partition.ReplicaNum), peers, int(dataPartitionSize), leaderSize, hosts, createType, partitionType, decommissionedDisks))
partition.resetTaskID(task) return}
复制代码


通过 dataNode 节点提供的 TaskManager 模块,调用syncSendAdminTask方法以同步的方式创建数据分片。

3. 关联卷和用户的associateVolWithUser方法

associateVolWithUser方法将持久化后 Volume 通过volNameuserID进行关联,

3.1 获取关联用户信息

当 userID 不存在时,通过 User 模块提供的 createKey 方法生成一个类型为UserTypeNormal的普通用户,默认密码为CubeFSUser的用户,


if err == proto.ErrUserNotExists {
var param = proto.UserCreateParam{ ID: userID, Password: DefaultUserPassword, Type: proto.UserTypeNormal, }
if userInfo, err = m.user.createKey(&param); err != nil { return err } }
复制代码


在找到用户后,通过 User 模块提供的addOwnVol方法,调用addUserToVol方法,User 模块首先更新sync.Map类型volUser结构体信息,该结构体信息如下


type VolUser struct {    Vol     string       `json:"vol"`    UserIDs []string     `json:"user_id"`    Mu      sync.RWMutex `json:"-" graphql:"-"`}
复制代码

3.2 持久化卷和用户关联信息

通过调用用户模块提供的syncAddVolUser方法,具体如下


// key = #voluser#volname, value = userIDsfunc (u *User) syncAddVolUser(volUser *proto.VolUser) (err error) {  return u.syncPutVolUser(opSyncAddVolUser, volUser)}
复制代码


syncPutVolUser方法生成一个 userInfo,metadata 使用 RaftCmd 生成一个指针,RaftCmd 有 3 个字段,如下所示


type RaftCmd struct {    Op uint32 `json:"op"`    K  string `json:"k"`    V  []byte `json:"v"`}
复制代码


其中 Op 代表具体的操作,如这里为opSyncAddVolUser添加卷操作,K 为#voluser#volname, value 为json.Marshal(volUser),核心实现如下


通过调用submit方法,最终调用 raftstore.Partition 提供的 Submit 方法将序列号后的 metadata 字段持久化到 rocksDB 里面和卷持久化类似。

发布于: 刚刚阅读数: 4
用户头像

自顶向下 2020-11-01 加入

还未添加个人简介

评论

发布
暂无评论
CubeFS源码分析系列(一) 创建Volume_总想做点什么_InfoQ写作社区