本文首发于个人博客 ShardingSphere Proxy 适配 MySQL addBatch/executeBatch 数组结果实战,转载请注明原始链接。
问题背景
MySQL JDBC 驱动支持使用 addBatch 和 executeBatch 方法,进行批量写入操作。MySQL 提供了 allowMultiQueries 和 rewriteBatchedStatements 两个属性用于控制是否开启批量写入,如果用户在 JDBC URL 上开启 &allowMultiQueries=true&rewriteBatchedStatements=true 属性,那么 MySQL 驱动会将多条 INSERT 语句改写成多组 VALUES,将 DELETE 和 UPDATE 语句改写成 ; 分隔的多语句。如果用户不配置 allowMultiQueries 和 rewriteBatchedStatements 属性,MySQL 驱动则会以单条 SQL 方式逐一请求。
ShardingSphere Proxy 实现了完整的 MySQL 协议,因此对 MySQL 批量写入也进行了兼容,但是笔者在开发 DBPlusEngine 全局索引功能时,发现新增的批量写入 Case 断言报错(如下图),executeBatch 返回的 int[] 不正确,需要进一步分析和适配。
问题分析
最小化 Demo 复现
首先,由于 E2E 程序不方便调试,我们编写一个最小化 Demo 复现这个异常,如下是最小化 Demo 的源码,使用了 PreparedStatement 方式创建预编译 SQL,然后再通过 addBatch() 添加多组参数,此处需要注意,只有当 batchCount > 3 时,MySQL 驱动才会将 SQL 改写为批量 SQL。
@Testvoid assertGlobalIndex() throws SQLException { try ( // Connection connection = DriverManager.getConnection("jdbc:shardingsphere:classpath:config/driver/foo-driver-fixture-global-index.yaml"); Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3307/sphereex_global_index?useSSL=false&useServerPrepStmts=true&useLocalSessionState=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true", "root", "root"); // Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/global_index?useSSL=false&useServerPrepStmts=true&useLocalSessionState=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true", "root", "123456"); PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM t_order WHERE order_id = ?")) { // PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM t_order_1 WHERE order_id = ?")) { connection.setAutoCommit(false); preparedStatement.setObject(1, 1001); preparedStatement.addBatch(); preparedStatement.setObject(1, 1101); preparedStatement.addBatch(); preparedStatement.setObject(1, 999); preparedStatement.addBatch(); preparedStatement.setObject(1, 998); preparedStatement.addBatch(); final int[] ints = preparedStatement.executeBatch(); System.out.println(ints); preparedStatement.clearBatch(); preparedStatement.setObject(1, 1000); preparedStatement.addBatch(); preparedStatement.setObject(1, 1100); preparedStatement.addBatch(); preparedStatement.setObject(1, 999); preparedStatement.addBatch(); preparedStatement.setObject(1, 998); preparedStatement.addBatch(); final int[] ints2 = preparedStatement.executeBatch(); System.out.println(ints2); preparedStatement.clearBatch(); connection.rollback(); }}
复制代码
Demo 中我们连接的是 3307 端口,该端口指向的是 Proxy 服务,我们需要将如下的配置添加到 Proxy 配置文件中(如下展示的全局索引配置为 ShardingSphere 商业版功能,开源版本需要删除全局索引配置):
databaseName: sphereex_global_index
dataSources: global_index: url: jdbc:mysql://localhost:3306/global_index?serverTimezone=UTC&useSSL=false&characterEncoding=utf-8&allowPublicKeyRetrieval=true username: root password: 123456 connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 50 minPoolSize: 2
rules:- !SINGLE tables: - "*.*"- !SHARDING tables: t_order: actualDataNodes: global_index.t_order_${0..9} tableStrategy: standard: shardingColumn: order_id shardingAlgorithmName: t_order_inline globalIndexStrategy: globalIndexNames: - t_order_user_id_idx - t_order_merchant_id_idx consistencyLevel: STRONG globalIndexes: t_order_user_id_idx: actualDataNodes: global_index.t_order_user_id_idx_${0..9} databaseStrategy: none: tableStrategy: standard: shardingColumn: user_id shardingAlgorithmName: t_order_user_id_idx_inline coveringColumns: - order_id t_order_merchant_id_idx: actualDataNodes: global_index.t_order_merchant_id_idx_${0..9} databaseStrategy: none: tableStrategy: standard: shardingColumn: merchant_id shardingAlgorithmName: t_order_merchant_id_idx_inline coveringColumns: - order_id - creation_date shardingAlgorithms: t_order_inline: type: INLINE props: algorithm-expression: t_order_${order_id % 10} t_order_user_id_idx_inline: type: INLINE props: algorithm-expression: t_order_user_id_idx_${user_id % 10} t_order_merchant_id_idx_inline: type: INLINE props: algorithm-expression: t_order_merchant_id_idx_${merchant_id % 10}
复制代码
启动 Demo 程序,可以复现和 E2E 中相同的异常,下面我们就来分析下异常的具体原因。
返回多个 MySQLOKPacket
我们使用最小化 Demo 进行 Debug,可以发现 Proxy 多语句的入口类是 MySQLMultiStatementsHandler,该类目前返回的结果为 UpdateResponseHeader,UpdateResponseHeader 会被封装为单个 MySQLOKPacket,目前执行多语句时,只会简单地将 updated 进行了累加,因此断言时结果不正确。
private UpdateResponseHeader executeBatchedStatements(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext) throws SQLException { boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown(); ResourceMetaData resourceMetaData = metaDataContexts.getMetaData().getDatabase(connectionSession.getUsedDatabaseName()).getResourceMetaData(); JDBCExecutorCallback<int[]> callback = new BatchedJDBCExecutorCallback(resourceMetaData, sqlStatementSample, isExceptionThrown); List<int[]> executeResults = jdbcExecutor.execute(executionGroupContext, callback); int updated = 0; for (int[] eachResult : executeResults) { for (int each : eachResult) { updated += each; } } // TODO Each logic SQL should correspond to an OK Packet. return new UpdateResponseHeader(sqlStatementSample, Collections.singletonList(new UpdateResult(updated, 0L)));}
复制代码
根据此处的 TODO 标记可以看出,MySQL 执行多语句时,需要返回批量的 MySQLOKPacket 集合,分别对应每条语句的执行结果。为了解决这个问题,需要增加一个 MultiStatementsUpdateResponseHeader 类进行封装,将多个 MySQLOKPacket 集合封装到其中:
/** * Multi statements update response header. */@RequiredArgsConstructor@Getterpublic final class MultiStatementsUpdateResponseHeader implements ResponseHeader { private final Collection<UpdateResponseHeader> updateResponseHeaders;}
复制代码
然后在 MySQLComQueryPacketExecutor 执行器类中,对 MultiStatementsUpdateResponseHeader 进行处理,具体处理逻辑如下,根据 MultiStatementsUpdateResponseHeader 中维护的 UpdateResponseHeader 集合,将其组装为多个 MySQLOKPacket。
@Overridepublic Collection<DatabasePacket> execute() throws SQLException { ResponseHeader responseHeader = proxyBackendHandler.execute(); if (responseHeader instanceof QueryResponseHeader) { return processQuery((QueryResponseHeader) responseHeader); } responseType = ResponseType.UPDATE; if (responseHeader instanceof MultiStatementsUpdateResponseHeader) { return processMultiStatementsUpdate((MultiStatementsUpdateResponseHeader) responseHeader); } return processUpdate((UpdateResponseHeader) responseHeader);}
private Collection<DatabasePacket> processMultiStatementsUpdate(final MultiStatementsUpdateResponseHeader responseHeader) { Collection<DatabasePacket> result = new LinkedList<>(); int index = 0; for (UpdateResponseHeader each : responseHeader.getUpdateResponseHeaders()) { boolean lastPacket = ++index == responseHeader.getUpdateResponseHeaders().size(); result.addAll(ResponsePacketBuilder.buildUpdateResponsePackets(each, ServerStatusFlagCalculator.calculateFor(connectionSession, lastPacket))); } return result;}
复制代码
此时,我们再次进行测试,但是发现结果仍然不正确,这又是为什么呢?想要搞清楚 MySQL 内部协议的交互逻辑,我们需要通过 WireShark 进行抓包,对比原生 MySQL 批量语句执行和 Proxy 批量语句执行之间的差异。
ServerStatusFlag 增加 SERVER_MORE_RESULTS_EXISTS
为了搞清楚 Proxy 和 MySQL 之间的差异,我们分别执行 Demo 程序中的 Proxy 示例和 MySQL 示例,并使用 WireShark 进行抓包(WireShark 使用可参考使用 Wireshark 解决 BenchmarkSQL 压测 Proxy 异常)。首先,我们执行 MySQL 批量写入并进行抓包,如下记录了抓包的内容,包括了 1 次 Request 和 4 次 Response。
然后我们再执行 Proxy 批量写入,并使用 WireShark 抓包,如下记录了 Proxy 抓包的内容,只有 1 次 Request 和 1 次 Response。
对比 MySQL 和 Proxy 抓包的差异,可以发现 MySQL 直到最后一个 MySQLOKPacket Server Status 才变为 1,前三个 MySQLOKPacket Server Status 都为 9(8 多语句 SERVER_MORE_RESULTS_EXISTS + 1 事务中 SERVER_STATUS_IN_TRANS),而 Proxy 第一个 Response 就返回了 1,并且后续不再返回 Response。
排查 MySQL 驱动可以发现,如果 SQL Response 中的 ServerStatusFlag 不包含 MySQLStatusFlag.SERVER_MORE_RESULTS_EXISTS,MySQL 驱动就只会读取第一个 MySQLOKPacket,并填充到客户端 int[] 数组中。因此可以考虑在封装多语句 MySQLOKPacket 时,根据多语句是否为最后一条,决定该标记的设置,当 MySQLOKPacket 未遍历到最后一条记录时,应设置 SERVER_MORE_RESULTS_EXISTS 标记。
我们调整 ServerStatusFlagCalculator#calculateFor 方法的实现逻辑,根据传入的 lastPacket 标记,决定是否设置 SERVER_MORE_RESULTS_EXISTS,具体实现逻辑如下:
/** * Calculate server status flag for specified connection. * * @param connectionSession connection session * @param lastPacket last packet * @return server status flag */public static int calculateFor(final ConnectionSession connectionSession, final boolean lastPacket) { int result = 0; result |= connectionSession.isAutoCommit() ? MySQLStatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue() : 0; result |= connectionSession.getTransactionStatus().isInTransaction() ? MySQLStatusFlag.SERVER_STATUS_IN_TRANS.getValue() : 0; result |= lastPacket ? 0 : MySQLStatusFlag.SERVER_MORE_RESULTS_EXISTS.getValue(); return result;}
复制代码
修改完成后,我们再次运行测试程序,发现此时直接出现了 NPE,需要进一步分析 NPE 的原因。
capabilityFlags 增加 CLIENT_MULTI_RESULTS/CLIENT_PS_MULTI_RESULTS
根据出现 NPE 的位置,我们大致可以定位到 NativeProtocol#readNextResultset 方法,通过 Debug 可以发现,在 MySQL 驱动获取下一个结果集时,currentProtocolEntity 为空导致了 NPE。排查 currentProtocolEntity 赋值的地方,发现是 serverSession.useMultiResults() 返回 false,导致 currentProtocolEntity 未赋值,而 useMultiResults 方法的判断逻辑如下,会从 clientParam 标记中获取 CLIENT_MULTI_RESULTS 和 CLIENT_PS_MULTI_RESULTS。
// /Users/duanzhengqiang/.m2/repository/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31-sources.jar!/com/mysql/cj/protocol/a/NativeServerSession.java:220@Overridepublic boolean useMultiResults() { return (this.clientParam & CLIENT_MULTI_RESULTS) != 0 || (this.clientParam & CLIENT_PS_MULTI_RESULTS) != 0;}
复制代码
可以看到该判断主要依赖 clientParam 变量,NativeAuthenticationProvider 方法会在登录认证通过后,调用 setClientParam 方法初始化该变量,具体代码逻辑位置如下。
// /Users/duanzhengqiang/.m2/repository/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31-sources.jar!/com/mysql/cj/protocol/a/NativeAuthenticationProvider.java:201long clientParam = capabilityFlags & NativeServerSession.CLIENT_LONG_PASSWORD //| (this.propertySet.getBooleanProperty(PropertyKey.useAffectedRows).getValue() ? // 0 : capabilityFlags & NativeServerSession.CLIENT_FOUND_ROWS) //| capabilityFlags & NativeServerSession.CLIENT_LONG_FLAG //| (this.useConnectWithDb ? capabilityFlags & NativeServerSession.CLIENT_CONNECT_WITH_DB : 0) //| (this.propertySet.getBooleanProperty(PropertyKey.useCompression).getValue() ? // capabilityFlags & NativeServerSession.CLIENT_COMPRESS : 0) //| (this.propertySet.getBooleanProperty(PropertyKey.allowLoadLocalInfile).getValue() || this.propertySet.getStringProperty(PropertyKey.allowLoadLocalInfileInPath).isExplicitlySet() ? // capabilityFlags & NativeServerSession.CLIENT_LOCAL_FILES : 0) //| capabilityFlags & NativeServerSession.CLIENT_PROTOCOL_41 //| (this.propertySet.getBooleanProperty(PropertyKey.interactiveClient).getValue() ? // capabilityFlags & NativeServerSession.CLIENT_INTERACTIVE : 0) //| (this.propertySet.<SslMode>getEnumProperty(PropertyKey.sslMode).getValue() != SslMode.DISABLED ? // capabilityFlags & NativeServerSession.CLIENT_SSL : 0) //| capabilityFlags & NativeServerSession.CLIENT_TRANSACTIONS // Required to get server status values.| NativeServerSession.CLIENT_SECURE_CONNECTION //| (this.propertySet.getBooleanProperty(PropertyKey.allowMultiQueries).getValue() ? // capabilityFlags & NativeServerSession.CLIENT_MULTI_STATEMENTS : 0) //| capabilityFlags & NativeServerSession.CLIENT_MULTI_RESULTS // Always allow multiple result sets.| capabilityFlags & NativeServerSession.CLIENT_PS_MULTI_RESULTS // Always allow multiple result sets for SSPS.| NativeServerSession.CLIENT_PLUGIN_AUTH //| (NONE.equals(this.propertySet.getStringProperty(PropertyKey.connectionAttributes).getValue()) ? // 0 : capabilityFlags & NativeServerSession.CLIENT_CONNECT_ATTRS) //| capabilityFlags & NativeServerSession.CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA //| (this.propertySet.getBooleanProperty(PropertyKey.disconnectOnExpiredPasswords).getValue() ? // 0 : capabilityFlags & NativeServerSession.CLIENT_CAN_HANDLE_EXPIRED_PASSWORD) //| (this.propertySet.getBooleanProperty(PropertyKey.trackSessionState).getValue() ? // capabilityFlags & NativeServerSession.CLIENT_SESSION_TRACK : 0) //| capabilityFlags & NativeServerSession.CLIENT_DEPRECATE_EOF //| capabilityFlags & NativeServerSession.CLIENT_QUERY_ATTRIBUTES //| capabilityFlags & NativeServerSession.CLIENT_MULTI_FACTOR_AUTHENTICATION;
sessState.setClientParam(clientParam);
复制代码
Proxy 端通过 MySQLAuthenticationEngine 处理 MySQL 登录认证,会将握手结果封装在 MySQLHandshakePacket 中,其中包含了 capabilityFlags 服务端能力标志位的信息。
@Overridepublic int handshake(final ChannelHandlerContext context) { int result = ConnectionIdGenerator.getInstance().nextId(); connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH; boolean sslEnabled = ProxySSLContext.getInstance().isSSLEnabled(); if (sslEnabled) { context.pipeline().addFirst(MySQLSSLRequestHandler.class.getSimpleName(), new MySQLSSLRequestHandler()); } context.writeAndFlush(new MySQLHandshakePacket(result, sslEnabled, authPluginData)); MySQLStatementIdGenerator.getInstance().registerConnection(result); return result;}
复制代码
参考 MySQL Client/Server Protocol 文档 - Capabilities Flags,能力标志位共 32 个 bit 位,每个 bit 位代表协议的一个可选功能,客户端和服务端的交集,共同决定了将使用协议的哪些可选部分。
按照功能属于高 16 位,还是低 16 位,需要分别将功能设置到 capabilityFlagsLower 和 capabilityFlagsUpper 中。查看 CLIENT_MULTI_RESULTS 和 CLIENT_PS_MULTI_RESULTS,它们属于高位功能,因此在 calculateHandshakeCapabilityFlagsUpper 方法中增加 Flags 即可,如下是具体设置代码。
CLIENT_MULTI_RESULTS(0x00020000),
CLIENT_PS_MULTI_RESULTS(0x00040000),
/** * Get handshake capability flags upper bit. * * @return handshake capability flags upper bit */public static int calculateHandshakeCapabilityFlagsUpper() { return calculateCapabilityFlags(CLIENT_MULTI_STATEMENTS, CLIENT_PLUGIN_AUTH, CLIENT_MULTI_RESULTS, CLIENT_PS_MULTI_RESULTS) >> 16;}
复制代码
修改完成后,再次使用 Demo 程序测试,发现已经能够返回正确的结果,通过 JDBC 可以正常执行 addBatch/executeBatch 并返回 int[] 数组。
功能测试
最后,我们使用全局索引功能 E2E 再次进行测试,原先断言失败的 Case 现在终于可以通过,大家终于可以放心使用商业版全局索引功能。在此,也真心向大家推荐 SphereEx 的 DBPlusEngine,相比开源的 ShardingSphere,它具有更完善的企业级功能,不仅能够进行海量数据的分片管理,还可以用于数据安全加密和数据库替换等场景,更多信息可以查看 SphereEx 官网。
结语
本文介绍了 E2E 测试 Proxy 发现批量写入返回结果错误后,如何一步步梳理 Proxy 代码,使用 Wireshark 抓包对比分析,以及排查 MySQL 驱动源码,最终完美解决了问题。提升 Proxy 对 MySQL 协议的兼容度,很直接的方法就是同测试用例比对,通过强大的 Wireshark 工具,我们可以很清晰地观测到请求过程中的差异,进而快速找到解决问题的方案。本案例的排查思路也适合其他 Proxy 接入端的问题,希望对大家有用,由于本人对 Wireshark 使用经验有限,如果问题也欢迎指正。
参考文档
欢迎关注
欢迎关注「端小强的博客」微信公众号,会不定期分享日常学习和工作经验,欢迎大家关注交流。
评论