写点什么

你不知道的开源分布式存储系统 Alluxio 源码完整解析(下篇)

作者:腾源会
  • 2021 年 11 月 15 日
  • 本文字数:11283 字

    阅读完需:约 37 分钟


在《Alluxio-源码简述-上》主要讲述了 Alluxio 本地环境搭建,源码项目结构,服务进程的启动流程和服务间 RPC 调用。


本篇将在上篇的基础上,继续为大家讲述 Alluxio 中重点类详解,Alluxio 中 Block 底层读写流程,Alluxio Client 调用流程和 Alluxo 内置的轻量级调度框架。

一、重点类详述


1.1. Journaled

Journaled 接口定义可被 Journaled 持久化维护的通用方法,通过 JournalEntryIterable#getJournalEntryIterator 获取 Journal 元素遍历信息,该接口提供默认 checkpoint 方法。Journaled 接口继承 Checkpointed、JournalEntryIterable,定义的方法包括:

  • getJournalEntryIterator:获取 Journal 所有元素;

  • getCheckpointName:获取 checkpoint class 类名称;

  • writeToCheckpoint:持久化写入所有状态的 checkpoint;

  • restoreFromCheckpoint:checkpoint 恢复;

  • processJournalEntry:处理指定的 Journal 元素,Journal 处理核心方法

  • resetState:重置 Journal 状态;

  • applyAndJournal:对 Journal 元素执行和应用 Journal 操作。


1.2. UnderFileSystem

Alluxio 管理和适配数据在底层各个存储系统执行操作,实现 UnderFileSystem 接口的底层存储可以作为 Alluxio 的合法 UFS。

1.2.1. 类图

UnderFileSystem 的类图如下所示,主要由抽象类 BaseUnderFileSystem 实现,而 BaseUnderFileSystem 下主要分为两大类:

  • ConsistentUnderFileSystem:具备一致性的 UFS 实现,主要包括:LocalUnderFileSystem、HdfsUnderFileSystem、CephFSUnderFileSystem 等;

  • ObjectUnderFileSystem:对象存储 UFS 实现,主要包括:S3AUnderFileSystem、COSUnderFileSystem、OSSUnderFileSystem 等。

1.2.2.  接口方法

在 UnderFileSystem 中有两类接口 API:

  • 存储系统通用操作,如:创建/删除文件,文件重命名;

  • 处理数据持久化最终一致性的操作(eventual consistency),如:解决当 AlluxioMaster 维护元数据成功时,但执行 UFS 操作失败的问题。

1.2.2.1. 存储系统操作

  • create:指定 path 路径,在 UFS 中创建数据文件(父目录不存在会自动创建),可通过 CreateOptions 设置创建文件的用户组和 ACL 策略;

  • deleteDirectory:删除指定目录,可通过 DeleteOptions 设置删除的策略和遍历方式;

  • deleteFile:删除指定文件;

  • getDirectoryStatus:获取 UFS 指定目录状态,需传入已存在的目录文件;

  • getFileStatus:获取 UFS 指定文件状态;

  • getStatus:获取 UFS 状态,可指定目录或文件;

  • isDirectory:判断指定路径在 UFS 是否是目录;

  • open:打开 UFS 上指定文件,可通过 OpenOptions 设置文件打开参数;

  • renameDirectory:UFS 上指定目录重命名;

  • renameFile:UFS 上指定文件重命名;

  • exists:判断指定的文件或目录是否存在;

  • getAclPair:获取 UFS 的 ACL 策略;

  • getBlockSizeByte:获取指定目录下 UFS 的每个 Block 文件大小,单位 bytes;

  • getFileLocations:获取指定路径在 UFS 关联的存储 Location 列表;

  • getFingerprint:计算并获取指定路径的文件标识(指纹),文件标识(指纹)的计算必须是确定且不可变的;

  • getOperationMode:获取底层 UFS 的操作模式,Alluxio 的底层存储可以由多种类型 UFS 组成,该方法用来确定底层 UFS 的操作模式,例子:底层 UFS 为:hdfs://ns1/,hdfs://ns2/,则返回结果:{hdfs://ns1/:NO_ACCESS,hdfs://ns2/:READ_WRITE};

  • getPhysicalStores:获取所有 UFS 类型,包括数据结构和对应权限

  • getSpace:通过制定 SpaceType 获取 UFS 中指定路径的存储空间信息,SpaceType 包括:SPACE_TOTAL、SPACE_FREE、SPACE_USED;

  • getUnderFSType:获取 UFS 类型,如 hdfs;

  • isFile:判断文件文件在 UFS 是否存在;

  • isObjectStorage:判断 UFS 是否是对象存储;

  • isSeekable:判断 UFS 是否支持搜索;

  • listStatus:指定 UFS 路径下的文件状态列表,该列表不保证顺序,可通过 ListOptions 设置是否支持遍历;

  • mkdirs:在 UFS 上创建指定目录,可通过 MkdirsOptions 设置目录创建规则,如 ACL 和递归父目录创建;

  • setAclEntries:指定路径,设置 UFS 的 ALC 策略集合;

  • setMode:指定路径,设置 UFS ALC Mode,如 0777;

  • setOwner:指定路径,设置 UFS ALC 的 user 和 group;

  • supportsFlush:判断 UFS 是否支持文件 Flush;

  • supportsActiveSync:判断 UFS 是否支持 ActiveSync(访问内部文件共享),ActiveSync 相关的接口包括:getActiveSyncInfo、startSync、stopSync、startActiveSyncPolling、stopActiveSyncPolling。

