写点什么

数据迁移脚本优化过程:从 MySQL 到 Django 模型表

  • 2024-06-27
    湖南
  • 本文字数:2100 字

    阅读完需:约 7 分钟

在大规模的数据迁移过程中,性能问题往往是开发者面临的主要挑战之一。本文将分析一个数据迁移脚本的优化过程,展示如何从 MySQL 数据库迁移数据到 Django 模型表,并探讨优化前后的性能差异。

优化前的脚本分析

优化前的脚本按批次从 MySQL 数据库中读取数据,并将其插入到 Django 模型表中。每次读取的数据量由 batch_size 确定。以下是优化前的关键部分:

fetch_sql = f"""    SELECT search_rank, search_term,  `period`, report_date     FROM hot_search_terms_table     WHERE period = '{period}'     LIMIT %s OFFSET %s;"""
复制代码

每次查询使用 LIMIT 和 OFFSET 子句,OFFSET 指定从哪一行开始读取。然而,随着数据量的增加,OFFSET 会导致性能显著下降,因为数据库必须扫描更多行来确定结果集的起点。

优化后的脚本分析

优化后的脚本通过使用递增的主键 ID 进行分页查询,避免了 OFFSET 带来的性能问题。以下是优化后的关键部分:

fetch_sql = f"""    SELECT id, search_rank, search_term,`period`, report_date     FROM hot_search_terms_table     WHERE period = %s AND id > %s     ORDER BY id ASC     LIMIT %s;"""
复制代码

通过 WHERE id > %s 和 ORDER BY id ASC,我们可以确保每次查询的结果集都是按主键 ID 排序的,性能大大提高,因为数据库可以直接从上一次查询结束的地方开始读取数据。

优化前后的性能比较

优化前的性能问题

  • 查询性能下降:随着 OFFSET 值的增加,查询性能会显著下降。数据库需要扫描所有的行,直到达到指定的偏移量,然后返回后续的行。

  • 长时间等待:当数据量较大时,随着偏移量的增加,每次查询所需的时间会变得越来越长。

优化后的性能改进

  • 高效的分页查询:使用递增的主键 ID 进行分页查询,避免了扫描大量无关行的数据。

  • 稳定的查询时间:每次查询都只需读取新的数据,无需扫描之前已经处理过的数据行,查询时间稳定且较快。

实施细节

优化前的实现

优化前的实现通过读取偏移量文件来记录上次处理的位置,每次查询都从该位置开始,读取一批数据并插入到 Django 模型表中:

class Command(BaseCommand):    # 省略部分代码...
def handle(self, *args, **kwargs): try: # 连接数据库 mysql_conn = mysql.connector.connect(**mysql_config) mysql_cursor = mysql_conn.cursor() # 批次处理 while True: self.stdout.write(self.style.SUCCESS(f"正在获取 {offset} - {offset + batch_size} 行的数据")) zhilin_cursor.execute(fetch_sql, (batch_size, offset)) batch_data = zhilin_cursor.fetchall() if not batch_data: break
# 转换并插入数据 objects = [HotSearchTermsReportABA(...) for row in batch_data] with transaction.atomic(): HotSearchTermsReportABA.objects.bulk_create(objects) offset += batch_size total_rows_transferred += len(batch_data) self.update_last_offset(offset)
except Error as e: # 错误处理 self.stdout.write(self.style.ERROR(f"传输过程中出现异常:{e}"))
复制代码

优化后的实现

优化后的实现使用主键 ID 进行分页查询,并记录上次处理的最大 ID:

class Command(BaseCommand):    # 省略部分代码...
def handle(self, *args, **kwargs): try: # 连接数据库 mysql_conn = mysql.connector.connect(**mysql_config) mysql_cursor = mysql_conn.cursor() # 批次处理 while True: self.stdout.write(self.style.SUCCESS(f"正在获取 ID 大于 {last_id} 的 {self.batch_size} 行数据")) zhilin_cursor.execute(fetch_sql, (period, last_id, self.batch_size)) batch_data = zhilin_cursor.fetchall() if not batch_data: break
# 转换并插入数据 objects = [HotSearchTermsReportABA(...) for row in batch_data] with transaction.atomic(): HotSearchTermsReportABA.objects.bulk_create(objects) last_id = batch_data[-1][0] total_rows_transferred += len(batch_data) self.update_last_id(last_id)
except Error as e: # 错误处理 self.stdout.write(self.style.ERROR(f"传输过程中出现异常:{e}"))
复制代码

总结

通过上述优化过程解决了数据量增大导致的查询性能下降问题。具体优化策略包括:

  1. 使用主键 ID 进行分页查询,避免 OFFSET 带来的性能问题。

  2. 确保每次查询只读取新的数据,减少数据库扫描的行数。


作者:pycode

链接:https://juejin.cn/post/7384697662614519823

用户头像

欢迎关注,一起学习,一起交流,一起进步 2020-06-14 加入

公众号:做梦都在改BUG

评论

发布
暂无评论
数据迁移脚本优化过程:从 MySQL 到 Django 模型表_Python_我再BUG界嘎嘎乱杀_InfoQ写作社区