写点什么

升级遇到坑?一文带你搞定 DolphinScheduler 2.0 到 3.0 升级

作者:白鲸开源
  • 2025-05-14
    天津
  • 本文字数:21224 字

    阅读完需:约 70 分钟

升级遇到坑?一文带你搞定DolphinScheduler 2.0到3.0升级

1.升级背景

因项目需要使用数据质量模块功能,可以为数仓提供良好的数据质量监控功能。故要对已有 2.0 版本升级到 3.0 版本以上,此次选择测试了 3.0.1 和 3.1.1 两个版本,对进行同数据等任务调度暂停等操作测试,最后选择 3.0.1 版本。


原因:


  1. 3.1.1 在测试 sql 任务时 ,同时启动上百 sql 任务时,会出现 sql 任务报错,导致大量任务无法正常运行,询问社区大佬,这是 DS 本身 bug 导致,虽然此现象在 3.0.1 也有出现,不过出现几率较小。

  2. DS3.0.1 以上版本 zookeeper 的依赖版本进行了更新,查看驱动版本是 3.8 版本。我们生产不打算升级 zk,故选择使用 3.0.1 版本。


此版本测试还是比较稳定的,功能比较完善,满足我们使用需求。


此次升级已经验证可行性,已在生产环境验证上线,对已有的问题,并给出了合理的解决方便,故写此篇文章,供各位同学参考。

2.升级的方案

选定方案:


采用数据库表同步的方式进行任务迁移,前期 3.0 版本 和 2.0 版本同时运行,任务再验证没问题后,再逐步停止 2.0 版本。


原因:直接使用官网提供的升级脚本,无法正常运行,有较多问题,目前我们改造后,升级的数据库信息没问题,运行时数据信息有损坏,导致较多问题,所以为安全稳定,不直接使用官网提方案。

3.升级准备

1.首先对已有数据库进行备份,此项非常重要,


备份原始DS库: mysqldump -h ip -P 3306 -u 用户 -p 密码 数据库名  > /opt/new_dolphinscheduler.sql
恢复到新库:  mysql -u 用户 –p 密码 数据库名 < 备份文件.sql
复制代码


2.对已有备份表进行表结构变更