1.2.2.2.  最终一致性操作

  • createNonexistingFile:创建不存在的文件,若文件存在,则退出;

  • deleteExistingDirectory:删除指定目录;

  • deleteExistingFile:删除指定文件;

  • getExistingDirectoryStatus:获取 UFS 指定目录状态;

  • getExistingFileStatus:获取 UFS 指定文件状态;

  • getExistingStatus:获取 UFS 状态,可指定目录或文件;

  • isExistingDirectory:判断指定路径在 UFS 是否是目录;

  • openExistingFile:打开 UFS 上指定文件,可通过 OpenOptions 设置文件打开参数;

  • renameRenamableDirectory:UFS 上指定目录重命名;

  • renameRenamableFile:UFS 上指定文件重命名。

1.2.2.3. 其他操作

  • cleanup:当数据文件创建时没有正常的成功结束或被抛弃处理,则对底层 UFS 清理;

  • connectFromMaster:指定 AlluxioMaster 主机地址,建立指定 Master 与 UFS 连接;

  • connectFromWorker:指定 AlluxioWorker 主机地址,建立指定 Worker 与 UFS 连接;

  • resolveUri:给定 Alluxio 基础 URI 和路径,返回拼装后的 Alluxio 路径。


1.3. UfsManager

Alluxio 中对底层 UFS(Under FileSystem)管理操作的通用统一接口类定义,定义的接口方法包括:

  • addMount:UFS 挂载到 Alluxio,该方法仅针对 Alluxio 处理,不对底层 UFS 操作;

  • removeMount:移除 Alluxio 中的 UFS 挂载;

  • get:根据 mountId 获取挂载的 UFS 信息;

  • getRoot:获取 Alluxio 上挂载的根目录信息;

  • getJournal:获取 Journal 的 Location 地址;

其中 AbstractUfsManager 抽象类对 UFS 管理接口进行基本实现。


1.3.1. UfsClient

维护底层 UFS 的 Client 连接信息和其他相关 UFS 的描述信息,基于 UfsClient 实现 Alluxio 对 UnderFileSystem 的操作。


1.4. BlockClient

BlockClient 抽象类定义调用方对 Block 基本的读写操作,其类图示意如下,主要包括:BlockWriterBlockReader


读写 Block 的定义的方法类:


1.5.  DefaultFileSystemMaster

Master 服务维护所有 FileSystem(文件系统)元数据变更的管理操作,DefaultFileSystemMaster 内部基于 InodeTree 维护文件系统结构,并将 InodeTree 持久化到日志文件(journal);除此之外,其内部维护多个管理操作,如:InodeLockManager、MasterUfsManager、MountTable 等;

备注:DefaultFil1.5.  DefaultFileSystemMastereSystemMaster 的启动 start 方法详情前面所述内容。

1.5.1. 接口方法

