写点什么

springboot 项目集成 dolphinscheduler 调度器 实现 datax 数据同步任务

作者:刘大猫
  • 2025-07-13
    黑龙江
  • 本文字数:47995 字

    阅读完需:约 157 分钟


Datax 安装及基本使用请查看上一篇文章:https://blog.csdn.net/a924382407/article/details/120952339?spm=1001.2014.3001.5501@[TOC]

Datax 概述

1.概述

2.功能清单

3.==说明==:本项目只支持 mysql 及 hbase 之间的数据同步

代码模块

配置文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>com.geespace.microservices.bd-platform</groupId>        <artifactId>all</artifactId>        <version>1.0-SNAPSHOT</version>    </parent>
<artifactId>data-sync-config</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <java.version>1.8</java.version> <gson.version>2.8.1</gson.version> </properties>
<dependencies>
<dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--elasticsearch--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.8.12</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>6.8.12</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.8.12</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.geespace.microservices.bd-platform</groupId> <artifactId>data-config</artifactId> <version>1.0-SNAPSHOT</version> <scope>compile</scope> </dependency> <!--httpclient--> <dependency> <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.2</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.1.4.RELEASE</version> </dependency> </dependencies> <configuration> <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope> <createDependencyReducedPom>true</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.geespace.microservices.dispatcher.DispatchApplication</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
复制代码

DataxDolphinschedulerController

