写点什么

ScheduleMaster 分布式任务调度中心基本使用和原理

作者:神秘码农
  • 2022 年 4 月 24 日
  • 本文字数:6536 字

    阅读完需:约 21 分钟

@[toc]

一、ScheduleMaster 核心概念

  • 概念统一执多个系统的任务【回收超时订单,清理垃圾信息 】,如图:

二、ScheduleMaster 应用场景

  • 场景主要应用在微服务系统中。如图:

三、ScheduleMaster 项目落地

  • 工具

  • ScheduleMaster 网盘下载地址:链接:https://pan.baidu.com/s/1LcCHS__zRRJv_HHwsza3cg 提取码:eyup

  • Demo 项目

  • 步骤

  • 运行 ScheduleMaster

  • 启动命令


      #进入Hos.ScheduleMaster.Web\bin\Release\netcoreapp3.1\publish目录中启动      #备注:默认数据库为sqlserver,如果其他数据库需要在appsettings.json中修改数据库类型和数据库连接地址        dotnet Hos.ScheduleMaster.Web.dll      #进入Hos.ScheduleMaster.QuartzHost\bin\Release\netcoreapp3.1\publish 目录中启动        dotnet Hos.ScheduleMaster.QuartzHost.dll --urls http://*:30001
复制代码


  运行结果如图:  ![在这里插入图片描述](https://img-blog.csdnimg.cn/716d83fd5f2743e0af3fa46a5a7f302d.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQEDnpZ7lhpzlhpnku6PnoIE=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center)
复制代码



  浏览器运行结果:http://localhost:30000,用户名:admin 密码:111111  如图:  ![在这里插入图片描述](https://img-blog.csdnimg.cn/23484427f22b4e47a67282876800bdd4.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQEDnpZ7lhpzlhpnku6PnoIE=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center)
复制代码



  • Demo 项目

  • 新建一个订单回收 API 接口


        private readonly ILogger<HomeController> logger;        public HomeController(ILogger<HomeController> _logger) {            logger = _logger;        }         /// <summary>        /// 超时订单回收接口        /// </summary>        /// <returns></returns>        [HttpPost]        public IActionResult OrderCancel()         {            logger.LogInformation("回收订单任务");            return Ok("回收订单任务");        }
复制代码


  • 新建任务

  • 点击任务列表如图:


  • 点击创建任务按钮->选择 “基础信息配置”如图:


  • 选择“元数据配置”,在点击“保存”即可,如图:


  • 运行结果(每隔 5 秒进行调用订单回收接口),如图:


四、ScheduleMaster 运行原理

  • 原理

  • Master 概念主节点:协调 Hos.ScheduleMaster.Web

  • Node 概念工作节点:执行业务 Hos.ScheduleMaster.QuartzHost

  • 数据库用来存储任务信息。

  • 全局架构如图:


  • 执行过程客户端---->Hos.ScheduleMaster.Web(master 节点)---->Hos.ScheduleMaster.QuartzHost(工作节点)---->订单回收接口

  • master 节点的核心

  • 选择工作节点

  • 指定工作节点,执行任务

  • 工作节点的核心

  • 取出任务配置信息

  • 使用 Quartz 根据配置运行任务

  • 使用 HttpClient 调用接口

  • 使用反射调用程序集方法

五、ScheduleMaster 程序集任务

  • 工具

  • 控制台 Demo 项目

  • ScheduleMaster

  • 步骤

  • 新建一个 Console 项目


      //项目安装        ScheduleMaster
复制代码


新建OrderServer类继承
复制代码


       public  class OrderServer:TaskBase       {           public override void Run(TaskContext context)         {            //超时订单逻辑            context.WriteLog("订单开始回收.......成功");         }       }
复制代码


- 控制台项目打包  项目编译后,进入 bin文件件,将Hos.ScheduleMaster.Base.dll除外的所有的文件打包成压缩文件,如图:  ![在这里插入图片描述](https://img-blog.csdnimg.cn/e5845a029d364a71b773bf7e533be9f2.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQEDnpZ7lhpzlhpnku6PnoIE=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center)
复制代码


  • ScheduleMaster 配置

  • 点击“任务列表”目录,点击“创建任务”按钮,任务类型选择“程序集任务”,如图:

六、ScheduleMaster API 接口任务(使用代码自定义创建任务)

  • 实现代码如下:


        /// <summary>        /// 通过代码创建任务        /// </summary>        /// <returns></returns>        [HttpPost("CreateTask")]        public async Task<IActionResult> CreateTask()        {            HttpClient client = new HttpClient();            //登录 设置用户名和密码            client.DefaultRequestHeaders.Add("ms_auth_user", "admin");            client.DefaultRequestHeaders.Add("ms_auth_secret", MD5($"admin{MD5("111111")}admin"));            List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();            args.Add(new KeyValuePair<string, string>("MetaType", "2"));            args.Add(new KeyValuePair<string, string>("RunLoop", "true"));            args.Add(new KeyValuePair<string, string>("CronExpression", "0/5 * * * * ?"));            args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));            args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));            args.Add(new KeyValuePair<string, string>("Title", "order_cancel_http_api"));            args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "http://localhost:5000/Order"));            args.Add(new KeyValuePair<string, string>("HttpMethod", "POST"));            args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json"));            args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]"));            args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }"));            HttpContent reqContent = new FormUrlEncodedContent(args);            var response = await client.PostAsync("http://localhost:30000/api/Task/Create", reqContent);            var content = await response.Content.ReadAsStringAsync();            logger.LogInformation(content);             return Ok("创建成功");         }         /// <summary>        /// MD5加密        /// </summary>        /// <param name="prestr"></param>        /// <returns></returns>        public static string MD5(string prestr)        {            StringBuilder sb = new StringBuilder(32);            MD5 md5 = new MD5CryptoServiceProvider();            byte[] t = md5.ComputeHash(Encoding.GetEncoding("UTF-8").GetBytes(prestr));            for (int i = 0; i < t.Length; i++)            {                sb.Append(t[i].ToString("x").PadLeft(2, '0'));            }            return sb.ToString();        } 
复制代码


  • 官方 API

  • API Server 对接流程对于开放接口来说,使用签名验证已经是必不可少的一环,这是保证系统安全性的重要手段。看一下核心对接流程:

  • 在控制台中创建好专用的 API 对接用户账号。

  • 使用对接账号的用户名设置为 http header 中的ms_auth_user值。

  • 使用经过哈希运算过的秘钥设置为 http header 中的ms_auth_secret值,计算规则:按{用户名}{hash(密码)}{用户名}的格式拼接得到字符串 str,然后再对 str 做一次 hash 运算即得到最终秘钥,hash 函数是小写的 32 位 MD5 算法。

  • 使用 form 格式发起 http 调用,如果非法用户会返回 401-Unauthorized。代码示例:


        HttpClient client = new HttpClient();        client.DefaultRequestHeaders.Add("ms_auth_user", "admin");        client.DefaultRequestHeaders.Add("ms_auth_secret", SecurityHelper.MD5($"admin{SecurityHelper.MD5("111111")}}admin"));
复制代码


签名验证这块设计的比较简单,具体源码逻辑可以参考Hos.ScheduleMaster.Web.Filters.AccessControlFilter


  • API 返回格式

  • 所有接口采用统一的返回格式,字段如下:


  • 创建程序集任务

  • 接口地址:http://yourip:30000/api/task/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务 id

  • 参数列表:


  • ScheduleParam:


  • 代码示例:


        HttpClient client = new HttpClient();        List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();        args.Add(new KeyValuePair<string, string>("MetaType", "1"));        args.Add(new KeyValuePair<string, string>("RunLoop", "true"));        args.Add(new KeyValuePair<string, string>("CronExpression", "33 0/8 * * * ?"));        args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));        args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));        args.Add(new KeyValuePair<string, string>("Title", "程序集接口测试任务"));        args.Add(new KeyValuePair<string, string>("AssemblyName", "Hos.ScheduleMaster.Demo"));        args.Add(new KeyValuePair<string, string>("ClassName", "Hos.ScheduleMaster.Demo.Simple"));        args.Add(new KeyValuePair<string, string>("CustomParamsJson", "[{\"ParamKey\":\"k1\",\"ParamValue\":\"1111\",\"ParamRemark\":\"r1\"},{\"ParamKey\":\"k2\",\"ParamValue\":\"2222\",\"ParamRemark\":\"r2\"}]"));        args.Add(new KeyValuePair<string, string>("Keepers", "1"));        args.Add(new KeyValuePair<string, string>("Keepers", "2"));        //args.Add(new KeyValuePair<string, string>("Nexts", ""));        //args.Add(new KeyValuePair<string, string>("Executors", ""));        HttpContent reqContent = new FormUrlEncodedContent(args);        var response = await client.PostAsync("http://localhost:30000/api/Task/Create", reqContent);        var content = await response.Content.ReadAsStringAsync();        Debug.WriteLine(content);
