写点什么

SarkSQL 高并发:读取存储数据库

  • 2022 年 1 月 24 日
  • 本文字数:2320 字

    阅读完需:约 8 分钟

摘要:实践解析如何利用 SarkSQL 高并发进行读取数据库和存储数据到数据库。

 

本文分享自华为云社区《SarkSQL高并发读取数据库和存储数据到数据库》,作者:Copy 工程师 。

1. SparkSql 高并发读取数据库

SparkSql 连接数据库读取数据给了三个 API:

 

//Construct a DataFrame representing the database table accessible via JDBC URL url named table and connection properties.Dataset<Row> 	jdbc(String url, String table, java.util.Properties properties)//Construct a DataFrame representing the database table accessible via JDBC URL url named table using connection properties.Dataset<Row> 	jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)//Construct a DataFrame representing the database table accessible via JDBC URL url named table.Dataset<Row> 	jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)
复制代码

 

三个 API 介绍:

 

1、单个分区,单个 task 执行,无并发

遇到数据量很大的表,抽取速度慢。

实例:

 

SparkSession sparkSession = SparkSession.builder().appName("SPARK_FENGDING_TASK1").master("local").config("spark.testing.memory", 471859200).getOrCreate();// 配置连接属性Properties dbProps = new Properties();dbProps.put("user","user");dbProps.put("password","pwd");dbProps.put("driver","oracle.jdbc.driver.OracleDriver");// 连接数据库 获取数据 要使用自己的数据库连接串Dataset<Row> tableDf = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", dbProps);// 返回1tableDf.rdd().getPartitions();
复制代码

 

该 API 的并发数为 1,单分区,不管你留给该任务节点多少资源,都只有一个 task 执行任务

 

2、任意字段分区

该 API 是第二个 API,根据设置的分层条件设置并发度:

 

def jdbc(    url: String,    table: String,    predicates: Array[String], #这个是分层的条件,一个数组    connectionProperties: Properties): DataFrame = {    val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>        JDBCPartition(part, i) : Partition    }    jdbc(url, table, parts, connectionProperties)}
复制代码

 

实例:

 

// 设置分区条件 通过入库时间 把 10月和11月 的数据 分两个分区String[] patitions = {"rksj >= '1569859200' and rksj < '1572537600'","rksj >= '1572537600' and rksj < '1575129600'"};// 根据StudentId 分15个分区,就会有15个task抽取数据Dataset<Row> tableDf3 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO",patitions,dbProps);// 返回2tableDf3.rdd().getPartitions();
复制代码

 

该 API 操作相对自由,就是设置分区条件麻烦一点。

 

3、根据 Long 类型字段分区该 API 是第三个 API,根据设置的分区数并发抽取数据:

 

def jdbc(    url: String,    table: String,    columnName: String,    # 根据该字段分区,需要为整形,比如id等    lowerBound: Long,      # 分区的下界    upperBound: Long,      # 分区的上界    numPartitions: Int,    # 分区的个数    connectionProperties: Properties): DataFrame = {    val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)    val parts = JDBCRelation.columnPartition(partitioning)    jdbc(url, table, parts, connectionProperties)}
复制代码

 

实例:

 

// 根据StudentId 分15个分区,就会有15个task抽取数据Dataset<Row> tableDf2 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", "studentId",0,1500,15,dbProps);// 返回10tableDf2.rdd().getPartitions();
复制代码

 

该操作根据分区数设置并发度,缺点是只能用于 Long 类型字段。

2. 存储数据到数据库

存储数据库 API 给了 Class DataFrameWriter<T>类,该类有存储到文本,Hive,数据库的 API。这里只说数据库的 API,提一句,如果保存到 Text 格式,只支持保存一列。。。就很难受。

 

实例:

有三种写法

 

// 第一张写法,指定format类型,使用save方法存储数据库jdbcDF.write()  .format("jdbc")  .option("url", "jdbc:postgresql:dbserver")  .option("dbtable", "schema.tablename")  .option("user", "username")  .option("password", "password")  .save();// 第二种写法 使用jdbc写入数据库jdbcDF2.write()  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// 第三种写法,也是使用jdbc,只不过添加createTableColumnTypes,创建表的时候使用该属性字段创建表字段jdbcDF.write() .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
复制代码

 

当我们的表已经存在的时候,使用上面的语句就会报错表已存在,这是因为我们没有指定存储模式,默认是 ErrorIfExists

保存模式:

 

所以一般都是这样用:

tableDf3.write().mode(SaveMode.Append).jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", connectionProperties);

对于 connectionProperties 还有很多其他选项:

 




 

这里面的 truncate 就是说当使用 SaveMode.Overwrite 的时候,设置 truncate 为 true,就会对表进行 truncate 语句清理表,不再是删除表在重建表的操作。

 

点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
SarkSQL高并发:读取存储数据库