####此脚本是官网脚本改的,添加了字段/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- uc_dolphin_T_t_ds_alert_R_signdrop PROCEDURE if EXISTS uc_dolphin_T_t_ds_alert_R_sign;delimiter d//CREATE PROCEDURE uc_dolphin_T_t_ds_alert_R_sign()BEGIN IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='sign') THENALTER TABLE `t_ds_alert` ADD COLUMN `sign` char(40) NOT NULL DEFAULT '' COMMENT 'sign=sha1(content)' after `id`;ALTER TABLE `t_ds_alert` ADD INDEX `idx_sign` (`sign`) USING BTREE;END IF;END;
d//
delimiter ;CALL uc_dolphin_T_t_ds_alert_R_sign;DROP PROCEDURE uc_dolphin_T_t_ds_alert_R_sign;
-- add unique key to t_ds_relation_project_userdrop PROCEDURE if EXISTS add_t_ds_relation_project_user_uk_uniq_uid_pid;delimiter d//CREATE PROCEDURE add_t_ds_relation_project_user_uk_uniq_uid_pid()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_relation_project_user' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='uniq_uid_pid') THENALTER TABLE t_ds_relation_project_user ADD UNIQUE KEY uniq_uid_pid(user_id, project_id);END IF;END;
d//
delimiter ;# CALL add_t_ds_relation_project_user_uk_uniq_uid_pid;DROP PROCEDURE add_t_ds_relation_project_user_uk_uniq_uid_pid;
-- drop t_ds_relation_project_user key user_id_indexdrop PROCEDURE if EXISTS drop_t_ds_relation_project_user_key_user_id_index;delimiter d//CREATE PROCEDURE drop_t_ds_relation_project_user_key_user_id_index()BEGIN IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_relation_project_user' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='user_id_index') THENALTER TABLE `t_ds_relation_project_user` DROP KEY `user_id_index`;END IF;END;d//delimiter ;CALL drop_t_ds_relation_project_user_key_user_id_index;DROP PROCEDURE drop_t_ds_relation_project_user_key_user_id_index;
-- add unique key to t_ds_projectdrop PROCEDURE if EXISTS add_t_ds_project_uk_unique_name;delimiter d//CREATE PROCEDURE add_t_ds_project_uk_unique_name()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_project' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_name') THENALTER TABLE t_ds_project ADD UNIQUE KEY unique_name(name);END IF;END;d//delimiter ;CALL add_t_ds_project_uk_unique_name;DROP PROCEDURE add_t_ds_project_uk_unique_name;
drop PROCEDURE if EXISTS add_t_ds_project_uk_unique_code;delimiter d//CREATE PROCEDURE add_t_ds_project_uk_unique_code()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_project' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_code') THENALTER TABLE t_ds_project ADD UNIQUE KEY unique_code(code);END IF;END;d//delimiter ;CALL add_t_ds_project_uk_unique_code;DROP PROCEDURE add_t_ds_project_uk_unique_code;
-- add unique key to t_ds_queuedrop PROCEDURE if EXISTS add_t_ds_queue_uk_unique_queue_name;delimiter d//CREATE PROCEDURE add_t_ds_queue_uk_unique_queue_name()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_queue' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_queue_name') THENALTER TABLE t_ds_queue ADD UNIQUE KEY unique_queue_name(queue_name);END IF;END;d//delimiter ;CALL add_t_ds_queue_uk_unique_queue_name;DROP PROCEDURE add_t_ds_queue_uk_unique_queue_name;
-- add unique key to t_ds_udfsdrop PROCEDURE if EXISTS add_t_ds_udfs_uk_unique_func_name;delimiter d//CREATE PROCEDURE add_t_ds_udfs_uk_unique_func_name()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_udfs' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_func_name') THENALTER TABLE t_ds_udfs ADD UNIQUE KEY unique_func_name(func_name);END IF;END;d//delimiter ;CALL add_t_ds_udfs_uk_unique_func_name;DROP PROCEDURE add_t_ds_udfs_uk_unique_func_name;
-- add unique key to t_ds_tenantdrop PROCEDURE if EXISTS add_t_ds_tenant_uk_unique_tenant_code;delimiter d//CREATE PROCEDURE add_t_ds_tenant_uk_unique_tenant_code()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_tenant' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='unique_tenant_code') THENALTER TABLE t_ds_tenant ADD UNIQUE KEY unique_tenant_code(tenant_code);END IF;END;d//delimiter ;CALL add_t_ds_tenant_uk_unique_tenant_code;DROP PROCEDURE add_t_ds_tenant_uk_unique_tenant_code;
-- ALTER TABLE `t_ds_task_instance` ADD INDEX `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE;drop PROCEDURE if EXISTS add_t_ds_task_instance_uk_idx_code_version;delimiter d//CREATE PROCEDURE add_t_ds_task_instance_uk_idx_code_version()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_code_version') THENALTER TABLE `t_ds_task_instance` ADD INDEX `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE;END IF;END;d//delimiter ;CALL add_t_ds_task_instance_uk_idx_code_version;DROP PROCEDURE add_t_ds_task_instance_uk_idx_code_version;
-- ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`;drop PROCEDURE if EXISTS modify_t_ds_task_instance_col_task_params;delimiter d//CREATE PROCEDURE modify_t_ds_task_instance_col_task_params()BEGIN IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='task_params') THENALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`;END IF;END;d//delimiter ;CALL modify_t_ds_task_instance_col_task_params;DROP PROCEDURE modify_t_ds_task_instance_col_task_params;
-- ALTER TABLE `t_ds_task_instance` ADD COLUMN `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id';drop PROCEDURE if EXISTS add_t_ds_task_instance_col_task_group_id;delimiter d//CREATE PROCEDURE add_t_ds_task_instance_col_task_group_id()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME ='task_group_id') THENALTER TABLE `t_ds_task_instance` ADD COLUMN `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' after `var_pool`;END IF;END;d//delimiter ;CALL add_t_ds_task_instance_col_task_group_id;DROP PROCEDURE add_t_ds_task_instance_col_task_group_id;
-- ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE;drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_code;delimiter d//CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_code()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_code') THENALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE;END IF;END;d//delimiter ;CALL add_t_ds_process_task_relation_key_idx_code;DROP PROCEDURE add_t_ds_process_task_relation_key_idx_code;
-- ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_pre_task_code_version` (`pre_task_code`,`pre_task_version`);drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_pre_task_code_version;delimiter d//CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_pre_task_code_version()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_pre_task_code_version') THENALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_pre_task_code_version` (`pre_task_code`,`pre_task_version`);END IF;END;d//delimiter ;CALL add_t_ds_process_task_relation_key_idx_pre_task_code_version;DROP PROCEDURE add_t_ds_process_task_relation_key_idx_pre_task_code_version;
-- ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_post_task_code_version` (`post_task_code`,`post_task_version`);drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_post_task_code_version;delimiter d//CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_post_task_code_version()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_post_task_code_version') THENALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_post_task_code_version` (`post_task_code`,`post_task_version`);END IF;END;d//delimiter ;CALL add_t_ds_process_task_relation_key_idx_post_task_code_version;DROP PROCEDURE add_t_ds_process_task_relation_key_idx_post_task_code_version;
-- ALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE;drop PROCEDURE if EXISTS add_t_ds_process_task_relation_key_idx_process_code_version;delimiter d//CREATE PROCEDURE add_t_ds_process_task_relation_key_idx_process_code_version()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_process_task_relation_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_process_code_version') THENALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE;END IF;END;d//delimiter ;CALL add_t_ds_process_task_relation_key_idx_process_code_version;DROP PROCEDURE add_t_ds_process_task_relation_key_idx_process_code_version;
-- ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_project_code` (`project_code`) USING BTREE;drop PROCEDURE if EXISTS add_t_ds_task_definition_log_key_idx_process_code_version;delimiter d//CREATE PROCEDURE add_t_ds_task_definition_log_key_idx_process_code_version()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_project_code') THENALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_project_code` (`project_code`) USING BTREE;END IF;END;d//delimiter ;CALL add_t_ds_task_definition_log_key_idx_process_code_version;DROP PROCEDURE add_t_ds_task_definition_log_key_idx_process_code_version;
-- ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;drop PROCEDURE if EXISTS add_t_ds_task_definition_log_key_idx_code_version;delimiter d//CREATE PROCEDURE add_t_ds_task_definition_log_key_idx_code_version()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_code_version') THENALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;END IF;END;d//delimiter ;CALL add_t_ds_task_definition_log_key_idx_code_version;DROP PROCEDURE add_t_ds_task_definition_log_key_idx_code_version;
-- alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;drop PROCEDURE if EXISTS add_t_ds_task_definition_log_col_task_group_id;delimiter d//CREATE PROCEDURE add_t_ds_task_definition_log_col_task_group_id()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_id') THENalter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;END IF;END;d//delimiter ;CALL add_t_ds_task_definition_log_col_task_group_id;DROP PROCEDURE add_t_ds_task_definition_log_col_task_group_id;
-- alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;drop PROCEDURE if EXISTS add_t_ds_task_definition_col_task_group_id;delimiter d//CREATE PROCEDURE add_t_ds_task_definition_col_task_group_id()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_id') THENalter table t_ds_task_definition add `task_group_id` int DEFAULT NULL COMMENT 'task group id';END IF;END;d//delimiter ;CALL add_t_ds_task_definition_col_task_group_id;DROP PROCEDURE add_t_ds_task_definition_col_task_group_id;
-- alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;drop PROCEDURE if EXISTS add_t_ds_task_definition_log_col_task_group_priority;delimiter d//CREATE PROCEDURE add_t_ds_task_definition_log_col_task_group_priority()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition_log' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_priority') THENalter table t_ds_task_definition_log add `task_group_priority` tinyint DEFAULT '0' COMMENT 'task group priority' AFTER `task_group_id`;END IF;END;d//delimiter ;CALL add_t_ds_task_definition_log_col_task_group_priority;DROP PROCEDURE add_t_ds_task_definition_log_col_task_group_priority;
-- alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0' COMMENT 'task group id' AFTER `task_group_id`;drop PROCEDURE if EXISTS add_t_ds_task_definition_col_task_group_priority;delimiter d//CREATE PROCEDURE add_t_ds_task_definition_col_task_group_priority()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_definition' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='task_group_priority') THENalter table t_ds_task_definition add `task_group_priority` tinyint DEFAULT '0' COMMENT 'task group priority' AFTER `task_group_id`;END IF;END;d//delimiter ;CALL add_t_ds_task_definition_col_task_group_priority;DROP PROCEDURE add_t_ds_task_definition_col_task_group_priority;
-- ALTER TABLE `t_ds_user` ADD COLUMN `time_zone` varchar(32) DEFAULT NULL COMMENT 'time zone';drop PROCEDURE if EXISTS add_t_ds_user_col_time_zone;delimiter d//CREATE PROCEDURE add_t_ds_user_col_time_zone()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_user' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='time_zone') THENALTER TABLE `t_ds_user` ADD COLUMN `time_zone` varchar(32) DEFAULT NULL COMMENT 'time zone';END IF;END;d//delimiter ;CALL add_t_ds_user_col_time_zone;DROP PROCEDURE add_t_ds_user_col_time_zone;
-- ALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed';drop PROCEDURE if EXISTS add_t_ds_alert_col_warning_type;delimiter d//CREATE PROCEDURE add_t_ds_alert_col_warning_type()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='warning_type') THENALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed';END IF;END;d//delimiter ;CALL add_t_ds_alert_col_warning_type;DROP PROCEDURE add_t_ds_alert_col_warning_type;
-- ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE;drop PROCEDURE if EXISTS add_t_ds_alert_idx_idx_status;delimiter d//CREATE PROCEDURE add_t_ds_alert_idx_idx_status()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND INDEX_NAME='idx_status') THENALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE;END IF;END;d//delimiter ;CALL add_t_ds_alert_idx_idx_status;DROP PROCEDURE add_t_ds_alert_idx_idx_status;
-- ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code';-- ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code';-- ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id';-- ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type';drop PROCEDURE if EXISTS add_t_ds_alert_col_project_code;delimiter d//CREATE PROCEDURE add_t_ds_alert_col_project_code()BEGIN IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_alert' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='project_code') THENALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code';ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code';ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id';ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type';END IF;END;d//delimiter ;# CALL add_t_ds_alert_col_project_code;DROP PROCEDURE add_t_ds_alert_col_project_code;
-- t_ds_task_instancedrop PROCEDURE if EXISTS alter_t_ds_task_instance_col_log_path;delimiter d//CREATE PROCEDURE alter_t_ds_task_instance_col_log_path()BEGIN IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='t_ds_task_instance' AND TABLE_SCHEMA=(SELECT DATABASE()) AND COLUMN_NAME='log_path') THENALTER TABLE `t_ds_task_instance` MODIFY COLUMN `log_path` longtext DEFAULT NULL COMMENT 'task log path';END IF;END;d//delimiter ;CALL alter_t_ds_task_instance_col_log_path;DROP PROCEDURE alter_t_ds_task_instance_col_log_path;
---- Table structure for table `t_ds_dq_comparison_type`--DROP TABLE IF EXISTS `t_ds_dq_comparison_type`;CREATE TABLE `t_ds_dq_comparison_type` ( `id` int(11) NOT NULL AUTO_INCREMENT, `type` varchar(100) NOT NULL, `execute_sql` text DEFAULT NULL, `output_table` varchar(100) DEFAULT NULL, `name` varchar(100) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `is_inner_source` tinyint(1) DEFAULT '0', PRIMARY KEY (`id`))ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_dq_execute_result--DROP TABLE IF EXISTS `t_ds_dq_execute_result`;CREATE TABLE `t_ds_dq_execute_result` ( `id` int(11) NOT NULL AUTO_INCREMENT, `process_definition_id` int(11) DEFAULT NULL, `process_instance_id` int(11) DEFAULT NULL, `task_instance_id` int(11) DEFAULT NULL, `rule_type` int(11) DEFAULT NULL, `rule_name` varchar(255) DEFAULT NULL, `statistics_value` double DEFAULT NULL, `comparison_value` double DEFAULT NULL, `check_type` int(11) DEFAULT NULL, `threshold` double DEFAULT NULL, `operator` int(11) DEFAULT NULL, `failure_strategy` int(11) DEFAULT NULL, `state` int(11) DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `comparison_type` int(11) DEFAULT NULL, `error_output_path` text DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_dq_rule--DROP TABLE IF EXISTS `t_ds_dq_rule`;CREATE TABLE `t_ds_dq_rule` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(100) DEFAULT NULL, `type` int(11) DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_dq_rule_execute_sql--DROP TABLE IF EXISTS `t_ds_dq_rule_execute_sql`;CREATE TABLE `t_ds_dq_rule_execute_sql` ( `id` int(11) NOT NULL AUTO_INCREMENT, `index` int(11) DEFAULT NULL, `sql` text DEFAULT NULL, `table_alias` varchar(255) DEFAULT NULL, `type` int(11) DEFAULT NULL, `is_error_output_sql` tinyint(1) DEFAULT '0', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_dq_rule_input_entry--DROP TABLE IF EXISTS `t_ds_dq_rule_input_entry`;CREATE TABLE `t_ds_dq_rule_input_entry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `field` varchar(255) DEFAULT NULL, `type` varchar(255) DEFAULT NULL, `title` varchar(255) DEFAULT NULL, `value` varchar(255) DEFAULT NULL, `options` text DEFAULT NULL, `placeholder` varchar(255) DEFAULT NULL, `option_source_type` int(11) DEFAULT NULL, `value_type` int(11) DEFAULT NULL, `input_type` int(11) DEFAULT NULL, `is_show` tinyint(1) DEFAULT '1', `can_edit` tinyint(1) DEFAULT '1', `is_emit` tinyint(1) DEFAULT '0', `is_validate` tinyint(1) DEFAULT '1', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_dq_task_statistics_value--DROP TABLE IF EXISTS `t_ds_dq_task_statistics_value`;CREATE TABLE `t_ds_dq_task_statistics_value` ( `id` int(11) NOT NULL AUTO_INCREMENT, `process_definition_id` int(11) DEFAULT NULL, `task_instance_id` int(11) DEFAULT NULL, `rule_id` int(11) NOT NULL, `unique_code` varchar(255) NULL, `statistics_name` varchar(255) NULL, `statistics_value` double NULL, `data_time` datetime DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_relation_rule_execute_sql--DROP TABLE IF EXISTS `t_ds_relation_rule_execute_sql`;CREATE TABLE `t_ds_relation_rule_execute_sql` ( `id` int(11) NOT NULL AUTO_INCREMENT, `rule_id` int(11) DEFAULT NULL, `execute_sql_id` int(11) DEFAULT NULL, `create_time` datetime NULL, `update_time` datetime NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
---- Table structure for table t_ds_relation_rule_input_entry--DROP TABLE IF EXISTS `t_ds_relation_rule_input_entry`;CREATE TABLE `t_ds_relation_rule_input_entry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `rule_id` int(11) DEFAULT NULL, `rule_input_entry_id` int(11) DEFAULT NULL, `values_map` text DEFAULT NULL, `index` int(11) DEFAULT NULL, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ------------------------------ Table structure for t_ds_k8s-- ----------------------------DROP TABLE IF EXISTS `t_ds_k8s`;CREATE TABLE `t_ds_k8s` ( `id` int(11) NOT NULL AUTO_INCREMENT, `k8s_name` varchar(100) DEFAULT NULL, `k8s_config` text DEFAULT NULL, `create_time` datetime DEFAULT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`)) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
-- ------------------------------ Table structure for t_ds_k8s_namespace-- ----------------------------DROP TABLE IF EXISTS `t_ds_k8s_namespace`;CREATE TABLE `t_ds_k8s_namespace` ( `id` int(11) NOT NULL AUTO_INCREMENT, `limits_memory` int(11) DEFAULT NULL, `namespace` varchar(100) DEFAULT NULL, `online_job_num` int(11) DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `pod_replicas` int(11) DEFAULT NULL, `pod_request_cpu` decimal(14,3) DEFAULT NULL, `pod_request_memory` int(11) DEFAULT NULL, `limits_cpu` decimal(14,3) DEFAULT NULL, `k8s` varchar(100) DEFAULT NULL, `create_time` datetime DEFAULT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`), UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
-- ------------------------------ Table structure for t_ds_relation_namespace_user-- ----------------------------DROP TABLE IF EXISTS `t_ds_relation_namespace_user`;CREATE TABLE `t_ds_relation_namespace_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `user_id` int(11) NOT NULL COMMENT 'user id', `namespace_id` int(11) DEFAULT NULL COMMENT 'namespace id', `perm` int(11) DEFAULT '1' COMMENT 'limits of authority', `create_time` datetime DEFAULT NULL COMMENT 'create time', `update_time` datetime DEFAULT NULL COMMENT 'update time', PRIMARY KEY (`id`), UNIQUE KEY `namespace_user_unique` (`user_id`,`namespace_id`)) ENGINE=InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
-- ------------------------------ Table structure for t_ds_alert_send_status-- ----------------------------DROP TABLE IF EXISTS t_ds_alert_send_status;CREATE TABLE t_ds_alert_send_status ( `id` int(11) NOT NULL AUTO_INCREMENT, `alert_id` int(11) NOT NULL, `alert_plugin_instance_id` int(11) NOT NULL, `send_status` tinyint(4) DEFAULT '0', `log` text, `create_time` datetime DEFAULT NULL COMMENT 'create time', PRIMARY KEY (`id`), UNIQUE KEY `alert_send_status_unique` (`alert_id`,`alert_plugin_instance_id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ------------------------------ Table structure for t_ds_audit_log-- ----------------------------DROP TABLE IF EXISTS `t_ds_audit_log`;CREATE TABLE `t_ds_audit_log` ( `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT'key', `user_id` int(11) NOT NULL COMMENT 'user id', `resource_type` int(11) NOT NULL COMMENT 'resource type', `operation` int(11) NOT NULL COMMENT 'operation', `time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', `resource_id` int(11) NULL DEFAULT NULL COMMENT 'resource id', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET=utf8;
-- ------------------------------ Table structure for t_ds_task_group-- ----------------------------DROP TABLE IF EXISTS `t_ds_task_group`;CREATE TABLE `t_ds_task_group` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key', `name` varchar(100) DEFAULT NULL COMMENT 'task_group name', `description` varchar(200) DEFAULT NULL, `group_size` int (11) NOT NULL COMMENT'group size', `use_size` int (11) DEFAULT '0' COMMENT 'used size', `user_id` int(11) DEFAULT NULL COMMENT 'creator id', `project_code` bigint(20) DEFAULT 0 COMMENT 'project code', `status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(`id`)) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
-- ------------------------------ Table structure for t_ds_task_group_queue-- ----------------------------DROP TABLE IF EXISTS `t_ds_task_group_queue`;CREATE TABLE `t_ds_task_group_queue` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT'key', `task_id` int(11) DEFAULT NULL COMMENT 'taskintanceid', `task_name` varchar(100) DEFAULT NULL COMMENT 'TaskInstance name', `group_id` int(11) DEFAULT NULL COMMENT 'taskGroup id', `process_id` int(11) DEFAULT NULL COMMENT 'processInstace id', `priority` int(8) DEFAULT '0' COMMENT 'priority', `status` tinyint(4) DEFAULT '-1' COMMENT '-1: waiting 1: running 2: finished', `force_start` tinyint(4) DEFAULT '0' COMMENT 'is force start 0 NO ,1 YES', `in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY( `id` ))ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;

ALTER TABLE t_ds_process_instance ADD`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId';ALTER TABLE t_ds_process_instance ADD `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time';ALTER TABLE t_ds_process_definition ADD `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority';ALTER TABLE t_ds_process_definition_log ADD `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority';
复制代码

4. 安装 3.0.1 DS

具体安装参考官网,创建新库,这里不做介绍。


introduction (apache.org)

5. 把 DS 2.0 数据导入数据库

需要重点注意:


次脚本导入时:


  1. 定时调度 状态全设置为下线状态;定时表 中 t_ds_schedules release_state全设置为 0 了,为了防止和原调度冲突,测试完成后需要手动上线即可。

  2. worker 分组组信息需要重新设置,和原先保持一致即可。

  3. 告警实例管理需要重新添加,然后在告警组管理中重新绑定。


#项目表truncate table t_ds_project;
insert into t_ds_projectselect id, name, code, description, user_id, flag, create_time, update_timefrom dolphin201.t_ds_project;
#任务过程定义####唯一dagtruncate table t_ds_process_definition;
insert into t_ds_process_definitionselect id, code, name, version, description, project_code, release_state, user_id, global_params, flag, locations, warning_group_id, timeout, tenant_id, execution_type, create_time, update_time from dolphin201.t_ds_process_definition;

#任务过程定义日志truncate table t_ds_process_definition_log;
insert into t_ds_process_definition_logselect t1.id, t1.code, t1.name, t1.version, t1.description, t1.project_code, t1.release_state, t1.user_id, t1.global_params, t1.flag, t1.locations, t1.warning_group_id, t1.timeout, t1.tenant_id, t1.execution_type, t1.operator, t1.operate_time, t1.create_time, t1.update_time
from dolphin201.t_ds_process_definition_log t1 inner join (select code, project_code, max(version) version from dolphin201.t_ds_process_definition_log group by project_code, code) t2 on t1.code = t2.code and t1.project_code = t2.project_code and t1.version = t2.version ;
###进行任务相关性表truncate table t_ds_process_task_relation;
insert into t_ds_process_task_relationselect id, name, project_code, process_definition_code, process_definition_version, pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, create_time, update_timefrom dolphin201.t_ds_process_task_relation;
###进行任务相关性表日志truncate table t_ds_process_task_relation_log;
insert into t_ds_process_task_relation_logselect t3.id, t3.name, t3.project_code, t3.process_definition_code, t3.process_definition_version, t3.pre_task_code, t3.pre_task_version, t3.post_task_code, t3.post_task_version, t3.condition_type, t3.condition_params, t3.operator, t3.operate_time, t3.create_time, t3.update_time # from t_ds_process_task_relation;from dolphin201.t_ds_process_task_relation_log t3 inner join (select project_code, process_definition_code, post_task_code, max(process_definition_version) process_definition_version, max(post_task_version) AS post_task_version from dolphin201.t_ds_process_task_relation_log group by project_code, process_definition_code, post_task_code) t4 on t3.project_code = t4.project_code and t3.process_definition_code = t4.process_definition_code and t3.post_task_code = t4.post_task_code and t3.process_definition_version = t4.process_definition_version and t3.post_task_version = t4.post_task_version;
###任务定义truncate table t_ds_task_definition;
insert into t_ds_task_definitionselect id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, task_group_id, task_group_priority, create_time, update_timefrom dolphin201.t_ds_task_definition;
###任务定义日志truncate table t_ds_task_definition_log;
insert into t_ds_task_definition_logselect t5.id, t5.code, t5.name, t5.version, t5.description, t5.project_code, t5.user_id, t5.task_type, t5.task_params, t5.flag, t5.task_priority, t5.worker_group, t5.environment_code, t5.fail_retry_times, t5.fail_retry_interval, t5.timeout_flag, t5.timeout_notify_strategy, t5.timeout, t5.delay_time, t5.resource_ids, t5.operator, t5.task_group_id, t5.task_group_priority, t5.operate_time, t5.create_time, t5.update_timefrom dolphin201.t_ds_task_definition_log t5;
###定时计划truncate table t_ds_schedules;
insert into t_ds_schedulesselect id, process_definition_code, start_time, end_time, timezone_id, crontab, failure_strategy, user_id, '0' AS release_state, warning_type, warning_group_id, process_instance_priority, worker_group, environment_code, create_time, update_timefrom dolphin201.t_ds_schedules;
###告警组 truncate table t_ds_alertgroup;
insert into t_ds_alertgroup select id, alert_instance_ids, create_user_id, group_name, description, create_time, update_time from dolphin201.t_ds_alertgroup;
## ###告警组信息 ,这张表信息有冲突故不导入,后续自己配置下# truncate table t_ds_alert_plugin_instance;
# insert into t_ds_alert_plugin_instance# select id, plugin_define_id, plugin_instance_params, create_time, update_time, instance_name# from dolphin201.t_ds_alert_plugin_instance;
###用户truncate table t_ds_user;
insert into t_ds_userselect id, user_name, user_password, user_type, email, phone, tenant_id, create_time, update_time, queue, state, time_zonefrom dolphin201.t_ds_user;
###租户truncate table t_ds_tenant;
insert into t_ds_tenantselect id, tenant_code, description, queue_id, create_time, update_timefrom dolphin201.t_ds_tenant;

####资源中心truncate table t_ds_resources;
insert into t_ds_resourcesselect id, alias, file_name, description, user_id, type, size, create_time, update_time, pid, full_name, is_directoryfrom dolphin201.t_ds_resources;

####数据源truncate table t_ds_datasource;
insert into t_ds_datasourceselect id, name, note, type, user_id, connection_params, create_time, update_timefrom dolphin201.t_ds_datasource;

###环境情况truncate table t_ds_environment;
insert into t_ds_environmentselect id, code, name, config, description, operator, create_time, update_timefrom dolphin201.t_ds_environment;
复制代码

6. 遇到问题和解决方案

  • 任务无法打开



如直接使用官网任务脚本会报次错误,这保存可以直接看日志,是由于字段缺少导致的,上面 ddl 语句中已经添加相关字段语句。


  • 会出现少量任务无法保存问题,主要原因主键冲突造成的


最简单的解决方案:重新复制一个任务,在新任务中进行更改保存。


  • 用户授权的任务需要进行重新授权。


转载自缤纷的世界


原文链接:https://blog.csdn.net/m0_46571744/article/details/128175021

用户头像

白鲸开源

关注

一家开源原生的DataOps商业公司。 2022-03-18 加入

致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

评论

发布
暂无评论
升级遇到坑?一文带你搞定DolphinScheduler 2.0到3.0升级_开源_白鲸开源_InfoQ写作社区