CubeFS 提供了客户端工具用于创建卷,执行的命令如下
./build/bin/cfs-cli volume create ltptest ltptest
复制代码
在这条命令发生后,由 cfs-cli 作为客户端发出请求,向配置文件中的masterAddr
字段指定的服务端发起 http 请求, 服务端根据请求指令创建卷,在创建卷的过程中,CubeFS 做了什么。
1. 请求解析
CubeFS 接收到客户端请求时,首先将请求http.Request
路由到/admin/createVol
注册的服务端的 createVol()方法,进入到该方法后,CubeFS 首先生成 CubeFS 使用的 createVolReq 结构体信息,在这个过程中
信息映射:将 http.Request 字段中的信息通过extractUint
之类的方法进行相应映射,如映射卷名称:
if req.name, err = extractName(r); err != nil {
return
}
复制代码
2.合法性校验:校验请求中createVolReq
的参数合法性:如 volume 请求的容量大小size
不能等于 0
if req.capacity == 0 {
return fmt.Errorf("vol capacity can't be zero, %d", req.capacity)
}
复制代码
2. 创建卷使用的 Cluster 模块createVol
方法
2.1 创建卷结构体Vol
在请求解析之后,真正的创建核心逻辑由 Cluster 模块的createVol
方法根据请求解析生成的 req 执行,
if vol, err = m.cluster.createVol(req); err != nil {
sendErrReply(w, r, newErrHTTPReply(err))
return
}
复制代码
createVol
方法首先执行doCreateVol
方法,在该请求中,根据 req 参数生成一个 volValue 结构体, 字段信息如下:
type volValue struct {
ID uint64
Name string
ReplicaNum uint8
DpReplicaNum uint8
Status uint8
DataPartitionSize uint64
Capacity uint64
Owner string
FollowerRead bool
Authenticate bool
DpReadOnlyWhenVolFull bool
CrossZone bool
DomainOn bool
ZoneName string
OSSAccessKey string
OSSSecretKey string
CreateTime int64
DeleteLockTime int64
Description string
DpSelectorName string
DpSelectorParm string
DefaultPriority bool
DomainId uint64
VolType int
EbsBlkSize int
CacheCapacity uint64
CacheAction int
CacheThreshold int
CacheTTL int
CacheHighWater int
CacheLowWater int
CacheLRUInterval int
CacheRule string
EnablePosixAcl bool
EnableQuota bool
EnableTransaction bsProto.TxOpMask
TxTimeout int64
TxConflictRetryNum int64
TxConflictRetryInterval int64
TxOpLimit int
VolQosEnable bool
DiskQosEnable bool
IopsRLimit, IopsWLimit, FlowRlimit, FlowWlimit uint64
IopsRMagnify, IopsWMagnify, FlowRMagnify, FlowWMagnify uint32
ClientReqPeriod, ClientHitTriggerCnt uint32
TrashInterval int64
DisableAuditLog bool
}
复制代码
并通过allocateCommonID
方法生成 ID 填充volValue.ID
字段,再通过newVol
方法将volValue
结构体转换为Vol
结构体,该结构体中添加了dataPartitions
字段,dataPartitions
字段是一个*DataPartitionMap
类型数据,用于存储所有的数据分片信息,结构体信息如下:
// DataPartitionMap stores all the data partitionMap
type DataPartitionMap struct {
sync.RWMutex
partitionMap map[uint64]*DataPartition
readableAndWritableCnt int // number of readable and writable partitionMap
lastLoadedIndex uint64 // last loaded partition index
lastReleasedIndex uint64 // last released partition index
partitions []*DataPartition
responseCache []byte
lastAutoCreateTime time.Time
volName string
readMutex sync.RWMutex
}
复制代码
其中正在的数据存储在 DataPartition 结构体,该结构体信息如下:
// DataPartition represents the structure of storing the file contents.
type DataPartition struct {
PartitionID uint64
PartitionType int
PartitionTTL int64
LastLoadedTime int64
ReplicaNum uint8
Status int8
isRecover bool
Replicas []*DataReplica
LeaderReportTime int64
Hosts []string // host addresses
Peers []proto.Peer
offlineMutex sync.RWMutex
sync.RWMutex
total uint64
used uint64
MissingNodes map[string]int64 // key: address of the missing node, value: when the node is missing
VolName string
VolID uint64
modifyTime int64
createTime int64
lastWarnTime int64
OfflinePeerID uint64
FileInCoreMap map[string]*FileInCore
FilesWithMissingReplica map[string]int64 // key: file name, value: last time when a missing replica is found
SingleDecommissionStatus uint8
singleDecommissionChan chan bool
SingleDecommissionAddr string
RdOnly bool
addReplicaMutex sync.RWMutex
DecommissionRetry int
DecommissionStatus uint32
DecommissionSrcAddr string
DecommissionDstAddr string
DecommissionRaftForce bool
DecommissionSrcDiskPath string //
DecommissionTerm uint64 // only used for disk decommission
IsDiscard bool
}
复制代码
在获取Vol
结构体后,通过结构体私有方法refreshOSSSecure1
更新OSSAccessKey
,OSSSecretKey
字段信息
2.2 持久化卷信息
在创建Vol
结构体后,CubeFS 通过Cluster
模块调用syncAddVol
方法, 该方法调用syncPutVolInfo
方法,具体如下
// key=#vol#volID,value=json.Marshal(vv)
func (c *Cluster) syncAddVol(vol *Vol) (err error) {
return c.syncPutVolInfo(opSyncAddVol, vol)
}
复制代码
syncPutVolInfo
方法生成一个 metadata,metadata 使用 RaftCmd 生成一个指针,RaftCmd 有 3 个字段,如下所示
type RaftCmd struct {
Op uint32 `json:"op"`
K string `json:"k"`
V []byte `json:"v"`
}
复制代码
其中 Op 代表具体的操作,如这里为opSyncAddVol
添加卷操作,K 为#vol#volID
, value 为json.Marshal(vv)
,核心实现如下
func (c *Cluster) syncPutVolInfo(opType uint32, vol *Vol) (err error) {
metadata := new(RaftCmd)
metadata.Op = opType
metadata.K = volPrefix + strconv.FormatUint(vol.ID, 10)
vv := newVolValue(vol)
if metadata.V, err = json.Marshal(vv); err != nil {
return errors.New(err.Error())
}
return c.submit(metadata)
}
复制代码
通过调用submit
方法,最终调用 raftstore.Partition 提供的 Submit 方法将序列号后的 metadata 字段持久化到 rocksDB 里面,核心实现如下
func (c *Cluster) submit(metadata *RaftCmd) (err error) {
cmd, err := metadata.Marshal()
if err != nil {
return errors.New(err.Error())
}
if _, err = c.partition.Submit(cmd); err != nil {
msg := fmt.Sprintf("action[metadata_submit] err:%v", err.Error())
return errors.New(msg)
}
return
}
复制代码
在提交后,将Vol
结构体放入到 Cluster 模块维护的map[string]*Vol
结构体中
func (c *Cluster) putVol(vol *Vol) {
c.volMutex.Lock()
defer c.volMutex.Unlock()
if _, ok := c.vols[vol.Name]; !ok {
c.vols[vol.Name] = vol
}
}
复制代码
2.3 初始化元数据分片
在持久化卷 Volume 后,调用Volume
模块提供的initMetaPartitions
方法持久化元数据分片, 元数据分片最少为 3,最大为 100
if count < defaultInitMetaPartitionCount {
count = defaultInitMetaPartitionCount // 3
}
if count > defaultMaxInitMetaPartitionCount {
count = defaultMaxInitMetaPartitionCount // 100
}
复制代码
2.3.1 设置 MetaPartition 起始位置
通过调用vol.createMetaPartition(c, start, end)
方法创建分片,start,end 为分片数据的起始位置,start, end 之间相差的距离为 defaultMetaPartitionInodeIDStep(默认为 1<<22)
for index := 0; index < count; index++ {
if index != 0 {
start = end + 1
}
end = gConfig.MetaPartitionInodeIdStep * uint64(index+1)
if index == count-1 {
end = defaultMaxMetaPartitionInodeID
}
if err = vol.createMetaPartition(c, start, end); err != nil {
log.LogErrorf("action[initMetaPartitions] vol[%v] init meta partition err[%v]", vol.Name, err)
break
}
}
复制代码
2.3.2 生成 MetaPartition 结构体
实际的调用为createMetaPartition
方法,该方法调用doCreateMetaPartition
方法生成 MetaPartition 结构体,MetaPartition 结构体信息如下
type MetaPartition struct {
PartitionID uint64
Start uint64
End uint64
MaxInodeID uint64
InodeCount uint64
DentryCount uint64
FreeListLen uint64
TxCnt uint64
TxRbInoCnt uint64
TxRbDenCnt uint64
Replicas []*MetaReplica
LeaderReportTime int64
ReplicaNum uint8
Status int8
IsRecover bool
volID uint64
volName string
Hosts []string
Peers []proto.Peer
OfflinePeerID uint64
MissNodes map[string]int64
LoadResponse []*proto.MetaPartitionLoadResponse
offlineMutex sync.RWMutex
uidInfo []*proto.UidReportSpaceInfo
EqualCheckPass bool
sync.RWMutex
}
复制代码
2.3.3 MetaPatition
持久化
doCreateMetaPartition
方法通过getHostFromDomainZone
或者getHostFromNormalZone
方法选择hosts
和peers
,将创建的 MetaPartition,通过syncCreateMetaPartitionToMetaNode
方法将 MetaPartition 封装成一个 AdminTask 任务,通过调用 metaNode 提供的Sender.syncSendAdminTask(tasks[0])
将任务发出
func (c *Cluster) syncCreateMetaPartitionToMetaNode(host string, mp *MetaPartition) (err error) {
hosts := make([]string, 0)
hosts = append(hosts, host)
tasks := mp.buildNewMetaPartitionTasks(hosts, mp.Peers, mp.volName)
metaNode, err := c.metaNode(host)
if err != nil {
return
}
if _, err = metaNode.Sender.syncSendAdminTask(tasks[0]); err != nil {
return
}
return
}
复制代码
创建完成后通过 cluster 模块提供的syncAddMetaPartition
方法添加元数据信息
// key=#mp#volID#metaPartitionID,value=json.Marshal(metaPartitionValue)
func (c *Cluster) syncAddMetaPartition(mp *MetaPartition) (err error) {
return c.putMetaPartitionInfo(opSyncAddMetaPartition, mp)
}
复制代码
最后调用 Volume 模块提供的addMetaPartition
模块添加 MetaPartition 信息。
func (vol *Vol) addMetaPartition(mp *MetaPartition) {
vol.mpsLock.Lock()
defer vol.mpsLock.Unlock()
if _, ok := vol.MetaPartitions[mp.PartitionID]; !ok {
vol.MetaPartitions[mp.PartitionID] = mp
return
}
// replace the old partition in the map with mp
vol.MetaPartitions[mp.PartitionID] = mp
}
复制代码
2.3.4 MetaPartition 创建过程
整个过程如下创建 MetaPartition 过程如下
func (vol *Vol) createMetaPartition(c *Cluster, start, end uint64) (err error) {
var mp *MetaPartition
if mp, err = vol.doCreateMetaPartition(c, start, end); err != nil {
return
}
if err = c.syncAddMetaPartition(mp); err != nil {
return errors.NewError(err)
}
vol.addMetaPartition(mp)
return
}
复制代码
当创建的 metadata 分片数目不等于需要的 count 时,创建报错,返回信息
if len(vol.MetaPartitions) != count {
err = fmt.Errorf("action[initMetaPartitions] vol[%v] init meta partition failed,mpCount[%v],expectCount[%v],err[%v]",
vol.Name, len(vol.MetaPartitions), count, err)
}
复制代码
2.4 初始化对象数据分片
初始化对象分片调用卷提供的方法initDataPartitions
方法,该方法调用如下
func (vol *Vol) initDataPartitions(c *Cluster) (err error) {
// initialize k data partitionMap at a time
err = c.batchCreateDataPartition(vol, defaultInitDataPartitionCnt, true)
return
}
复制代码
batchCreateDataPartition
方法最终调用createDataPartition
方法,该方法通过allocateDataPartitionID
分配 partitionID,通过调用getHostFromDomainZone
或者getHostFromNormalZone
选择可用的 targetHosts 主机,作为存储 DataPartition 数据节点,关键代码如下:
2.4.1 选择目标主机
if c.isFaultDomain(vol) {
if targetHosts, targetPeers, err = c.getHostFromDomainZone(vol.domainId, TypeDataPartition, dpReplicaNum); err != nil {
goto errHandler
}
} else {
zoneNum := c.decideZoneNum(vol.crossZone)
if targetHosts, targetPeers, err = c.getHostFromNormalZone(TypeDataPartition, nil, nil, nil,
int(dpReplicaNum), zoneNum, zoneName); err != nil {
goto errHandler
}
}
复制代码
2.4.2 分配 partitionID
if partitionID, err = c.idAlloc.allocateDataPartitionID(); err != nil {
goto errHandler
}
复制代码
2.4.3 生成 DataPartition 结构体
dp = newDataPartition(partitionID, dpReplicaNum, volName, vol.ID, proto.GetDpType(vol.VolType, isPreload), partitionTTL)
dp.Hosts = targetHosts
dp.Peers = targetPeers
复制代码
2.4.4 数据分配到相应节点
通过调用 Cluster 模块执行syncCreateDataPartitionToDataNode
方法将数据放置到分配节点上。
func (c *Cluster) syncCreateDataPartitionToDataNode(host string, size uint64, dp *DataPartition,
peers []proto.Peer, hosts []string, createType int, partitionType int) (diskPath string, err error) {
log.LogInfof("action[syncCreateDataPartitionToDataNode] dp [%v] createtype[%v], partitionType[%v]", dp.PartitionID, createType, partitionType)
dataNode, err := c.dataNode(host)
if err != nil {
return
}
task := dp.createTaskToCreateDataPartition(host, size, peers, hosts, createType, partitionType, dataNode.getDecommissionedDisks())
var resp *proto.Packet
if resp, err = dataNode.TaskManager.syncSendAdminTask(task); err != nil {
return
}
return string(resp.Data), nil
}
复制代码
该方法调用 DataPartition 模块提供的 createTaskToCreateDataPartition 方法,将 partition 信息封装成 AdminTask 任务,
func (partition *DataPartition) createTaskToCreateDataPartition(addr string, dataPartitionSize uint64,
peers []proto.Peer, hosts []string, createType int, partitionType int, decommissionedDisks []string) (task *proto.AdminTask) {
leaderSize := 0
if createType == proto.DecommissionedCreateDataPartition {
leaderSize = int(partition.Replicas[0].Used)
}
task = proto.NewAdminTask(proto.OpCreateDataPartition, addr, newCreateDataPartitionRequest(
partition.VolName, partition.PartitionID, int(partition.ReplicaNum),
peers, int(dataPartitionSize), leaderSize, hosts, createType, partitionType, decommissionedDisks))
partition.resetTaskID(task)
return
}
复制代码
通过 dataNode 节点提供的 TaskManager 模块,调用syncSendAdminTask
方法以同步的方式创建数据分片。
3. 关联卷和用户的associateVolWithUser
方法
associateVolWithUser
方法将持久化后 Volume 通过volName
与userID
进行关联,
3.1 获取关联用户信息
当 userID 不存在时,通过 User 模块提供的 createKey 方法生成一个类型为UserTypeNormal
的普通用户,默认密码为CubeFSUser
的用户,
if err == proto.ErrUserNotExists {
var param = proto.UserCreateParam{
ID: userID,
Password: DefaultUserPassword,
Type: proto.UserTypeNormal,
}
if userInfo, err = m.user.createKey(¶m); err != nil {
return err
}
}
复制代码
在找到用户后,通过 User 模块提供的addOwnVol
方法,调用addUserToVol
方法,User 模块首先更新sync.Map类型
的volUser
结构体信息,该结构体信息如下
type VolUser struct {
Vol string `json:"vol"`
UserIDs []string `json:"user_id"`
Mu sync.RWMutex `json:"-" graphql:"-"`
}
复制代码
3.2 持久化卷和用户关联信息
通过调用用户模块提供的syncAddVolUser
方法,具体如下
// key = #voluser#volname, value = userIDs
func (u *User) syncAddVolUser(volUser *proto.VolUser) (err error) {
return u.syncPutVolUser(opSyncAddVolUser, volUser)
}
复制代码
syncPutVolUser
方法生成一个 userInfo,metadata 使用 RaftCmd 生成一个指针,RaftCmd 有 3 个字段,如下所示
type RaftCmd struct {
Op uint32 `json:"op"`
K string `json:"k"`
V []byte `json:"v"`
}
复制代码
其中 Op 代表具体的操作,如这里为opSyncAddVolUser
添加卷操作,K 为#voluser#volname
, value 为json.Marshal(volUser)
,核心实现如下
通过调用submit
方法,最终调用 raftstore.Partition 提供的 Submit 方法将序列号后的 metadata 字段持久化到 rocksDB 里面和卷持久化类似。
评论