FileSystemMaster 接口定义 master 中针对 FS 的操作方法,DefaultFileSystemMaster 继承 FileSystemMaster,其接口方法主要包括:

  • cleanupUfs:周期性清理底层 UFS;

  • getFileId:基于 Alluxio 路径 URI 获取文件 ID,若文件不缓存 Alluxio,则调用 UFS 获取;

  • getFileInfo:根据文件 ID 获取文件详情,该接口仅对内部服务开放,不对用户直接开放;

  • getPersistenceState:根据文件 ID,获取该文件的持久化状态;

  • listStatus:指定 Alluxio 路径,获取文件状态列表;

  • checkAccess:校验指定 Alluxio 路径的权限;

  • checkConsistency:校验指定 Alluxio 路径的文件数据一致性;

  • completeFile:关闭/结束指定 Alluxio 路径,关闭后,则该文件不可写;

  • createFile:基于指定 Alluxio 文件路径,创建文件 FileInfo;

  • getNewBlockIdForFile:指定 Alluxio 文件路径,获取下个待操作 Block 文件的 Block ID;

  • getMountPointInfoSummary:获取 Alluxio 中 mount(挂载)路径的快照信息;

  • getDisplayMountPointInfo:获取 Alluxio 用户展示的 Mount 信息;

  • delete:删除指定 Alluxio 路径的文件元信息;

  • getFileBlockInfoList:获取指定 Alluxio 路径下的所有 Block 列表信息;

  • getInAlluxioFiles:获取 Alluxio 中所有的文件列表路径;

  • getInMemoryFiles:获取 Alluxio 中所有缓存在内存的文件列表路径;

  • createDirectory:创建 Alluxio 对应的目录,并返回目录 ID;

  • rename:Alluxio 中文件重命名操作的元数据变更;

  • free:指定 Alluxio 目录下,释放所有 alluxio 缓存的 block 文件信息,支持目录下遍历的文件释放;

  • getPath:根据指定 FileId 获取 Alluxio URI 路径;

  • getPinIdList:获取被固定的 inode id 列表;

  • getUfsAddress:获取 master 所需的 UFS 地址;

  • getUfsInfo:根据挂载 ID 获取对应 UFS 信息;

  • getLostFiles:获取 worker 节点丢失的文件列表;

  • mount:核心操作,将 UFS 路径挂载在 Alluxio 指定路径;

  • unmount:取消指定 Alluxio 路径上的 UFS 挂载;

  • updateMount:更新指定 Alluxio 路径挂载信息;

  • setAcl:设置 Alluxio 路径 ACL;

  • updateUfsMode:设置底层 UFS Mode;

  • validateInodeBlocks:验证 inode block 信息是否具备完整性;

  • workerHeartbeat:指定 worker ID,通知对应 worker 进行文件的存储持久化;

  • getWorkerInfoList:获取所有 worker 节点信息列表;

  • getTimeSeries:获取 alluxio master 中元数据存储的时间版本信息;


1.6. DefaultBlockWorker

1.6.1. 接口

Worker Server 针对 Block 的管理操作,实现接口类:BlockWorker,其接口方法主要包括:

  • getWorkerId:获取 worker id;

  • abortBlock:丢弃 session 中临时创建的 block 文件;

  • accessBlock:访问指定 session 和 block id 下的 block 信息,该方法可能会在 block 缓存释放被访问;

  • commitBlock:提交 block 到 Alluxio 的管理空间,待提交的 block 必须是临时的,当 block 提交成功之前,block 是不支持读写访问;

  • commitBlockInUfs:将 block 提交到 UFS 持久化;

  • createBlock:在 Alluxio 管理空间创建 block,基于 BlockWriter 类可对 block 进行写操作,在 block commit 提交之前都是临时的;

  • getTempBlockMeta:获取临时 block 元数据;

  • createBlockWriter:基于 session 和 block id 创建 BlockWriter,用于 block 的写操作;

  • getReport:获取 worker 与 master 周期性心跳的报告;

  • getStoreMeta:获取整个 block 存储的元数据信息,包括 block 中每个存储目录映射和每层存储的容量情况;

  • getStoreMetaFull:与 getStoreMeta 相似,但包括完整的 blockId 列表,获取代价更高;

  • getVolatileBlockMeta:根据指定 blockId 获取 block 元数据信息;

  • lockBlock:对 block 进行加锁操作;

  • moveBlock:将 block 从当前存储 Location 移动到目标 Location;当前仅支持分层存储移动;

  • moveBlockToMedium:block 移动并指定对应的存储介质类型(MediumType);

  • createBlockReader:创建 BlockReader 进行 Block 读操作,可读取 Alluxio Block 和 UFS Block;

  • createUfsBlockReader:创建 BlockReader 进行 UFS Block 读操作;

  • removeBlock:从 Allxuio 管理空间移除 Block;

  • requestSpace:为指定 block 获取存储空间,该 block 必须为临时 block;

  • unlockBlock:对 block 去除锁操作;

  • asyncCache:提交异步缓存请求进行异步的缓存管理;

  • updatePinList:更新底层 block 存储占用的 pin 列表;

  • getFileInfo:基于指定 file id 获取文件信息。