import java.io.IOException;import java.io.UnsupportedEncodingException;import java.net.URLEncoder;import java.util.ArrayList;import java.util.List;import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.geespace.microservices.builder.dto.ProcessDto;import com.geespace.microservices.builder.dto.SyncConfigDto;import com.geespace.microservices.builder.enums.DictionaryEnum;import com.geespace.microservices.builder.request.ConfigAddForm;import com.geespace.microservices.builder.request.ConfigSelectForm;import com.geespace.microservices.builder.request.ConfigUpdateForm;import com.geespace.microservices.builder.response.BizCode;import com.geespace.microservices.builder.response.DolphinschedulerResponse;import com.geespace.microservices.builder.response.Msg;import com.geespace.microservices.builder.response.PageResult;import com.geespace.microservices.builder.response.ReturnResult;import com.geespace.microservices.builder.service.SyncConfigService;import com.geespace.microservices.builder.tools.JsonTools;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.httpclient.HttpException;import org.apache.commons.httpclient.NameValuePair;import org.apache.commons.httpclient.methods.PostMethod;import org.springframework.beans.BeanUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.http.HttpEntity;import org.springframework.http.HttpHeaders;import org.springframework.http.HttpMethod;import org.springframework.http.ResponseEntity;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.StringUtils;import org.springframework.validation.annotation.Validated;import org.springframework.web.bind.annotation.DeleteMapping;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.PutMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import org.springframework.web.client.RestTemplate;
/** * 迁移dolphinscheduler调度器 * * @author: liudz * @date: 2021/5/7 */@Slf4j@RestController@RequestMapping("/dolphinscheduler/v1")public class DataxDolphinschedulerController { @Autowired private RestTemplate restTemplate; @Value("${dolphinscheduler.token}") String token; @Value("${dolphinscheduler.address}") String address; public static final int ZERO = 0; public static final int SUCCESS = 200; public static final String CREATE = "create"; public static final String UPDATE = "update"; public static final String ADD = "add"; public static final String DELETE = "delete"; public static final String ONLINE = "ONLINE"; public static final String OFFLINE = "OFFLINE"; public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500; public static final int SIX = 6; public static final int EIGHTY = 80; public static final int THREE = 3; @Autowired private SyncConfigService syncConfigService;

/** * 创建任务-创建用户下唯一工作流,无则创建有则并排添加 * @param request request * @param form 任务参数 * @author liudz * @date 2021/5/8 * @return 执行结果 **/ @PostMapping("/project/process/datax") @Transactional(rollbackFor = Exception.class) public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) { Long userId = Long.valueOf(request.getUserPrincipal().getName()); form.setUserId(userId);
ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.addConfig(form); if (dataxTaskReturnResult.getCode() != SUCCESS) { return dataxTaskReturnResult; } log.info("--(1)addDataxTaskResult--success"); form.setId(dataxTaskReturnResult.getData().getId()); if (dataxTaskReturnResult.getCode() == SUCCESS) { Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName()); log.info("--(2)verifyProcessExist--success:{}", verifyResult); if (!verifyResult) { ProcessDto processDto = packageProcessParam( "create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null); log.info("--(3)packageProcessParam--success"); processDto.setProjectName(form.getProjectName()); processDto.setProjectId(form.getProjectId()); dataxTaskReturnResult = createProcess(processDto); } else { //获取用户下唯一工作流ID DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName()); JSONObject processJson = new JSONObject(); log.info("--(3)getUserProcess--success:{}", processInfoList); List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData(); for (Map<String, Object> map : list) { if (map.get("name").equals(userId + "-dataxTask")) { processJson.fluentPutAll(map); } } ProcessDto processDto = packageProcessParam( "add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson); processDto.setId(processJson.getInteger("id")); log.info("--(4)packageProcessParam--success"); if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) { releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask", processDto.getId(), 0); log.info("--(5)releaseProcessDefinition--OFFLINE--success"); } dataxTaskReturnResult = updateProcess(form, processDto); } } return dataxTaskReturnResult; } /** * 更新任务 * @param request request * @param form 任务参数 * @author liudz * @date 2021/5/8 * @return 执行结果 **/ @PutMapping("/project/process/datax") @Transactional(rollbackFor = Exception.class) public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) { Long userId = Long.valueOf(request.getUserPrincipal().getName()); form.setUserId(userId); ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.updateConfig(form); log.info("--(1)updateDataxTaskResult--mysql--success"); if (dataxTaskReturnResult.getCode() == SUCCESS) { //获取用户下唯一工作流ID DolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName()); JSONObject processJson = new JSONObject(); log.info("--(2)getUserProcess--success:{}", processInfoList); List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData(); for (Map<String, Object> map : list) { if (map.get("name").equals(userId + "-dataxTask")) { processJson.fluentPutAll(map); } } ProcessDto processDto = packageProcessParam( "update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson); processDto.setProjectName(form.getProjectName()); processDto.setProjectId(form.getProjectId()); processDto.setId(processJson.getInteger("id")); log.info("--(3)packageProcessParam--success"); if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) { releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask", processDto.getId(), 0); log.info("--(4)releaseProcessDefinition--OFFLINE--success"); } ConfigAddForm configAddForm = new ConfigAddForm(); BeanUtils.copyProperties(form, configAddForm); return updateProcess(configAddForm, processDto); } return dataxTaskReturnResult; } /** * 删除任务 * @param request request * @param projectName 项目名称 * @param id 任务ID * @author liudz * @date 2021/5/8 * @return 执行结果 **/ @DeleteMapping("/project/process/datax/{projectName}/{id}") @Transactional(rollbackFor = Exception.class) public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName, @PathVariable("id") Long id) { Long userId = Long.valueOf(request.getUserPrincipal().getName()); SyncConfigDto syncConfigDto = new SyncConfigDto(); syncConfigDto.setId(id); ConfigAddForm configAddForm = new ConfigAddForm(); configAddForm.setProjectName(projectName); ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId); log.info("--(1)deleteDataxTask--mysql--success"); if (dataxTaskReturnResult.getCode() == SUCCESS) { //获取用户下唯一工作流ID DolphinschedulerResponse processInfoList = getUserProcess(projectName); JSONObject processJson = new JSONObject(); log.info("--(2)getUserProcess--success:{}", processInfoList); List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData(); for (Map<String, Object> map : list) { if (map.get("name").equals(userId + "-dataxTask")) { processJson.fluentPutAll(map); } } ProcessDto processDto = packageProcessParam( "delete", userId + "-dataxTask", syncConfigDto, processJson); processDto.setId(processJson.getInteger("id")); log.info("--(3)packageProcessParam--success"); if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) { releaseProcessDefinition(projectName, userId + "-dataxTask", processDto.getId(), 0); log.info("--(4)releaseProcessDefinition--OFFLINE--success"); } if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) { //删除工作流 deleteProcess(configAddForm, processDto); } else { //更新工作流 updateProcess(configAddForm, processDto); } } return dataxTaskReturnResult; }
/** * 校验工作流是否存在 * * @param processName * 工作流名称 * @param projectName 项目名称 * @author liudz * @date 2021/5/8 * @return boolean **/ public Boolean verifyProcessExist(String processName, String projectName) { HttpHeaders headers = new HttpHeaders(); headers.set("token", token); headers.set("Content-Type", "application/json"); HttpEntity requestEntity = new HttpEntity(headers); ResponseEntity<DolphinschedulerResponse> returnResult = restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/verify-name?name=" + processName, HttpMethod.GET, requestEntity, DolphinschedulerResponse.class); if (returnResult.getBody().getCode() == ZERO) { return false; } return true; }
/** * 创建工作流 * @param processDto processDto * @author liudz * @date 2021/5/7 * @return 执行结果 **/ public ReturnResult createProcess(ProcessDto processDto) { try { String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()), new NameValuePair("name", processDto.getName()), new NameValuePair("locations", processDto.getLocations()), new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())}; postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); log.info("--(5)httpCreateProcess:{}", result); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg")); } } catch (Exception e) { log.info("请求异常:{}", e); } return ReturnResult.success(); }
/** * 更新工作流 * @param vo vo * @param processDto processDto * @author liudz * @date 2021/5/7 * @return 执行结果 **/ public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) { try {
String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); // 参数设置,需要注意的就是里边不能传NULL,要传空字符串 NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()), new NameValuePair("name", processDto.getName()), new NameValuePair("locations", processDto.getLocations()), new NameValuePair("id", processDto.getId().toString()), new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())}; postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); log.info("--(5)httpUpdateProcess:{}", result); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg")); } } catch (Exception e) { log.info("请求异常:{}", e); } return ReturnResult.success(); } /** * 删除工作流 * @param dto dto * @param processDto processDto * @author liudz * @date 2021/5/7 * @return 执行结果 **/ public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) { HttpHeaders headers = new HttpHeaders(); headers.set("token", token); headers.set("Content-Type", "application/json"); HttpEntity requestEntity = new HttpEntity(headers); ResponseEntity<DolphinschedulerResponse> returnResult = restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName() + "/process/delete?processDefinitionId=" + processDto.getId(), HttpMethod.GET, requestEntity, DolphinschedulerResponse.class); log.info("--(5)httpDeleteProcess:{}", returnResult); return returnResult.getBody(); }
/** * 获取dolphinscheduler上的资源spark可拖拽jar的id * * @author liudz * @date 2021/5/8 * @return id **/ public Integer getSparkResourceJarId() { Integer resourceId = null; HttpHeaders headers = new HttpHeaders(); headers.set("token", token); headers.set("Content-Type", "application/json"); HttpEntity requestEntity = new HttpEntity(headers); ResponseEntity<DolphinschedulerResponse> returnResult = restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1", HttpMethod.GET, requestEntity, DolphinschedulerResponse.class); List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getData(); for (Map<String, Object> map : list) { if (map.get("name").equals("big_data02.jar")) { resourceId = Integer.valueOf(map.get("id").toString()); } } return resourceId; } /** * 获取dolphinscheduler上的某用户下唯一工作流 * @param projectName 项目名称 * @author liudz * @date 2021/5/8 * @return id **/ public DolphinschedulerResponse getUserProcess(String projectName) { HttpHeaders headers = new HttpHeaders(); headers.set("token", token); headers.set("Content-Type", "application/json"); HttpEntity requestEntity = new HttpEntity(headers); ResponseEntity<DolphinschedulerResponse> returnResult = restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list", HttpMethod.GET, requestEntity, DolphinschedulerResponse.class); return returnResult.getBody(); } /** * 封装参数 * @param type 操作类型 * @param processName 用户工作流名称 * @param dto 任务参数 * @param processJson 工作流json * @author liudz * @date 2021/5/13 * @return ProcessDto **/ public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSONObject processJson) { ProcessDto processDto = new ProcessDto(); processDto.setConnects("[]"); processDto.setName(processName); JSONObject locationsOne = new JSONObject(); JSONObject locationsTwo = new JSONObject(); locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0"); locationsTwo.fluentPut("x", 0).fluentPut("y", 0); locationsOne.put("datax-" + dto.getId(), locationsTwo);
// 创建工作流 if (CREATE.equals(type)) { processDto = packageProcessParamOfCreate(processDto, dto, locationsOne); } else if (ADD.equals(type)) { //工作流添加节点 processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo); } else if (UPDATE.equals(type)) { //更新工作流-只更新参数processDefinitionJson的tasks参数 processDto = packageProcessParamOfUpdate(processDto, processJson, dto); } else if (DELETE.equals(type)) { //更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数 processDto = packageProcessParamOfDelete(processDto, processJson, dto); } return processDto; } /** * packageProcessParamOfCreate * @param processDto 工作流参数 * @param dto 任务参数 * @param locationsOne locationsOne * @author liudz * @date 2021/5/7 * @return ProcessDto **/ public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSONObject locationsOne) { processDto.setLocations(locationsOne.toString()); JSONObject processOne = new JSONObject(); processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0); JSONObject processTwo = new JSONObject(); processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId()); processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");
String taskJsonString = dto.getContent().toString(); processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1," + "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));
JSONObject jsonTimeout = new JSONObject(); jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false); processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL"); JSONObject processTree = new JSONObject(); processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>()); JSONObject jsonconditionResult = new JSONObject(); jsonconditionResult.put("successNode", new ArrayList<>()); jsonconditionResult.put("failedNode", new ArrayList<>()); processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject()); processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1"); processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default"); processTwo.fluentPut("preTasks", new ArrayList<>()); JSONArray processTaskArray = new JSONArray(); processTaskArray.add(processTwo); processOne.put("tasks", processTaskArray); processDto.setProcessDefinitionJson(processOne.toString()); return processDto; } /** * packageProcessParamOfAdd * @param processDto 工作流参数 * @param locationsOne locationsOne * @param locationsTwo locationsTwo * @param dto 任务参数 * @param processJson 工作流json * @author liudz * @date 2021/5/7 * @return ProcessDto **/ public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSONObject processJson, JSONObject locationsOne, JSONObject locationsTwo) { String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations")); Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x"); Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y"); if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) { locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y); } else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) { locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0); } locationsOne = processJson.getJSONObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo); processDto.setLocations(locationsOne.toString()); processDto.setId(processJson.getInteger("id")); JSONObject processTwo = new JSONObject(); processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId()); processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", ""); String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {"); processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1," + "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}")); JSONObject jsonTimeout = new JSONObject(); jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false); processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL"); JSONObject processTree = new JSONObject(); processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>()); JSONObject jsonconditionResult = new JSONObject(); jsonconditionResult.put("successNode", new ArrayList<>()); jsonconditionResult.put("failedNode", new ArrayList<>()); processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject()); processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1"); processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default"); processTwo.fluentPut("preTasks", new ArrayList<>()); JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson"); JSONArray jsonArray = jsonNew.getJSONArray("tasks"); jsonArray.add(processTwo); jsonNew.put("tasks", jsonArray); processDto.setProcessDefinitionJson(jsonNew.toString()); return processDto; } /** * packageProcessParamOfUpdate * @param processDto 工作流参数 * @param dto 任务参数 * @param processJson 工作流json * @author liudz * @date 2021/5/7 * @return ProcessDto **/ public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) { processDto.setLocations(processJson.getString("locations")); processDto.setId(processJson.getInteger("id")); JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks"); JSONArray copyJsonTasksArray = new JSONArray(); copyJsonTasksArray.addAll(jsonTasksArray); JSONObject processDefinitionJson = new JSONObject(); String taskJsonString = dto.getContent().toString(); for (Object object : jsonTasksArray) { JSONObject jsonObject = JSONObject.parseObject(object.toString()); if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) { String json = jsonObject.getString("json"); json = taskJsonString; copyJsonTasksArray.remove(jsonObject); jsonObject.getJSONObject("params").put("json", json); copyJsonTasksArray.add(jsonObject); processDefinitionJson = processJson.getJSONObject("processDefinitionJson"); processDefinitionJson.put("tasks", copyJsonTasksArray); } } processDto.setProcessDefinitionJson(processDefinitionJson.toString()); return processDto; } /** * packageProcessParamOfDelete * @param processDto 工作流参数 * @param dto 任务参数 * @param processJson 工作流json * @author liudz * @date 2021/5/7 * @return ProcessDto **/ public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) { processDto.setId(processJson.getInteger("id")); JSONObject locationsJson = processJson.getJSONObject("locations"); JSONObject processDefinitionJson = processJson.getJSONObject("processDefinitionJson"); JSONArray processDefinitionArray = processDefinitionJson.getJSONArray("tasks"); JSONArray copyProcessDefinitionArray = new JSONArray(); copyProcessDefinitionArray.addAll(processDefinitionArray); if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) { locationsJson.remove("datax-" + dto.getId()); for (Object object : processDefinitionArray) { if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) { copyProcessDefinitionArray.remove(object); } } processDefinitionJson.put("tasks", copyProcessDefinitionArray); } processDto.setLocations(locationsJson.toString()); processDto.setProcessDefinitionJson(processDefinitionJson.toString()); return processDto; }
/** * 工作流【上线或者下线】 * @param projectName 项目名称 * @param processName 用户工作流名称 * @param processId 工作流ID * @param releaseState 上下线状态操作【0:下线,1:上线】 * @author liudz * @date 2021/5/7 * @return 执行结果 **/ public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId, Integer releaseState) { try { String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8") + "/process/release"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); // 参数设置,需要注意的就是里边不能传NULL,要传空字符串 NameValuePair[] data = {new NameValuePair("name", processName), new NameValuePair("processId", processId.toString()), new NameValuePair("releaseState", releaseState.toString())}; postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg")); } } catch (Exception e) { log.info("请求异常:{}", e); } return ReturnResult.success(); } /** * 运行流程实例 * @param projectName 项目名称 * @param request request * @param id 数据同步任务ID * @author liudz * @date 2021/5/7 * @return 执行结果 **/ @GetMapping("/project/process/datax/start") public DolphinschedulerResponse startProcessDataxTask( @RequestParam("projectName") String projectName, @RequestParam("id") Integer id, HttpServletRequest request) { try { Long userId = Long.valueOf(request.getUserPrincipal().getName()); DolphinschedulerResponse processInfoList = getUserProcess(projectName); if (processInfoList.getCode() != ZERO) { return processInfoList; } JSONObject processJson = new JSONObject(); log.info("--(1)getUserProcess--success:{}", processInfoList); List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData(); for (Map<String, Object> map : list) { if (map.get("name").equals(userId + "-dataxTask")) { processJson.fluentPutAll(map); } } if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) { releaseProcessDefinition(projectName, userId + "-dataxTask", processJson.getInteger("id"), 1); log.info("--(2)releaseProcessDefinition--ONLINE--success"); } String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8") + "/executors/start-process-instance"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); // 参数设置,需要注意的就是里边不能传NULL,要传空字符串 NameValuePair[] data = packageNameValuePair(processJson, id); postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); log.info("--(2)startProcessInstance--result:{}", result); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg")); } } catch (Exception e) { log.info("请求异常:{}", e); } return DolphinschedulerResponse.success(); } /** * packageNameValuePair封装参数 * @param processJson 工作流json * @param dragSparkTaskId 任务ID * @author liudz * @date 2021/5/14 * @return NameValuePair **/ public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) { NameValuePair[] data = { new NameValuePair("failureStrategy", "CONTINUE"), new NameValuePair("processDefinitionId", processJson.getString("id")), new NameValuePair("processInstancePriority", "MEDIUM"), new NameValuePair("warningGroupId", "0"), new NameValuePair("warningType", "NONE"), new NameValuePair("runMode", "RUN_MODE_SERIAL"), new NameValuePair("startNodeList", "datax-" + dragSparkTaskId), new NameValuePair("taskDependType", "TASK_POST"), new NameValuePair("workerGroup", "default")}; return data; }

