摘要:实践解析如何利用 SarkSQL 高并发进行读取数据库和存储数据到数据库。
本文分享自华为云社区《SarkSQL高并发读取数据库和存储数据到数据库》,作者:Copy 工程师 。
1. SparkSql 高并发读取数据库
SparkSql 连接数据库读取数据给了三个 API:
//Construct a DataFrame representing the database table accessible via JDBC URL url named table and connection properties.
Dataset<Row> jdbc(String url, String table, java.util.Properties properties)
//Construct a DataFrame representing the database table accessible via JDBC URL url named table using connection properties.
Dataset<Row> jdbc(String url, String table, String[] predicates, java.util.Properties connectionProperties)
//Construct a DataFrame representing the database table accessible via JDBC URL url named table.
Dataset<Row> jdbc(String url, String table, String columnName, long lowerBound, long upperBound, int numPartitions, java.util.Properties connectionProperties)
复制代码
三个 API 介绍:
1、单个分区,单个 task 执行,无并发
遇到数据量很大的表,抽取速度慢。
实例:
SparkSession sparkSession = SparkSession.builder().appName("SPARK_FENGDING_TASK1").master("local").config("spark.testing.memory", 471859200).getOrCreate();
// 配置连接属性
Properties dbProps = new Properties();
dbProps.put("user","user");
dbProps.put("password","pwd");
dbProps.put("driver","oracle.jdbc.driver.OracleDriver");
// 连接数据库 获取数据 要使用自己的数据库连接串
Dataset<Row> tableDf = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", dbProps);
// 返回1
tableDf.rdd().getPartitions();
复制代码
该 API 的并发数为 1,单分区,不管你留给该任务节点多少资源,都只有一个 task 执行任务
2、任意字段分区
该 API 是第二个 API,根据设置的分层条件设置并发度:
def jdbc(
url: String,
table: String,
predicates: Array[String], #这个是分层的条件,一个数组
connectionProperties: Properties): DataFrame = {
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
jdbc(url, table, parts, connectionProperties)
}
复制代码
实例:
// 设置分区条件 通过入库时间 把 10月和11月 的数据 分两个分区
String[] patitions = {"rksj >= '1569859200' and rksj < '1572537600'","rksj >= '1572537600' and rksj < '1575129600'"};
// 根据StudentId 分15个分区,就会有15个task抽取数据
Dataset<Row> tableDf3 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO",patitions,dbProps);
// 返回2
tableDf3.rdd().getPartitions();
复制代码
该 API 操作相对自由,就是设置分区条件麻烦一点。
3、根据 Long 类型字段分区该 API 是第三个 API,根据设置的分区数并发抽取数据:
def jdbc(
url: String,
table: String,
columnName: String, # 根据该字段分区,需要为整形,比如id等
lowerBound: Long, # 分区的下界
upperBound: Long, # 分区的上界
numPartitions: Int, # 分区的个数
connectionProperties: Properties): DataFrame = {
val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, connectionProperties)
}
复制代码
实例:
// 根据StudentId 分15个分区,就会有15个task抽取数据
Dataset<Row> tableDf2 = sparkSession.read().jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", "studentId",0,1500,15,dbProps);
// 返回10
tableDf2.rdd().getPartitions();
复制代码
该操作根据分区数设置并发度,缺点是只能用于 Long 类型字段。
2. 存储数据到数据库
存储数据库 API 给了 Class DataFrameWriter<T>类,该类有存储到文本,Hive,数据库的 API。这里只说数据库的 API,提一句,如果保存到 Text 格式,只支持保存一列。。。就很难受。
实例:
有三种写法
// 第一张写法,指定format类型,使用save方法存储数据库
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
// 第二种写法 使用jdbc写入数据库
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// 第三种写法,也是使用jdbc,只不过添加createTableColumnTypes,创建表的时候使用该属性字段创建表字段
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
复制代码
当我们的表已经存在的时候,使用上面的语句就会报错表已存在,这是因为我们没有指定存储模式,默认是 ErrorIfExists
保存模式:
所以一般都是这样用:
tableDf3.write().mode(SaveMode.Append).jdbc("jdbc:oracle:thin:@IP:1521:DEMO", "TABLE_DEMO", connectionProperties);
对于 connectionProperties 还有很多其他选项:
这里面的 truncate 就是说当使用 SaveMode.Overwrite 的时候,设置 truncate 为 true,就会对表进行 truncate 语句清理表,不再是删除表在重建表的操作。
点击关注,第一时间了解华为云新鲜技术~
评论