1.6.2. TieredBlockStore

BlockStore 定义 block 的存储接口,用于管理本地 block 存储,其接口核心目的:具体实现 BlockWorker 中定义的方法类,接口如下:

TieredBlockStore 是 BlockStore 的实现类,实现了 Alluxio 中核心功能点:分层存储,使得对应的存储对象可基于 block 形式进行分层存储管理,并对外暴露提供 API 进行 block 管理。TieredBlockStore 中内置分配算法确定新 block 的存取和旧 block 的释放,基于 BlockMetadataManager 维护分层存储状态、block 读写锁管理等元数据信息。

TieredBlockStore 是线程安全的,所有基于 block 级别的操作都需要调用 BlockLockManager 来获取对应的读写锁,保证该 block 下的元数据操作和 I/O 操作是线程安全的。任何 block 的元数据操作都需要基于 BlockMetadataManager 来获取元数据的 ReentrantReadWriteLock 读写锁。

Allocator 接口定义 Alluxio 中数据管理的分配策略,接口方法:allocateBlockWithView,目前内部有三种实现类:

  • RoundRobinAllocator:基于 round-robin 轮训分配,默认从最高层开始分配,当最高层存储空间不足则会到下一层,该分配策略不支持指定存储具体的分层。

  • MaxFreeAllocator:分配到存储中最大剩余空间,当没有指定具体存储分层,默认从最高层开始分配;

  • GreedyAllocator:返回满足存储 block 大小的第一层存储空间,是存储分配的示例类;

其中 BlockStoreLocation 定义存储 block 的 location 地址和分层信息,描述了三个存储维度:存储层别名、对应存储层目录地址,存储层介质信息。

1.6.2.1. createBlock

当存在可用空间(space)时,基于 block 分配算法创建临时 block;特别的:创建 block 不会触发其他 block 的销毁释放,通过 BlockMetadataAllocatorView 获取只读的 Block 元数据信息,为 Allocator 调度提供数据来源,Allocator 分配调度后返回 StorageDirView 对象并创建 TempBlockMeta 并通过 BlockMetadataManager 管理。存储分配后的元数据会基于 createBlockFile 方法持久化到 Block 元文件。


Allocator 接口定义 Alluxio 中数据管理的分配策略,接口方法:allocateBlockWithView,目前内部有三种实现类:

  • RoundRobinAllocator:基于 round-robin 轮询分配,默认从最高层开始分配,当最高层存储空间不足则会到下一层,该分配策略不支持指定存储具体的分层。

  • MaxFreeAllocator:分配到存储中最大剩余空间,当没有指定具体存储分层,默认从最高层开始分配;

  • GreedyAllocator:返回满足存储 block 大小的第一层存储空间,是存储分配的示例类;

其中 BlockStoreLocation 定义存储 block 的 location 地址和分层信息,描述了三个存储维度:存储层别名、对应存储层目录地址,存储层介质信息。

1.6.2.2.  freeSpace

同步方法执行 Block 缓存存储空间执行立刻删除释放,当所有存储分层的空间释放操作结束后才能支持新 Block 创建。根据 BlockMetadataEvictorView 获取 Block 存储中可移除的 Block 信息。判断当前缓存存储中是否满足最小连续空间和最小可用空间,若同时满足,则不进行后续空间清理操作;若不满足,则遍历 Block 信息,判断是否可清理,若可以清理,则删除对应的 Block 文件及元数据,通过 BlockStoreEventListener 事件监听器同步 Block 释放操作。