/** * stopProcessDataxTask * @param id id * @param executeType executeType * @param projectName 项目名称 * @return ReturnResult * @author: liudz * @author: lty update 2020/5/27 * @date: 2020/4/28 10:31 */ @GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}") public DolphinschedulerResponse<String> stopProcessDataxTask(@PathVariable("projectName") String projectName, @PathVariable("id") Long id, @PathVariable("executeType") String executeType) { log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType); try { HttpHeaders headers = new HttpHeaders(); headers.set("token", token); headers.set("Content-Type", "application/json"); HttpEntity requestEntity = new HttpEntity(headers); ResponseEntity<JSONObject> returnResult = restTemplate.exchange(address + "/" + "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?" + "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class); List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getJSONObject("data").get("totalList"); Integer processInstanceId = null; for (Map<String, Object> map : list) { if (map.get("state").equals("RUNNING_EXEUTION")) { processInstanceId = Integer.valueOf(map.get("processInstanceId").toString()); }
} if (StringUtils.isEmpty(processInstanceId)) { return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED); } log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId); String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8") + "/executors/execute"; PostMethod postMethod = new PostMethod(postURL); postMethod.setRequestHeader("Content-Type", "application/x-www-form-urlencoded;charset=utf-8"); postMethod.setRequestHeader("token", token); NameValuePair[] data = {new NameValuePair("executeType", executeType), new NameValuePair("processInstanceId", processInstanceId.toString())}; postMethod.setRequestBody(data); org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient(); httpClient.executeMethod(postMethod); JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString()); if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) { return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg")); } log.info("--(3)stopProcessSparkTask--success--:{}", result); } catch (UnsupportedEncodingException e) { log.info("UnsupportedEncodingException:{}", e); } catch (HttpException e) { log.info("HttpException:{}", e); } catch (IOException e) { log.info("IOException:{}", e); } return DolphinschedulerResponse.success(); }
/** * 查询全部同步任务配置(分页) * * @param form * name * @param request * 含有用户id * @return 分页结果 */ @RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST) public ReturnResult<PageResult<SyncConfigDto>> findAll(@RequestBody @Validated ConfigSelectForm form, HttpServletRequest request) { Long userId = Long.valueOf(request.getUserPrincipal().getName()); return syncConfigService.list(form, userId); }
/** * 获取同步任务配置 * * @param id * 配置id * @param request * 用户id * @return 添加结果 */ @RequestMapping(value = "/project/process/datax", method = RequestMethod.GET) public ReturnResult<SyncConfigDto> findById(@RequestParam Long id, HttpServletRequest request) { Long userId = Long.valueOf(request.getUserPrincipal().getName()); return syncConfigService.findById(id, userId); }}
复制代码

ConfigAddForm

package com.geespace.microservices.builder.request;
import javax.validation.constraints.NotEmpty;import javax.validation.constraints.NotNull;
import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/** * @Author: zjr * @Date: 2020-05-06 09:42 * @Version 1.0 */@Datapublic class ConfigAddForm {
/** * 配置名称 */ @NotEmpty(message = "name不能为空") private String name; /** * 配置描述 */ private String description; /** * 实时/全量/增量 */ @NotNull(message = "同步方式不能为空") private int syncType; /** * reader 选择的数据源id */ @NotNull(message = "读取数据源id不能为空") private Long readerConfigId; /** * reader */ @NotEmpty(message = "读取参数不能为空") private JSONObject readerParam; /** * writer 选择的数据源id */ @NotNull(message = "写入数据源id不能为空") private Long writerConfigId; /** * writer */ @NotEmpty(message = "写入参数不能为空") private JSONObject writerParam; /** * reader:column left,writer:column right */ @NotEmpty(message = "字段对照表不能为空") private JSONArray columnMap;
private Long userId; /** * 项目名称 **/ String projectName; /** * 项目id **/ @NotNull(message = "projectId not null") Long projectId; Long id;}
复制代码

ConfigUpdateForm

