写点什么

springboot 项目集成 dolphinscheduler 调度器 可拖拽 spark 任务管理

作者:刘大猫
  • 2025-07-12
    黑龙江
  • 本文字数:14297 字

    阅读完需:约 47 分钟

dolphinscheduler 调度器接入注意事项等信息可参考我的上一篇博客进行了解,地址在这里 ->@[TOC]

一、功能清单

二、可拖拽 spark 任务管理

说明:任务管理实际是操作 dolphinscheduler 调度器中的项目中的用户下唯一工作流中的各种类型节点的管理操作

共用的依赖

<!--httpclient--><dependency>    <groupId>commons-httpclient</groupId>    <artifactId>commons-httpclient</artifactId>    <version>3.1</version></dependency>
复制代码

共用配置文件

dolphinscheduler.token=xxxdolphinscheduler.address=http://IP:12345
复制代码

共用代码

@Autowiredprivate RestTemplate restTemplate;@Value("${dolphinscheduler.token}")String token;@Value("${dolphinscheduler.address}")String address;public static final int ZERO = 0;public static final int SUCCESS = 200;@Autowiredprivate DragSparkTaskService dragSparkTaskService;@Value("${spark.main.class}")String mainClass;public static final String CREATE = "create";public static final String UPDATE = "update";public static final String ADD = "add";public static final String DELETE = "delete";public static final String ONLINE = "ONLINE";public static final String OFFLINE = "OFFLINE";public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;public static final int SIX = 6;public static final int EIGHTY = 80;public static final int THREE = 3;@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${drag.task.state}")String dragTaskState;@Autowiredprivate DragSparkTaskMapper dragSparkTaskMapper;
复制代码

1.创建可拖拽模型 spark 任务



请求参数举例:


{  "name": "测试任务模型1",  "describeInfo": "测试任务模型1",  "projectName": "spark线性回归模型",  "task": {    "edges": [      {        "id": "13889c99",        "index": 2,        "source": "356bc2be",        "target": "33838a0d",        "sourceAnchor": 2,        "targetAnchor": 0      }    ],    "nodes": [      {        "x": 482.671875,        "y": 89.125,        "id": "356bc2be",        "size": "72*72",        "type": "node",        "color": "#1890FF",        "index": 0,        "label": "数据源",        "shape": "flow-circle",        "config": {          "sourceType": "mysql",          "targetTable": "machine_learning_house_info2"        },        "inputs": [],        "outputs": [],        "modelType": "dataSource"      },      {        "x": 502.171875,        "y": 269.125,        "id": "33838a0d",        "size": "110*42",        "type": "node",        "color": "#66C35D",        "index": 1,        "label": "全表统计",        "shape": "flow-capsule",        "config": {          "selectColumns": "*"        },        "inputs": [],        "outputs": [],        "modelType": "fullTableStatistics"      }    ]  }}
复制代码



代码