BlockStoreEventListener 监听 BlockStore 中元数据变化成功结束的触发事件,主要包括的接口方法类:

  • onAccessBlock:访问 Block 事件触发;

  • onAbortBlock:清理和释放临时 Block 事件触发;

  • onCommitBlock:提交临时 Block 并关联 Block 的存储信息 BlockStoreLocation 事件触发;

  • onMoveBlockByClient:基于 Client 移动 Block 的 BlockStoreLocation 事件触发;

  • onMoveBlockByWorker:基于 Worker 移动 Block 的 BlockStoreLocation 事件触发;

  • onRemoveBlockByClient:基于 Client 移除并释放 Block 的 BlockStoreLocation 事件触发;

  • onRemoveBlock:移除并释放 Block 事件触发;

  • onBlockLost:Block 丢失 事件触发;

  • onStorageLost:存储目录丢失 事件触发。


1.7. PlanDefinition

Alluxio 中内置轻量级角度系统的 Job 执行计划定义,有两个核心部分,1. PlanDefinition#selectExecutors:该方法在 Master 节点调用,用于选择执行任务的 AlluxioJobWorker,2.PlanDefinition#runTask:在 JobWorker 中运行指定作业计划。PlanDefinition 主要包括的作业定义实现有:

  • MoveDefinition:在 FileSystemMaster 校验层级上触发 Block 的移动操作;

  • ReplicateDefinition:在 FileSystemMaster 校验层级上触发 Block 的复制操作;

  • EvictDefinition:在 FileSystemMaster 校验层级上触发 Block 释放操作;

  • PersistDefinition:将 Alluxio Block 缓存存储持久化到底层 UFS;

  • CompactDefinition:在指定目录下降结构化表的数据文件进行压缩;

  • MigrateDefinition:Block 移动,源和目标 Block 可以挂载在不同的 UFS 节点;

  • LoadDefinition:实现简单的 Block 文件的 Load 操作。

1.7.1. TaskExecutorManager

管理 JobWorker Task 执行器,真正的执行任务通过线程池调用 TaskExecutor#run,而 TaskExecutor#run 底层通过 PlanDefinition#runTask 实现同时 TaskExecutorManager 内部也管理 Task 的执行容量和 Task 生命周期管理,如:获取执行的线程池,对任务执行限流/解除限流,任务启停。


二、Block 读写操作

2.1. 读操作

BlockWorker RPC 服务提供的客户端的读操作,大致流程如下:

  • BlockWorkerClientServiceHandler.readBlock 方法定义 Block 读取,默认创建请求参数 StreamObserver<ReadResponse> responseObserver 创建 CallStreamObserver;若支持零拷贝,则使用 DataMessageServerStreamObserver

  • 基于 CallStreamObserver 创建 BlockReadHandler,并调用 BlockReadHandler#onReady 开启数据读取,基于线程池提交创建 DataReader 线程执行;

  • DataReader 是 Alluxio 用于 I/O 数据读取的线程类,封装了核心的 Alluxio 读操作逻辑,(1).获取 Alluxio 数据输入流 DataBuffer;(2)调用 CallStreamObserver.onNext 触发和监听数据流读取;

  • DataReader 获取 DataBuffer 是整个读取处理的核心逻辑,判断数据读取来源:Local、UFS,是否进行 Block 移动实现短路读;

    创建打开 Block,若请求需要加速(promote=true)则操作 BlockWorker.moveBlock,将 Block 移动到存储更高层;

    调用 DefaultBlockWorker#createBlockReader 创建 BlockReader,判断本地 Worker 是否可以直接访问,若支持则返回 LocalFileBlockReader;若为 UFS 中,则调用 UnderFileSystemBlockReader

    调用 BlockReader.transferTo 读取数据,并将 I/O 封装为 NettyDataBuffer 返回。


2.1.1. UnderFileSystemBlockReader

UnderFileSystemBlockReader 类实现直接从 UFS 读取并将读取的信息缓存到读取的 Worker Block 中,大致流程如下:

  • UfsInputStreamCache.acquire 根据 ufs、路径、blockId 获取输入流 InputStream,若 InputStream 在缓存中直接获取,若不存在,则根据 ufs.openExistingFile 获取底层 UFS 的文件输入流 InputStream;

  • 获取并更新 BlockWriter,判断是否存在有对应 Block 存在,不存在则调用 BlockStore.createBlock 新建临时 Block,并返回对应 BlockWriter;

  • 根据第一步骤获取的输入流 InputStream 和参数 offset 读取文件,读取的数据:(1).通过 BlockWriter 写入 Block 缓存对应 Worker;(2).返回调用方读取信息。