package com.geespace.microservices.builder.request;
import javax.validation.constraints.NotEmpty;import javax.validation.constraints.NotNull;
import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/** * @Author: zjr * @Date: 2020-05-06 09:42 * @Version 1.0 */@Datapublic class ConfigUpdateForm { @NotNull(message = "同步配置id不能为空") private Long id; /** * 配置名称 */ @NotEmpty(message = "name不能为空") private String name; /** * 配置描述 */ private String description; /** * 实时/全量/增量 */ @NotNull(message = "同步方式不能为空") private int syncType; /** * reader 选择的数据源id */ @NotNull(message = "读取数据源id不能为空") private Long readerConfigId; /** * reader */ @NotEmpty(message = "读取参数不能为空") private JSONObject readerParam; /** * writer 选择的数据源id */ @NotNull(message = "写入数据源id不能为空") private Long writerConfigId; /** * writer */ @NotEmpty(message = "写入参数不能为空") private JSONObject writerParam; /** * reader:column left,writer:column right */ @NotEmpty(message = "字段对照表不能为空") private JSONArray columnMap;
private Long userId; /** * 项目id **/ @NotNull(message = "projectId not null") Long projectId; /** * 项目名称 **/ String projectName;
}
复制代码

ProcessDto

package com.geespace.microservices.builder.dto;
import lombok.Data;import lombok.EqualsAndHashCode;import lombok.ToString;
/** * dolphinscheduler调度器中工作流参数 * @Author: liudz * @Date: 2020-03-23 **/@Data@EqualsAndHashCode(callSuper = false)@ToString(callSuper = true)public class ProcessDto { /** * 流程定义ID **/ private Integer id; /** * 流程定义节点图标连接信息(json格式) **/ private String connects; /** * 流程定义节点坐标位置信息(json格式) **/ private String locations; /** * 流程定义名称 **/ private String name; /** * 流程定义详细信息(json格式) **/ private String processDefinitionJson; /** * 项目名称 **/ String projectName; /** * 项目id **/ Long projectId;}
复制代码

SyncConfigDto

package com.geespace.microservices.builder.dto;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/** * @Author: zjr * @Date: 2020-05-05 17:03 * @Version 1.0 */@Datapublic class SyncConfigDto { private Long id;
/** * 配置名称 */ private String name;
/** * 配置描述 */ private String description;
/** * 实时/全量/增量 */ private int syncType; /** * json base64 */ private JSONObject content; /** * 项目名称 **/ String projectName; /** * 项目id **/ Long projectId;}
复制代码

SyncConfigService

package com.geespace.microservices.builder.service;
import com.geespace.microservices.builder.dto.SyncConfigDto;import com.geespace.microservices.builder.request.ConfigAddForm;import com.geespace.microservices.builder.request.ConfigSelectForm;import com.geespace.microservices.builder.request.ConfigUpdateForm;import com.geespace.microservices.builder.response.PageResult;import com.geespace.microservices.builder.response.ReturnResult;
/** * @Author: zjr * @Date: 2020-05-05 13:59 * @Version 1.0 */public interface SyncConfigService { /** * 添加同步任务配置 * * @param form * 任务配置参数 * @return 添加结果 */ ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form);
/** * 修改同步任务配置 * * @param form * 任务配置参数(含id) * @return 修改结果 */ ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form);
/** * 查找同步任务配置 * * @param id * 同步任务配置id * @param userId * 用户id * @return 查询结果 */ ReturnResult<SyncConfigDto> findById(Long id, Long userId);
/** * 删除同步任务配置 * * @param id * 任务配置id * @param userId * 用户id * @return 删除结果 */ ReturnResult delete(Long id, Long userId);
/** * 查询全部同步任务配置(分页) * * @param form * name * @param userId * 用户id * @return 分页结果 */ ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId);}
复制代码

SyncConfigServiceImpl

