springboot 项目集成大数据第三方 dolphinscheduler 调度器 执行 / 停止任务
作者:刘大猫
- 2025-07-17 黑龙江
本文字数:6456 字
阅读完需:约 21 分钟
dolphinscheduler 调度器接入注意事项等信息可参考我的上一篇博客进行了解,地址在这里 ->@[TOC]
一、功能清单
二、执行/停止任务
说明:大数据平台执行可拖拽 spark 任务实际实行的是 dolphinscheduler 调度器中项目下工作流下的某一节点而已,不是执行整个工作流。
共用的依赖
<!--httpclient--><dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version></dependency>
复制代码
共用配置文件
dolphinscheduler.token=xxxdolphinscheduler.address=http://IP:12345
复制代码
共用代码
@Autowiredprivate RestTemplate restTemplate;@Value("${dolphinscheduler.token}")String token;@Value("${dolphinscheduler.address}")String address;public static final int ZERO = 0;public static final int SUCCESS = 200;@Autowiredprivate DragSparkTaskService dragSparkTaskService;@Value("${spark.main.class}")String mainClass;public static final String CREATE = "create";public static final String UPDATE = "update";public static final String ADD = "add";public static final String DELETE = "delete";public static final String ONLINE = "ONLINE";public static final String OFFLINE = "OFFLINE";public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;public static final int SIX = 6;public static final int EIGHTY = 80;public static final int THREE = 3;@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${drag.task.state}")String dragTaskState;@Autowiredprivate DragSparkTaskMapper dragSparkTaskMapper;
复制代码
1.执行任务
代码:
/** * 运行流程实例 * @param projectName 项目名称 * @param request request * @param dragSparkTaskId 任务ID * @author liudz * @date 2021/5/7 * @return 执行结果 **/ @GetMapping("/project/process/start") public DolphinschedulerResponse startProcessInstance( @RequestParam("projectName") String projectName, @RequestParam("dragSparkTaskId") Integer dragSparkTaskId, HttpServletRequest request) { try { Long userId = Long.valueOf(request.getUserPrincipal().getName()); DolphinschedulerResponse processInfoList = getUserProcess(projectName); if (processInfoList.getCode() != ZERO) { return processInfoList; } JSONObject processJson = new JSONObject(); log.info("--(1)getUserProcess--success:{}", processInfoList); List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData(); for (Map<String, Object> map : list) { if (map.get("name").equals(userId + "-dragSparkTask")) { processJson.fluentPutAll(map); } } if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) { releaseProcessDefinition(projectName, userId + "-dragSparkTask", processJson.getInteger("id"), 1); log.info("--(2)releaseProcessDefinition--ONLINE--success"); } String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8") + "/executors/start-process-instance"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); // 参数设置,需要注意的就是里边不能传NULL,要传空字符串 NameValuePair[] data = packageNameValuePair(processJson, dragSparkTaskId); postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); log.info("--(2)startProcessInstance--result:{}", result); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg")); } redisTemplate.opsForValue().set(dragTaskState + dragSparkTaskId, "1", 1, TimeUnit.HOURS); DragSparkTask drag = new DragSparkTask(); drag.setId(Long.valueOf(dragSparkTaskId)); drag.setState("1"); drag.setCreateId(userId); dragSparkTaskMapper.updateDragSparkTask(drag); log.info("--(3)----updateDragSparkTask--success!"); } catch (Exception e) { log.info("请求异常:{}", e); } return DolphinschedulerResponse.success(); }
复制代码
/** * packageNameValuePair封装参数 * @param processJson 工作流json * @param dragSparkTaskId 任务ID * @author liudz * @date 2021/5/14 * @return NameValuePair **/ public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) { NameValuePair[] data = { new NameValuePair("failureStrategy", "CONTINUE"), new NameValuePair("processDefinitionId", processJson.getString("id")), new NameValuePair("processInstancePriority", "MEDIUM"), new NameValuePair("warningGroupId", "0"), new NameValuePair("warningType", "NONE"), new NameValuePair("runMode", "RUN_MODE_SERIAL"), new NameValuePair("startNodeList", "spark-" + dragSparkTaskId), new NameValuePair("taskDependType", "TASK_POST"), new NameValuePair("workerGroup", "default")}; return data; }
复制代码
/** * 解析节点和线,拼接nodesArray * @param jsonObject 模型task * @author liudz * @date 2020/12/10 * @return 填充后的nodesArray **/ public JSONArray parseLineAndNode(JSONObject jsonObject) { JSONArray edgesArray = jsonObject.getJSONArray("edges"); JSONArray nodesArray = jsonObject.getJSONArray("nodes"); for (int i = 0; i < edgesArray.size(); i++) { JSONObject edgeJson = edgesArray.getJSONObject(i); for (int j = 0; j < nodesArray.size(); j++) { JSONObject nodeJson = nodesArray.getJSONObject(j); String nodeSourceId = edgeJson.getString("source"); String nodeTargetId = edgeJson.getString("target");if ("breakUp".equals(nodeJson.getString("modelType")) && nodeSourceId.equals(nodeJson.getString("id"))) { double fraction = edgeJson.getJSONObject("config").getDoubleValue("fraction"); if (fraction > Double.parseDouble("0.5")) { nodeJson.getJSONObject("config").put("fraction", fraction); } if (nodeJson.getJSONArray("firstOutputs").size() == 0) { String[] outputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("firstOutputs"). toArray(new String[nodeJson.getJSONArray("firstOutputs").size()]), nodeTargetId); nodeJson.put("firstOutputs", outputsArr); continue; } else { String[] outputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("secondOutputs"). toArray(new String[nodeJson.getJSONArray("secondOutputs").size()]), nodeTargetId); nodeJson.put("secondOutputs", outputsArr); continue; } } else { if (nodeSourceId.equals(nodeJson.getString("id"))) { String[] outputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("outputs"). toArray(new String[nodeJson.getJSONArray("outputs").size()]), nodeTargetId); nodeJson.put("outputs", outputsArr); continue; } if (nodeTargetId.equals(nodeJson.getString("id"))) { String[] inputsArr = JavaTools.arrayInsert(nodeJson.getJSONArray("inputs"). toArray(new String[nodeJson.getJSONArray("inputs").size()]), nodeSourceId); nodeJson.put("inputs", inputsArr); continue; } } } } for (int j = 0; j < nodesArray.size(); j++) { JSONObject nodeJson = nodesArray.getJSONObject(j); String label = nodeJson.getString("modelType") + "_" + nodeJson.getString("id"); String name = nodeJson.getString("label") + "_" + nodeJson.getString("id"); nodeJson.put("name", name); nodeJson.put("label", label); } return nodesArray; }
复制代码
2.停止任务
代码:
/** * stopProcessSparkTask * @param id id * @param executeType executeType * @param projectName 项目名称 * @return Response * @author: liudz * @author: lty update 2020/5/27 * @date: 2020/4/28 10:31 */ @GetMapping(value = "/project/execute/{projectName}/{id}/{executeType}") public Response<String> stopProcessSparkTask(@PathVariable("projectName") String projectName, @PathVariable("id") Long id, @PathVariable("executeType") String executeType) { log.info("--(1)stopProcessSparkTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType); try { HttpHeaders headers = new HttpHeaders(); headers.set("token", token); headers.set("Content-Type", "application/json"); HttpEntity requestEntity = new HttpEntity(headers); ResponseEntity<JSONObject> response = restTemplate.exchange(address + "/" + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?" + "pageNo=1&pageSize=100&taskName=spark-" + id, HttpMethod.GET, requestEntity, JSONObject.class);List<Map<String, Object>> list = (List<Map<String, Object>>) response.getBody().getJSONObject("data").get("totalList"); Integer processInstanceId = null; for (Map<String, Object> map : list) { if (map.get("state").equals("RUNNING_EXEUTION")) { processInstanceId = Integer.valueOf(map.get("processInstanceId").toString()); } } log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId); String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8") + "/executors/execute"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); NameValuePair[] data = {new NameValuePair("executeType", executeType), new NameValuePair("processInstanceId", processInstanceId.toString())}; postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return Response.error(result.getInteger("code"), result.getString("msg")); } log.info("--(3)stopProcessSparkTask--success--:{}", result); redisTemplate.opsForValue().set(dragTaskState + id, "0", 1, TimeUnit.HOURS); DragSparkTaskVo dragSparkTaskVo = new DragSparkTaskVo(); dragSparkTaskVo.setId(id); dragSparkTaskVo.setState("0"); dragSparkTaskService.updateDragSparkTask(dragSparkTaskVo); log.info("--(4)updateDragSparkTask--success"); } catch (UnsupportedEncodingException e) { log.info("UnsupportedEncodingException:{}", e); } catch (HttpException e) { log.info("HttpException:{}", e); } catch (IOException e) { log.info("IOException:{}", e); } return Response.success(); }
复制代码
三、本人相关其他文章链接
1.springboot 项目集成 dolphinscheduler 调度器 可拖拽 spark 任务管理:https://blog.csdn.net/a924382407/article/details/117119831
2.springboot 项目集成 dolphinscheduler 调度器 实现 datax 数据同步任务:https://blog.csdn.net/a924382407/article/details/120951230
3.springboot 项目集成 dolphinscheduler 调度器 项目管理:https://blog.csdn.net/a924382407/article/details/117118931
4.springboot 项目集成大数据第三方 dolphinscheduler 调度器 执行/停止任务https://blog.csdn.net/a924382407/article/details/117121181
5.springboot 项目集成大数据第三方 dolphinscheduler 调度器https://blog.csdn.net/a924382407/article/details/117113848
划线
评论
复制
发布于: 刚刚阅读数: 2
刘大猫
关注
还未添加个人签名 2022-08-23 加入
还未添加个人简介









评论