env { parallelism = 1 job.mode = "BATCH"}
source { Http { plugin_output = "stock" url = "https://ip/http/prd/query_sap_stock" method = "POST" headers { Authorization = "Basic XXX" Content-Type = "application/json" } body = """{"IT_WERKS": [{"VALUE": "1080"}]}""" format = "json" content_field = "$.ET_RETURN.*" schema { fields { MATNR = "string" MAKTX = "string" WERKS = "string" NAME1 = "string" LGORT = "string" LGOBE = "string" CHARG = "string" MEINS = "string" LABST = "double" UMLME = "double" INSME = "double" EINME = "double" SPEME = "double" RETME = "double" } } }}
# 此转换操作主要用于字段从命名等方便用途transform { Sql { plugin_input = "stock" plugin_output = "stock-tf-out" query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock" }}
# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写sink { jdbc { plugin_input = "stock-tf-out" url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "lab" password = "XXX" compatible_mode="starrocks" query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)""" }}
# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景// sink {// StarRocks {// plugin_input = "stock-tf-out"// nodeUrls = ["ip:8030"]// base-url = "jdbc:mysql://ip:9030/"// username = "lab"// password = "XXX"// database = "scm"// table = "ods_sap_stock"// batch_max_rows = 1000// data_save_mode="DROP_DATA"// starrocks.config = {// format = "JSON"// strip_outer_array = true// }// schema_save_mode = "RECREATE_SCHEMA"// save_mode_create_template="""// CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (// MATNR STRING COMMENT '物料',// WERKS STRING COMMENT '工厂',// LGORT STRING COMMENT '库存地点',// MAKTX STRING COMMENT '物料描述',// NAME1 STRING COMMENT '工厂名称',// LGOBE STRING COMMENT '地点描述',// CHARG STRING COMMENT '批次编号',// MEINS STRING COMMENT '单位',// LABST DOUBLE COMMENT '非限制使用库存',// UMLME DOUBLE COMMENT '在途库存',// INSME DOUBLE COMMENT '质检库存',// EINME DOUBLE COMMENT '受限制使用的库存',// SPEME DOUBLE COMMENT '已冻结的库存',// RETME DOUBLE COMMENT '退货'// ) ENGINE=OLAP// PRIMARY KEY ( MATNR,WERKS,LGORT)// COMMENT 'sap库存'// DISTRIBUTED BY HASH (WERKS) PROPERTIES (// "replication_num" = "1"// )// """// }// }
评论