CubeFS 提供了客户端工具用于创建卷,执行的命令如下
./build/bin/cfs-cli volume create ltptest ltptest
复制代码
在这条命令发生后,由 cfs-cli 作为客户端发出请求,向配置文件中的masterAddr字段指定的服务端发起 http 请求, 服务端根据请求指令创建卷,在创建卷的过程中,CubeFS 做了什么。
1. 请求解析
CubeFS 接收到客户端请求时,首先将请求http.Request路由到/admin/createVol注册的服务端的 createVol()方法,进入到该方法后,CubeFS 首先生成 CubeFS 使用的 createVolReq 结构体信息,在这个过程中
信息映射:将 http.Request 字段中的信息通过extractUint之类的方法进行相应映射,如映射卷名称:
if req.name, err = extractName(r); err != nil { return}
复制代码
2.合法性校验:校验请求中createVolReq的参数合法性:如 volume 请求的容量大小size不能等于 0
if req.capacity == 0 { return fmt.Errorf("vol capacity can't be zero, %d", req.capacity)}
复制代码
2. 创建卷使用的 Cluster 模块createVol方法
2.1 创建卷结构体Vol
在请求解析之后,真正的创建核心逻辑由 Cluster 模块的createVol方法根据请求解析生成的 req 执行,
if vol, err = m.cluster.createVol(req); err != nil { sendErrReply(w, r, newErrHTTPReply(err)) return}
复制代码
createVol方法首先执行doCreateVol方法,在该请求中,根据 req 参数生成一个 volValue 结构体, 字段信息如下:
type volValue struct { ID uint64 Name string ReplicaNum uint8 DpReplicaNum uint8 Status uint8 DataPartitionSize uint64 Capacity uint64 Owner string FollowerRead bool Authenticate bool DpReadOnlyWhenVolFull bool
CrossZone bool DomainOn bool ZoneName string OSSAccessKey string OSSSecretKey string CreateTime int64 DeleteLockTime int64 Description string DpSelectorName string DpSelectorParm string DefaultPriority bool DomainId uint64 VolType int
EbsBlkSize int CacheCapacity uint64 CacheAction int CacheThreshold int CacheTTL int CacheHighWater int CacheLowWater int CacheLRUInterval int CacheRule string
EnablePosixAcl bool EnableQuota bool
EnableTransaction bsProto.TxOpMask TxTimeout int64 TxConflictRetryNum int64 TxConflictRetryInterval int64 TxOpLimit int
VolQosEnable bool DiskQosEnable bool IopsRLimit, IopsWLimit, FlowRlimit, FlowWlimit uint64 IopsRMagnify, IopsWMagnify, FlowRMagnify, FlowWMagnify uint32 ClientReqPeriod, ClientHitTriggerCnt uint32 TrashInterval int64 DisableAuditLog bool}
复制代码
并通过allocateCommonID方法生成 ID 填充volValue.ID字段,再通过newVol方法将volValue结构体转换为Vol结构体,该结构体中添加了dataPartitions字段,dataPartitions字段是一个*DataPartitionMap类型数据,用于存储所有的数据分片信息,结构体信息如下:
// DataPartitionMap stores all the data partitionMaptype DataPartitionMap struct { sync.RWMutex partitionMap map[uint64]*DataPartition readableAndWritableCnt int // number of readable and writable partitionMap lastLoadedIndex uint64 // last loaded partition index lastReleasedIndex uint64 // last released partition index partitions []*DataPartition responseCache []byte lastAutoCreateTime time.Time volName string readMutex sync.RWMutex}
复制代码
其中正在的数据存储在 DataPartition 结构体,该结构体信息如下:
// DataPartition represents the structure of storing the file contents.type DataPartition struct { PartitionID uint64 PartitionType int PartitionTTL int64 LastLoadedTime int64 ReplicaNum uint8 Status int8 isRecover bool Replicas []*DataReplica LeaderReportTime int64 Hosts []string // host addresses Peers []proto.Peer offlineMutex sync.RWMutex sync.RWMutex total uint64 used uint64 MissingNodes map[string]int64 // key: address of the missing node, value: when the node is missing VolName string VolID uint64 modifyTime int64 createTime int64 lastWarnTime int64 OfflinePeerID uint64 FileInCoreMap map[string]*FileInCore FilesWithMissingReplica map[string]int64 // key: file name, value: last time when a missing replica is found SingleDecommissionStatus uint8 singleDecommissionChan chan bool SingleDecommissionAddr string RdOnly bool addReplicaMutex sync.RWMutex DecommissionRetry int DecommissionStatus uint32 DecommissionSrcAddr string DecommissionDstAddr string DecommissionRaftForce bool DecommissionSrcDiskPath string // DecommissionTerm uint64 // only used for disk decommission IsDiscard bool}
复制代码
在获取Vol结构体后,通过结构体私有方法refreshOSSSecure1更新OSSAccessKey,OSSSecretKey字段信息
2.2 持久化卷信息
在创建Vol结构体后,CubeFS 通过Cluster模块调用syncAddVol方法, 该方法调用syncPutVolInfo方法,具体如下
// key=#vol#volID,value=json.Marshal(vv)func (c *Cluster) syncAddVol(vol *Vol) (err error) { return c.syncPutVolInfo(opSyncAddVol, vol)}
复制代码
syncPutVolInfo方法生成一个 metadata,metadata 使用 RaftCmd 生成一个指针,RaftCmd 有 3 个字段,如下所示
type RaftCmd struct { Op uint32 `json:"op"` K string `json:"k"` V []byte `json:"v"`}
复制代码
其中 Op 代表具体的操作,如这里为opSyncAddVol添加卷操作,K 为#vol#volID, value 为json.Marshal(vv),核心实现如下
func (c *Cluster) syncPutVolInfo(opType uint32, vol *Vol) (err error) { metadata := new(RaftCmd) metadata.Op = opType metadata.K = volPrefix + strconv.FormatUint(vol.ID, 10) vv := newVolValue(vol) if metadata.V, err = json.Marshal(vv); err != nil { return errors.New(err.Error()) } return c.submit(metadata)}
复制代码
通过调用submit方法,最终调用 raftstore.Partition 提供的 Submit 方法将序列号后的 metadata 字段持久化到 rocksDB 里面,核心实现如下
func (c *Cluster) submit(metadata *RaftCmd) (err error) { cmd, err := metadata.Marshal() if err != nil { return errors.New(err.Error()) } if _, err = c.partition.Submit(cmd); err != nil { msg := fmt.Sprintf("action[metadata_submit] err:%v", err.Error()) return errors.New(msg) } return}
复制代码
在提交后,将Vol结构体放入到 Cluster 模块维护的map[string]*Vol结构体中
func (c *Cluster) putVol(vol *Vol) { c.volMutex.Lock() defer c.volMutex.Unlock() if _, ok := c.vols[vol.Name]; !ok { c.vols[vol.Name] = vol }}
复制代码
2.3 初始化元数据分片
在持久化卷 Volume 后,调用Volume模块提供的initMetaPartitions方法持久化元数据分片, 元数据分片最少为 3,最大为 100
if count < defaultInitMetaPartitionCount { count = defaultInitMetaPartitionCount // 3}if count > defaultMaxInitMetaPartitionCount { count = defaultMaxInitMetaPartitionCount // 100}
复制代码
2.3.1 设置 MetaPartition 起始位置
通过调用vol.createMetaPartition(c, start, end)方法创建分片,start,end 为分片数据的起始位置,start, end 之间相差的距离为 defaultMetaPartitionInodeIDStep(默认为 1<<22)
for index := 0; index < count; index++ { if index != 0 { start = end + 1 } end = gConfig.MetaPartitionInodeIdStep * uint64(index+1) if index == count-1 { end = defaultMaxMetaPartitionInodeID } if err = vol.createMetaPartition(c, start, end); err != nil { log.LogErrorf("action[initMetaPartitions] vol[%v] init meta partition err[%v]", vol.Name, err) break }}
复制代码
2.3.2 生成 MetaPartition 结构体
实际的调用为createMetaPartition方法,该方法调用doCreateMetaPartition方法生成 MetaPartition 结构体,MetaPartition 结构体信息如下
type MetaPartition struct { PartitionID uint64 Start uint64 End uint64 MaxInodeID uint64 InodeCount uint64 DentryCount uint64 FreeListLen uint64 TxCnt uint64 TxRbInoCnt uint64 TxRbDenCnt uint64 Replicas []*MetaReplica LeaderReportTime int64 ReplicaNum uint8 Status int8 IsRecover bool volID uint64 volName string Hosts []string Peers []proto.Peer OfflinePeerID uint64 MissNodes map[string]int64 LoadResponse []*proto.MetaPartitionLoadResponse offlineMutex sync.RWMutex uidInfo []*proto.UidReportSpaceInfo EqualCheckPass bool sync.RWMutex}
复制代码
2.3.3 MetaPatition持久化
doCreateMetaPartition方法通过getHostFromDomainZone或者getHostFromNormalZone方法选择hosts和peers,将创建的 MetaPartition,通过syncCreateMetaPartitionToMetaNode方法将 MetaPartition 封装成一个 AdminTask 任务,通过调用 metaNode 提供的Sender.syncSendAdminTask(tasks[0])将任务发出
func (c *Cluster) syncCreateMetaPartitionToMetaNode(host string, mp *MetaPartition) (err error) { hosts := make([]string, 0) hosts = append(hosts, host) tasks := mp.buildNewMetaPartitionTasks(hosts, mp.Peers, mp.volName) metaNode, err := c.metaNode(host) if err != nil { return } if _, err = metaNode.Sender.syncSendAdminTask(tasks[0]); err != nil { return } return}
复制代码
创建完成后通过 cluster 模块提供的syncAddMetaPartition方法添加元数据信息
// key=#mp#volID#metaPartitionID,value=json.Marshal(metaPartitionValue)func (c *Cluster) syncAddMetaPartition(mp *MetaPartition) (err error) { return c.putMetaPartitionInfo(opSyncAddMetaPartition, mp)}
复制代码
最后调用 Volume 模块提供的addMetaPartition模块添加 MetaPartition 信息。
func (vol *Vol) addMetaPartition(mp *MetaPartition) { vol.mpsLock.Lock() defer vol.mpsLock.Unlock() if _, ok := vol.MetaPartitions[mp.PartitionID]; !ok { vol.MetaPartitions[mp.PartitionID] = mp return } // replace the old partition in the map with mp vol.MetaPartitions[mp.PartitionID] = mp}
复制代码
2.3.4 MetaPartition 创建过程
整个过程如下创建 MetaPartition 过程如下
func (vol *Vol) createMetaPartition(c *Cluster, start, end uint64) (err error) { var mp *MetaPartition if mp, err = vol.doCreateMetaPartition(c, start, end); err != nil { return } if err = c.syncAddMetaPartition(mp); err != nil { return errors.NewError(err) } vol.addMetaPartition(mp) return}
复制代码
当创建的 metadata 分片数目不等于需要的 count 时,创建报错,返回信息
if len(vol.MetaPartitions) != count { err = fmt.Errorf("action[initMetaPartitions] vol[%v] init meta partition failed,mpCount[%v],expectCount[%v],err[%v]", vol.Name, len(vol.MetaPartitions), count, err)}
复制代码
2.4 初始化对象数据分片
初始化对象分片调用卷提供的方法initDataPartitions方法,该方法调用如下
func (vol *Vol) initDataPartitions(c *Cluster) (err error) { // initialize k data partitionMap at a time err = c.batchCreateDataPartition(vol, defaultInitDataPartitionCnt, true) return}
复制代码
batchCreateDataPartition方法最终调用createDataPartition方法,该方法通过allocateDataPartitionID分配 partitionID,通过调用getHostFromDomainZone或者getHostFromNormalZone选择可用的 targetHosts 主机,作为存储 DataPartition 数据节点,关键代码如下:
2.4.1 选择目标主机
if c.isFaultDomain(vol) { if targetHosts, targetPeers, err = c.getHostFromDomainZone(vol.domainId, TypeDataPartition, dpReplicaNum); err != nil { goto errHandler } } else { zoneNum := c.decideZoneNum(vol.crossZone) if targetHosts, targetPeers, err = c.getHostFromNormalZone(TypeDataPartition, nil, nil, nil, int(dpReplicaNum), zoneNum, zoneName); err != nil { goto errHandler } }
复制代码
2.4.2 分配 partitionID
if partitionID, err = c.idAlloc.allocateDataPartitionID(); err != nil { goto errHandler}
复制代码
2.4.3 生成 DataPartition 结构体
dp = newDataPartition(partitionID, dpReplicaNum, volName, vol.ID, proto.GetDpType(vol.VolType, isPreload), partitionTTL)dp.Hosts = targetHostsdp.Peers = targetPeers
复制代码
2.4.4 数据分配到相应节点
通过调用 Cluster 模块执行syncCreateDataPartitionToDataNode方法将数据放置到分配节点上。
func (c *Cluster) syncCreateDataPartitionToDataNode(host string, size uint64, dp *DataPartition, peers []proto.Peer, hosts []string, createType int, partitionType int) (diskPath string, err error) { log.LogInfof("action[syncCreateDataPartitionToDataNode] dp [%v] createtype[%v], partitionType[%v]", dp.PartitionID, createType, partitionType) dataNode, err := c.dataNode(host) if err != nil { return } task := dp.createTaskToCreateDataPartition(host, size, peers, hosts, createType, partitionType, dataNode.getDecommissionedDisks()) var resp *proto.Packet if resp, err = dataNode.TaskManager.syncSendAdminTask(task); err != nil { return } return string(resp.Data), nil}
复制代码
该方法调用 DataPartition 模块提供的 createTaskToCreateDataPartition 方法,将 partition 信息封装成 AdminTask 任务,
func (partition *DataPartition) createTaskToCreateDataPartition(addr string, dataPartitionSize uint64, peers []proto.Peer, hosts []string, createType int, partitionType int, decommissionedDisks []string) (task *proto.AdminTask) {
leaderSize := 0 if createType == proto.DecommissionedCreateDataPartition { leaderSize = int(partition.Replicas[0].Used) }
task = proto.NewAdminTask(proto.OpCreateDataPartition, addr, newCreateDataPartitionRequest( partition.VolName, partition.PartitionID, int(partition.ReplicaNum), peers, int(dataPartitionSize), leaderSize, hosts, createType, partitionType, decommissionedDisks))
partition.resetTaskID(task) return}
复制代码
通过 dataNode 节点提供的 TaskManager 模块,调用syncSendAdminTask方法以同步的方式创建数据分片。
3. 关联卷和用户的associateVolWithUser方法
associateVolWithUser方法将持久化后 Volume 通过volName与userID进行关联,
3.1 获取关联用户信息
当 userID 不存在时,通过 User 模块提供的 createKey 方法生成一个类型为UserTypeNormal的普通用户,默认密码为CubeFSUser的用户,
if err == proto.ErrUserNotExists {
var param = proto.UserCreateParam{ ID: userID, Password: DefaultUserPassword, Type: proto.UserTypeNormal, }
if userInfo, err = m.user.createKey(¶m); err != nil { return err } }
复制代码
在找到用户后,通过 User 模块提供的addOwnVol方法,调用addUserToVol方法,User 模块首先更新sync.Map类型的volUser结构体信息,该结构体信息如下
type VolUser struct { Vol string `json:"vol"` UserIDs []string `json:"user_id"` Mu sync.RWMutex `json:"-" graphql:"-"`}
复制代码
3.2 持久化卷和用户关联信息
通过调用用户模块提供的syncAddVolUser方法,具体如下
// key = #voluser#volname, value = userIDsfunc (u *User) syncAddVolUser(volUser *proto.VolUser) (err error) { return u.syncPutVolUser(opSyncAddVolUser, volUser)}
复制代码
syncPutVolUser方法生成一个 userInfo,metadata 使用 RaftCmd 生成一个指针,RaftCmd 有 3 个字段,如下所示
type RaftCmd struct { Op uint32 `json:"op"` K string `json:"k"` V []byte `json:"v"`}
复制代码
其中 Op 代表具体的操作,如这里为opSyncAddVolUser添加卷操作,K 为#voluser#volname, value 为json.Marshal(volUser),核心实现如下
通过调用submit方法,最终调用 raftstore.Partition 提供的 Submit 方法将序列号后的 metadata 字段持久化到 rocksDB 里面和卷持久化类似。
评论