package com.geespace.microservices.builder.service.impl;
import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;
import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import com.geespace.microservices.builder.biz.Contants;import com.geespace.microservices.builder.dao.SyncConfigMapper;import com.geespace.microservices.builder.dto.ColumnMap;import com.geespace.microservices.builder.dto.SyncConfigDto;import com.geespace.microservices.builder.entity.SyncConfig;import com.geespace.microservices.builder.factory.BaseParamTool;import com.geespace.microservices.builder.factory.ParamToolFactory;import com.geespace.microservices.builder.request.ConfigAddForm;import com.geespace.microservices.builder.request.ConfigSelectForm;import com.geespace.microservices.builder.request.ConfigUpdateForm;import com.geespace.microservices.builder.response.BizCode;import com.geespace.microservices.builder.response.PageResult;import com.geespace.microservices.builder.response.ReturnResult;import com.geespace.microservices.builder.service.SyncConfigService;import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;import com.geespace.microservices.datasource.response.Response;import com.geespace.microservices.datasource.service.JdbcDataSourceService;import com.github.pagehelper.PageHelper;import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.util.CollectionUtils;
/** * @Author: zjr * @Date: 2020-05-05 13:59 * @Version 1.0 */@Service@Slf4jpublic class SyncConfigServiceImpl implements SyncConfigService { public static final int ZERO = 0; public static final String HBASE = "hbase"; @Autowired private SyncConfigMapper syncConfigMapper;
@Autowired private JdbcDataSourceService dataSourceService;
@Override public ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form) { Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), null); if (checkResult == ZERO) { ColumnMap columnMap = makeColumnMap(form.getColumnMap()); if (columnMap == null) { return ReturnResult.error(BizCode.COLUMN_MATCHING_ERROR); } // 查询reader数据源 填充reader JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader()); // 查询writer数据源 填充writer JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter()); JSONArray contentArray = new JSONArray(); JSONObject content = new JSONObject(); content.put("reader", reader); content.put("writer", writer); contentArray.add(content); SyncConfig syncConfig = new SyncConfig(); syncConfig.setContent(packageJob(contentArray)); syncConfig.setName(form.getName()); syncConfig.setDescription(form.getDescription()); syncConfig.setSyncType(form.getSyncType()); syncConfig.setCreatedTimestamp(System.currentTimeMillis()); syncConfig.setCreatedUser(form.getUserId()); syncConfig.setModifiedTimestamp(System.currentTimeMillis()); syncConfig.setProjectName(form.getProjectName()); syncConfig.setProjectId(form.getProjectId()); syncConfigMapper.insert(syncConfig); return ReturnResult.success(entityToDto(syncConfig)); } log.error("SyncConfigServiceImpl--addConfig--NAME_IS_EXIST!"); return ReturnResult.error(BizCode.NAME_IS_EXIST); }
@Override public ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form) { SyncConfig syncConfig = syncConfigMapper.findById(form.getId()); if (syncConfig == null || syncConfig.getCreatedUser() != form.getUserId()) { return ReturnResult.error(BizCode.UPDATE_OBJECT_NOT_EXIST); } Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), form.getId()); if (checkResult == ZERO) { ColumnMap columnMap = makeColumnMap(form.getColumnMap()); JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader()); JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter()); JSONArray contentArray = new JSONArray(); JSONObject content = new JSONObject(); content.put("reader", reader); content.put("writer", writer); contentArray.add(content); syncConfig.setContent(packageJob(contentArray)); syncConfig.setName(form.getName()); syncConfig.setDescription(form.getDescription()); syncConfig.setSyncType(form.getSyncType()); syncConfig.setModifiedTimestamp(System.currentTimeMillis()); syncConfig.setProjectName(form.getProjectName()); syncConfig.setProjectId(form.getProjectId()); syncConfigMapper.update(syncConfig); return ReturnResult.success(entityToDto(syncConfig)); } log.error("SyncConfigServiceImpl--updateConfig--NAME_IS_EXIST!"); return ReturnResult.error(BizCode.NAME_IS_EXIST); }
@Override public ReturnResult<SyncConfigDto> findById(Long id, Long userId) { SyncConfig syncConfig = syncConfigMapper.findById(id); if (syncConfig == null || syncConfig.getCreatedUser() != userId) { return ReturnResult.success(new SyncConfigDto()); } return ReturnResult.success(entityToDto(syncConfig)); }
@Override public ReturnResult delete(Long id, Long userId) { log.debug("****id:{},userId:{}****", id, userId); SyncConfig syncConfig = syncConfigMapper.findById(id); log.debug("****syncConfig:{}****", syncConfig); log.debug("****syncConfig != null:{}", syncConfig != null); log.debug("****syncConfig.getCreatedUser():{},userId:{},syncConfig.getCreatedUser().equals(userId):{}", syncConfig.getCreatedUser(), userId, syncConfig.getCreatedUser().equals(userId)); if (syncConfig != null && syncConfig.getCreatedUser().equals(userId)) { syncConfigMapper.delete(id); log.debug("****delete success!"); } return ReturnResult.success(); }
@Override public ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId) { SyncConfig syncConfig = new SyncConfig(); syncConfig.setCreatedUser(userId); syncConfig.setName(form.getName()); syncConfig.setProjectId(form.getProjectId()); PageHelper.startPage(form.getPageNum(), form.getPageSize()); PageInfo<SyncConfig> configPageInfo = new PageInfo<>(syncConfigMapper.list(syncConfig)); PageResult<SyncConfigDto> result = new PageResult<>(); result.setPageNum(configPageInfo.getPageNum()); result.setPageSize(configPageInfo.getPageSize()); result.setTotalCount(configPageInfo.getTotal()); result.setTotalPage(configPageInfo.getPages()); List<SyncConfigDto> dtoList = configPageInfo.getList().stream().map(this::entityToDto).collect(Collectors.toList()); result.setList(dtoList); return ReturnResult.success(result); }
/** * 将reader writer对照list查分成2个独立list(保持顺序) * * @param columnMap * [{"reader":"col l1","writer":"col r1"},{"reader":"col l2","writer":"col r2"}] * @return object contants reader(list<String>) and writer(list<String>) */ private ColumnMap makeColumnMap(JSONArray columnMap) { List<String> readerColumns = new ArrayList<>(); List<String> writerColumns = new ArrayList<>(); for (int i = 0; i < columnMap.size(); i++) { JSONObject column = columnMap.getJSONObject(i); readerColumns.add(column.getString("reader")); writerColumns.add(column.getString("writer")); } if (CollectionUtils.isEmpty(readerColumns) || CollectionUtils.isEmpty(writerColumns)) { return null; } ColumnMap column = new ColumnMap(); column.setReader(readerColumns); column.setWriter(writerColumns); return column; }
/** * 封装reader json * * @param readerConfigId * 数据源id * @param readerParam * 页面填写reader 配置属性信息(table、where...) * @param readerColumns * 选择的数据字段 * @return reader json */ private JSONObject packageReader(Long readerConfigId, JSONObject readerParam, List<String> readerColumns) { Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(readerConfigId); if (!descrypt.responseSuccess()) { return null; } JdbcDataSourceDto jdbcDataSource = descrypt.getInfo(); String sourceType = jdbcDataSource.getSourceType(); BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType); JSONObject reader = baseParamTool.makeReaderJson(jdbcDataSource, readerParam, readerColumns); return reader; }
/** * 封装writer json * * @param writerConfigId * 数据源id * @param writerParam * 页面填写writer 配置属性信息(table、where...) * @param writerColumns * 选择的映射字段 * @return writer json */ private JSONObject packageWriter(Long writerConfigId, JSONObject writerParam, List<String> writerColumns) { Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(writerConfigId); if (!descrypt.responseSuccess()) { return null; } JdbcDataSourceDto jdbcDataSource = descrypt.getInfo(); String sourceType = jdbcDataSource.getSourceType(); BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType); JSONObject writer = baseParamTool.makeWriterJson(jdbcDataSource, writerParam, writerColumns); return writer; }
/** * 封装执行job json * * @param content * reader and writer * @return job */ private JSONObject packageJob(JSONArray content) { JSONObject job = new JSONObject(); JSONObject setting = new JSONObject(); JSONObject speed = new JSONObject(); speed.put("channel", 1); JSONObject errorLimit = new JSONObject(); errorLimit.put("record", 0); errorLimit.put("percentage", Contants.PERCENTAGE); setting.put("speed", speed); setting.put("errorLimit", errorLimit); job.put("setting", setting); job.put("content", content); JSONObject jobContent = new JSONObject(); jobContent.put("job", job); return jobContent; }
/** * entity转dto * * @param syncConfig * entity * @return dto */ private SyncConfigDto entityToDto(SyncConfig syncConfig) { SyncConfigDto configDto = new SyncConfigDto(); BeanUtils.copyProperties(syncConfig, configDto); return configDto; }}
复制代码

SyncConfigMapper

package com.geespace.microservices.builder.dao;
import java.util.List;
import com.geespace.microservices.builder.entity.SyncConfig;
import org.apache.ibatis.annotations.Delete;import org.apache.ibatis.annotations.Insert;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Result;import org.apache.ibatis.annotations.ResultMap;import org.apache.ibatis.annotations.Results;import org.apache.ibatis.annotations.Select;import org.apache.ibatis.annotations.SelectKey;import org.apache.ibatis.annotations.SelectProvider;import org.apache.ibatis.annotations.Update;import org.apache.ibatis.type.JdbcType;
/** * @Author: zjr * @Date: 2020-05-05 10:40 * @Version 1.0 */@Mapperpublic interface SyncConfigMapper { /** * 插入一条数据 * * @param syncConfig * 插入对象 * @return 结果 */ @Insert({"insert into sync_config (name, description, content, sync_type, created_timestamp, created_user, ", "modified_timestamp,project_id,project_name) values (#{name,jdbcType=VARCHAR},#{description,jdbcType=VARCHAR},", "#{content,jdbcType=OTHER, typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler}, ", "#{syncType,jdbcType=TINYINT}, #{createdTimestamp,jdbcType=BIGINT}, #{createdUser,jdbcType=BIGINT}, ", "#{modifiedTimestamp,jdbcType=BIGINT},#{projectId},#{projectName})"}) @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class) Long insert(SyncConfig syncConfig);
/** * 更新数据 * * @param syncConfig * 插入对象 * @return 结果 */ @Update({"update sync_config set ", "name = #{name,jdbcType=VARCHAR}, description = #{description,jdbcType=VARCHAR}, ", "sync_type = #{syncType,jdbcType=TINYINT}, content = #{content,jdbcType=OTHER,", "typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler},project_id=#{projectId}, ", "created_timestamp = #{createdTimestamp,jdbcType=BIGINT}, created_user = #{createdUser,jdbcType=BIGINT}, ", "modified_timestamp = #{modifiedTimestamp,jdbcType=BIGINT},project_name=#{projectName},project_id=#{projectId}", " where id = #{id,jdbcType=BIGINT}"}) int update(SyncConfig syncConfig);
/** * 删除数据 * * @param id * config id * @return 影响行数 */ @Delete("delete from sync_config where id = #{id,jdbcType=BIGINT}") int delete(Long id);
/** * 查询 * * @param syncConfig * name * @return list结果 */ @SelectProvider(type = SyncConfigSqlProvider.class, method = "select") @Results(id = "resultMap", value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true), @Result(column = "name", property = "name", jdbcType = JdbcType.VARCHAR), @Result(column = "description", property = "description", jdbcType = JdbcType.VARCHAR), @Result(column = "sync_type", property = "syncType", jdbcType = JdbcType.TINYINT), @Result(column = "content", property = "content", jdbcType = JdbcType.OTHER, typeHandler = com.geespace.microservices.builder.handler.MySqlJsonHandler.class), @Result(column = "created_timestamp", property = "createdTimestamp", jdbcType = JdbcType.BIGINT), @Result(column = "created_user", property = "createdUser", jdbcType = JdbcType.BIGINT), @Result(column = "project_id", property = "projectId", jdbcType = JdbcType.BIGINT), @Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR), @Result(column = "modified_timestamp", property = "modifiedTimestamp", jdbcType = JdbcType.BIGINT)}) List<SyncConfig> list(SyncConfig syncConfig);
/** * id 查询 * * @param id * id * @return 结果 */ @Select({"select id,project_id,project_name,name, description, sync_type, content," + " created_timestamp, created_user, modified_timestamp ", "from sync_config where id = #{id,jdbcType=BIGINT}"}) @ResultMap("resultMap") SyncConfig findById(Long id);
/** * 校验任务名称唯一性,用于新增功能 * @author: liudz * @param createdUser 用户ID * @param name 任务名称 * @param id 任务ID * @date: 2020/7/23 * @return SparkTask */ @SelectProvider(type = SyncConfigSqlProvider.class, method = "checkNameUnique") Integer checkNameUnique(Long createdUser, String name, Long id);}
复制代码