备注

  • LocalFileBlockReader:基于 FileChannel.map 方法的 I/O 操作读取文本文件信息

  • RemoteBlockReader:基于远端的 Worker(非本地 Worker)读取,暂不支持;

  • DelegatingBlockReader 根据不同的使用场景,判断和选择使用的 BlockReader 实现类。

2.1.2. ShortCircuitBlockReadHandler

ShortCircuitBlockReadHandler 类是 RPC 服务实现提供短路读能力,首先 Grpc 的 StreamObserver(观察者模式),一次 onNext 调用说明一次消息读取,大致的执行流程:

  • 根据 OpenLocalBlockRequest 获取是否进行加速读取,若加速(promote=true)则调用 BlockWorker.moveBlock 将存储移动更高层存储分层;

  • 调用 BlockWorker.lockBlock 获取 Block 的读写操作锁,最后 BlockWorker.accessBlock 获取访问 Block。

2.2. 写操作

BlockWorker RPC 服务提供的客户端的写操作,大致流程如下:

  • BlockWorkerClientServiceHandler.writeBlock 方法定义 Block 写入,默认创建请求参数 StreamObserver<WriteResponse> responseObserver 创建 CallStreamObserver;若支持零拷贝,则使用  BlockWorkerClientServiceHandler;

  • 基于 CallStreamObserver 创建 DelegationWriteHandler,并调用 DelegationWriteHandler#onCancel 关闭数据写操作;调用 onNext 方法进行数据流监听写操作;

  • DelegationWriteHandler 根据请求 Command 类型获取对应的 AbstractWriteHandler 实现类:

    ALLUXIO_BLOCK:BlockWriteHandler,数据仅写入 Alluxio Block,基于 BlockWriter 实现写操作;

    UFS_FILE:UfsFileWriteHandler,数据仅写入 UFS,基于 UFS Client 创建目录文件并进行 I/O 操作;

    UFS_FALLBACK_BLOCK:UfsFallbackBlockWriteHandler,先基于 BlockWriteHandler 写入 Alluxio Block 再写入 UFS;


AbstractWriteHandler 抽象类关系如下:


2.2.1.  LocalFileBlockWriter

基于本地 Worker 写入 Block 文件信息,调用 FileChannel.map

2.2.2.  ShortCircuitBlockWriteHandler

ShortCircuitBlockWriterHandler 实现短路读的创建本地 Block 能力,基于 onNext 调用,大致执行流程:

  • 若仅申请空间资源,则基于 BlockWorker.requestSpace 获取 Block 创建的请求空间资源;

  • 若需创建临时 Block,则调用 BlockWorker.createBlock 创建 Block 并返回对应 Block 路径。

三、Catalog 管理

AlluxioCatalog 进行 Alluxio 中 Catalog 管理对象,封装和维护了 Alluxio 中注册的 DB 信息及各个 DB 下的 Table 等元数据信息,其基本的方法操作如下,包括:获取数据库 db 信息,db 元数据同步,db 绑定/解绑等操作。


  • attachDatabase:将绑定的 db 元数据信息维护在内存中并同步持久化到 Journal 中;

  • syncDatabase:会基于底层 udb 获取最新元数据 database 信息,如 Hive 则调用 HMS 客户端接口方法 IMetaStoreClient#getDatabase 获取数据库信息。

四、Client 操作


4.1. Client

Client 接口抽象定义 Alluxio 中 Client 操作,其继承和实现类如下所示,封装了对接各个组件的 RPC 接口

  • FileSystemMasterClient:封装 FileSystemMasterClientServiceHandler 相关 RPC 调用,进行元数据管理操作

  • BlockMasterClient:封装 BlockMasterClientServiceHandler 相关 RPC 调用,进行 Block 管理操作

  • TableMasterClient:封装 TableMasterClientServiceHandler 相关 RPC 调用,进行 Alluxio Table Catalog 管理操作

  • MetaMasterClient:封装 MetaMasterClientServiceHandler 相关 RPC 调用

  • MetaMasterConfigClient:

    封装 MetaMasterConfigurationServiceHandler 相关 RPC 调用

  • JobMasterClient:封装 JobMasterClientServiceHandler 相关 RPC 调用,进行 Alluxio Job 的调用操作;

