env {
parallelism = 1
job.mode = "BATCH"
}
source {
Http {
plugin_output = "stock"
url = "https://ip/http/prd/query_sap_stock"
method = "POST"
headers {
Authorization = "Basic XXX"
Content-Type = "application/json"
}
body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""
format = "json"
content_field = "$.ET_RETURN.*"
schema {
fields {
MATNR = "string"
MAKTX = "string"
WERKS = "string"
NAME1 = "string"
LGORT = "string"
LGOBE = "string"
CHARG = "string"
MEINS = "string"
LABST = "double"
UMLME = "double"
INSME = "double"
EINME = "double"
SPEME = "double"
RETME = "double"
}
}
}
}
# 此转换操作主要用于字段从命名等方便用途
transform {
Sql {
plugin_input = "stock"
plugin_output = "stock-tf-out"
query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"
}
}
# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写
sink {
jdbc {
plugin_input = "stock-tf-out"
url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "lab"
password = "XXX"
compatible_mode="starrocks"
query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""
}
}
# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景
// sink {
// StarRocks {
// plugin_input = "stock-tf-out"
// nodeUrls = ["ip:8030"]
// base-url = "jdbc:mysql://ip:9030/"
// username = "lab"
// password = "XXX"
// database = "scm"
// table = "ods_sap_stock"
// batch_max_rows = 1000
// data_save_mode="DROP_DATA"
// starrocks.config = {
// format = "JSON"
// strip_outer_array = true
// }
// schema_save_mode = "RECREATE_SCHEMA"
// save_mode_create_template="""
// CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (
// MATNR STRING COMMENT '物料',
// WERKS STRING COMMENT '工厂',
// LGORT STRING COMMENT '库存地点',
// MAKTX STRING COMMENT '物料描述',
// NAME1 STRING COMMENT '工厂名称',
// LGOBE STRING COMMENT '地点描述',
// CHARG STRING COMMENT '批次编号',
// MEINS STRING COMMENT '单位',
// LABST DOUBLE COMMENT '非限制使用库存',
// UMLME DOUBLE COMMENT '在途库存',
// INSME DOUBLE COMMENT '质检库存',
// EINME DOUBLE COMMENT '受限制使用的库存',
// SPEME DOUBLE COMMENT '已冻结的库存',
// RETME DOUBLE COMMENT '退货'
// ) ENGINE=OLAP
// PRIMARY KEY ( MATNR,WERKS,LGORT)
// COMMENT 'sap库存'
// DISTRIBUTED BY HASH (WERKS) PROPERTIES (
// "replication_num" = "1"
// )
// """
// }
// }
评论