前文回顾
译注:cstack 在 github 维护了一个简单的、类似 sqlite 的数据库实现,通过这个简单的项目,可以很好的理解数据库是如何运行的。本文是第九篇,主要是实现 B-tree 的二叉搜索并解决主键重复问题
Part 9 二叉搜索与主键重复
上次注意到我们的 B 树存储 key 时仍然是非排序的。现在我们来解决这个问题,并检测和拒绝主键的重复(插入)。
现在我们的 execute_insert() 函数在插入时,选择的位置是在表的结尾处。作为替换,我们需要搜索表(树)中正确的插入位置,然后把 key 插入到这个位置上。如果 key 在这个位置上已经存在,那么就返回(键重复)报错。
ExecuteResult execute_insert(Statement* statement, Table* table) { void* node = get_page(table->pager, table->root_page_num);- if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {+ uint32_t num_cells = (*leaf_node_num_cells(node));+ if (num_cells >= LEAF_NODE_MAX_CELLS) { return EXECUTE_TABLE_FULL; }
Row* row_to_insert = &(statement->row_to_insert);- Cursor* cursor = table_end(table);+ uint32_t key_to_insert = row_to_insert->id;+ Cursor* cursor = table_find(table, key_to_insert);++ if (cursor->cell_num < num_cells) {+ uint32_t key_at_index = *leaf_node_key(node, cursor->cell_num);+ if (key_at_index == key_to_insert) {+ return EXECUTE_DUPLICATE_KEY;+ }+ }
leaf_node_insert(cursor, row_to_insert->id, row_to_insert);
复制代码
这样的话,我们就不再需要 table_end() 函数了。
-Cursor* table_end(Table* table) {- Cursor* cursor = malloc(sizeof(Cursor));- cursor->table = table;- cursor->page_num = table->root_page_num;-- void* root_node = get_page(table->pager, table->root_page_num);- uint32_t num_cells = *leaf_node_num_cells(root_node);- cursor->cell_num = num_cells;- cursor->end_of_table = true;-- return cursor;-}
复制代码
我们用一个方法替换 table_end() 函数,方法实现使用给定的 key 来搜索 Btree。
+/*+Return the position of the given key.+If the key is not present, return the position+where it should be inserted+*/+Cursor* table_find(Table* table, uint32_t key) {+ uint32_t root_page_num = table->root_page_num;+ void* root_node = get_page(table->pager, root_page_num);++ if (get_node_type(root_node) == NODE_LEAF) {+ return leaf_node_find(table, root_page_num, key);+ } else {+ printf("Need to implement searching an internal node\n");+ exit(EXIT_FAILURE);+ }+}
复制代码
我先把内部节点实现的分支存根,因为我还没有实现内部节点(internal node)。我们可以在叶子节点中使用二叉搜索法来进行搜索。
+Cursor* leaf_node_find(Table* table, uint32_t page_num, uint32_t key) {+ void* node = get_page(table->pager, page_num);+ uint32_t num_cells = *leaf_node_num_cells(node);++ Cursor* cursor = malloc(sizeof(Cursor));+ cursor->table = table;+ cursor->page_num = page_num;++ // Binary search+ uint32_t min_index = 0;+ uint32_t one_past_max_index = num_cells;+ while (one_past_max_index != min_index) {+ uint32_t index = (min_index + one_past_max_index) / 2;+ uint32_t key_at_index = *leaf_node_key(node, index);+ if (key == key_at_index) {+ cursor->cell_num = index;+ return cursor;+ }+ if (key < key_at_index) {+ one_past_max_index = index;+ } else {+ min_index = index + 1;+ }+ }++ cursor->cell_num = min_index;+ return cursor;+}
复制代码
这将返回:
因为我们现在会检查节点的类型,所以需要一个函数在节点中获取和设置节点的类型值。
+NodeType get_node_type(void* node) {+ uint8_t value = *((uint8_t*)(node + NODE_TYPE_OFFSET));+ return (NodeType)value;+}++void set_node_type(void* node, NodeType type) {+ uint8_t value = type;+ *((uint8_t*)(node + NODE_TYPE_OFFSET)) = value;+}
复制代码
需要先把值转为 uint8_t 类型来确保它作为一个序列化的单字节。 还需要初始化节点的类型。
-void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }+void initialize_leaf_node(void* node) {+ set_node_type(node, NODE_LEAF);+ *leaf_node_num_cells(node) = 0;+}
复制代码
最后,我们需要创建并处理一个新的错误码。
-enum ExecuteResult_t { EXECUTE_SUCCESS, EXECUTE_TABLE_FULL };+enum ExecuteResult_t {+ EXECUTE_SUCCESS,+ EXECUTE_DUPLICATE_KEY,+ EXECUTE_TABLE_FULL+};case (EXECUTE_SUCCESS): printf("Executed.\n"); break;+ case (EXECUTE_DUPLICATE_KEY):+ printf("Error: Duplicate key.\n");+ break;case (EXECUTE_TABLE_FULL): printf("Error: Table full.\n"); break;
复制代码
有了这些修改,我们的测试可以更改为检查排序顺序:
"db > Executed.","db > Tree:","leaf (size 3)",- " - 0 : 3",- " - 1 : 1",- " - 2 : 2",+ " - 0 : 1",+ " - 1 : 2",+ " - 2 : 3","db > "])end
复制代码
并且我们能够为重复键增加一个新的测试:
+ it 'prints an error message if there is a duplicate id' do+ script = [+ "insert 1 user1 person1@example.com",+ "insert 1 user1 person1@example.com",+ "select",+ ".exit",+ ]+ result = run_script(script)+ expect(result).to match_array([+ "db > Executed.",+ "db > Error: Duplicate key.",+ "db > (1, user1, person1@example.com)",+ "Executed.",+ "db > ",+ ])+ end
复制代码
就是这样!接下来:实现拆分叶节点和创建内部节点。
评论