因为公司业务迁移需要,需要从数仓同步一张大表,数据总量大概三千多万,接近四千万的样子,当遇到这种数据量的时候,综合考虑之后,当前比较流行的框架都不能满足于生产需求,使用框架对性能的损耗过于严重,所以有了以下千万级数据量的插入方案。
当数据量达到一定规模的时候,假设一个语句为这样,还比较小的,只有三个字段。
INSERT INTO user_operation_min_temp(observer_id, access_id, access_name) VALUES (?, ?, ?)
复制代码
如果使用单线程,一次 INSET 一条的话,那么要插入千万次,并且提交千万次,众所周知,数据库有一定的瓶颈,大量的插入提交操作会严重损耗系统性能。
所以需要使用批量插入,Mybatis 的批量插入虽然是批量插入,但那只是业务层面的批量插入,真正执行的时候,还是会帮你分解成单条插入,所以必须要放弃 orm 框架。而使用原生的插入
批量插入的代码如下
/** * 数据存入新的临时表 * @param list */ private void batch(List<UserOperation> list) { String sql = "INSERT INTO user_operation_min_temp(observer_id, access_id, access_name) VALUES (?, ?, ?) "; Connection connection = null; PreparedStatement preparedStatement = null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(sql); for (UserOperation obj : list) { preparedStatement.setInt(1, obj.getObserverId()); preparedStatement.setInt(2, obj.getAccessId()); preparedStatement.setString(3, obj.getAccessName()); preparedStatement.addBatch(); } preparedStatement.executeBatch(); connection.commit(); } catch (Exception e) { } finally { if (preparedStatement != null) { try { preparedStatement.close(); preparedStatement = null; } catch (Exception e) { } } if (connection != null) { try { connection.close(); connection = null; } catch (Exception e) { } } } }
复制代码
那么问题来了,我是否可以把三千多万条 list 直接传入到这个 batch 方法呢,答案是不是不可以,但是却是不太好,第一就是你的三千多万条 list 这个对象或者数组一直要等待插入完成,系统中没有地方使用才可以进行释放内存,对业务或者服务器的负荷比较大,并且容易造成 OOM,也就是内存溢出问题。
所以可以利用多线程对 list 进行分割,比如说 3000 万的话,可以按照每 100 万一个批次,也就是 30 个批次,使用多线程提交这三十个批次。具体的可以按照自己的情况来分割。
list 分割可以使用集合的工具类
List<List<UserOperation>> batch = Lists.partition(allList, 30);
复制代码
需要引入 guava 依赖
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>24.1-jre</version></dependency>
复制代码
批量执行
batch.parallelStream().forEach(list -> { //执行插入 batch(list);});
复制代码
然后,你以为这样就可以了吗,现实会告诉你,我不要面子的吗、这样缺失不行,第一,你的三千万条数据的对象还是三千万,还多分割了 30 个批次。内存不要钱吗。所以在读取数据的时候就要烤炉分割,而不会一下子把三千万的数据一次性拿到内存来,然后一批处理完成之后及时释放掉。
改造之后的方案。
1、读取数据,因为我这里读取数据是使用的接口进行调用。可以根据自己的业务进行改造
public void sync() throws SQLException { long startTime = System.currentTimeMillis(); //获取所有用户 List<User> users = syncService.all(); if(users == null || users.size() == 0) { return; } //复制表结构 syncService.like(); //按照200个人一个批次进行分 List<List<User>> batch = Lists.partition(users, 200); //并发处理 数据存入新表 batch.parallelStream().forEach(list -> { //使用线程安全的集合 List<UserOperation> userOperations = Collections.synchronizedList(new ArrayList<>()); for(User user : list) { Integer obId = user.getObId(); if(obId == null || "".equalsIgnoreCase(obId.toString())) { continue; } //调用接口获取每个用户的权限 List<UserOperation> temp = syncService.syncUserPermission(obId.toString()); //当到线程的list中 userOperations.addAll(temp); //logger.info("加入员工 obId:【{}】,姓名:【{}】", obId, user.getName()); } try { //执行保存逻辑 syncService.save(userOperations); logger.info("线程批次存入数据库: 当前线程:{}", Thread.currentThread()); } catch (SQLException e) { e.printStackTrace(); } finally { //执行释放 userOperations = null; } }); //数据写完之后,进行临时表的切换 //一个事物。 //1、删除旧表 //2、临时表重命名为旧表的名字 syncService.transactional(); long endTime = System.currentTimeMillis(); logger.info("耗时:{} ms", endTime - startTime); }
复制代码
说明一下,这里不使用清空表再进行插入是因为避免对业务系统的使用造成影响,保证这些操作是在一个事物中发生。
调用权限接口的代码
public List<UserOperation> syncUserPermission(String obId) { Map<String, Object> params = ImmutableMap.of("data", ImmutableMap.of("observerId", obId)); ResponseEntity<String> content = null; try { content = new RetryTemplate() { @Override protected ResponseEntity<String> doService() { return RestTemplateUtils.post("", params, String.class); } }.setRetryTime(3).setSleepTime(1000).execute(); } catch (InterruptedException e) { e.printStackTrace(); } if (content == null) { return new ArrayList<>(); } String body = content.getBody(); JSONArray jsonArray = JSONArray.parseArray(body); if (jsonArray == null || jsonArray.size() == 0) { return new ArrayList<>(); } List<UserOperation> operations = jsonArray.stream().map(p -> { JSONObject json = (JSONObject) p; UserOperation userOperation = new UserOperation(); userOperation.setObserverId(Integer.parseInt(obId)); userOperation.setAccessId(json.getInteger("iD")); userOperation.setAccessName(json.getString("name")); userOperation.setActive(1); return userOperation; }).collect(Collectors.toList()); return operations; }
复制代码
表结构操作的方法
@Transactional public void transactional() throws SQLException { drop(); reName(); } /** * 复制表结构 */ public void like() throws SQLException { String sql = "CREATE TABLE user_operation_min_temp LIKE user_operation_min;"; new QueryRunner(dataSource).update(sql); } /** * 删掉旧表 * @throws SQLException */ public void drop() throws SQLException { String sql = "drop table user_operation_min;"; new QueryRunner(dataSource).update(sql); } /** * 新表重命名 */ public void reName() throws SQLException { String sql = "ALTER TABLE user_operation_min_temp RENAME TO user_operation_min;"; new QueryRunner(dataSource).update(sql); }
复制代码
然后重点来了,之前提到过得批量 INSET 方法真的可以用吗?还需要进行一下处理,上面我的 INSET 语句是三个字段,也就是三个?,三个占位符,PreparedStatement ,一次提交的占位符不能超过 65535 个,65536 / 3 = 21845 所以我一个批次不能超过 21848,多于的还是要分割。这个因为上层已经分割过了,所以就按照最粗的粒度分割。
public void save(List<UserOperation> list) throws SQLException { if (list == null || list.size() == 0) { return; } pre(list); } /** * 预处理占位符问题 * @param list * @throws SQLException */ private void pre(List<UserOperation> list) throws SQLException { // 21845 * 3 = 65535 占位符不可以超过这个数 if (list.size() > 21845) { List<List<UserOperation>> parts = Lists.partition(list, 21845); parts.forEach(this::batch); return; } batch(list); } /** * 数据存入新的临时表 * @param list */ private void batch(List<UserOperation> list) { String sql = "INSERT INTO user_operation_min_temp(observer_id, access_id, access_name) VALUES (?, ?, ?) "; Connection connection = null; PreparedStatement preparedStatement = null; try { connection = dataSource.getConnection(); connection.setAutoCommit(false); preparedStatement = connection.prepareStatement(sql); for (UserOperation obj : list) { preparedStatement.setInt(1, obj.getObserverId()); preparedStatement.setInt(2, obj.getAccessId()); preparedStatement.setString(3, obj.getAccessName()); preparedStatement.addBatch(); } preparedStatement.executeBatch(); connection.commit(); } catch (Exception e) { } finally { if (preparedStatement != null) { try { preparedStatement.close(); preparedStatement = null; } catch (Exception e) { } } if (connection != null) { try { connection.close(); connection = null; } catch (Exception e) { } } } }
复制代码
代码中很多我业务中的类,这里只分享技巧和经验,在使用中还是要根据自己的数据来进行处理。
评论