1、元数据管理
1.1、总体介绍
1、StarRocks 基于 EditLog + Checkpoint 的方式来实现元数据持久化;
2、Catalog 类中持有所有元数据信息,也就是说所有元数据都是在内存中存放的,元数据的持久化本质就是把 Catalog 对象中的持有的各种元数据对象进行持久化(如:Cluster、Database、Job 等);
3、Catalog 类封装了元数据管理的工作,且只有 Master 节点才能完成元数据的更新操作( 非 Master 节点会把元数据更新请求转发给 Master 节点执行,见 MasterOpExecutor ),Master 会通过 EditLog 把变更日志写入到 bdbje 中进行持久化;
4、借助 bdbje 的 replication 机制,会自动将这些日志同步到非 Master 节点,即这些变更日志会相应的同步到 Follower 节点的 bdbje 数据库文件中;
5、非 Master 节点的 Catalog 类会启动一个 Replayer 线程来监听 bdbje 的数据库内容的变更。通过获取当前 bdbje 中最大的 journalId(具体是按 database name 升序排序,取最后一个 database name + database.count - 1)来判断是否有新的日志,如果有新日志,则读取新日志并回放到内存中,也就是 Catalog 对象中,最终实现元数据的近实时同步;
6、由于元数据的每个更新操作,都会对应一条日志(日志 id 是自增的,也就是 JournalId ),如果日志一直增长却没有清除机制,那么 bdbje 中保存的数据量将无限膨胀,为此引入了 Checkpoint 概念,最终由 Checkpoint 和 EditLog 通过 JournalId 来配合实现元数据持久化;
7、bdbje database 的拆分逻辑:前面提到 FE master 会通过 EditLog 把每次变更日志记录到 bdbje 中,key 是 journalId,value 是具体的操作内容。Editlog 会将变更日志存储到 bdbje 的多个 database 中,默认是 5 万条 log 划分一个 database,database 的名字是这个 database 中最小的 journalId,比如 id 为 50001 这条数据会在名字为 50000 的这个 database 中;
8、FE master 会定期生成 Checkpoint ,即生成 image 文件(元数据全量的快照文件),本质是创建一个新的 Catalog 对象,加载最新的 image,并回放 editlog 数据,填充这个 catalog 中的元数据信息,然后再调用 Catalog 的 saveImage
方法将所有元数据信息序列化到 image.${JournalId} 文件中;
9、Master 每次生成完 image 文件,会通知其他 FE 节点拉取最新的 image 文件到本地;
10、Checkpoint 成功后会通过 EditLog 删除已经持久化到 image 中的 bdbje database,这样可以确保 bdbje 中仅保留少量的数据。
以下两个图例说明了 EditLog 和 Checkpoint 的具体工作流程:
1.2、EditLog 的写入及同步流程
1.3、Checkpoint 的工作流程
1.4、FE 如何恢复元数据
FE 启动时,是如何加载元数据的呢,假设此时发现本地 image.50001,那么需要:
1、将 image.50001 这个文件反序列化,加载到内存中;
2、再把大于 50001 的所有 editlog 回放到内存,具体是根据前面说的划分 database 的规则,找到对应的 database ,然后将 database 中所有 journalId 大于 50001 的日志读取出来并回放到内存。
2、bdbje 的作用
基于前面的结论总结一下,StarRocks 依赖了 bdbje 来完成以下工作:
1、Catalog 中各种元数据的变更日志的持久化;
2、借助 bdbje 的 Replication 机制和 Replayer 线程(回放元数据变更日志),实现元数据在多个 FE 节点上的近实时同步;
3、负责实现 FE 节点的高可用,包括 Master 节点的选举及角色变更事件的监听。
2.1、访问 bdbje 的相关类
BDBEnvironment:封装基础的配置定义及 open、remove database 的方法,并持有所有已经 open 的 database;
BDBJEJournal:这个类封装了针对 bdbje 的操作,实际持久化会回放数据是通过 EditLog 和调用 BDBJEJournal 来实现的;
BDBJournalCursor:这个类封装了迭代访问指定 JournalId 区间的数据,主要用在变更日志回放场景,负责读取变更日志( EditLog 会将变更日志回放到内存中);
EditLog:负责将各种元数据变更日志持久化到 bdbje 中(存储到本地的文件中);负责将单条变更日志回放到内存;
BDBHA:实现了 HAProtocol 接口,用户获取当前集群中的节点信息及角色信息,主要用来支持 FE 节点信息的展示,被 ShowMetaInfoAction、HaAction 这两个 Action 使用;
说明:以上几个类都是 StarRocks 代码,非 bdbje 代码。
2.2、BDBEnvironment
BDBEnvironment 的 setup
方法主要用来构建 bdbje 的 ReplicatedEnvironment 实例;
同时 BDBEnvironment 还封装了 openDatabase 和 removeDatabase 方法,并持有已经 open 的 bdbje database 实例。
public void setup(File envHome, String selfNodeName,
String selfNodeHostPort,String helperHostPort, boolean isElectable) {
this.closing = false;
// Almost never used, just in case the master can not restart
if (Config.metadata_failure_recovery.equals("true")) {
if (!isElectable) {
LOG.error("Current node is not in the electable_nodes list. will exit");
System.exit(-1);
}
DbResetRepGroup resetUtility = new DbResetRepGroup(envHome, STARROCKS_JOURNAL_GROUP, selfNodeName,
selfNodeHostPort);
resetUtility.reset();
LOG.info("group has been reset.");
}
// set replication config
replicationConfig = new ReplicationConfig();
replicationConfig.setNodeName(selfNodeName);
replicationConfig.setNodeHostPort(selfNodeHostPort);
// 在启动FE时会把已经成功启动的FE节点作为helper节点,通过helper节点来发现当前集群的master节点
replicationConfig.setHelperHosts(helperHostPort);
replicationConfig.setGroupName(STARROCKS_JOURNAL_GROUP);
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10");
replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS);
replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
String.valueOf(Config.txn_rollback_limit));
replicationConfig
.setConfigParam(ReplicationConfig.REPLICA_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");
replicationConfig
.setConfigParam(ReplicationConfig.FEEDER_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");
if (isElectable) {
replicationConfig.setReplicaAckTimeout(Config.bdbje_replica_ack_timeout_second, TimeUnit.SECONDS);
replicationConfig.setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT, "0");
replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
} else {
replicationConfig.setNodeType(NodeType.SECONDARY);
replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy());
}
// set environment config
environmentConfig = new EnvironmentConfig();
environmentConfig.setTransactional(true);
environmentConfig.setAllowCreate(true);
environmentConfig.setCachePercent(MEMORY_CACHE_PERCENT);
environmentConfig.setLockTimeout(Config.bdbje_lock_timeout_second, TimeUnit.SECONDS);
if (isElectable) {
Durability durability = new Durability(getSyncPolicy(Config.master_sync_policy),
getSyncPolicy(Config.replica_sync_policy), getAckPolicy(Config.replica_ack_policy));
environmentConfig.setDurability(durability);
}
// set database config
dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
if (isElectable) {
dbConfig.setAllowCreate(true);
dbConfig.setReadOnly(false);
} else {
dbConfig.setAllowCreate(false);
dbConfig.setReadOnly(true);
}
// open environment and epochDB
for (int i = 0; i < RETRY_TIME; i++) {
try {
// open the environment
replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);
// get replicationGroupAdmin object.
Set<InetSocketAddress> adminNodes = new HashSet<InetSocketAddress>();
// 1. add helper node
InetSocketAddress helper = new InetSocketAddress(helperHostPort.split(":")[0],
Integer.parseInt(helperHostPort.split(":")[1]));
adminNodes.add(helper);
LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort);
// 2. add self if is electable
if (!selfNodeHostPort.equals(helperHostPort) && Catalog.getCurrentCatalog().isElectable()) {
InetSocketAddress self = new InetSocketAddress(selfNodeHostPort.split(":")[0],
Integer.parseInt(selfNodeHostPort.split(":")[1]));
adminNodes.add(self);
LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort);
}
replicationGroupAdmin = new ReplicationGroupAdmin(STARROCKS_JOURNAL_GROUP, adminNodes);
// get a BDBHA object and pass the reference to Catalog
HAProtocol protocol = new BDBHA(this, selfNodeName);
Catalog.getCurrentCatalog().setHaProtocol(protocol);
// start state change listener
StateChangeListener listener = new BDBStateChangeListener();
replicatedEnvironment.setStateChangeListener(listener);
// open epochDB. the first parameter null means auto-commit
epochDB = new CloseSafeDatabase(replicatedEnvironment.openDatabase(null, "epochDB", dbConfig));
break;
} catch (InsufficientLogException insufficientLogEx) {
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false); // delete obsolete log files.
// Use the members returned by insufficientLogEx.getLogProviders()
// to select the desired subset of members and pass the resulting
// list as the argument to config.setLogProviders(), if the
// default selection of providers is not suitable.
restore.execute(insufficientLogEx, config);
} catch (DatabaseException e) {
if (i < RETRY_TIME - 1) {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
LOG.error("error to open replicated environment. will exit.", e);
System.exit(-1);
}
}
}
}
复制代码
2.3、BDBJEJournal
通过查看 Journal 接口的定义,可以看出 BDBJEJournal 主要负责 bdbje 的数据读写操作以及动态创建 database(rollJournal)及清理 database(deleteJournals)工作:
public interface Journal {
// Open the journal environment
public void open();
// Roll Edit file or database
public void rollJournal();
// Get the newest journal id
public long getMaxJournalId();
// Get the oldest journal id
public long getMinJournalId();
// Close the environment
public void close();
// Get the journal which id = journalId
public JournalEntity read(long journalId);
// Get all the journals whose id: fromKey <= id <= toKey
// toKey = -1 means toKey = Long.Max_Value
public JournalCursor read(long fromKey, long toKey);
// Write a journal and sync to disk
public void write(short op, Writable writable);
// Delete journals whose max id is less than deleteToJournalId
public void deleteJournals(long deleteJournalToId);
// Current db's min journal id - 1
public long getFinalizedJournalId();
// Get all the dbs' name
public List<Long> getDatabaseNames();
}
复制代码
值得注意的是 nextJournalId 变量用来控制 journalId 的自增(实际是在 write 方法中进行自增的);
在日志回放场景,当需要读取多条变更日志时,read 方法实际返回 JournalCursor 对象,JournalCursor.next()
方法可以获取具体的变更日志。
2.4、BDBJournalCursor
用于读取连续的多条变更日志,支持日志回放场景。
// This class is like JDBC ResultSet.
public interface JournalCursor {
// Return the next journal. return null when there is no more journals
public JournalEntity next();
public void close();
}
复制代码
OperationType 类定义了所有写入 editlog 的元数据类型;每一种变更日志的实现类都会实现 Writable 接口实现具体的序列化逻辑;同时提供 read(DataInput in)
方法,用于反序列化变更日志;
以 OperationType.OP_DROP_PARTITION
操作为例,对应的 DropPartitionInfo 类中定义了 db、table、partition 等必要的信息。
public class DropPartitionInfo implements Writable {
@SerializedName(value = "dbId")
private Long dbId;
@SerializedName(value = "tableId")
private Long tableId;
@SerializedName(value = "partitionName")
private String partitionName;
@SerializedName(value = "isTempPartition")
private boolean isTempPartition = false;
@SerializedName(value = "forceDrop")
private boolean forceDrop = false;
private void readFields(DataInput in) throws IOException {
dbId = in.readLong();
tableId = in.readLong();
partitionName = Text.readString(in);
}
public static DropPartitionInfo read(DataInput in) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_74) {
DropPartitionInfo info = new DropPartitionInfo();
info.readFields(in);
return info;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DropPartitionInfo.class);
}
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}
复制代码
JournalEntity 类的 readFields
方法定义了根据 opType 反序列化元数据变更日志的具体逻辑:
public class JournalEntity implements Writable {
public static final Logger LOG = LogManager.getLogger(Checkpoint.class);
private short opCode;
private Writable data;
@Override
public void write(DataOutput out) throws IOException {
out.writeShort(opCode);
data.write(out);
}
public void readFields(DataInput in) throws IOException {
opCode = in.readShort();
// set it to true after the entity is truly read,
// to avoid someone forget to call read method.
boolean isRead = false;
switch (opCode) {
...
case OperationType.OP_DROP_PARTITION: {
data = DropPartitionInfo.read(in);
isRead = true;
break;
}
...
}
}
}
复制代码
2.5、EditLog
EditLog 封装了各种元数据变更日志的持久化操作,最终通过 logEdit
方法在向 bdbje 写入变更日志;
当写入条数超过 edit_log_roll_num
(默认 50000)时会调用journal.rollJournal()
进行 bdbje database 的 rolling 操作,通过这种方式将变更日志写入到多个 bdbje 的多个 database 中;
private synchronized void logEdit(short op, Writable writable) {
long start = System.currentTimeMillis();
try {
journal.write(op, writable);
} catch (Exception e) {
LOG.error("Fatal Error : write stream Exception", e);
System.exit(-1);
}
// get a new transactionId
txId++;
// update statistics
long end = System.currentTimeMillis();
numTransactions++;
totalTimeTransactions += (end - start);
if (txId >= Config.edit_log_roll_num) {
LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.",
txId, Config.edit_log_roll_num);
rollEditLog();
txId = 0;
}
}
复制代码
同时 Editlog 封装了单条日志的回放逻辑,同样以 OperationType.OP_DROP_PARTITION
为例,看一下如何将变更日志回放到内存;
Catalog 中定义的 Replayer 线程会不断通过 JournalCursor 对象解析最新的 JournalEntity(JournalEntity 中持有具体的变更日志),每个 JournalEntity 都会调用一次 EditLog 的loadJournal
方法来实现回放:
// 将一条元数据变更日志,回放到内存中(catalog中)
public static void loadJournal(Catalog catalog, JournalEntity journal) {
short opCode = journal.getOpCode();
if (opCode != OperationType.OP_SAVE_NEXTID && opCode != OperationType.OP_TIMESTAMP) {
LOG.debug("replay journal op code: {}", opCode);
}
try {
switch (opCode) {
...
case OperationType.OP_DROP_PARTITION: {
DropPartitionInfo info = (DropPartitionInfo) journal.getData();
LOG.info("Begin to unprotect drop partition. db = " + info.getDbId()
+ " table = " + info.getTableId()
+ " partitionName = " + info.getPartitionName());
catalog.replayDropPartition(info);
break;
...
}
复制代码
Catalog 中实际的回放逻辑如下:
public void replayDropPartition(DropPartitionInfo info) {
Database db = this.getDb(info.getDbId());
db.writeLock();
try {
OlapTable olapTable = (OlapTable) db.getTable(info.getTableId());
if (info.isTempPartition()) {
olapTable.dropTempPartition(info.getPartitionName(), true);
} else {
olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());
}
} finally {
db.writeUnlock();
}
}
复制代码
3、高可用的实现
3.1、StateChangeListener
前面提到过在创建 bdbje 的 ReplicatedEnvironment 实例时,会注册一个 StateChangeListener,用于监听当前节点的状态变更事件:
public class BDBStateChangeListener implements StateChangeListener {
public static final Logger LOG = LogManager.getLogger(EditLog.class);
public BDBStateChangeListener() {
}
@Override
public synchronized void stateChange(StateChangeEvent sce) throws RuntimeException {
FrontendNodeType newType = null;
switch (sce.getState()) {
case MASTER: {
newType = FrontendNodeType.MASTER;
break;
}
case REPLICA: {
// bdbje 的Replica角色会对应FE 的Follower和Observer两种角色
if (Catalog.getCurrentCatalog().isElectable()) {
newType = FrontendNodeType.FOLLOWER;
} else {
newType = FrontendNodeType.OBSERVER;
}
break;
}
case UNKNOWN: {
newType = FrontendNodeType.UNKNOWN;
break;
}
default: {
String msg = "this node is " + sce.getState().name();
LOG.warn(msg);
Util.stdoutWithTime(msg);
return;
}
}
Preconditions.checkNotNull(newType);
Catalog.getCurrentCatalog().notifyNewFETypeTransfer(newType);
}
}
复制代码
获取到最新的角色后,会将最新的角色信息写入 Catalog 的 typeTransferQueue
中:
public void notifyNewFETypeTransfer(FrontendNodeType newType) {
try {
String msg = "notify new FE type transfer: " + newType;
LOG.warn(msg);
Util.stdoutWithTime(msg);
this.typeTransferQueue.put(newType);
} catch (InterruptedException e) {
LOG.error("failed to put new FE type: {}", newType, e);
}
}
复制代码
Catalog 的 initialize 方法中会启动一个 listener 线程,用于消费 typeTransferQueue
中的数据:
public void initialize(String[] args) throws Exception {
...
// 6. start state listener thread
createStateListener();
listener.start();
}
public void createStateListener() {
listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) {
@Override
protected synchronized void runOneCycle() {
while (true) {
FrontendNodeType newType = null;
try {
newType = typeTransferQueue.take();
} catch (InterruptedException e) {
LOG.error("got exception when take FE type from queue", e);
Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage());
System.exit(-1);
}
Preconditions.checkNotNull(newType);
LOG.info("begin to transfer FE type from {} to {}", feType, newType);
if (feType == newType) {
return;
}
/*
* INIT -> MASTER: transferToMaster
* INIT -> FOLLOWER/OBSERVER: transferToNonMaster
* UNKNOWN -> MASTER: transferToMaster
* UNKNOWN -> FOLLOWER/OBSERVER: transferToNonMaster
* FOLLOWER -> MASTER: transferToMaster
* FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false
*/
switch (feType) {
case INIT: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
case UNKNOWN:
break;
default:
break;
}
break;
}
case UNKNOWN: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case FOLLOWER: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case OBSERVER: {
switch (newType) {
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case MASTER: {
// exit if master changed to any other type
String msg = "transfer FE type from MASTER to " + newType.name() + ". exit";
LOG.error(msg);
Util.stdoutWithTime(msg);
System.exit(-1);
}
default:
break;
} // end switch formerFeType
feType = newType;
LOG.info("finished to transfer FE type to {}", feType);
}
} // end runOneCycle
};
listener.setMetaContext(metaContext);
}
复制代码
从以上代码可以看出,角色转换主要分两种,分别对应 transferToMaster
和 transferToNonMaster
这两个方法。
3.2、transferToMaster()
1、stop replayer 线程;
2、open editlog;
3、replayJournal
,rollEditLog
;
4、startMasterOnlyDaemonThreads()
,启动只在 master 节点运行的线程。其中会启动一个 timePrinter 线程,每 10 秒会把当前的时间戳写入到 editlog 中,这个时间戳会同步到其他非 Master FE 节点,并记录到synchronizedTimeMs
属性中,非 Master 节点会在 Replayer 线程中检查这个时间戳,如果超过Config.meta_delay_toleration_second
(默认 5 分钟)没有更新,那么会将当前 FE 节点的 Catalog 设置为不可读状态(FE 元数据不可读时会将接收到的请求转发给 Master FE),避免 FE 节点提供过期的元数据;
5、startNonMasterDaemonThreads()
,启动在所有 FE 节点运行的线程。
private void transferToMaster() {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
}
replayer = null;
}
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
editLog.open();
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
checkCurrentNodeExist();
editLog.rollEditLog();
// Log meta_version
int communityMetaVersion = MetaContext.get().getMetaVersion();
int starrocksMetaVersion = MetaContext.get().getStarRocksMetaVersion();
if (communityMetaVersion < FeConstants.meta_version ||
starrocksMetaVersion < FeConstants.starrocks_meta_version) {
editLog.logMetaVersion(new MetaVersion(FeConstants.meta_version, FeConstants.starrocks_meta_version));
MetaContext.get().setMetaVersion(FeConstants.meta_version);
MetaContext.get().setStarRocksMetaVersion(FeConstants.starrocks_meta_version);
}
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
}
if (!isDefaultClusterCreated) {
initDefaultCluster();
}
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
this.masterIp = FrontendOptions.getLocalHostAddress();
this.masterRpcPort = Config.rpc_port;
this.masterHttpPort = Config.http_port;
MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort);
editLog.logMasterInfo(info);
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
MetricRepo.init();
canRead.set(true);
isReady.set(true);
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
}
复制代码
3.3、transferToNonMaster()
转换成非 Master 角色时会判断是否有 replayer 线程,如果没有那么会启动 replayer 线程。
private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// add helper sockets
if (Config.edit_log_type.equalsIgnoreCase("BDB")) {
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort());
}
}
}
if (replayer == null) {
createReplayer();
replayer.start();
}
startNonMasterDaemonThreads();
MetricRepo.init();
}
复制代码
4、回顾
1、每个 FE 节点都可以更新元数据吗?
2、元数据是如何持久化的?
3、元数据在多个 FE 节点中是如何同步的?
4、FE 启动时指定的 --helper 参数是用来做什么的?
5、FE 是如何实现高可用的?
6、每个 FE 节点的元数据都是可读的吗?
5、参考文章
https://docs.oracle.com/cd/E17277_02/html/ReplicationGuide/BerkeleyDB-JE-Replication.pdf
http://doris.incubator.apache.org/branch-0.11/zh-CN/internal/metadata-design.html#%E5%AE%9E%E7%8E%B0%E7%BB%86%E8%8A%82
https://www.bilibili.com/video/BV15i4y1Z7AL/?spm_id_from=333.788.videocard.18
评论