写点什么

基于 Calcite 的分布式多数据源查询

作者:尹千觞
  • 2022 年 1 月 26 日
  • 本文字数:6053 字

    阅读完需:约 20 分钟

基于Calcite的分布式多数据源查询

在本文中,我们将实践 GBase8sMySQL 的跨数据源联合查询,案例中 MySQL 数据源中存放商品信息,GBase8s 数据源中存放订单信息。 整体架构如下,


好了,我们开始吧。

环境准备

GBase8s

安装镜像 docker pull liaosnet/gbase8s启动容器 docker run -itd -p 19088:9088 liaosnet/gbase8s容器基本信息:

JDBC JAR:/home/gbase/gbasedbtjdbc_3.3.0_2.jar类名:com.gbasedbt.jdbc.DriverURL:jdbc:gbasedbt-sqli://IPADDR:19088/testdb:GBASEDBTSERVER=gbase01;DB_LOCALE=zh_CN.utf8;CLIENT_LOCALE=zh_CN.utf8;IFX_LOCK_MODE_WAIT=30;用户:gbasedbt密码:GBase123其中:IPADDR为docker所在机器的IP地址,同时需要放通19088端口。
复制代码

MySQL

安装镜像 docker pull liaosnet/gbase8s启动容器 docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=dafei1288 -d mysql

数据准备

GBase8s

CREATE TABLE order_table ( oid INTEGER NOT NULL, iid INTEGER, icount INTEGER, PRIMARY KEY (oid) CONSTRAINT order_table_pk);
INSERT INTO order_table (oid, iid, icount) VALUES(1, 1, 10);INSERT INTO order_table (oid, iid, icount) VALUES(2, 3, 30);

复制代码

MySQL

create table item(    i_id    int auto_increment        primary key,    catalog varchar(20) null,    pname   varchar(20) null,    price   float       null,    constraint item_i_id_uindex        unique (i_id));
INSERT INTO test.item (i_id, catalog, pname, price) VALUES (1, '游戏', '大航海时代IV', 300);INSERT INTO test.item (i_id, catalog, pname, price) VALUES (2, '游戏', '马车8', 300);INSERT INTO test.item (i_id, catalog, pname, price) VALUES (3, '食品', '青椒豆腐乳西瓜', 20);

复制代码

工程准备

创建 maven 工程,目录如下图所示:


添加依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>    <artifactId>calcite_multi_database</artifactId>    <version>1.0-SNAPSHOT</version>
    <properties>        <maven.compiler.source>17</maven.compiler.source>        <maven.compiler.target>17</maven.compiler.target>    </properties>    <dependencies>        <dependency>            <groupId>org.apache.calcite</groupId>            <artifactId>calcite-core</artifactId>            <version>1.29.0</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>8.0.28</version>        </dependency>        <dependency>            <groupId>com.google.code.gson</groupId>            <artifactId>gson</artifactId>            <version>2.8.9</version>        </dependency>        <dependency>            <groupId>gbase</groupId>            <artifactId>gbasedbt</artifactId>            <version>330</version>            <scope>system</scope>            <systemPath>${project.basedir}/libs/gbasedbtjdbc_3.3.0_2_36477d.jar</systemPath>        </dependency>    </dependencies></project>
复制代码

添加数据源配置文件 multiSource.json