4.1.1. FileSystem

Client 中定义的文件系统操作接口类,用于元数据管理和数据管理,用户可根据其实现类 BaseFileSystem 扩展 Client 文件操作行为。

FileSystem 中定义的接口方法主要包括以下几类:

  • checkAccess:检查指定路径权限;

  • createDirectory:基于 AlluxioURI 创建文件目录;

  • createFile:基于 AlluxioURI 创建数据文件;

  • delete:基于 AlluxioURI 删除指定文件/目录;

  • exists:基于 AlluxioURI 判断指定文件/目录是否存在;

  • free:基于 AlluxioURI 释放 Alluxio 空间,但不删除 UFS 数据文件了;

  • listStatus:列出 AlluxioURI 文件/目录信息;

  • mount/updateMount/unmount:挂载/更新/取消挂载指定 AlluxioURI 目录;

  • openFile:打开并读取 AlluxioURI 文件输入流;

  • persist:将 Alluxio 中缓存的数据异步持久化底层 UFS;

  • rename:Alluxio 文件重命名。

4.1.2. FileSystemContext

维护 Alluxio 基于 Client 进行文件系统操作的上下文信息,通常的,一个 Client JVM 进程会使用同个 FileSystem 连接 Alluxio,因此 Client 对象会在不同线程中共享。FileSystemContext 只有当用户需要个性化配置和认证时才被创建,线程共享的 Client 会针对 FileSystemContext 维护独立的线程空间,FileSystemContext 线程不共享(线程安全)会增加 Client 连接的资源使用,因此当用户停止 Alluxio 操作后,需要关闭 FileSystemContext 释放资源。

4.1.3. FileInStream/FileOutStream

Client 中定义基于 Alluxio 文件操作的输入/输出流,如下所示:

  • 输出流:AlluxioFileOutStream,Alluxio 输出流写入,底层操作 BlockOutStream

  • 输入流:AlluxioFileInStream:Alluxio 输入流读取,封装了本地/远端节点数据读取,或者直接基于底层 UFS;底层操作 BlockInStream,LocalCacheFileInStream,AlluxioHdfsInputStream


4.2. AbstractShell

Client 的功能可以通过 Shell 对外提供操作,AbstractShell 抽象类定义 Alluxio 中 Shell 命令操作,其继承子类包括:

  • FileSystemShell:Alluxio Shell 文件操作入口类;

  • FileSystemAdminShell:Alluxio 文件系统管理操作;

  • CollectInfo:Alluxio 中从所有 Woker 节点采集信息命令;

  • TableShell:Alluxio 表管理操作;

  • JobShell:Alluxio 执行 job 管理操作。


4.2.1. CatCommand

以 CatCommand 为例,简述 Alluxio Client 进行文件读取的大致流程如:

  • FileSystemShell 接收 shell 命令,执行"cat"打开文件操作,调用 CatCommand.run 命令,shell 命令支持正则和多目录,对每个指定目录执行自定义实现的 runPlainPath 操作;

  • CatCommand#runPlainPath 方法通过 getStatus 判断文件类型,若为目录则退出,若为文件则基于 FileSystem 打开文件获取客户端输入流对象 FileInStream(AlluxioFileInStream);

  • 基于 AlluxioFileInStream#read 读取文件内容,URIStatus 维护 Alluxio 中目录和文件元数据快照信息,基于 URIStatus 获取指定 Alluxio 文件对应 Block 信息,通过 Client AlluxioBlockStore 中维护的 Block 信息获取 BlockInStream(Block 输入流);

  • 基于 BlockInStream 调用输入流读取操作,底层基于 Block 的数据读取接口 DataReader 实现,基于 DataReader 读取 Block 详情下述的 Block 读操作。


4.2.2. TouchCommand