SyncConfigSqlProvider

package com.geespace.microservices.builder.dao;
import com.geespace.microservices.builder.entity.SyncConfig;
import org.apache.commons.lang3.StringUtils;import org.apache.ibatis.jdbc.SQL;
/** * @Author: zjr * @Date: 2020-05-22 13:35 * @Version 1.0 */public class SyncConfigSqlProvider { /** * 条件查询 * * @param syncConfig * name * @return sql */ public String select(SyncConfig syncConfig) { SQL sql = new SQL(); sql.SELECT("id,project_id,project_name,name, description, sync_type, content, created_timestamp," + " created_user, modified_timestamp"); sql.FROM("sync_config"); sql.WHERE("created_user = #{createdUser,jdbcType=BIGINT}"); if (!org.springframework.util.StringUtils.isEmpty(syncConfig.getProjectId())) { sql.WHERE("project_id=#{projectId}"); } if (!StringUtils.isBlank(syncConfig.getName())) { sql.WHERE("name like concat('%', #{name,jdbcType=VARCHAR}, '%')"); } sql.ORDER_BY("id desc"); return sql.toString(); }
/** * 校验任务名称唯一性,用于新增功能 * * @author: liudz * @date 2019/12/3 * @author: liudz * @param createdUser 用户ID * @param name 任务名称 * @param id 任务ID * @return sql */ public String checkNameUnique(Long createdUser, String name, Long id) { SQL sql = new SQL(); sql.SELECT("COUNT(name)"); sql.FROM("sync_config"); if (!org.springframework.util.StringUtils.isEmpty(id)) { sql.WHERE("id != #{id}"); } sql.WHERE("created_user=#{createdUser} and name=#{name}"); return sql.toString(); }}
复制代码

JdbcDataSourceService

package com.geespace.microservices.datasource.service;
import java.util.List;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;import com.geespace.microservices.datasource.response.PageResult;import com.geespace.microservices.datasource.response.Response;
/** * @Author: zjr * @Date: 2020-04-07 17:44 * @Version 1.0 */public interface JdbcDataSourceService { /** * 添加数据源信息 * * @param dataSourceAddForm * 数据源信息 * @return 添加成功的信息 */ Response<JdbcDataSourceDto> addDataSource(DataSourceAddForm dataSourceAddForm);
/** * 修改数据源信息 * * @param dataSourceUpdateForm * 数据源信息 * @return 修改后的信息 */ Response<JdbcDataSourceDto> updateDataSource(DataSourceUpdateForm dataSourceUpdateForm);
/** * 删除数据源信息 * * @param id * 数据源id * @return 删除是否成功 */ Response deleteDataSource(Long id);
/** * 数据源列表查询-全量 * * @param creator * 创建者 * @return 全量列表 */ Response<List<JdbcDataSourceDto>> list(Long creator);
/** * 数据源列表查询-按类型查询 * * @param type * 数据源类型 * @param creator * 创建者 * @return 全量列表 */ Response<List<JdbcDataSourceDto>> listByType(Long creator, List<String> type);
/** * 内部数据源列表查询-全量 * * @return 全量列表 */ Response<List<JdbcDataSourceDto>> listMeta();
/** * 数据源列表查询-分页 * * @param form * 查询条件 * @return 分页列表 */ Response<PageResult<JdbcDataSourceDto>> select(DataSourceSelectForm form);
/** * 通过id查找数据源 * * @param id * 数据源id * @return 查询结果 */ Response<JdbcDataSourceDto> find(Long id);
/** * 通过id查找元数据源 * * @param id * 数据源id * @return 查询结果 */ Response<JdbcDataSourceDto> findMetaDataSource(Long id);
/** * 通过id查找数据源-明文 * * @param id * 数据源id * @return 查询结果 */ Response<JdbcDataSourceDto> findDescrypt(Long id);}
复制代码

JdbcDataSourceServiceImpl

package com.geespace.microservices.datasource.service.impl;
import java.util.Date;import java.util.List;import java.util.stream.Collectors;
import com.geespace.microservices.datasource.dao.DataSourceMapper;import com.geespace.microservices.datasource.dao.MetaDataSourceMapper;import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;import com.geespace.microservices.datasource.entity.JdbcDataSource;import com.geespace.microservices.datasource.enums.JdbcDataSourceStatusEnum;import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;import com.geespace.microservices.datasource.response.Msg;import com.geespace.microservices.datasource.response.PageResult;import com.geespace.microservices.datasource.response.Response;import com.geespace.microservices.datasource.service.JdbcDataSourceService;import com.geespace.microservices.datasource.util.AesUtil;import com.geespace.microservices.datasource.util.LocalCacheUtil;import com.github.pagehelper.PageHelper;import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;import org.springframework.beans.BeanUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;
/** * @Author: zjr * @Date: 2020-04-07 17:45 * @Version 1.0 */@Slf4j@Servicepublic class JdbcDataSourceServiceImpl implements JdbcDataSourceService { @Autowired private DataSourceMapper dataSourceMapper;
@Autowired private MetaDataSourceMapper metaDataSourceMapper;
@Override public Response<JdbcDataSourceDto> addDataSource(DataSourceAddForm dataSourceAddForm) { JdbcDataSource dataSource = new JdbcDataSource(); BeanUtils.copyProperties(dataSourceAddForm, dataSource); JdbcDataSource exist = dataSourceMapper.nameExist(dataSource); if (exist != null) { return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST); } String userName = AesUtil.decrypt(dataSource.getUserName()); // 判断账密是否为密文 if (userName == null) { dataSource.setUserName(AesUtil.encrypt(dataSource.getUserName())); } String pwd = AesUtil.decrypt(dataSource.getPassword()); if (pwd == null) { dataSource.setPassword(AesUtil.encrypt(dataSource.getPassword())); } dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); dataSource.setStatus(JdbcDataSourceStatusEnum.USING.getStatus()); dataSourceMapper.insert(dataSource); JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto(); BeanUtils.copyProperties(dataSource, dataSourceDto); return Response.success(dataSourceDto); }
@Override public Response<JdbcDataSourceDto> updateDataSource(DataSourceUpdateForm dataSourceUpdateForm) { JdbcDataSource dataSource = dataSourceMapper.find(dataSourceUpdateForm.getId()); if (dataSource == null || dataSourceUpdateForm.getCreator() != dataSource.getCreator()) { return Response.error(Msg.DATASOURCE_NOT_EXIST); } String userName = AesUtil.decrypt(dataSourceUpdateForm.getUserName()); // 判断账密是否为密文 if (userName == null) { dataSourceUpdateForm.setUserName(AesUtil.encrypt(dataSourceUpdateForm.getUserName())); } String pwd = AesUtil.decrypt(dataSourceUpdateForm.getPassword()); if (pwd == null) { dataSourceUpdateForm.setPassword(AesUtil.encrypt(dataSourceUpdateForm.getPassword())); } String originName = dataSource.getSourceName(); // 注意copyProperties是将source中的属性全部copy到target中 BeanUtils.copyProperties(dataSourceUpdateForm, dataSource); JdbcDataSource exist = dataSourceMapper.nameExist(dataSource); if (exist != null && !exist.getSourceName().equals(originName)) { return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST); } dataSource.setUpdateTime(new Date()); dataSourceMapper.update(dataSource); LocalCacheUtil.remove(dataSource.getCreator() + originName); JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto(); BeanUtils.copyProperties(dataSource, dataSourceDto); return Response.success(dataSourceDto); }
@Override public Response deleteDataSource(Long id) { dataSourceMapper.delete(id); return Response.success(); }
@Override public Response<List<JdbcDataSourceDto>> list(Long creator) { List<JdbcDataSource> list = dataSourceMapper.list(creator); List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList()); return Response.success(listDto); }
@Override public Response<List<JdbcDataSourceDto>> listByType(Long creator, List<String> type) { List<JdbcDataSource> list = dataSourceMapper.listByType(creator, type); List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList()); return Response.success(listDto); }
@Override public Response<List<JdbcDataSourceDto>> listMeta() { List<JdbcDataSource> list = metaDataSourceMapper.list(); List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList()); return Response.success(listDto); }
@Override public Response<PageResult<JdbcDataSourceDto>> select(DataSourceSelectForm form) { JdbcDataSource select = new JdbcDataSource(); select.setSourceName(form.getSourceName()); select.setCreator(form.getCreator()); PageHelper.startPage(form.getPageNum(), form.getPageSize()); List<JdbcDataSource> list = dataSourceMapper.select(select); PageInfo<JdbcDataSource> pageInfo = new PageInfo<>(list); PageResult<JdbcDataSourceDto> pageResult = new PageResult<>(); pageResult.setPageNum(pageInfo.getPageNum()); pageResult.setPageSize(pageInfo.getPageSize()); pageResult.setTotalPage(pageInfo.getPages()); pageResult.setTotalCount(pageInfo.getTotal()); pageResult.setList(pageInfo.getList().stream().map(this::getDto).collect(Collectors.toList())); return Response.success(pageResult); }
@Override public Response<JdbcDataSourceDto> find(Long id) { JdbcDataSource jdbcDataSource = dataSourceMapper.find(id); if (jdbcDataSource == null) { return Response.error(Msg.DATASOURCE_NOT_EXIST); } return Response.success(getDto(jdbcDataSource)); }
@Override public Response<JdbcDataSourceDto> findMetaDataSource(Long id) { JdbcDataSource jdbcDataSource = this.metaDataSourceMapper.find(id); if (jdbcDataSource == null) { return Response.error(Msg.DATASOURCE_NOT_EXIST); } return Response.success(getDto(jdbcDataSource)); }
@Override public Response<JdbcDataSourceDto> findDescrypt(Long id) { JdbcDataSource jdbcDataSource = dataSourceMapper.find(id); if (jdbcDataSource == null) { return Response.error(Msg.DATASOURCE_NOT_EXIST); } if (!StringUtils.isBlank(jdbcDataSource.getUserName())) { jdbcDataSource.setUserName(AesUtil.decrypt(jdbcDataSource.getUserName())); } if (!StringUtils.isBlank(jdbcDataSource.getPassword())) { jdbcDataSource.setPassword(AesUtil.decrypt(jdbcDataSource.getPassword())); } return Response.success(getDto(jdbcDataSource)); }
/** * 获取dto * * @param jdbcDataSource * source * @return dto */ private JdbcDataSourceDto getDto(JdbcDataSource jdbcDataSource) { JdbcDataSourceDto dto = new JdbcDataSourceDto(); BeanUtils.copyProperties(jdbcDataSource, dto); return dto; }}
复制代码

