写点什么

ShardingSphere Proxy 适配 MySQL addBatch/executeBatch 数组结果实战

作者:端小强
  • 2024-11-24
    江苏
  • 本文字数:8423 字

    阅读完需:约 28 分钟

ShardingSphere Proxy 适配 MySQL addBatch/executeBatch 数组结果实战

本文首发于个人博客 ShardingSphere Proxy 适配 MySQL addBatch/executeBatch 数组结果实战,转载请注明原始链接。

问题背景

MySQL JDBC 驱动支持使用 addBatchexecuteBatch 方法,进行批量写入操作。MySQL 提供了 allowMultiQueriesrewriteBatchedStatements 两个属性用于控制是否开启批量写入,如果用户在 JDBC URL 上开启 &allowMultiQueries=true&rewriteBatchedStatements=true 属性,那么 MySQL 驱动会将多条 INSERT 语句改写成多组 VALUES,将 DELETEUPDATE 语句改写成 ; 分隔的多语句。如果用户不配置 allowMultiQueriesrewriteBatchedStatements 属性,MySQL 驱动则会以单条 SQL 方式逐一请求。


ShardingSphere Proxy 实现了完整的 MySQL 协议,因此对 MySQL 批量写入也进行了兼容,但是笔者在开发 DBPlusEngine 全局索引功能时,发现新增的批量写入 Case 断言报错(如下图),executeBatch 返回的 int[] 不正确,需要进一步分析和适配。

问题分析

最小化 Demo 复现

首先,由于 E2E 程序不方便调试,我们编写一个最小化 Demo 复现这个异常,如下是最小化 Demo 的源码,使用了 PreparedStatement 方式创建预编译 SQL,然后再通过 addBatch() 添加多组参数,此处需要注意,只有当 batchCount > 3 时,MySQL 驱动才会将 SQL 改写为批量 SQL。