复制代码


 要提一下的是,使用API创建任务的方式不支持上传程序包,所以在任务需要启动时要确保程序包已通过其他方式上传,否则会启动失败。
复制代码


  • 创建 HTTP 任务

  • 接口地址:http://yourip:30000/api/task/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务 id

  • 参数列表:


  • 代码示例:


        HttpClient client = new HttpClient();        List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();        args.Add(new KeyValuePair<string, string>("MetaType", "2"));        args.Add(new KeyValuePair<string, string>("RunLoop", "true"));        args.Add(new KeyValuePair<string, string>("CronExpression", "22 0/8 * * * ?"));        args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created"));        args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss")));        args.Add(new KeyValuePair<string, string>("Title", "Http接口测试任务"));        args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "http://localhost:56655/api/1.0/value/jsonpost"));        args.Add(new KeyValuePair<string, string>("HttpMethod", "POST"));        args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json"));        args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]"));        args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }"));        HttpContent reqContent = new FormUrlEncodedContent(args);        var response = await client.PostAsync("http://localhost:30000/api/Task/Create", reqContent);        var content = await response.Content.ReadAsStringAsync();        Debug.WriteLine(content);
复制代码


  • 创建延时任务

  • 接口地址:http://yourip:30000/api/delaytask/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务 id

  • 参数列表:


  • 代码示例:


        for (int i = 0; i < 5; i++)        {            int rndNum = new Random().Next(20, 500);            List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>();            args.Add(new KeyValuePair<string, string>("SourceApp", "TestApp"));            args.Add(new KeyValuePair<string, string>("Topic", "TestApp.Trade.TimeoutCancel"));            args.Add(new KeyValuePair<string, string>("ContentKey", i.ToString()));            args.Add(new KeyValuePair<string, string>("DelayTimeSpan", rndNum.ToString()));            args.Add(new KeyValuePair<string, string>("DelayAbsoluteTime", DateTime.Now.AddSeconds(rndNum).ToString("yyyy-MM-dd HH:mm:ss")));            args.Add(new KeyValuePair<string, string>("NotifyUrl", "http://localhost:56655/api/1.0/value/delaypost"));            args.Add(new KeyValuePair<string, string>("NotifyDataType", "application/json"));            args.Add(new KeyValuePair<string, string>("NotifyBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }"));            HttpContent reqContent = new FormUrlEncodedContent(args);            var response = await client.PostAsync("http://localhost:30000/api/DelayTask/Create", reqContent);            var content = await response.Content.ReadAsStringAsync();            Debug.WriteLine(content);        }
