写点什么

StarRocks 元数据管理及 FE 高可用机制

作者:邸星星
  • 2022 年 2 月 23 日
  • 本文字数:14556 字

    阅读完需:约 48 分钟

1、元数据管理


1.1、总体介绍

1、StarRocks 基于 EditLog + Checkpoint 的方式来实现元数据持久化;


2、Catalog 类中持有所有元数据信息,也就是说所有元数据都是在内存中存放的,元数据的持久化本质就是把 Catalog 对象中的持有的各种元数据对象进行持久化(如:Cluster、Database、Job 等);


3、Catalog 类封装了元数据管理的工作,且只有 Master 节点才能完成元数据的更新操作( 非 Master 节点会把元数据更新请求转发给 Master 节点执行,见 MasterOpExecutor ),Master 会通过 EditLog 把变更日志写入到 bdbje 中进行持久化;


4、借助 bdbje 的 replication 机制,会自动将这些日志同步到非 Master 节点,即这些变更日志会相应的同步到 Follower 节点的 bdbje 数据库文件中;


5、非 Master 节点的 Catalog 类会启动一个 Replayer 线程来监听 bdbje 的数据库内容的变更。通过获取当前 bdbje 中最大的 journalId(具体是按 database name 升序排序,取最后一个 database name + database.count - 1)来判断是否有新的日志,如果有新日志,则读取新日志并回放到内存中,也就是 Catalog 对象中,最终实现元数据的近实时同步;


6、由于元数据的每个更新操作,都会对应一条日志(日志 id 是自增的,也就是 JournalId ),如果日志一直增长却没有清除机制,那么 bdbje 中保存的数据量将无限膨胀,为此引入了 Checkpoint 概念,最终由 Checkpoint 和 EditLog 通过 JournalId 来配合实现元数据持久化;


7、bdbje database 的拆分逻辑:前面提到 FE master 会通过 EditLog 把每次变更日志记录到 bdbje 中,key 是 journalId,value 是具体的操作内容。Editlog 会将变更日志存储到 bdbje 的多个 database 中,默认是 5 万条 log 划分一个 database,database 的名字是这个 database 中最小的 journalId,比如 id 为 50001 这条数据会在名字为 50000 的这个 database 中;


8、FE master 会定期生成 Checkpoint ,即生成 image 文件(元数据全量的快照文件),本质是创建一个新的 Catalog 对象,加载最新的 image,并回放 editlog 数据,填充这个 catalog 中的元数据信息,然后再调用 Catalog 的 saveImage 方法将所有元数据信息序列化到 image.${JournalId} 文件中;


9、Master 每次生成完 image 文件,会通知其他 FE 节点拉取最新的 image 文件到本地;


10、Checkpoint 成功后会通过 EditLog 删除已经持久化到 image 中的 bdbje database,这样可以确保 bdbje 中仅保留少量的数据。


以下两个图例说明了 EditLog 和 Checkpoint 的具体工作流程:

1.2、EditLog 的写入及同步流程



1.3、Checkpoint 的工作流程



1.4、FE 如何恢复元数据

FE 启动时,是如何加载元数据的呢,假设此时发现本地 image.50001,那么需要:

1、将 image.50001 这个文件反序列化,加载到内存中;

2、再把大于 50001 的所有 editlog 回放到内存,具体是根据前面说的划分 database 的规则,找到对应的 database ,然后将 database 中所有 journalId 大于 50001 的日志读取出来并回放到内存。


2、bdbje 的作用

基于前面的结论总结一下,StarRocks 依赖了 bdbje 来完成以下工作:

1、Catalog 中各种元数据的变更日志的持久化;

2、借助 bdbje 的 Replication 机制和 Replayer 线程(回放元数据变更日志),实现元数据在多个 FE 节点上的近实时同步;

3、负责实现 FE 节点的高可用,包括 Master 节点的选举及角色变更事件的监听。

2.1、访问 bdbje 的相关类

BDBEnvironment:封装基础的配置定义及 open、remove database 的方法,并持有所有已经 open 的 database;

BDBJEJournal:这个类封装了针对 bdbje 的操作,实际持久化会回放数据是通过 EditLog 和调用 BDBJEJournal 来实现的;