@Testvoid assertGlobalIndex() throws SQLException {    try (        // Connection connection = DriverManager.getConnection("jdbc:shardingsphere:classpath:config/driver/foo-driver-fixture-global-index.yaml");        Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3307/sphereex_global_index?useSSL=false&useServerPrepStmts=true&useLocalSessionState=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true", "root", "root");        // Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/global_index?useSSL=false&useServerPrepStmts=true&useLocalSessionState=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true", "root", "123456");        PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM t_order WHERE order_id = ?")) {        // PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM t_order_1 WHERE order_id = ?")) {        connection.setAutoCommit(false);        preparedStatement.setObject(1, 1001);        preparedStatement.addBatch();        preparedStatement.setObject(1, 1101);        preparedStatement.addBatch();        preparedStatement.setObject(1, 999);        preparedStatement.addBatch();        preparedStatement.setObject(1, 998);        preparedStatement.addBatch();        final int[] ints = preparedStatement.executeBatch();        System.out.println(ints);        preparedStatement.clearBatch();                preparedStatement.setObject(1, 1000);        preparedStatement.addBatch();        preparedStatement.setObject(1, 1100);        preparedStatement.addBatch();        preparedStatement.setObject(1, 999);        preparedStatement.addBatch();        preparedStatement.setObject(1, 998);        preparedStatement.addBatch();        final int[] ints2 = preparedStatement.executeBatch();        System.out.println(ints2);        preparedStatement.clearBatch();        connection.rollback();    }}
复制代码


Demo 中我们连接的是 3307 端口,该端口指向的是 Proxy 服务,我们需要将如下的配置添加到 Proxy 配置文件中(如下展示的全局索引配置为 ShardingSphere 商业版功能,开源版本需要删除全局索引配置):


databaseName: sphereex_global_index
dataSources: global_index: url: jdbc:mysql://localhost:3306/global_index?serverTimezone=UTC&useSSL=false&characterEncoding=utf-8&allowPublicKeyRetrieval=true username: root password: 123456 connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 50 minPoolSize: 2
rules:- !SINGLE tables: - "*.*"- !SHARDING tables: t_order: actualDataNodes: global_index.t_order_${0..9} tableStrategy: standard: shardingColumn: order_id shardingAlgorithmName: t_order_inline globalIndexStrategy: globalIndexNames: - t_order_user_id_idx - t_order_merchant_id_idx consistencyLevel: STRONG globalIndexes: t_order_user_id_idx: actualDataNodes: global_index.t_order_user_id_idx_${0..9} databaseStrategy: none: tableStrategy: standard: shardingColumn: user_id shardingAlgorithmName: t_order_user_id_idx_inline coveringColumns: - order_id t_order_merchant_id_idx: actualDataNodes: global_index.t_order_merchant_id_idx_${0..9} databaseStrategy: none: tableStrategy: standard: shardingColumn: merchant_id shardingAlgorithmName: t_order_merchant_id_idx_inline coveringColumns: - order_id - creation_date shardingAlgorithms: t_order_inline: type: INLINE props: algorithm-expression: t_order_${order_id % 10} t_order_user_id_idx_inline: type: INLINE props: algorithm-expression: t_order_user_id_idx_${user_id % 10} t_order_merchant_id_idx_inline: type: INLINE props: algorithm-expression: t_order_merchant_id_idx_${merchant_id % 10}
复制代码


启动 Demo 程序,可以复现和 E2E 中相同的异常,下面我们就来分析下异常的具体原因。

返回多个 MySQLOKPacket

我们使用最小化 Demo 进行 Debug,可以发现 Proxy 多语句的入口类是 MySQLMultiStatementsHandler,该类目前返回的结果为 UpdateResponseHeader,UpdateResponseHeader 会被封装为单个 MySQLOKPacket,目前执行多语句时,只会简单地将 updated 进行了累加,因此断言时结果不正确。


private UpdateResponseHeader executeBatchedStatements(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) throws SQLException {    boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();    ResourceMetaData resourceMetaData = metaDataContexts.getMetaData().getDatabase(connectionSession.getUsedDatabaseName()).getResourceMetaData();    JDBCExecutorCallback<int[]> callback = new BatchedJDBCExecutorCallback(resourceMetaData, sqlStatementSample, isExceptionThrown);    List<int[]> executeResults = jdbcExecutor.execute(executionGroupContext, callback);    int updated = 0;    for (int[] eachResult : executeResults) {        for (int each : eachResult) {            updated += each;        }    }    // TODO Each logic SQL should correspond to an OK Packet.    return new UpdateResponseHeader(sqlStatementSample, Collections.singletonList(new UpdateResult(updated, 0L)));}
复制代码


根据此处的 TODO 标记可以看出,MySQL 执行多语句时,需要返回批量的 MySQLOKPacket 集合,分别对应每条语句的执行结果。为了解决这个问题,需要增加一个 MultiStatementsUpdateResponseHeader 类进行封装,将多个 MySQLOKPacket 集合封装到其中:


/** * Multi statements update response header. */@RequiredArgsConstructor@Getterpublic final class MultiStatementsUpdateResponseHeader implements ResponseHeader {        private final Collection<UpdateResponseHeader> updateResponseHeaders;}
复制代码


然后在 MySQLComQueryPacketExecutor 执行器类中,对 MultiStatementsUpdateResponseHeader 进行处理,具体处理逻辑如下,根据 MultiStatementsUpdateResponseHeader 中维护的 UpdateResponseHeader 集合,将其组装为多个 MySQLOKPacket。


@Overridepublic Collection<DatabasePacket> execute() throws SQLException {    ResponseHeader responseHeader = proxyBackendHandler.execute();    if (responseHeader instanceof QueryResponseHeader) {        return processQuery((QueryResponseHeader) responseHeader);    }    responseType = ResponseType.UPDATE;    if (responseHeader instanceof MultiStatementsUpdateResponseHeader) {        return processMultiStatementsUpdate((MultiStatementsUpdateResponseHeader) responseHeader);    }    return processUpdate((UpdateResponseHeader) responseHeader);}
private Collection<DatabasePacket> processMultiStatementsUpdate(final MultiStatementsUpdateResponseHeader responseHeader) { Collection<DatabasePacket> result = new LinkedList<>(); int index = 0; for (UpdateResponseHeader each : responseHeader.getUpdateResponseHeaders()) { boolean lastPacket = ++index == responseHeader.getUpdateResponseHeaders().size(); result.addAll(ResponsePacketBuilder.buildUpdateResponsePackets(each, ServerStatusFlagCalculator.calculateFor(connectionSession, lastPacket))); } return result;}
复制代码


此时,我们再次进行测试,但是发现结果仍然不正确,这又是为什么呢?想要搞清楚 MySQL 内部协议的交互逻辑,我们需要通过 WireShark 进行抓包,对比原生 MySQL 批量语句执行和 Proxy 批量语句执行之间的差异。

ServerStatusFlag 增加 SERVER_MORE_RESULTS_EXISTS

为了搞清楚 Proxy 和 MySQL 之间的差异,我们分别执行 Demo 程序中的 Proxy 示例和 MySQL 示例,并使用 WireShark 进行抓包(WireShark 使用可参考使用 Wireshark 解决 BenchmarkSQL 压测 Proxy 异常)。首先,我们执行 MySQL 批量写入并进行抓包,如下记录了抓包的内容,包括了 1 次 Request 和 4 次 Response。






然后我们再执行 Proxy 批量写入,并使用 WireShark 抓包,如下记录了 Proxy 抓包的内容,只有 1 次 Request 和 1 次 Response。




对比 MySQL 和 Proxy 抓包的差异,可以发现 MySQL 直到最后一个 MySQLOKPacket Server Status 才变为 1,前三个 MySQLOKPacket Server Status 都为 9(8 多语句 SERVER_MORE_RESULTS_EXISTS + 1 事务中 SERVER_STATUS_IN_TRANS),而 Proxy 第一个 Response 就返回了 1,并且后续不再返回 Response。


排查 MySQL 驱动可以发现,如果 SQL Response 中的 ServerStatusFlag 不包含 MySQLStatusFlag.SERVER_MORE_RESULTS_EXISTS,MySQL 驱动就只会读取第一个 MySQLOKPacket,并填充到客户端 int[] 数组中。因此可以考虑在封装多语句 MySQLOKPacket 时,根据多语句是否为最后一条,决定该标记的设置,当 MySQLOKPacket 未遍历到最后一条记录时,应设置 SERVER_MORE_RESULTS_EXISTS 标记。


我们调整 ServerStatusFlagCalculator#calculateFor 方法的实现逻辑,根据传入的 lastPacket 标记,决定是否设置 SERVER_MORE_RESULTS_EXISTS,具体实现逻辑如下:


/** * Calculate server status flag for specified connection. * * @param connectionSession connection session * @param lastPacket last packet * @return server status flag */public static int calculateFor(final ConnectionSession connectionSession, final boolean lastPacket) {    int result = 0;    result |= connectionSession.isAutoCommit() ? MySQLStatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue() : 0;    result |= connectionSession.getTransactionStatus().isInTransaction() ? MySQLStatusFlag.SERVER_STATUS_IN_TRANS.getValue() : 0;    result |= lastPacket ? 0 : MySQLStatusFlag.SERVER_MORE_RESULTS_EXISTS.getValue();    return result;}
复制代码


修改完成后,我们再次运行测试程序,发现此时直接出现了 NPE,需要进一步分析 NPE 的原因。

capabilityFlags 增加 CLIENT_MULTI_RESULTS/CLIENT_PS_MULTI_RESULTS



根据出现 NPE 的位置,我们大致可以定位到 NativeProtocol#readNextResultset 方法,通过 Debug 可以发现,在 MySQL 驱动获取下一个结果集时,currentProtocolEntity 为空导致了 NPE。排查 currentProtocolEntity 赋值的地方,发现是 serverSession.useMultiResults() 返回 false,导致 currentProtocolEntity 未赋值,而 useMultiResults 方法的判断逻辑如下,会从 clientParam 标记中获取 CLIENT_MULTI_RESULTSCLIENT_PS_MULTI_RESULTS


// /Users/duanzhengqiang/.m2/repository/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31-sources.jar!/com/mysql/cj/protocol/a/NativeServerSession.java:220@Overridepublic boolean useMultiResults() {    return (this.clientParam & CLIENT_MULTI_RESULTS) != 0 || (this.clientParam & CLIENT_PS_MULTI_RESULTS) != 0;}
复制代码


可以看到该判断主要依赖 clientParam 变量,NativeAuthenticationProvider 方法会在登录认证通过后,调用 setClientParam 方法初始化该变量,具体代码逻辑位置如下。


// /Users/duanzhengqiang/.m2/repository/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31-sources.jar!/com/mysql/cj/protocol/a/NativeAuthenticationProvider.java:201long clientParam = capabilityFlags & NativeServerSession.CLIENT_LONG_PASSWORD //| (this.propertySet.getBooleanProperty(PropertyKey.useAffectedRows).getValue() ? //        0 : capabilityFlags & NativeServerSession.CLIENT_FOUND_ROWS) //| capabilityFlags & NativeServerSession.CLIENT_LONG_FLAG //| (this.useConnectWithDb ? capabilityFlags & NativeServerSession.CLIENT_CONNECT_WITH_DB : 0) //| (this.propertySet.getBooleanProperty(PropertyKey.useCompression).getValue() ? //        capabilityFlags & NativeServerSession.CLIENT_COMPRESS : 0) //| (this.propertySet.getBooleanProperty(PropertyKey.allowLoadLocalInfile).getValue()        || this.propertySet.getStringProperty(PropertyKey.allowLoadLocalInfileInPath).isExplicitlySet() ? //                capabilityFlags & NativeServerSession.CLIENT_LOCAL_FILES : 0) //| capabilityFlags & NativeServerSession.CLIENT_PROTOCOL_41 //| (this.propertySet.getBooleanProperty(PropertyKey.interactiveClient).getValue() ? //        capabilityFlags & NativeServerSession.CLIENT_INTERACTIVE : 0) //| (this.propertySet.<SslMode>getEnumProperty(PropertyKey.sslMode).getValue() != SslMode.DISABLED ? //        capabilityFlags & NativeServerSession.CLIENT_SSL : 0) //| capabilityFlags & NativeServerSession.CLIENT_TRANSACTIONS // Required to get server status values.| NativeServerSession.CLIENT_SECURE_CONNECTION //| (this.propertySet.getBooleanProperty(PropertyKey.allowMultiQueries).getValue() ? //        capabilityFlags & NativeServerSession.CLIENT_MULTI_STATEMENTS : 0) //| capabilityFlags & NativeServerSession.CLIENT_MULTI_RESULTS // Always allow multiple result sets.| capabilityFlags & NativeServerSession.CLIENT_PS_MULTI_RESULTS // Always allow multiple result sets for SSPS.| NativeServerSession.CLIENT_PLUGIN_AUTH //| (NONE.equals(this.propertySet.getStringProperty(PropertyKey.connectionAttributes).getValue()) ? //        0 : capabilityFlags & NativeServerSession.CLIENT_CONNECT_ATTRS) //| capabilityFlags & NativeServerSession.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA //| (this.propertySet.getBooleanProperty(PropertyKey.disconnectOnExpiredPasswords).getValue() ? //        0 : capabilityFlags & NativeServerSession.CLIENT_CAN_HANDLE_EXPIRED_PASSWORD) //| (this.propertySet.getBooleanProperty(PropertyKey.trackSessionState).getValue() ? //        capabilityFlags & NativeServerSession.CLIENT_SESSION_TRACK : 0) //| capabilityFlags & NativeServerSession.CLIENT_DEPRECATE_EOF //| capabilityFlags & NativeServerSession.CLIENT_QUERY_ATTRIBUTES //| capabilityFlags & NativeServerSession.CLIENT_MULTI_FACTOR_AUTHENTICATION;
sessState.setClientParam(clientParam);
复制代码


Proxy 端通过 MySQLAuthenticationEngine 处理 MySQL 登录认证,会将握手结果封装在 MySQLHandshakePacket 中,其中包含了 capabilityFlags 服务端能力标志位的信息。


@Overridepublic int handshake(final ChannelHandlerContext context) {    int result = ConnectionIdGenerator.getInstance().nextId();    connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;    boolean sslEnabled = ProxySSLContext.getInstance().isSSLEnabled();    if (sslEnabled) {        context.pipeline().addFirst(MySQLSSLRequestHandler.class.getSimpleName(), new MySQLSSLRequestHandler());    }    context.writeAndFlush(new MySQLHandshakePacket(result, sslEnabled, authPluginData));    MySQLStatementIdGenerator.getInstance().registerConnection(result);    return result;}
复制代码


参考 MySQL Client/Server Protocol 文档 - Capabilities Flags,能力标志位共 32 个 bit 位,每个 bit 位代表协议的一个可选功能,客户端和服务端的交集,共同决定了将使用协议的哪些可选部分。


按照功能属于高 16 位,还是低 16 位,需要分别将功能设置到 capabilityFlagsLowercapabilityFlagsUpper 中。查看 CLIENT_MULTI_RESULTSCLIENT_PS_MULTI_RESULTS,它们属于高位功能,因此在 calculateHandshakeCapabilityFlagsUpper 方法中增加 Flags 即可,如下是具体设置代码。


CLIENT_MULTI_RESULTS(0x00020000),
CLIENT_PS_MULTI_RESULTS(0x00040000),
/** * Get handshake capability flags upper bit. * * @return handshake capability flags upper bit */public static int calculateHandshakeCapabilityFlagsUpper() { return calculateCapabilityFlags(CLIENT_MULTI_STATEMENTS, CLIENT_PLUGIN_AUTH, CLIENT_MULTI_RESULTS, CLIENT_PS_MULTI_RESULTS) >> 16;}
复制代码


修改完成后,再次使用 Demo 程序测试,发现已经能够返回正确的结果,通过 JDBC 可以正常执行 addBatch/executeBatch 并返回 int[] 数组。

功能测试

最后,我们使用全局索引功能 E2E 再次进行测试,原先断言失败的 Case 现在终于可以通过,大家终于可以放心使用商业版全局索引功能。在此,也真心向大家推荐 SphereEx 的 DBPlusEngine,相比开源的 ShardingSphere,它具有更完善的企业级功能,不仅能够进行海量数据的分片管理,还可以用于数据安全加密和数据库替换等场景,更多信息可以查看 SphereEx 官网


结语

本文介绍了 E2E 测试 Proxy 发现批量写入返回结果错误后,如何一步步梳理 Proxy 代码,使用 Wireshark 抓包对比分析,以及排查 MySQL 驱动源码,最终完美解决了问题。提升 Proxy 对 MySQL 协议的兼容度,很直接的方法就是同测试用例比对,通过强大的 Wireshark 工具,我们可以很清晰地观测到请求过程中的差异,进而快速找到解决问题的方案。本案例的排查思路也适合其他 Proxy 接入端的问题,希望对大家有用,由于本人对 Wireshark 使用经验有限,如果问题也欢迎指正。

参考文档


欢迎关注


欢迎关注「端小强的博客」微信公众号,会不定期分享日常学习和工作经验,欢迎大家关注交流。



发布于: 刚刚阅读数: 4
用户头像

端小强

关注

行到水穷处,坐看云起时 2017-12-28 加入

大家好,我是端正强,目前是 Apache ShardingSphere 社区的 PMC 成员,并担任 SphereEx 高级中间件工程师职位。本人热爱开源,乐于分享,目前专注于 Apache ShardingSphere 内核模块开发。

评论

发布
暂无评论
ShardingSphere Proxy 适配 MySQL addBatch/executeBatch 数组结果实战_ShardingSphere_端小强_InfoQ写作社区