以 TouchCommand 为例,简述 Alluxio Client 进行文件写入的大致流程如:

  • FileSystemShell 接收 shell 命令,执行"touch"打开文件操作,调用 TouchCommand.run 命令,shell 命令支持正则和多目录,对每个指定目录执行自定义实现的 runPlainPath 操作;

  • TouchCommand#runPlainPath 方法调用 FileSystem.createFile 创建文件并在结束后关闭该连接;

  • FileSystem.createFile 的方法详解如下:

    基于 FileSystemMasterClient 获取 FileSystemMasterClientServiceHandler 远程的 RPC 连接信息;

    基于 FileSystemMasterClient 调用 RPC 接口创建数据文件(createFile),将新建 Alluxio 文件元数据信息同步 Alluxio Master;

    FileSystem 新建 Client 端的 Alluxio 文件输出流对象:AlluxioFileOutStream,其底层调用 Block 的 DataWriter 对象进行文件处理;

    输出流完成后,执行 AlluxioFileOutStream#close 方法,调用 FileSystemMasterClient#completeFile 判断是否已执行完成,最终基于 RPC 接口实现 completeFile;


五、轻量级调度

Alluxio 内部基于 AlluxioJobMaster 和 AlluxioJobWoker 实现轻量级内置的 Alluxio 操作调度,Master 负责作业的调度管理,而 Worker 真正执行作业操作。


5.1. 调度管理

由前文 AlluxioJobMaster 启动流程可知,AlluxioJobMaster 在启动时会触发 JobMaster Server 启动,JobMaster 内部维护执行计划(plan)的管理追踪器:PlanTracker,用于创建、移除、访问任务作业集合,每个作业都有对应的 PlanCoordinator 用于分布式作业执行协调。外部服务可通过 HTTP 和 RPC 方式调用 JobMaster.run 方法根据作业配置(JobConfig)启动并进行作业调度(同步/线程安全的)。JobConfig 定义作业配置接口,分为两类:PlanConfig(单作业执行)、WorkflowConfig(一组作业流执行)。

JobMaster 中作业调度管理的大致流程如下:

  • 外部接口可调用 JobMaster.run 方法触发作业执行,以 Plan 作业类型为例,调用 PlanTracker 执行 run 方法;

  • PlanTracker 先校验并移除已完成的作业,并基于 PlanCoordinator 创建新的作业实例并启动该作业实例;

  • PlanCoordinator 作业启动流程:

    基于 JobConfig 获取对应的 PlanDefinition;

    根据可用的 Worker 列表和 PlanDefinition,调用 selectExecutors 方法获取待执行作业 Worker 列表;

    调用 CommandManager 提交作业,将作业及待执行作业 worker 列表信息维护在内存队列中;

  • 最后,Job Master 和 Job Worker 节点通过 RPC 心跳检测,下发具体的作业信息给 Worker 执行。


5.2. 作业执行

由前文 AlluxioJobWorker 启动流程可知,AlluxioJobWorker 启动时会触发心跳检测线程 CommandHandlingExecutor,对接收到的作业执行调度处理,每个作业启动一个线程执行,作业执行大致流程如下:

  • CommandHandlingExecutor 线程启动与 JobMaster 进行心跳检测,基于 JobMasterClient.heartbeat 方法获取所有的待执行作业列表;

  • 遍历待执行作业列表,从线程池调用 CommandHandler.run 线程类执行作业调度,包括的作业类型:启动、取消、注册作业;

  • CommandHandler 启动作业会调用 TaskExecutorManager 执行作业,以 Future 执行 TaskExecutor 进行线程级别作业调度;

  • TaskExecutor 真正执行作业调度:

    对应作业参数进行反序列化操作;

    根据 PlanDefinitionRegistry 获取执行 Job 的 PlanDefinition 并调动 runTask 执行作业;

PersistDefinition 为例,大致说明 Job Executor 操作,将 Alluxio Block 存储持久化到底层 UFS:

  • 获取 Alluxio 的数据存储 URI,读取对应的数据输入流 in;

  • 获取指定的 UFS 目标路径,根据 UfsClient 判断该路径是否存在,若不存在则创建,并基于 UnderFileSystem 创建输出流 out;

  • 根据 I/O 操作工具类,将数据从数据流拷贝输出流,持久化到 UFS。


用户头像

腾源会

关注

Believe in Open Source 2021.08.04 加入

腾源会是腾讯云成立的汇聚开源项目、开源爱好者、开源领导者的开放社区,致力于帮助开源项目健康成长、开源爱好者能交流协助、开源领导者能发挥领袖价值,让全球开源生态变得更加繁荣。

评论

发布
暂无评论
你不知道的开源分布式存储系统 Alluxio 源码完整解析(下篇)