{  "defaultSchema": "gbasedbt",  "schemas": [    {      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",      "name": "mysql",      "operand": {        "jdbcDriver": "com.mysql.cj.jdbc.Driver",        "jdbcUrl": "jdbc:mysql://localhost:3306/test",        "jdbcUser": "root",        "jdbcPassword": "dafei1288"      },      "type": "custom"    },    {      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",      "name": "gbasedbt",      "operand": {        "jdbcDriver": "com.gbasedbt.jdbc.Driver",        "jdbcUrl": "jdbc:gbasedbt-sqli://localhost:19088/testdb:GBASEDBTSERVER=gbase01;DB_LOCALE=zh_CN.utf8;CLIENT_LOCALE=zh_CN.utf8;IFX_LOCK_MODE_WAIT=30;",        "jdbcUser": "gbasedbt",        "jdbcPassword": "GBase123"      },      "type": "custom"    }  ],  "version": "1.0"}
复制代码

创建执行程序 MultiSource

使用上面的配置文件,创建 Calcite Jdbc 链接

String filepath = "E:\\working\\GBase\\writting\\calcite_multi_database_select\\calcite_multi_database\\src\\main\\resources\\multiSource.json";Properties config = new Properties();config.put("model",filepath);config.put("lex", "MYSQL");
复制代码

这里 config.put("lex", "MYSQL"); 用于解析外层 SQL ,所以必须保留。使用查询语句 SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id 进行数据查询。

除了执行结果,其实我们也会对执行的逻辑计划感兴趣,那么我们来看看如何将该 SQL 的逻辑计划打印出来

public static void printLogicPlan(String modelPath , String sql) throws Exception{
        String modelJsonStr = Files.readAllLines(Paths.get(modelPath)).stream().collect(Collectors.joining("\n"));        HashMap map = new Gson().fromJson(modelJsonStr, HashMap.class);        List<Map> schemas = (List<Map>) map.get("schemas");
        SchemaPlus rootSchema = Frameworks.createRootSchema(true);        Schema gbasedbt = JdbcSchema.create(rootSchema, "gbasedbt" , (Map<String,Object>)schemas.get(1).get("operand"));        Schema mysql = JdbcSchema.create(rootSchema, "mysql" , (Map<String,Object>)schemas.get(0).get("operand"));        rootSchema.add("gbasedbt",gbasedbt);        rootSchema.add("mysql",mysql);
        SqlParser.Config insensitiveParser = SqlParser.configBuilder()                .setCaseSensitive(false)                .build();
        FrameworkConfig config = Frameworks.newConfigBuilder()                .parserConfig(insensitiveParser)                .defaultSchema(rootSchema)                .build();
        Planner planner = Frameworks.getPlanner(config);        SqlNode sqlNode = planner.parse(sql);        SqlNode sqlNodeValidated = planner.validate(sqlNode);        RelRoot relRoot = planner.rel(sqlNodeValidated);        RelNode relNode = relRoot.project();
        System.out.println(sqlNode.toSqlString(MysqlSqlDialect.DEFAULT));        System.out.println();        System.out.println(relNode.explain());}
复制代码

下面是逻辑计划打印的结果,我们不难看出,这里是使用了 2 个全表扫描,然后再通过 Join 算子,然后进行 project算子的。其实这个执行不能说效率很高吧,只能说非常慢,如果想做优化,我们以后再开一篇文章。

LogicalProject(OID=[$0], IID=[$1], ICOUNT=[$2], CATALOG=[$4], PNAME=[$5], PRICE=[$6])  LogicalJoin(condition=[=($1, $3)], joinType=[inner])    JdbcTableScan(table=[[gbasedbt, order_table]])    JdbcTableScan(table=[[mysql, item]])
复制代码

SQL 查询结果如下

oid iid icount catalog pname price 1 1 10 游戏 大航海时代IV 300.0 2 3 30 食品 青椒豆腐乳西瓜 20.0 
复制代码

执行截图


完整代码清单:

package wang.datahub;
import com.google.gson.Gson;import org.apache.calcite.adapter.jdbc.JdbcSchema;import org.apache.calcite.rel.RelNode;import org.apache.calcite.rel.RelRoot;import org.apache.calcite.rel.RelWriter;import org.apache.calcite.rel.externalize.RelWriterImpl;import org.apache.calcite.schema.Schema;import org.apache.calcite.schema.SchemaPlus;import org.apache.calcite.sql.SqlExplainLevel;import org.apache.calcite.sql.SqlNode;import org.apache.calcite.sql.dialect.MysqlSqlDialect;import org.apache.calcite.sql.parser.SqlParser;import org.apache.calcite.tools.FrameworkConfig;import org.apache.calcite.tools.Frameworks;import org.apache.calcite.tools.Planner;

import java.io.PrintWriter;import java.nio.file.Files;import java.nio.file.Paths;import java.sql.*;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.stream.Collectors;
public class MultiSource {    public static void main(String[] args)  throws Exception{        String filepath = "E:\\working\\GBase\\writting\\calcite_multi_database_select\\calcite_multi_database\\src\\main\\resources\\multiSource.json";        Properties config = new Properties();        config.put("model",filepath);        config.put("lex", "MYSQL");        String sql =                "SELECT o.oid,o.iid,o.icount,i.catalog,i.pname,i.price FROM gbasedbt.order_table AS o join mysql.item AS i on o.iid = i.i_id";
        try (Connection con = DriverManager.getConnection("jdbc:calcite:", config)) {            try (Statement stmt = con.createStatement()) {                try (ResultSet rs = stmt.executeQuery(sql)) {                    //打印逻辑计划                    printLogicPlan(filepath,sql);                    //打印查询结果                    printRs(rs);                }            }        }
    }
    public static void printRs(ResultSet rs) throws Exception {        ResultSetMetaData rsmd = rs.getMetaData();        int count = rsmd.getColumnCount();
        for(int i = 1; i <= count; i++){            System.out.print(rsmd.getColumnName(i)+"\t");        }        System.out.println();
        while(rs.next()){            for(int i = 1; i <= count; i++){                System.out.print(rs.getString(i)+"\t");            }            System.out.println();        }    }
    public static void printLogicPlan(String modelPath , String sql) throws Exception{
        String modelJsonStr = Files.readAllLines(Paths.get(modelPath)).stream().collect(Collectors.joining("\n"));        HashMap map = new Gson().fromJson(modelJsonStr, HashMap.class);        List<Map> schemas = (List<Map>) map.get("schemas");
        SchemaPlus rootSchema = Frameworks.createRootSchema(true);        Schema gbasedbt = JdbcSchema.create(rootSchema, "gbasedbt" , (Map<String,Object>)schemas.get(1).get("operand"));        Schema mysql = JdbcSchema.create(rootSchema, "mysql" , (Map<String,Object>)schemas.get(0).get("operand"));        rootSchema.add("gbasedbt",gbasedbt);        rootSchema.add("mysql",mysql);
        SqlParser.Config insensitiveParser = SqlParser.configBuilder()                .setCaseSensitive(false)                .build();
        FrameworkConfig config = Frameworks.newConfigBuilder()                .parserConfig(insensitiveParser)                .defaultSchema(rootSchema)                .build();
        Planner planner = Frameworks.getPlanner(config);        SqlNode sqlNode = planner.parse(sql);        SqlNode sqlNodeValidated = planner.validate(sqlNode);        RelRoot relRoot = planner.rel(sqlNodeValidated);        RelNode relNode = relRoot.project();
        System.out.println(sqlNode.toSqlString(MysqlSqlDialect.DEFAULT));        System.out.println();        System.out.println(relNode.explain());    }}
复制代码

好了,现在我们初步完成了基于 GBase8s 的跨数据源查询工作,下一篇文章我们来说说查询优化。提前祝广大读者朋友,新春快乐,虎虎生威 ...

发布于: 刚刚阅读数: 2
用户头像

尹千觞

关注

还未添加个人签名 2018.04.27 加入

还未添加个人简介

评论

发布
暂无评论
基于Calcite的分布式多数据源查询