写点什么

从启动到关闭 | SeaTunnel2.1.1 源码解析

  • 2022 年 10 月 08 日
    广东
  • 本文字数:6707 字

    阅读完需:约 22 分钟

点亮 ⭐️ Star · 照亮开源之路


GitHub:https://github.com/apache/incubator-seatunnel




目录


本文转载自 Adobee Chen 的博客-CSDN 博客,看看是否有你感兴趣的吧!


如有出错,请多指正。


一、启动脚本解析

二、源码解析

01 入口

02 execute()核心方法

  1. 其中 BaseSource、BaseTransform、BaseSink 都是接口、都实现 Plugin 接口。他们的实现类就是对应的插件类型

  2. execute()方法向下走,创建一个执行环境。

  3. 调用 plugin.prepare(env)

  4. 最后启动 execution.start(sources, transforms, sinks);执行 flink 代码程序

  5. 最后关闭

一、启动脚本解析

在 /bin/start-seatunnel-flink.sh


#!/bin/bashfunction usage() {  echo "Usage: start-seatunnel-flink.sh [options]"  echo "  options:"  echo "    --config, -c FILE_PATH        Config file"  echo "    --variable, -i PROP=VALUE     Variable substitution, such as -i city=beijing, or -i date=20190318"  echo "    --check, -t                   Check config"  echo "    --help, -h                    Show this help message"} if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then  usage  exit 0fi is_exist() {    if [ -z $1 ]; then      usage      exit -1    fi} PARAMS=""while (( "$#" )); do  case "$1" in    -c|--config)      CONFIG_FILE=$2      is_exist ${CONFIG_FILE}      shift 2      ;;     -i|--variable)      variable=$2      is_exist ${variable}      java_property_value="-D${variable}"      variables_substitution="${java_property_value} ${variables_substitution}"      shift 2      ;;     *) # preserve positional arguments      PARAMS="$PARAMS $1"      shift      ;;   esacdone if [ -z ${CONFIG_FILE} ]; then  echo "Error: The following option is required: [-c | --config]"  usage  exit -1fi # set positional arguments in their proper placeeval set -- "$PARAMS" BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"APP_DIR=$(dirname ${BIN_DIR})CONF_DIR=${APP_DIR}/configPLUGINS_DIR=${APP_DIR}/libDEFAULT_CONFIG=${CONF_DIR}/application.confCONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG} assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar) if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then    source ${CONF_DIR}/seatunnel-env.shfi string_trim() {    echo $1 | awk '{$1=$1;print}'} export JVM_ARGS=$(string_trim "${variables_substitution}")  exec ${FLINK_HOME}/bin/flink run \    ${PARAMS} \    -c org.apache.seatunnel.SeatunnelFlink \    ${assemblyJarName} --config ${CONFIG_FILE}
复制代码


其中: 启动脚本能接收的 --config --variable --check(还不支持) --help


只要不是 config、variable 参数就放到 PARAMS 参数里,最后执行 flink 执行命令,PARAMS 当作 flink 参数执行。


org.apache.seatunnel.SeatunnelFlink 这个类就是主入口

二、源码解析

01 入口

public class SeatunnelFlink {     public static void main(String[] args) throws Exception {        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);        Seatunnel.run(flinkArgs);    } }
复制代码


FlinkCommandArgs 中进行命令行参数解析


  public static FlinkCommandArgs parseFlinkArgs(String[] args) {        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();        JCommander.newBuilder()            .addObject(flinkCommandArgs)            .build()            .parse(args);        return flinkCommandArgs;    }
复制代码


进入到 Seatunnel.run(flinkArgs);


  public static FlinkCommandArgs parseFlinkArgs(String[] args) {        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();        JCommander.newBuilder()            .addObject(flinkCommandArgs)            .build()            .parse(args);        return flinkCommandArgs;    }
复制代码


进入到 CommandFactory.createCommand(commandArgs)


根据不同的类型选择 Command