BDBJournalCursor:这个类封装了迭代访问指定 JournalId 区间的数据,主要用在变更日志回放场景,负责读取变更日志( EditLog 会将变更日志回放到内存中);

EditLog:负责将各种元数据变更日志持久化到 bdbje 中(存储到本地的文件中);负责将单条变更日志回放到内存;

BDBHA:实现了 HAProtocol 接口,用户获取当前集群中的节点信息及角色信息,主要用来支持 FE 节点信息的展示,被 ShowMetaInfoAction、HaAction 这两个 Action 使用;

说明:以上几个类都是 StarRocks 代码,非 bdbje 代码。


2.2、BDBEnvironment


BDBEnvironment 的 setup 方法主要用来构建 bdbje 的 ReplicatedEnvironment 实例;

  • 我们可以看到 FE 启动时指定的 --helper "fe_master_host:edit_log_port" 配置,实际上是用于构造 bdbje replicationConfig,在 bdbje Replication 机制中,helper hosts 用于发现集群中 Master 节点;

  • 同时注册 BDBStateChangeListener 实例,用于监听节点角色变更事件;

  • 实例化 ReplicationGroupAdmin 及 BDBHA 对象,用于快速获取 FE 节点信息;

  • 更多关于 bdbje Replication 机制的介绍可以参考:https://docs.oracle.com/cd/E17277_02/html/ReplicationGuide/BerkeleyDB-JE-Replication.pdf


同时 BDBEnvironment 还封装了 openDatabase 和 removeDatabase 方法,并持有已经 open 的 bdbje database 实例。

public void setup(File envHome, String selfNodeName,                   String selfNodeHostPort,String helperHostPort, boolean isElectable) {    this.closing = false;
// Almost never used, just in case the master can not restart if (Config.metadata_failure_recovery.equals("true")) { if (!isElectable) { LOG.error("Current node is not in the electable_nodes list. will exit"); System.exit(-1); } DbResetRepGroup resetUtility = new DbResetRepGroup(envHome, STARROCKS_JOURNAL_GROUP, selfNodeName, selfNodeHostPort); resetUtility.reset(); LOG.info("group has been reset."); }
// set replication config replicationConfig = new ReplicationConfig(); replicationConfig.setNodeName(selfNodeName); replicationConfig.setNodeHostPort(selfNodeHostPort); // 在启动FE时会把已经成功启动的FE节点作为helper节点,通过helper节点来发现当前集群的master节点 replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setGroupName(STARROCKS_JOURNAL_GROUP); replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10"); replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS); replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT, String.valueOf(Config.txn_rollback_limit)); replicationConfig .setConfigParam(ReplicationConfig.REPLICA_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s"); replicationConfig .setConfigParam(ReplicationConfig.FEEDER_TIMEOUT, Config.bdbje_heartbeat_timeout_second + " s");
if (isElectable) { replicationConfig.setReplicaAckTimeout(Config.bdbje_replica_ack_timeout_second, TimeUnit.SECONDS); replicationConfig.setConfigParam(ReplicationConfig.REPLICA_MAX_GROUP_COMMIT, "0"); replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy()); } else { replicationConfig.setNodeType(NodeType.SECONDARY); replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy()); }
// set environment config environmentConfig = new EnvironmentConfig(); environmentConfig.setTransactional(true); environmentConfig.setAllowCreate(true); environmentConfig.setCachePercent(MEMORY_CACHE_PERCENT); environmentConfig.setLockTimeout(Config.bdbje_lock_timeout_second, TimeUnit.SECONDS); if (isElectable) { Durability durability = new Durability(getSyncPolicy(Config.master_sync_policy), getSyncPolicy(Config.replica_sync_policy), getAckPolicy(Config.replica_ack_policy)); environmentConfig.setDurability(durability); }
// set database config dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); if (isElectable) { dbConfig.setAllowCreate(true); dbConfig.setReadOnly(false); } else { dbConfig.setAllowCreate(false); dbConfig.setReadOnly(true); }
// open environment and epochDB for (int i = 0; i < RETRY_TIME; i++) { try { // open the environment replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);
// get replicationGroupAdmin object. Set<InetSocketAddress> adminNodes = new HashSet<InetSocketAddress>(); // 1. add helper node InetSocketAddress helper = new InetSocketAddress(helperHostPort.split(":")[0], Integer.parseInt(helperHostPort.split(":")[1])); adminNodes.add(helper); LOG.info("add helper[{}] as ReplicationGroupAdmin", helperHostPort); // 2. add self if is electable if (!selfNodeHostPort.equals(helperHostPort) && Catalog.getCurrentCatalog().isElectable()) { InetSocketAddress self = new InetSocketAddress(selfNodeHostPort.split(":")[0], Integer.parseInt(selfNodeHostPort.split(":")[1])); adminNodes.add(self); LOG.info("add self[{}] as ReplicationGroupAdmin", selfNodeHostPort); }
replicationGroupAdmin = new ReplicationGroupAdmin(STARROCKS_JOURNAL_GROUP, adminNodes);
// get a BDBHA object and pass the reference to Catalog HAProtocol protocol = new BDBHA(this, selfNodeName); Catalog.getCurrentCatalog().setHaProtocol(protocol);
// start state change listener StateChangeListener listener = new BDBStateChangeListener(); replicatedEnvironment.setStateChangeListener(listener);
// open epochDB. the first parameter null means auto-commit epochDB = new CloseSafeDatabase(replicatedEnvironment.openDatabase(null, "epochDB", dbConfig)); break; } catch (InsufficientLogException insufficientLogEx) { NetworkRestore restore = new NetworkRestore(); NetworkRestoreConfig config = new NetworkRestoreConfig(); config.setRetainLogFiles(false); // delete obsolete log files. // Use the members returned by insufficientLogEx.getLogProviders() // to select the desired subset of members and pass the resulting // list as the argument to config.setLogProviders(), if the // default selection of providers is not suitable. restore.execute(insufficientLogEx, config); } catch (DatabaseException e) { if (i < RETRY_TIME - 1) { try { Thread.sleep(5 * 1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } else { LOG.error("error to open replicated environment. will exit.", e); System.exit(-1); } } }}
复制代码