/**     * 创建任务-创建用户下唯一工作流,无则创建有则并排添加     * @param request request     * @param vo 任务参数     * @author liudz     * @date 2021/5/8     * @return 执行结果     **/    @PostMapping("/project/process")    @Transactional(rollbackFor = Exception.class)    public Response operatorDragSparkTask(HttpServletRequest request, @RequestBody DragSparkTaskVo vo) {        Long userId = Long.valueOf(request.getUserPrincipal().getName());        vo.setCreateId(userId);        vo.setUpdateId(userId);        if (vo == null || org.apache.commons.lang3.StringUtils.isBlank(vo.getName()) || vo.getDescribeInfo() == null            || vo.getCreateId() == null) {            log.error("--DragSparkTaskController--addDragSparkTask--PARAM_ERROR!--");            return Response.error(Msg.PARAM_ERROR);        }        Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.addDragSparkTask(vo);        log.info("--(1)addDragSparkTask--success");        if (dragSparkTaskResponse.getCode() == SUCCESS) {            Boolean verifyResult = verifyProcessExist(userId + "-dragSparkTask", vo.getProjectName());            log.info("--(2)verifyProcessExist--success:{}", verifyResult);            if (!verifyResult) {                ProcessDto processDto = packageProcessParam(                        "create", userId + "-dragSparkTask", vo, null);                log.info("--(3)packageProcessParam--success");                dragSparkTaskResponse =  createProcess(vo, processDto);            } else {                //获取用户下唯一工作流ID                DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());                JSONObject processJson = new JSONObject();                log.info("--(3)getUserProcess--success:{}", processInfoList);                List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();                for (Map<String, Object> map : list) {                    if (map.get("name").equals(userId + "-dragSparkTask")) {                        processJson.fluentPutAll(map);                    }                }                ProcessDto processDto = packageProcessParam(                        "add", userId + "-dragSparkTask", vo, processJson);                processDto.setId(processJson.getInteger("id"));                log.info("--(4)packageProcessParam--success");                if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {                    releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",                            processDto.getId(), 0);                    log.info("--(5)releaseProcessDefinition--OFFLINE--success");                }                dragSparkTaskResponse =  updateProcess(vo, processDto);            }        }        return dragSparkTaskResponse;    }
复制代码


/**     * 校验工作流是否存在     *      * @param processName     *            工作流名称     * @param projectName 项目名称     * @author liudz     * @date 2021/5/8     * @return boolean     **/    public Boolean verifyProcessExist(String processName, String projectName) {        HttpHeaders headers = new HttpHeaders();        headers.set("token", token);        headers.set("Content-Type", "application/json");        HttpEntity requestEntity = new HttpEntity(headers);        ResponseEntity<DolphinschedulerResponse> response =            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName                            + "/process/verify-name?name=" + processName,                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);        if (response.getBody().getCode() == ZERO) {            return false;        }        return true;    }
复制代码


/**     * 获取dolphinscheduler上的某用户下唯一工作流     * @param projectName 项目名称     * @author liudz     * @date 2021/5/8     * @return id     **/    public DolphinschedulerResponse getUserProcess(String projectName) {        HttpHeaders headers = new HttpHeaders();        headers.set("token", token);        headers.set("Content-Type", "application/json");        HttpEntity requestEntity = new HttpEntity(headers);        ResponseEntity<DolphinschedulerResponse> response =            restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",                HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);        return response.getBody();    }
复制代码


/**     *  封装参数     * @param type 操作类型     * @param processName 用户工作流名称     * @param vo 任务参数     * @param processJson 工作流json     * @author liudz     * @date 2021/5/13     * @return ProcessDto     **/    public ProcessDto packageProcessParam(String type, String processName, DragSparkTaskVo vo, JSONObject processJson) {        ProcessDto processDto = new ProcessDto();        processDto.setConnects("[]");        processDto.setName(processName);        JSONObject locationsOne = new JSONObject();        JSONObject locationsTwo = new JSONObject();        locationsTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");        locationsTwo.fluentPut("x", 0).fluentPut("y", 0);        locationsOne.put("tasks-" + vo.getId(), locationsTwo);
// 创建工作流 if (CREATE.equals(type)) { processDto = packageProcessParamOfCreate(processDto, vo, locationsOne); } else if (ADD.equals(type)) { //工作流添加节点 processDto = packageProcessParamOfAdd(processDto, vo, processJson, locationsOne, locationsTwo); } else if (UPDATE.equals(type)) { //更新工作流-只更新参数processDefinitionJson的tasks参数 processDto = packageProcessParamOfUpdate(processDto, processJson, vo); } else if (DELETE.equals(type)) { //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数 processDto = packageProcessParamOfDelete(processDto, processJson, vo); } return processDto; }
复制代码


/**     * packageProcessParamOfCreate     * @param processDto 工作流参数     * @param vo 任务参数     * @param locationsOne locationsOne     * @author liudz     * @date 2021/5/7     * @return ProcessDto     **/    public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, DragSparkTaskVo vo, JSONObject locationsOne) {        processDto.setLocations(locationsOne.toString());        JSONObject processOne = new JSONObject();        processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);        JSONObject processTwo = new JSONObject();        processTwo.fluentPut("type", "SPARK").fluentPut("id", "tasks-" + vo.getId());        processTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("description", "");        JSONArray nodesArray = parseLineAndNode(vo.getTask());        JSONObject json = new JSONObject();        json.put("id", vo.getId());        json.put("name", vo.getName());        json.put("describeInfo", vo.getDescribeInfo());        json.put("nodes", nodesArray);        String taskJsonString = json.toString().replace("}}", "} }").replace("{{", "{ {");        processTwo.put("params", JSONObject.parseObject("{\"mainClass\":\"" + mainClass + "\","               + "\"mainJar\":{\"id\":" + getSparkResourceJarId() + "},\"deployMode\":\"cluster\","               + "\"resourceList\":[],\"localParams\":[],\"driverCores\":1,\"driverMemory\":\"1G\","               + "\"numExecutors\":\"1\",\"executorMemory\":\"1G\",\"executorCores\":\"1\",\"mainArgs\":\"\\\""               + taskJsonString.replace("\"", "\\\\\\\"") + "\\\"\",\"others\":\"\","               + "\"programType\":\"JAVA\",\"sparkVersion\":\"SPARK2\"}"));        JSONObject jsonTimeout = new JSONObject();        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");        JSONObject processTree = new JSONObject();        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());        JSONObject jsonconditionResult = new JSONObject();        jsonconditionResult.put("successNode", new ArrayList<>());        jsonconditionResult.put("failedNode", new ArrayList<>());        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");        processTwo.fluentPut("preTasks", new ArrayList<>());        JSONArray processTaskArray = new JSONArray();        processTaskArray.add(processTwo);        processOne.put("tasks", processTaskArray);        processDto.setProcessDefinitionJson(processOne.toString());        return processDto;    }
复制代码


/**     * packageProcessParamOfAdd     * @param processDto 工作流参数     * @param locationsOne locationsOne     * @param locationsTwo locationsTwo     * @param vo 任务参数     * @param processJson 工作流json     * @author liudz     * @date 2021/5/7     * @return ProcessDto     **/    public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, DragSparkTaskVo vo, JSONObject processJson,                                               JSONObject locationsOne, JSONObject locationsTwo) {        String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));        Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");        Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");        if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {            locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);        } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {            locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);        }        locationsOne = processJson.getJSONObject("locations").fluentPut("tasks-" + vo.getId(), locationsTwo);        processDto.setLocations(locationsOne.toString());        processDto.setId(processJson.getInteger("id"));        JSONObject processTwo = new JSONObject();        processTwo.fluentPut("type", "SPARK").fluentPut("id", "tasks-" + vo.getId());        processTwo.fluentPut("name", "spark-" + vo.getId()).fluentPut("description", "");        JSONArray nodesArray = parseLineAndNode(vo.getTask());        JSONObject json = new JSONObject();        json.put("id", vo.getId());        json.put("name", vo.getName());        json.put("describeInfo", vo.getDescribeInfo());        json.put("nodes", nodesArray);        String taskJsonString = json.toString().replace("}}", "} }").replace("{{", "{ {");        processTwo.put("params", JSONObject.parseObject("{\"mainClass\":\"" + mainClass + "\","               + "\"mainJar\":{\"id\":" + getSparkResourceJarId() + "},\"deployMode\":\"cluster\","               + "\"resourceList\":[],\"localParams\":[],\"driverCores\":1,\"driverMemory\":\"1G\","               + "\"numExecutors\":\"1\",\"executorMemory\":\"1G\",\"executorCores\":\"1\",\"mainArgs\":\"\\\""               + taskJsonString.replace("\"", "\\\\\\\"") + "\\\"\",\"others\":\"\","               + "\"programType\":\"JAVA\",\"sparkVersion\":\"SPARK2\"}"));        JSONObject jsonTimeout = new JSONObject();        jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);        processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");        JSONObject processTree = new JSONObject();        processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());        JSONObject jsonconditionResult = new JSONObject();        jsonconditionResult.put("successNode", new ArrayList<>());        jsonconditionResult.put("failedNode", new ArrayList<>());        processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());        processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");        processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");        processTwo.fluentPut("preTasks", new ArrayList<>());        JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");        JSONArray jsonArray = jsonNew.getJSONArray("tasks");        jsonArray.add(processTwo);        jsonNew.put("tasks", jsonArray);        processDto.setProcessDefinitionJson(jsonNew.toString());        return processDto;    }
复制代码