我们看的是 flinkCommand


    public static extends CommandArgs> Command createCommand(T commandArgs) {        switch (commandArgs.getEngineType()) {            case FLINK:                return (Command) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);            case SPARK:                return (Command) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);            default:                throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));        }    }
复制代码


进入到 buildCommand


根据是否检查 config 进入到不同的实现类


    public Command buildCommand(FlinkCommandArgs commandArgs) {        return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();    }
复制代码


FlinkConfValidateCommand、


FlinkTaskExecuteCommand


两个类都实现了 Command 类


并且都只有一个 execute()方法


public class FlinkConfValidateCommand implements Commandpublic class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<flinkcommandargs, FlinkEnvironment>
复制代码


**在 SeaTunnel.run(flinkArgs)**进入


command.execute(commandArgs);


我们先看 FlinkTaskExecuteCommand


类中的 execute 方法

02 execute()核心方法

public void execute(FlinkCommandArgs flinkCommandArgs) {        //flink        EngineType engine = flinkCommandArgs.getEngineType();        // --config        String configFile = flinkCommandArgs.getConfigFile();        //将String变成Config类        Config config = new ConfigBuilder<>(configFile, engine).getConfig();        //解析执行上下文        ExecutionContext executionContext = new ExecutionContext<>(config, engine);        //解析 sources模块        List<basesource> sources = executionContext.getSources();</basesource        //解析 tansform模块        List<basetransform> transforms = executionContext.getTransforms();</basetransform        //解析 sink模块        List<basesink> sinks = executionContext.getSinks();</basesink         baseCheckConfig(sinks, transforms, sinks);        showAsciiLogo();         try (Execution<basesource,</basesource                BaseTransform,                BaseSink,                FlinkEnvironment> execution = new ExecutionFactory<>(executionContext).createExecution()) {            //准备            prepare(executionContext.getEnvironment(), sources, transforms, sinks);            //启动            execution.start(sources, transforms, sinks);            //关闭            close(sources, transforms, sinks);        } catch (Exception e) {            throw new RuntimeException("Execute Flink task error", e);        }    }
复制代码


1.其中 BaseSource、BaseTransform、BaseSink 都是接口、都实现 Plugin 接口。他们的实现类就是对应的插件类型








如果我们的 source、sink 是 kafka 的话那么对应的就是 source 就是 KafkaTableStream、Sink 就是 KafkaSink


2. execute()方法向下走,创建一个执行环境。




进入 ExecutionFactory 种的 createExecution()


    public Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT> createExecution() {</basesource        Execution execution = null;        switch (executionContext.getEngine()) {            case SPARK:                SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment();                switch (executionContext.getJobMode()) {                    case STREAMING:                        execution = new SparkStreamingExecution(sparkEnvironment);                        break;                    case STRUCTURED_STREAMING:                        execution = new StructuredStreamingExecution(sparkEnvironment);                        break;                    default:                        execution = new SparkBatchExecution(sparkEnvironment);                }                break;            case FLINK:                FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment();                switch (executionContext.getJobMode()) {                    case STREAMING:                        execution = new FlinkStreamExecution(flinkEnvironment);                        break;                    default:                        execution = new FlinkBatchExecution(flinkEnvironment);                }                break;            default:                throw new IllegalArgumentException("No suitable engine");        }        LOGGER.info("current execution is [{}]", execution.getClass().getName());        return (Execution<basesource, BaseTransform, BaseSink, ENVIRONMENT>) execution;</basesource    }
复制代码


进入到 FlinkStreamExecution 中,可以看到最终是创建 flink 执行环境。


 private final FlinkEnvironment flinkEnvironment;     public FlinkStreamExecution(FlinkEnvironment streamEnvironment) {        this.flinkEnvironment = streamEnvironment;    }
复制代码


3. 调用 plugin.prepare(env)


    protected final void prepare(E env, List extends Plugin>... plugins) {        for (List extends Plugin> pluginList : plugins) {            pluginList.forEach(plugin -> plugin.prepare(env));        }    }
复制代码


例如 kafka->kafka


KafkaTableStream prepare


    public void prepare(FlinkEnvironment env) {        topic = config.getString(TOPICS);        PropertiesUtil.setProperties(config, kafkaParams, consumerPrefix, false);        tableName = config.getString(RESULT_TABLE_NAME);        if (config.hasPath(ROWTIME_FIELD)) {            rowTimeField = config.getString(ROWTIME_FIELD);            if (config.hasPath(WATERMARK_VAL)) {                watermark = config.getLong(WATERMARK_VAL);            }        }        String schemaContent = config.getString(SCHEMA);        format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());        schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);    }