2.3、BDBJEJournal

通过查看 Journal 接口的定义,可以看出 BDBJEJournal 主要负责 bdbje 的数据读写操作以及动态创建 database(rollJournal)及清理 database(deleteJournals)工作:

public interface Journal {
// Open the journal environment public void open();
// Roll Edit file or database public void rollJournal();
// Get the newest journal id public long getMaxJournalId();
// Get the oldest journal id public long getMinJournalId();
// Close the environment public void close();
// Get the journal which id = journalId public JournalEntity read(long journalId);
// Get all the journals whose id: fromKey <= id <= toKey // toKey = -1 means toKey = Long.Max_Value public JournalCursor read(long fromKey, long toKey);
// Write a journal and sync to disk public void write(short op, Writable writable);
// Delete journals whose max id is less than deleteToJournalId public void deleteJournals(long deleteJournalToId);
// Current db's min journal id - 1 public long getFinalizedJournalId();
// Get all the dbs' name public List<Long> getDatabaseNames();
}
复制代码


值得注意的是 nextJournalId 变量用来控制 journalId 的自增(实际是在 write 方法中进行自增的);

在日志回放场景,当需要读取多条变更日志时,read 方法实际返回 JournalCursor 对象,JournalCursor.next() 方法可以获取具体的变更日志。


2.4、BDBJournalCursor


用于读取连续的多条变更日志,支持日志回放场景。

// This class is like JDBC ResultSet.public interface JournalCursor {
// Return the next journal. return null when there is no more journals public JournalEntity next();
public void close();
}
复制代码


OperationType 类定义了所有写入 editlog 的元数据类型;每一种变更日志的实现类都会实现 Writable 接口实现具体的序列化逻辑;同时提供 read(DataInput in) 方法,用于反序列化变更日志;

OperationType.OP_DROP_PARTITION 操作为例,对应的 DropPartitionInfo 类中定义了 db、table、partition 等必要的信息。

