env {  parallelism = 1  job.mode = "BATCH"}
source {  Http {    plugin_output = "stock"    url = "https://ip/http/prd/query_sap_stock"    method = "POST"    headers {      Authorization = "Basic XXX"      Content-Type = "application/json"    }    body = """{"IT_WERKS": [{"VALUE": "1080"}]}"""    format = "json"    content_field = "$.ET_RETURN.*"    schema {      fields {        MATNR = "string"        MAKTX = "string"        WERKS = "string"        NAME1 = "string"        LGORT = "string"        LGOBE = "string"        CHARG = "string"        MEINS = "string"        LABST = "double"        UMLME = "double"        INSME = "double"        EINME = "double"        SPEME = "double"        RETME = "double"      }    }  }}
# 此转换操作主要用于字段从命名等方便用途transform {  Sql {    plugin_input = "stock"    plugin_output = "stock-tf-out"    query = "select MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME from stock"  }}
# 连接starRocks 进行数据分区覆写,本例适用starRocks建表,按照分区insert overwrite 覆写sink {    jdbc {        plugin_input = "stock-tf-out"        url = "jdbc:mysql://XXX:9030/scm?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"        driver = "com.mysql.cj.jdbc.Driver"        user = "lab"        password = "XXX"        compatible_mode="starrocks"        query = """insert overwrite ods_sap_stock PARTITION (WERKS='1080') (MATNR, MAKTX, WERKS,NAME1,LGORT,LGOBE,CHARG,MEINS,LABST,UMLME,INSME,EINME,SPEME,RETME) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"""        }}
# connector-starrocks进行对接 (未看到支持sql语句进行数据insert overwrite,本例子场景不适合),比较适合表数据全部删除重建场景// sink {//   StarRocks {//     plugin_input = "stock-tf-out"//     nodeUrls = ["ip:8030"]//     base-url = "jdbc:mysql://ip:9030/"//     username = "lab"//     password = "XXX"//     database = "scm"//     table = "ods_sap_stock"//     batch_max_rows = 1000//     data_save_mode="DROP_DATA"//     starrocks.config = {//       format = "JSON"//       strip_outer_array = true//     }//     schema_save_mode = "RECREATE_SCHEMA"//     save_mode_create_template="""//       CREATE TABLE IF NOT EXISTS `scm`.`ods_sap_stock` (//         MATNR STRING COMMENT '物料',//         WERKS STRING COMMENT '工厂',//         LGORT STRING COMMENT '库存地点',//         MAKTX STRING COMMENT '物料描述',//         NAME1 STRING COMMENT '工厂名称',//         LGOBE STRING COMMENT '地点描述',//         CHARG STRING COMMENT '批次编号',//         MEINS STRING COMMENT '单位',//         LABST DOUBLE COMMENT '非限制使用库存',//         UMLME DOUBLE COMMENT '在途库存',//         INSME DOUBLE COMMENT '质检库存',//         EINME DOUBLE COMMENT '受限制使用的库存',//         SPEME DOUBLE COMMENT '已冻结的库存',//         RETME DOUBLE COMMENT '退货'//       ) ENGINE=OLAP//       PRIMARY KEY ( MATNR,WERKS,LGORT)//       COMMENT 'sap库存'//       DISTRIBUTED BY HASH (WERKS) PROPERTIES (//       "replication_num" = "1"//       )//     """//   }// }
评论