复制代码

七、ScheduleMaster 集群和集群原理

  • 主要针对工作节点使用集群

  • 步骤

  • 进入工作节点项目根目录下【ScheduleMasterCore\Hos.ScheduleMaster.QuartzHost\bin\Release\netcoreapp3.1\publish】,更改配置文件【appsettings.json】, 因为要启动多个节点,只需要更改节点名称和端口号即可【切记:节点名称不能够重复】,如下:


          "NodeSetting": {            "IdentityName": "worker1",  //节点名称            "Role": "worker",            "Protocol": "http",            "IP": "localhost",            "Port": 30001,  //端口号            "Priority": 1,            "MaxConcurrency": 20          }
复制代码


- 启动
复制代码


        #进入Hos.ScheduleMaster.QuartzHost\bin\Release\netcoreapp3.1\publish 目录中启动        dotnet Hos.ScheduleMaster.QuartzHost.dll --urls http://*:30002
复制代码


- 运行结果,如图:  ![在这里插入图片描述](https://img-blog.csdnimg.cn/de6e5b35737d464faffef63b068f00a5.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQEDnpZ7lhpzlhpnku6PnoIE=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center)
复制代码


  • 场景

  • 如果当前的工作节点宕机了,会不会转移到其他的工作节点上?需要更改任务的执行节点,选择多个执行节点,如图:


  • 有两个工作节点,30001 和 30002,其中把 30001 宕机了[有个心跳检测的 API 接口,定时任务不断的检测当前的节点是否可用],会直接转移到其他节点执行任务,运行结果如下:


发布于: 刚刚阅读数: 4
用户头像

神秘码农

关注

还未添加个人签名 2022.03.14 加入

好好学习,天天向上!

评论

发布
暂无评论
ScheduleMaster分布式任务调度中心基本使用和原理_神秘码农_InfoQ写作社区