public class DropPartitionInfo implements Writable {    @SerializedName(value = "dbId")    private Long dbId;    @SerializedName(value = "tableId")    private Long tableId;    @SerializedName(value = "partitionName")    private String partitionName;    @SerializedName(value = "isTempPartition")    private boolean isTempPartition = false;    @SerializedName(value = "forceDrop")    private boolean forceDrop = false;     private void readFields(DataInput in) throws IOException {        dbId = in.readLong();        tableId = in.readLong();        partitionName = Text.readString(in);    }
public static DropPartitionInfo read(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_74) { DropPartitionInfo info = new DropPartitionInfo(); info.readFields(in); return info; } else { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, DropPartitionInfo.class); } }
@Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); Text.writeString(out, json); }}
复制代码

JournalEntity 类的 readFields 方法定义了根据 opType 反序列化元数据变更日志的具体逻辑:


public class JournalEntity implements Writable {    public static final Logger LOG = LogManager.getLogger(Checkpoint.class);
private short opCode; private Writable data;
@Override public void write(DataOutput out) throws IOException { out.writeShort(opCode); data.write(out); }
public void readFields(DataInput in) throws IOException { opCode = in.readShort(); // set it to true after the entity is truly read, // to avoid someone forget to call read method. boolean isRead = false; switch (opCode) { ... case OperationType.OP_DROP_PARTITION: { data = DropPartitionInfo.read(in); isRead = true; break; } ... } }}
复制代码

2.5、EditLog

EditLog 封装了各种元数据变更日志的持久化操作,最终通过 logEdit 方法在向 bdbje 写入变更日志;

当写入条数超过 edit_log_roll_num(默认 50000)时会调用journal.rollJournal() 进行 bdbje database 的 rolling 操作,通过这种方式将变更日志写入到多个 bdbje 的多个 database 中;


   private synchronized void logEdit(short op, Writable writable) {        long start = System.currentTimeMillis();
try { journal.write(op, writable); } catch (Exception e) { LOG.error("Fatal Error : write stream Exception", e); System.exit(-1); }
// get a new transactionId txId++;
// update statistics long end = System.currentTimeMillis(); numTransactions++; totalTimeTransactions += (end - start); if (txId >= Config.edit_log_roll_num) { LOG.info("txId {} is equal to or larger than edit_log_roll_num {}, will roll edit.", txId, Config.edit_log_roll_num); rollEditLog(); txId = 0; } }
复制代码


同时 Editlog 封装了单条日志的回放逻辑,同样以 OperationType.OP_DROP_PARTITION 为例,看一下如何将变更日志回放到内存;


Catalog 中定义的 Replayer 线程会不断通过 JournalCursor 对象解析最新的 JournalEntity(JournalEntity 中持有具体的变更日志),每个 JournalEntity 都会调用一次 EditLog 的loadJournal 方法来实现回放:

// 将一条元数据变更日志,回放到内存中(catalog中)public static void loadJournal(Catalog catalog, JournalEntity journal) {  short opCode = journal.getOpCode();        if (opCode != OperationType.OP_SAVE_NEXTID && opCode != OperationType.OP_TIMESTAMP) {            LOG.debug("replay journal op code: {}", opCode);        }        try {            switch (opCode) {              ...              case OperationType.OP_DROP_PARTITION: {                DropPartitionInfo info = (DropPartitionInfo) journal.getData();                LOG.info("Begin to unprotect drop partition. db = " + info.getDbId()                        + " table = " + info.getTableId()                        + " partitionName = " + info.getPartitionName());                catalog.replayDropPartition(info);                break;              ...            }
复制代码


Catalog 中实际的回放逻辑如下:

public void replayDropPartition(DropPartitionInfo info) {    Database db = this.getDb(info.getDbId());    db.writeLock();    try {        OlapTable olapTable = (OlapTable) db.getTable(info.getTableId());        if (info.isTempPartition()) {            olapTable.dropTempPartition(info.getPartitionName(), true);        } else {            olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());        }    } finally {        db.writeUnlock();    }}
复制代码

3、高可用的实现

3.1、StateChangeListener

前面提到过在创建 bdbje 的 ReplicatedEnvironment 实例时,会注册一个 StateChangeListener,用于监听当前节点的状态变更事件:

public class BDBStateChangeListener implements StateChangeListener {    public static final Logger LOG = LogManager.getLogger(EditLog.class);
public BDBStateChangeListener() { }
@Override public synchronized void stateChange(StateChangeEvent sce) throws RuntimeException { FrontendNodeType newType = null; switch (sce.getState()) { case MASTER: { newType = FrontendNodeType.MASTER; break; } case REPLICA: { // bdbje 的Replica角色会对应FE 的Follower和Observer两种角色 if (Catalog.getCurrentCatalog().isElectable()) { newType = FrontendNodeType.FOLLOWER; } else { newType = FrontendNodeType.OBSERVER; } break; } case UNKNOWN: { newType = FrontendNodeType.UNKNOWN; break; } default: { String msg = "this node is " + sce.getState().name(); LOG.warn(msg); Util.stdoutWithTime(msg); return; } } Preconditions.checkNotNull(newType); Catalog.getCurrentCatalog().notifyNewFETypeTransfer(newType); }
}
复制代码


获取到最新的角色后,会将最新的角色信息写入 Catalog 的 typeTransferQueue 中:

public void notifyNewFETypeTransfer(FrontendNodeType newType) {    try {        String msg = "notify new FE type transfer: " + newType;        LOG.warn(msg);        Util.stdoutWithTime(msg);        this.typeTransferQueue.put(newType);    } catch (InterruptedException e) {        LOG.error("failed to put new FE type: {}", newType, e);    }}
复制代码


Catalog 的 initialize 方法中会启动一个 listener 线程,用于消费 typeTransferQueue 中的数据:

public void initialize(String[] args) throws Exception {    ...    // 6. start state listener thread    createStateListener();    listener.start();}
public void createStateListener() { listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) { @Override protected synchronized void runOneCycle() {
while (true) { FrontendNodeType newType = null; try { newType = typeTransferQueue.take(); } catch (InterruptedException e) { LOG.error("got exception when take FE type from queue", e); Util.stdoutWithTime("got exception when take FE type from queue. " + e.getMessage()); System.exit(-1); } Preconditions.checkNotNull(newType); LOG.info("begin to transfer FE type from {} to {}", feType, newType); if (feType == newType) { return; }
/* * INIT -> MASTER: transferToMaster * INIT -> FOLLOWER/OBSERVER: transferToNonMaster * UNKNOWN -> MASTER: transferToMaster * UNKNOWN -> FOLLOWER/OBSERVER: transferToNonMaster * FOLLOWER -> MASTER: transferToMaster * FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false */ switch (feType) { case INIT: { switch (newType) { case MASTER: { transferToMaster(); break; } case FOLLOWER: case OBSERVER: { transferToNonMaster(newType); break; } case UNKNOWN: break; default: break; } break; } case UNKNOWN: { switch (newType) { case MASTER: { transferToMaster(); break; } case FOLLOWER: case OBSERVER: { transferToNonMaster(newType); break; } default: break; } break; } case FOLLOWER: { switch (newType) { case MASTER: { transferToMaster(); break; } case UNKNOWN: { transferToNonMaster(newType); break; } default: break; } break; } case OBSERVER: { switch (newType) { case UNKNOWN: { transferToNonMaster(newType); break; } default: break; } break; } case MASTER: { // exit if master changed to any other type String msg = "transfer FE type from MASTER to " + newType.name() + ". exit"; LOG.error(msg); Util.stdoutWithTime(msg); System.exit(-1); } default: break; } // end switch formerFeType
feType = newType; LOG.info("finished to transfer FE type to {}", feType); } } // end runOneCycle };
listener.setMetaContext(metaContext); }
复制代码


从以上代码可以看出,角色转换主要分两种,分别对应 transferToMastertransferToNonMaster 这两个方法。

3.2、transferToMaster()

1、stop replayer 线程;

2、open editlog;

3、replayJournalrollEditLog

4、startMasterOnlyDaemonThreads(),启动只在 master 节点运行的线程。其中会启动一个 timePrinter 线程,每 10 秒会把当前的时间戳写入到 editlog 中,这个时间戳会同步到其他非 Master FE 节点,并记录到synchronizedTimeMs 属性中,非 Master 节点会在 Replayer 线程中检查这个时间戳,如果超过Config.meta_delay_toleration_second (默认 5 分钟)没有更新,那么会将当前 FE 节点的 Catalog 设置为不可读状态(FE 元数据不可读时会将接收到的请求转发给 Master FE),避免 FE 节点提供过期的元数据;

5、startNonMasterDaemonThreads(),启动在所有 FE 节点运行的线程。

  private void transferToMaster() {        // stop replayer        if (replayer != null) {            replayer.exit();            try {                replayer.join();            } catch (InterruptedException e) {                LOG.warn("got exception when stopping the replayer thread", e);            }            replayer = null;        }
// set this after replay thread stopped. to avoid replay thread modify them. isReady.set(false); canRead.set(false);
editLog.open();
if (!haProtocol.fencing()) { LOG.error("fencing failed. will exit."); System.exit(-1); }
long replayStartTime = System.currentTimeMillis(); // replay journals. -1 means replay all the journals larger than current journal id. replayJournal(-1); long replayEndTime = System.currentTimeMillis(); LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
checkCurrentNodeExist();
editLog.rollEditLog();
// Log meta_version int communityMetaVersion = MetaContext.get().getMetaVersion(); int starrocksMetaVersion = MetaContext.get().getStarRocksMetaVersion(); if (communityMetaVersion < FeConstants.meta_version || starrocksMetaVersion < FeConstants.starrocks_meta_version) { editLog.logMetaVersion(new MetaVersion(FeConstants.meta_version, FeConstants.starrocks_meta_version)); MetaContext.get().setMetaVersion(FeConstants.meta_version); MetaContext.get().setStarRocksMetaVersion(FeConstants.starrocks_meta_version); }
// Log the first frontend if (isFirstTimeStartUp) { // if isFirstTimeStartUp is true, frontends must contains this Node. Frontend self = frontends.get(nodeName); Preconditions.checkNotNull(self); // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false editLog.logAddFirstFrontend(self); }
if (!isDefaultClusterCreated) { initDefaultCluster(); }
// MUST set master ip before starting checkpoint thread. // because checkpoint thread need this info to select non-master FE to push image this.masterIp = FrontendOptions.getLocalHostAddress(); this.masterRpcPort = Config.rpc_port; this.masterHttpPort = Config.http_port; MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort); editLog.logMasterInfo(info);
// start all daemon threads that only running on MASTER FE startMasterOnlyDaemonThreads(); // start other daemon threads that should running on all FE startNonMasterDaemonThreads();
MetricRepo.init();
canRead.set(true); isReady.set(true);
String msg = "master finished to replay journal, can write now."; Util.stdoutWithTime(msg); LOG.info(msg); // for master, there are some new thread pools need to register metric ThreadPoolManager.registerAllThreadPoolMetric(); }
复制代码


3.3、transferToNonMaster()

转换成非 Master 角色时会判断是否有 replayer 线程,如果没有那么会启动 replayer 线程。

private void transferToNonMaster(FrontendNodeType newType) {    isReady.set(false);
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) { Preconditions.checkState(newType == FrontendNodeType.UNKNOWN); LOG.warn("{} to UNKNOWN, still offer read service", feType.name()); // not set canRead here, leave canRead as what is was. // if meta out of date, canRead will be set to false in replayer thread. metaReplayState.setTransferToUnknown(); return; }
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// add helper sockets if (Config.edit_log_type.equalsIgnoreCase("BDB")) { for (Frontend fe : frontends.values()) { if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { ((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort()); } } }
if (replayer == null) { createReplayer(); replayer.start(); }
startNonMasterDaemonThreads();
MetricRepo.init();}
复制代码


4、回顾

1、每个 FE 节点都可以更新元数据吗?

2、元数据是如何持久化的?

3、元数据在多个 FE 节点中是如何同步的?

4、FE 启动时指定的 --helper 参数是用来做什么的?

5、FE 是如何实现高可用的?

6、每个 FE 节点的元数据都是可读的吗?

5、参考文章

https://docs.oracle.com/cd/E17277_02/html/ReplicationGuide/BerkeleyDB-JE-Replication.pdf

http://doris.incubator.apache.org/branch-0.11/zh-CN/internal/metadata-design.html#%E5%AE%9E%E7%8E%B0%E7%BB%86%E8%8A%82

https://www.bilibili.com/video/BV15i4y1Z7AL/?spm_id_from=333.788.videocard.18


用户头像

邸星星

关注

80后程序员 2020.04.01 加入

还未添加个人简介

评论

发布
暂无评论
StarRocks 元数据管理及 FE 高可用机制