写点什么

一文讲清数据库的分库分表

作者:王中阳Go
  • 2024-12-14
    湖南
  • 本文字数:5412 字

    阅读完需:约 18 分钟

一文讲清数据库的分库分表

想必大家在面试的时候都被问到过数据库的分库分表应该怎么做


分库分表指的是是将大型数据库分割成多个小型数据库或表格的技术,旨在通过分散数据来提升性能、增加可扩展性和简化管理。随着数据量的增长,传统的单体数据库可能会遭遇性能瓶颈,而分库分表能有效解决这些问题,支持系统线性扩展,确保高效的数据处理和响应速度,同时降低运维复杂度和成本。


今天我就分享一下我对此的一些见解。(如有错误,欢迎指正)

一、选择合适的数据库驱动和 ORM 框架(如果使用)

  1. 数据库驱动

  2. Golang 支持多种数据库驱动,如database/sql包提供了与数据库交互的标准接口。对于 MySQL,常用的驱动是github.com/go - sql - driver/mysql。确保在项目中正确导入和初始化驱动,例如:


      import (          "database/sql"          _ "github.com/go - sql - driver/mysql"      )            func main() {          db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/database_name")          if err!= nil {              // 处理错误          }          defer db.Close()      }
复制代码


  1. ORM 框架(可选)

  2. 如果项目使用 ORM 框架,如 GORM,它可以简化数据库操作,包括分库分表的实现。GORM 提供了方便的 API 来定义模型和执行数据库操作。导入 GORM 和相关的数据库驱动(以 MySQL 为例):


      import (          "gorm.io/driver/mysql"          "gorm.io/gorm"      )            func main() {          dsn := "user:password@tcp(127.0.0.1:3306)/database_name?charset=utf8mb4&parseTime=True&loc=Local"          db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})          if err!= nil {              // 处理错误          }      }
复制代码

二、确定分库分表策略

  1. 水平分表策略

  2. 按范围划分

  3. 例如,对于抽奖记录,按照时间范围进行分表。可以每月创建一张新表,表名可以采用lottery_records_202401(表示 2024 年 1 月的抽奖记录)这样的格式。在代码中,需要根据抽奖时间来确定操作哪一张表。

  4. 按哈希划分

  5. 对于用户表,按照用户 ID 进行哈希取模分表。假设要将用户数据分散到 10 张表中,可以计算user_id % 10,根据结果将用户数据存储到user_0user_1等对应的表中。在查询用户数据时,同样先计算哈希值,然后确定要查询的表。

  6. 垂直分库策略

  7. 按照业务模块划分数据库。例如,将用户信息存储在一个数据库(user_db)中,抽奖规则存储在另一个数据库(lottery_rule_db)中,抽奖结果存储在第三个数据库(lottery_result_db)等。在代码中,需要根据操作的业务模块来选择不同的数据库连接。

三、实现分库分表逻辑

  1. 基于 SQL 操作实现(不使用 ORM)

  2. 水平分表操作示例(按哈希划分用户表)

  3. 在查询用户数据时:


        func QueryUser(db *sql.DB, userID int) (*User, error) {            tableName := fmt.Sprintf("user_%d", userID%10)            querySQL := fmt.Sprintf("SELECT * FROM %s WHERE user_id =? ", tableName)            row := db.QueryRow(querySQL, userID)            user := &User{}            err := row.Scan(&user.ID, &user.Name, &user.Age)            if err!= nil {                return nil, err            }            return user, nil        }
复制代码


  - 在插入用户数据时:
复制代码


        func InsertUser(db *sql.DB, user *User) error {            tableName := fmt.Sprintf("user_%d", user.ID%10)            insertSQL := fmt.Sprintf("INSERT INTO %s (user_id, name, age) VALUES (?,?,?)", tableName)            stmt, err := db.Prepare(insertSQL)            if err!= nil {                return err            }            defer stmt.Close()            _, err = stmt.Exec(user.ID, user.Name, user.Age)            return err        }
复制代码


  1. 垂直分库操作示例(选择不同数据库连接)

  2. 假设已经有两个数据库连接userDBlotteryRuleDB


        func QueryUserInfo(userDB *sql.DB, userID int) (*UserInfo, error) {            querySQL := "SELECT * FROM user_info WHERE user_id =?"            row := userDB.QueryRow(querySQL, userID)            userInfo := &UserInfo{}            err := row.Scan(&userInfo.ID, &userInfo.Email, &userInfo.Address)            if err!= nil {                return nil, err            }            return userInfo, nil        }                func QueryLotteryRule(lotteryRuleDB *sql.DB, ruleID int) (*LotteryRule, error) {            querySQL := "SELECT * FROM lottery_rule WHERE rule_id =?"            row := lotteryRuleDB.QueryRow(querySQL, ruleID)            lotteryRule := &LotteryRule{}            err := row.Scan(&lotteryRule.ID, &lotteryRule.Probability, &lotteryRule.PrizeType)            if err!= nil {                return nil, err            }            return lotteryRule, nil        }
复制代码


  1. 基于 ORM 框架(如 GORM)实现

  2. 水平分表操作示例(按哈希划分用户表)

  3. 可以通过自定义 GORM 插件来实现分表逻辑。首先定义插件结构体:


        type ShardingPlugin struct{}
复制代码


  - 实现GORM的Plugin接口方法,在`Name`方法中返回插件名称,在`Initialize`方法中实现分表逻辑:  - ```Go    func (p ShardingPlugin) Name() string {        return "ShardingPlugin"    }    func (p ShardingPlugin) Initialize(db *gorm.DB) error {        // 根据用户ID计算表名        db.Callback().Query().Before("gorm:query").Register("sharding:query", func(db *gorm.DB) {            userID, ok := db.Statement.Vars["user_id"].(int)            if ok {                tableName := fmt.Sprintf("user_%d", userID%10)                db.Statement.Table(tableName)            }        })        db.Callback().Create().Before("gorm:create").Register("sharding:create", func(db *gorm.DB) {            userID, ok := db.Statement.Vars["user_id"].(int)            if ok {                tableName := fmt.Sprintf("user_%d", userID%10)                db.Statement.Table(tableName)            }        })        return nil    }
复制代码



- 在初始化GORM时注册这个插件:```Go func main() { dsn := "user:password@tcp(127.0.0.1:3306)/database_name?charset=utf8mb4&parseTime=True&loc=Local" db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{ Plugins: []gorm.Plugin{ShardingPlugin{}}, }) if err!= nil { // 处理错误 } }
复制代码


  1. 垂直分库操作示例(选择不同数据库连接)

  2. 在 GORM 中,可以通过定义不同的数据库连接实例来操作不同的数据库。假设已经定义了userDBlotteryRuleDB两个 GORM 数据库实例:


        func QueryUserInfo(userDB *gorm.DB, userID int) (*UserInfo, error) {            userInfo := &UserInfo{}            err := userDB.Where("user_id =?", userID).First(userInfo).Error            if err!= nil {                return nil, err            }            return userInfo, nil        }                func QueryLotteryRule(lotteryRuleDB *gorm.DB, ruleID int) (*LotteryRule, error) {            lotteryRule := &LotteryRule{}            err := lotteryRuleDB.Where("rule_id =?", ruleID).First(lotteryRule).Error            if err!= nil {                return nil, err            }            return lotteryRule, nil        }
复制代码

四、数据迁移和同步

  1. 初始数据迁移


  • 当实施分库分表策略时,需要将原有数据迁移到新的数据库结构中。如果是水平分表,可以编写数据迁移脚本,按照分表策略将数据从旧表复制到新表。例如,对于按时间范围分表的抽奖记录:


      func MigrateLotteryRecords() error {          oldDB, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/old_database_name")          if err!= nil {              return err          }          defer oldDB.Close()                newDB, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/new_database_name")          if err!= nil {              return err          }          defer newDB.Close()                rows, err := oldDB.Query("SELECT * FROM old_lottery_records")          if err!= nil {              return err          }          defer rows.Close()                for rows.Next() {              record := &LotteryRecord{}              err := rows.Scan(&record.ID, &record.UserID, &record.LotteryDate)              if err!= nil {                  return err              }              // 根据抽奖日期确定新表名              newTableName := fmt.Sprintf("lottery_records_%d", record.LotteryDate.Year()*100 + int(record.LotteryDate.Month()))              insertSQL := fmt.Sprintf("INSERT INTO %s (id, user_id, lottery_date) VALUES (?,?,?)", newTableName)              stmt, err := newDB.Prepare(insertSQL)              if err!= nil {                  return err              }              defer stmt.Close()              _, err = stmt.Exec(record.ID, record.UserID, record.LotteryDate)              if err!= nil {                  return err              }          }          return nil      }
复制代码


  1. 数据同步机制


  • 在分库分表后,可能需要建立数据同步机制,以确保数据的一致性。例如,在分布式系统中,当一个服务更新了用户表的数据,可能需要通过消息队列(如 Kafka)将更新事件发送到其他相关服务,其他服务收到消息后对相应的分表进行更新操作。以下是一个简单的示例,使用 Kafka 进行数据同步:


      import (          "github.com/Shopify/sarama"      )            func UpdateUserAndSync(userDB *sql.DB, kafkaProducer sarama.SyncProducer, user *User) error {          // 更新用户数据          err := UpdateUser(userDB, user)          if err!= nil {              return err          }          // 发送数据更新消息到Kafka          message := &sarama.ProducerMessage{              Topic: "user_update_topic",              Value: sarama.StringEncoder(fmt.Sprintf("user_id:%d", user.ID)),          }          _, _, err = kafkaProducer.SendMessage(message)          return err      }            func KafkaConsumerLoop(kafkaConsumer sarama.Consumer, userDB *sql.DB) {          consumer, err := kafkaConsumer.ConsumePartition("user_update_topic", 0, sarama.OffsetNewest)          if err!= nil {              // 处理错误          }          defer consumer.Close()          for message := range consumer.Messages() {              // 解析消息,获取用户ID              userIDStr := string(message.Value)              userID, err := strconv.Atoi(userIDStr[len("user_id:"):])              if err!= nil {                  // 处理错误              }              // 根据用户ID更新其他分表中的用户数据              user, err := QueryUser(userDB, userID)              if err!= nil {                  // 处理错误              }              // 更新其他分表...          }      }
复制代码

五、性能测试和优化

  1. 性能测试


  • 在实施分库分表后,需要对系统进行性能测试,以验证是否达到了预期的性能提升效果。可以使用性能测试工具,如go - bench来测试数据库操作的性能。例如,测试查询用户数据的性能:


      func BenchmarkQueryUser(b *testing.B) {          db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/database_name")          if err!= nil {              b.Fatal(err)          }          defer db.Close()          for i := 0; i < b.N; i++ {              userID := i              QueryUser(db, userID)          }      }
复制代码


  1. 优化调整

  2. 根据性能测试结果,对分库分表策略和代码进行优化调整。例如,如果发现某些查询操作仍然较慢,可以考虑优化索引策略、调整分片规则或者增加缓存机制等。如果是使用 ORM 框架,还可以优化 ORM 的配置,如调整 GORM 的PreloadJoins策略来减少不必要的数据库查询。

结语

今天就分享到这里,如果你对上面的内容有疑问或者你有更好的思路,欢迎在评论区留言!

欢迎关注 ❤

我们搞了一个免费的面试真题共享群,互通有无,一起刷题进步。


没准能让你能刷到自己意向公司的最新面试题呢。


感兴趣的朋友们可以加我微信:wangzhongyang1993,备注:面试群。

用户头像

王中阳Go

关注

靠敲代码在北京买房的程序员 2022-10-09 加入

【微信】wangzhongyang1993【公众号】程序员升职加薪之旅【成就】InfoQ专家博主👍掘金签约作者👍B站&掘金&CSDN&思否等全平台账号:王中阳Go

评论

发布
暂无评论
一文讲清数据库的分库分表_数据库_王中阳Go_InfoQ写作社区