/**     * 工作流【上线或者下线】     * @param projectName 项目名称     * @param processName 用户工作流名称     * @param processId 工作流ID     * @param releaseState 上下线状态操作【0:下线,1:上线】     * @author liudz     * @date 2021/5/7     * @return 执行结果     **/    public Response releaseProcessDefinition(String projectName, String processName, Integer processId,                  Integer releaseState) {        try {            String postURL = address + "/dolphinscheduler/projects/"                   + URLEncoder.encode(projectName, "utf-8") + "/process/release";            PostMethod postMethod = new PostMethod(postURL);            postMethod.setRequestHeader("Content-Type",                    "application/x-www-form-urlencoded;charset=utf-8");            postMethod.setRequestHeader("token", token);            // 参数设置,需要注意的就是里边不能传NULL,要传空字符串            NameValuePair[] data = {new NameValuePair("name", processName),                    new NameValuePair("processId", processId.toString()),                    new NameValuePair("releaseState", releaseState.toString())};            postMethod.setRequestBody(data);            org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();            httpClient.executeMethod(postMethod);            JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());            if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {                return Response.error(result.getInteger("code"), result.getString("msg"));            }        } catch (Exception e) {            log.info("请求异常:{}", e);        }        return Response.success();    }
复制代码

2.更新可拖拽模型 spark 任务



请求参数举例:


{  "name": "测试任务模型1",  "describeInfo": "测试任务模型1",  "projectName": "spark线性回归模型","id": 111,  "task": {    "edges": [      {        "id": "13889c99",        "index": 2,        "source": "356bc2be",        "target": "33838a0d",        "sourceAnchor": 2,        "targetAnchor": 0      }    ],    "nodes": [      {        "x": 482.671875,        "y": 89.125,        "id": "356bc2be",        "size": "72*72",        "type": "node",        "color": "#1890FF",        "index": 0,        "label": "数据源",        "shape": "flow-circle",        "config": {          "sourceType": "mysql",          "targetTable": "machine_learning_house_info2"        },        "inputs": [],        "outputs": [],        "modelType": "dataSource"      },      {        "x": 502.171875,        "y": 269.125,        "id": "33838a0d",        "size": "110*42",        "type": "node",        "color": "#66C35D",        "index": 1,        "label": "全表统计",        "shape": "flow-capsule",        "config": {          "selectColumns": "*"        },        "inputs": [],        "outputs": [],        "modelType": "fullTableStatistics"      }    ]  }}
复制代码



代码:


/**     * 更新任务     * @param request request     * @param vo 任务参数     * @author liudz     * @date 2021/5/8     * @return 执行结果     **/    @PutMapping("/project/process")    @Transactional(rollbackFor = Exception.class)    public Response updateDragSparkTask(HttpServletRequest request, @RequestBody DragSparkTaskVo vo) {        Long userId = Long.valueOf(request.getUserPrincipal().getName());        vo.setCreateId(userId);        vo.setUpdateId(userId);        if (vo == null || org.apache.commons.lang3.StringUtils.isBlank(vo.getName()) || vo.getDescribeInfo() == null            || vo.getCreateId() == null) {            log.error("--DragSparkTaskController--updateDragSparkTask--PARAM_ERROR!--");            return Response.error(Msg.PARAM_ERROR);        }        Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.updateDragSparkTask(vo);        log.info("--(1)updateDragSparkTask--mysql--success");        if (dragSparkTaskResponse.getCode() == SUCCESS) {            //获取用户下唯一工作流ID            DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());            JSONObject processJson = new JSONObject();            log.info("--(2)getUserProcess--success:{}", processInfoList);            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();            for (Map<String, Object> map : list) {                if (map.get("name").equals(userId + "-dragSparkTask")) {                    processJson.fluentPutAll(map);                }            }            ProcessDto processDto = packageProcessParam(                    "update", userId + "-dragSparkTask", vo, processJson);            processDto.setId(processJson.getInteger("id"));            log.info("--(3)packageProcessParam--success");            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {                releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",                        processDto.getId(), 0);                log.info("--(4)releaseProcessDefinition--OFFLINE--success");            }            return updateProcess(vo, processDto);        }        return dragSparkTaskResponse;    }
复制代码


/**     * packageProcessParamOfUpdate     * @param processDto 工作流参数     * @param vo 任务参数     * @param processJson 工作流json     * @author liudz     * @date 2021/5/7     * @return ProcessDto     **/    public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, DragSparkTaskVo vo) {        processDto.setLocations(processJson.getString("locations"));        processDto.setId(processJson.getInteger("id"));        JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");        JSONArray copyJsonTasksArray = new JSONArray();        copyJsonTasksArray.addAll(jsonTasksArray);        JSONObject processDefinitionJson = new JSONObject();        JSONArray nodesArray = parseLineAndNode(vo.getTask());        JSONObject json = new JSONObject();        json.put("id", vo.getId());        json.put("name", vo.getName());        json.put("describeInfo", vo.getDescribeInfo());        json.put("nodes", nodesArray);        String taskJsonString = json.toString().replace("}}", "} }").replace("{{", "{ {");        for (Object object : jsonTasksArray) {            JSONObject jsonObject = JSONObject.parseObject(object.toString());            if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == vo.getId()) {                String mainArgs = jsonObject.getString("mainArgs");                mainArgs = "\"" + taskJsonString.replace("\"", "\\\"") + "\"";                copyJsonTasksArray.remove(jsonObject);                jsonObject.getJSONObject("params").put("mainArgs", mainArgs);                copyJsonTasksArray.add(jsonObject);                processDefinitionJson = processJson.getJSONObject("processDefinitionJson");                processDefinitionJson.put("tasks", copyJsonTasksArray);            }        }        processDto.setProcessDefinitionJson(processDefinitionJson.toString());        return processDto;    }
复制代码


 /**     * 更新工作流     * @param vo vo     * @param processDto processDto     * @author liudz     * @date 2021/5/7     * @return 执行结果     **/    public Response updateProcess(DragSparkTaskVo vo, ProcessDto processDto) {        try {
String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); // 参数设置,需要注意的就是里边不能传NULL,要传空字符串 NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()), new NameValuePair("name", processDto.getName()), new NameValuePair("locations", processDto.getLocations()), new NameValuePair("id", processDto.getId().toString()), new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())}; postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); log.info("--(5)httpUpdateProcess:{}", result); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return Response.error(result.getInteger("code"), result.getString("msg")); } } catch (Exception e) { log.info("请求异常:{}", e); } return Response.success(); }
复制代码

3.删除可拖拽模型 spark 任务


代码:


/**     * 删除任务     * @param request request     * @param projectName 项目名称     * @param id 任务ID     * @author liudz     * @date 2021/5/8     * @return 执行结果     **/    @DeleteMapping("/project/process/{projectName}/{id}")    @Transactional(rollbackFor = Exception.class)    public Response deleteDragSparkTask(HttpServletRequest request, @PathVariable("projectName") String projectName,                                        @PathVariable("id") Long id) {        DragSparkTaskVo vo = new DragSparkTaskVo();        Long userId = Long.valueOf(request.getUserPrincipal().getName());        vo.setId(id);        vo.setCreateId(userId);        vo.setProjectName(projectName);        if (vo == null || vo.getId() == null || vo.getCreateId() == null) {            log.error("--deleteDragSparkTask--PARAM_ERROR!--");            return Response.error(Msg.PARAM_ERROR);        }        Response<DragSparkTaskVo> dragSparkTaskResponse = dragSparkTaskService.deleteDragSparkTask(vo);        log.info("--(1)deleteDragSparkTask--mysql--success");        if (dragSparkTaskResponse.getCode() == SUCCESS) {            //获取用户下唯一工作流ID            DolphinschedulerResponse processInfoList = getUserProcess(vo.getProjectName());            JSONObject processJson = new JSONObject();            log.info("--(2)getUserProcess--success:{}", processInfoList);            List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();            for (Map<String, Object> map : list) {                if (map.get("name").equals(userId + "-dragSparkTask")) {                    processJson.fluentPutAll(map);                }            }            ProcessDto processDto = packageProcessParam(                    "delete", userId + "-dragSparkTask", vo, processJson);            processDto.setId(processJson.getInteger("id"));            log.info("--(3)packageProcessParam--success");            if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {                releaseProcessDefinition(vo.getProjectName(), userId + "-dragSparkTask",                        processDto.getId(), 0);                log.info("--(4)releaseProcessDefinition--OFFLINE--success");            }            if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {                //删除工作流                deleteProcess(vo, processDto);            } else {                //更新工作流                updateProcess(vo, processDto);            }        }        return dragSparkTaskResponse;    }
复制代码


/**     * 删除工作流     * @param vo vo     * @param processDto processDto     * @author liudz     * @date 2021/5/7     * @return 执行结果     **/    public DolphinschedulerResponse deleteProcess(DragSparkTaskVo vo, ProcessDto processDto) {            HttpHeaders headers = new HttpHeaders();            headers.set("token", token);            headers.set("Content-Type", "application/json");            HttpEntity requestEntity = new HttpEntity(headers);            ResponseEntity<DolphinschedulerResponse> response =                    restTemplate.exchange(address + "/dolphinscheduler/projects/" + vo.getProjectName()                                   + "/process/delete?processDefinitionId=" + processDto.getId(),                            HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);            log.info("--(5)httpDeleteProcess:{}", response);        return response.getBody();    }
复制代码

三、本人相关其他文章链接

1.springboot 项目集成 dolphinscheduler 调度器 可拖拽 spark 任务管理:https://blog.csdn.net/a924382407/article/details/117119831


2.springboot 项目集成 dolphinscheduler 调度器 实现 datax 数据同步任务:https://blog.csdn.net/a924382407/article/details/120951230


3.springboot 项目集成 dolphinscheduler 调度器 项目管理:https://blog.csdn.net/a924382407/article/details/117118931


4.springboot 项目集成大数据第三方 dolphinscheduler 调度器 执行/停止任务https://blog.csdn.net/a924382407/article/details/117121181


5.springboot 项目集成大数据第三方 dolphinscheduler 调度器https://blog.csdn.net/a924382407/article/details/117113848

用户头像

刘大猫

关注

还未添加个人签名 2022-08-23 加入

还未添加个人简介

评论

发布
暂无评论
springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理_人工智能_刘大猫_InfoQ写作社区