复制代码


KafkaSink prepare


   public void prepare(FlinkEnvironment env) {        topic = config.getString("topics");        if (config.hasPath("semantic")) {            semantic = config.getString("semantic");        }        String producerPrefix = "producer.";        PropertiesUtil.setProperties(config, kafkaParams, producerPrefix, false);        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");        kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");    }
复制代码


4.启动 execution.start


(sources, transforms, sinks);


通过步骤 2.已经知道 execution 是根据不同引擎创建不同的执行环境,kafka 是 FlinkStreamExecution。那么就在 FlinkStreamExecution 中找到 start()方法


5.执行 flink 代码程序


其中 sorce.getDate 在 KafkaTableStream 中的 getDate 方法,sink 在 KafkaSink 中的 outputStream 方法


 public void start(List sources, List transforms, List sinks) throws Exception {        List<datastream> data = new ArrayList<>();</datastream         for (FlinkStreamSource source : sources) {            DataStream dataStream = source.getData(flinkEnvironment);            data.add(dataStream);            registerResultTable(source, dataStream);        }         DataStream input = data.get(0);         for (FlinkStreamTransform transform : transforms) {            DataStream stream = fromSourceTable(transform.getConfig()).orElse(input);            input = transform.processStream(flinkEnvironment, stream);            registerResultTable(transform, input);            transform.registerFunction(flinkEnvironment);        }         for (FlinkStreamSink sink : sinks) {            DataStream stream = fromSourceTable(sink.getConfig()).orElse(input);            sink.outputStream(flinkEnvironment, stream);        }        try {            LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());            flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());        } catch (Exception e) {            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());            throw e;        }    }
复制代码


6.最后关闭


    protected final void close(List extends Plugin>... plugins) {        PluginClosedException exceptionHolder = null;        for (List extends Plugin> pluginList : plugins) {            for (Plugin plugin : pluginList) {                try (Plugin> closed = plugin) {                    // ignore                } catch (Exception e) {                    exceptionHolder = exceptionHolder == null ?                            new PluginClosedException("below plugins closed error:") : exceptionHolder;                    exceptionHolder.addSuppressed(new PluginClosedException(                            String.format("plugin %s closed error", plugin.getClass()), e));                }            }        }        if (exceptionHolder != null) {            throw exceptionHolder;        }    }
复制代码

Apache SeaTunnel

来,和社区一同成长!


Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线 &实时)同步和转化的数据集成平台。


仓库地址: https://github.com/apache/incubator-seatunnel


网址:https://seatunnel.apache.org/


**Proposal:**https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal


**Apache SeaTunnel(Incubating) 2.1.0 下载地址:**https://seatunnel.apache.org/download


衷心欢迎更多人加入!


我们相信,在**「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」**(精英管理)、以及「**多样性与共识决策」**等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!


我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!


提交问题和建议:https://github.com/apache/incubator-seatunnel/issues


贡献代码:https://github.com/apache/incubator-seatunnel/pulls


订阅社区开发邮件列表 : dev-subscribe@seatunnel.apache.org


**开发邮件列表:**dev@seatunnel.apache.org


加入 Slack:https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ


关注 Twitter: https://twitter.com/ASFSeaTunnel


< 🐬🐬 >

用户头像

还未添加个人签名 2022.03.07 加入

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台。

评论

发布
暂无评论
从启动到关闭 | SeaTunnel2.1.1源码解析_Apache SeaTunnel_InfoQ写作社区