DataSourceMapper

package com.geespace.microservices.datasource.dao;
import java.util.List;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import org.apache.ibatis.annotations.Delete;import org.apache.ibatis.annotations.Insert;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Options;import org.apache.ibatis.annotations.Param;import org.apache.ibatis.annotations.Result;import org.apache.ibatis.annotations.ResultMap;import org.apache.ibatis.annotations.Results;import org.apache.ibatis.annotations.Select;import org.apache.ibatis.annotations.SelectKey;import org.apache.ibatis.annotations.SelectProvider;import org.apache.ibatis.annotations.Update;import org.apache.ibatis.type.JdbcType;
/** * @Author: zjr * @Date: 2020-04-07 17:05 * @Version 1.0 */@Mapperpublic interface DataSourceMapper { /** * 添加数据源信息 * * @param source * 数据源 * @return id */ @Insert({ "insert into ge_jdbc_datasource (source_type, source_name, jdbc_url, user_name, password, zk_address, znode, ", "database_name, jdbc_driver_class, remark, creator, create_time, update_time, status)", "values (#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{jdbcUrl,jdbcType=VARCHAR}, ", "#{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR}, #{zkAddress,jdbcType=VARCHAR}, ", "#{znode,jdbcType=VARCHAR}, #{databaseName,jdbcType=VARCHAR}, #{jdbcDriverClass,jdbcType=VARCHAR}, ", "#{remark,jdbcType=VARCHAR}, #{creator,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, ", "#{updateTime,jdbcType=TIMESTAMP}, #{status,jdbcType=TINYINT})"}) @SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class) int insert(JdbcDataSource source);
/** * 修改数据源 * * @param source * 全量修改 * @return 影响行数 */ @Update({"update ge_jdbc_datasource", "set source_type = #{sourceType,jdbcType=VARCHAR}, source_name = #{sourceName,jdbcType=VARCHAR}, ", "jdbc_url = #{jdbcUrl,jdbcType=VARCHAR}, user_name = #{userName,jdbcType=VARCHAR}, ", "password = #{password,jdbcType=VARCHAR}, zk_address = #{zkAddress,jdbcType=VARCHAR}, ", "znode = #{znode,jdbcType=VARCHAR}, database_name = #{databaseName,jdbcType=VARCHAR}, ", "jdbc_driver_class = #{jdbcDriverClass,jdbcType=VARCHAR}, remark = #{remark,jdbcType=VARCHAR}, ", "update_time = #{updateTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=TINYINT}", "where id = #{id,jdbcType=BIGINT} and status = 1"}) int update(JdbcDataSource source);
/** * 删除数据源配置 * * @param id * 数据源id * @return 影响行数 */ @Delete("delete from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT}") int delete(Long id);
/** * 查询用户数据源配置 * * @param creator * 创建者id * @return 数据源列表 */ @Select({"select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} and status = 1", " order by id desc"}) @Results(id = "resultMap", value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true), @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR), @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR), @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR), @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR), @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR), @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR), @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR), @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR), @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR), @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR), @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT), @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP), @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP), @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)}) List<JdbcDataSource> list(Long creator);
/** * 查询用户数据源配置 * * @param creator * 创建者id * @param type * 数据源类型 * @return 数据源列表 */ @Select({"<script>", "select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} ", "and status = 1 and source_type in ", "<foreach item='item' index='index' collection='type' open='(' separator=',' close=')'>", "#{item,jdbcType=VARCHAR}", "</foreach>", " order by id desc", "</script>"}) @ResultMap("resultMap") List<JdbcDataSource> listByType(@Param("creator") Long creator, @Param("type") List<String> type);
/** * 条件查询数据源 * * @param jdbcDataSource * 查询条件 * @return 查询结果 */ @SelectProvider(type = DataSourceSqlProvider.class, method = "select") @ResultMap("resultMap") List<JdbcDataSource> select(JdbcDataSource jdbcDataSource);
/** * id 查找 * * @param id * id * @return 数据源 */ @Options(flushCache = Options.FlushCachePolicy.TRUE) @Select("select * from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT} and status = 1") @ResultMap("resultMap") JdbcDataSource find(Long id);
/** * 数据源名称是否存在 * * @param jdbcDataSource * 数据源名称 * @return 数据源 */ @Select({"select * from ge_jdbc_datasource ", "where source_name = #{sourceName,jdbcType=VARCHAR} and creator = #{creator,jdbcType=BIGINT}"}) @ResultMap("resultMap") JdbcDataSource nameExist(JdbcDataSource jdbcDataSource);}
复制代码

DataSourceSqlProvider

package com.geespace.microservices.datasource.dao;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import org.apache.commons.lang.StringUtils;import org.apache.ibatis.jdbc.SQL;
/** * @Author: zjr * @Date: 2020-04-09 14:20 * @Version 1.0 */public class DataSourceSqlProvider { /** * 条件查询sql语句生成 * * @param jdbcDataSource * 查询条件 * @return sql语句 */ public String select(JdbcDataSource jdbcDataSource) { SQL sql = new SQL(); sql.SELECT("*"); sql.FROM("ge_jdbc_datasource"); sql.WHERE("status = 1"); if (jdbcDataSource.getCreator() != null) { sql.WHERE("creator = #{creator,jdbcType=BIGINT}"); } if (!StringUtils.isBlank(jdbcDataSource.getSourceName())) { sql.WHERE("source_name like concat('%', #{sourceName,jdbcType=VARCHAR}, '%')"); } sql.ORDER_BY("id desc"); return sql.toString(); }}
复制代码

MetaDataSourceMapper

package com.geespace.microservices.datasource.dao;
import java.util.List;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Result;import org.apache.ibatis.annotations.ResultMap;import org.apache.ibatis.annotations.Results;import org.apache.ibatis.annotations.Select;import org.apache.ibatis.type.JdbcType;
/** * 内部数据源,系统配置,和外部数据源保持一致 * @Author: zjr * @Date: 2020-04-07 17:05 * @Version 1.0 */@Mapperpublic interface MetaDataSourceMapper { /** * 查询用户数据源配置 * * @return 数据源列表 */ @Select("select * from ge_meta_datasource ") @Results(id = "resultMap", value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true), @Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR), @Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR), @Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR), @Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR), @Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR), @Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR), @Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR), @Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR), @Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR), @Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR), @Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT), @Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP), @Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP), @Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)}) List<JdbcDataSource> list(); /** * id 查找 * * @param id * id * @return 数据源 */ @Select("select * from ge_meta_datasource where id = #{id,jdbcType=BIGINT} ") @ResultMap("resultMap") JdbcDataSource find(Long id);}
复制代码

DataSourceSelectForm

package com.geespace.microservices.datasource.form.datasource;
import javax.validation.constraints.NotNull;
import lombok.Data;
/** * @Author: zjr * @Date: 2020-04-09 14:13 * @Version 1.0 */@Datapublic class DataSourceSelectForm { /** * 数据源名称模糊查询 */ private String sourceName; /** * 创建者 */ private Long creator; /** * 页码 */ @NotNull(message = "pageSize不能为空") private int pageSize; /** * 每页数据量 */ @NotNull(message = "pageNum不能为空") private int pageNum;}
复制代码

DataSourceAddForm

package com.geespace.microservices.datasource.form.datasource;
import javax.validation.constraints.NotBlank;
import lombok.Data;
/** * @Author: zjr * @Date: 2020-04-07 17:46 * @Version 1.0 */@Datapublic class DataSourceAddForm { /** * 数据源类型 */ @NotBlank(message = "数据源类型不能为空") private String sourceType; /** * 数据源名称 */ @NotBlank(message = "数据源名称不能为空") private String sourceName; /** * jdbc url */ private String jdbcUrl; /** * 用户名 */ private String userName; /** * 密码 */ private String password; /** * zk地址 */ private String zkAddress; /** * hbase znode */ private String znode; /** * 数据库名称 */ private String databaseName; /** * 驱动类 */ private String jdbcDriverClass; /** * 备注 */ private String remark; /** * 创建者 */ private Long creator;
}
复制代码

DataSourceUpdateForm

package com.geespace.microservices.datasource.form.datasource;
import javax.validation.constraints.NotBlank;import javax.validation.constraints.NotNull;
import lombok.Data;
/** * @Author: zjr * @Date: 2020-04-07 17:46 * @Version 1.0 */@Datapublic class DataSourceUpdateForm { /** * id */ @NotNull(message = "id不能为空") private Long id; /** * 数据源类型 */ @NotBlank(message = "数据源类型不能为空") private String sourceType; /** * 数据源名称 */ @NotBlank(message = "数据源名称不能为空") private String sourceName; /** * jdbc url */ private String jdbcUrl; /** * 用户名 */ private String userName; /** * 密码 */ private String password; /** * zk地址 */ private String zkAddress; /** * hbase znode */ private String znode; /** * 数据库名称 */ private String databaseName; /** * 驱动类 */ private String jdbcDriverClass; /** * 备注 */ private String remark; /** * 创建者 */ private Long creator;}
复制代码

YAPI 测试用例

5.1 查询全部同步任务配置(分页)


{  "pageNum": 1,  "pageSize": 10,  "projectId": 28,  "name": "测试"}
复制代码

5.2 创建同步任务配置-mysql->mysql

{  "name": "测试同步任务配置-mysql-mysql-1",  "description": "测试同步任务配置-mysql-mysql-1",  "projectName": "test测试1",  "projectId": 28,  "syncType": 2,  "readerConfigId": 1,  "readerParam": {    "table": "test_test"  },  "writerConfigId": 1,  "writerParam": {    "table": "test_test_1"  },  "columnMap": [    {      "reader": "id",      "writer": "id"    },    {      "reader": "name",      "writer": "name"    }  ]}
复制代码

5.6 创建同步任务配置-hbase->hbase

{  "name": "测试同步任务配置-hbase-hbase-1",  "description": "测试同步任务配置-hbase-hbase-1",  "projectName": "test测试1",  "projectId": 28,  "syncType": 2,  "readerConfigId": 130,  "readerParam": {    "table": "test_test"  },  "writerConfigId": 130,  "writerParam": {    "table": "test_test_1",    "rowkeyColumns": [      "f:id",      "f:name"    ]  },  "columnMap": [    {      "reader": "f:id",      "writer": "f:id"    },    {      "reader": "f:name",      "writer": "f:name"    }  ]}
复制代码

5.7 创建同步任务配置-mysql->hbase

{  "name": "测试同步任务配置-mysql-hbase-1",  "description": "测试同步任务配置mysql-hbase-1",  "projectName": "test测试1",  "projectId": 28,  "syncType": 2,  "readerConfigId": 1,  "readerParam": {    "table": "test_test"  },  "writerConfigId": 130,  "writerParam": {    "table": "test_test_1",    "rowkeyColumns": [      "f:id",      "f:name"    ]  },  "columnMap": [    {      "reader": "id",      "writer": "f:id"    },    {      "reader": "name",      "writer": "f:name"    }  ]}
复制代码

5.8 创建同步任务配置-hbase->mysql

{  "name": "测试同步任务配置-hbase-mysql-1",  "description": "测试同步任务配置-hbase-mysql-1",  "projectName": "test测试1",  "projectId": 28,  "syncType": 2,  "readerConfigId": 130,  "readerParam": {    "table": "test_test",    "rowkeyColumns": [      "f:id",      "f:name"    ]  },  "writerConfigId": 1,  "writerParam": {    "table": "test_test_1"  },  "columnMap": [    {      "reader": "f:id",      "writer": "id"    },    {      "reader": "f:name",      "writer": "name"    }  ]}
复制代码

5.3 更新同步任务配置

{  "id": 82,  "name": "测试同步任务配置-mysql-3",  "description": "测试同步任务配置-mysql-3",  "projectName": "数据同步任务",  "projectId": 19,  "syncType": 2,  "readerConfigId": 1,  "readerParam": {    "table": "test_test"  },  "writerConfigId": 1,  "writerParam": {    "table": "test_test_1"  },  "columnMap": [    {      "reader": "id",      "writer": "id"    },    {      "reader": "name",      "writer": "name"    }  ]}
复制代码

5.5 删除同步任务配置

5.4 查询同步任务配置

3.3 执行数据同步任务

3.4 停止数据同步任务

三、本人相关其他文章链接

1.springboot 项目集成 dolphinscheduler 调度器 可拖拽 spark 任务管理:https://blog.csdn.net/a924382407/article/details/117119831


2.springboot 项目集成 dolphinscheduler 调度器 实现 datax 数据同步任务:https://blog.csdn.net/a924382407/article/details/120951230


3.springboot 项目集成 dolphinscheduler 调度器 项目管理:https://blog.csdn.net/a924382407/article/details/117118931


4.springboot 项目集成大数据第三方 dolphinscheduler 调度器 执行/停止任务https://blog.csdn.net/a924382407/article/details/117121181


5.springboot 项目集成大数据第三方 dolphinscheduler 调度器[https://blog.csdn.net/a924382407/article/details/117113848](https://blog.csdn.

用户头像

刘大猫

关注

还未添加个人签名 2022-08-23 加入

还未添加个人简介

评论

发布
暂无评论
springboot项目集成dolphinscheduler调度器 实现datax数据同步任务_人工智能_刘大猫_InfoQ写作社区