写点什么

Elastic Job 简单使用

用户头像
赵镇
关注
发布于: 3 小时前

ElasticJob 单点使用

任务类


public class BackupJob implements SimpleJob {    public void execute(ShardingContext shardingContext) {            String selectSql = "select * from resume where state='未归档' limit 1";        List<Map<String, Object>> list =                JdbcUtil.executeQuery(selectSql);        if(list == null || list.size() == 0) {            return;        }        Map<String, Object> stringObjectMap = list.get(0);        long id = (long) stringObjectMap.get("id");        String name = (String) stringObjectMap.get("name");        String education = (String)                stringObjectMap.get("education");// 打印出这条记录        System.out.println("======>>>id:" + id + " name:" +                name + " education:" + education);// 更改状态        String updateSql = "update resume set state='已归档' where id=?";        JdbcUtil.executeUpdate(updateSql,id);// 归档这条记录        String insertSql = "insert into resume_bak select * from resume where id=?";        JdbcUtil.executeUpdate(insertSql,id);    }
}
复制代码


主要的任务就是将未归档的数据整理到归档的表中,表结构一样执行类


public class JobMain {    public static void main(String[] args) {        //初始化注册中心        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");        CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);        coordinatorRegistryCenter.init();        //创建任务        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",1).build();        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());        //执行任务        new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();    }}
复制代码


这种情况下,启动两个任务类只会有一个在执行任务。但是当一个任务停止之后,另一个任务会立马开始接着执行任务,相当于其他中间件中的主备切换。但是这里的主备切换是依托 zk 进行的

多节点分布式任务调度

修改执行类的代码为


public class JobMain {    public static void main(String[] args) {        //初始化注册中心        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");        CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);        coordinatorRegistryCenter.init();        //创建任务        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",3).build();        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());        //执行任务        new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build()).init();    }}
复制代码


除了修改分片数还需要在执行任务的类中执行相应的分片参数,另外需要注意的是仅仅增加分票策略是不生效的,必须同时配置分片参数。另外如果使用同一个 job 来做执行的话。需要增加 overwrite 为 true 执行器代码为


  public class BackupJob implements SimpleJob {    public void execute(ShardingContext shardingContext) {        int shardingitem = shardingContext.getShardingItem();        System.out.println("当前分片"+shardingitem);        String shardingParamter = shardingContext.getShardingParameter();        System.out.println(shardingParamter);            String selectSql = "select * from resume where state='未归档' and name='"+shardingParamter+"' limit 1";        List<Map<String, Object>> list =                JdbcUtil.executeQuery(selectSql);        if(list == null || list.size() == 0) {            return;        }        Map<String, Object> stringObjectMap = list.get(0);        long id = (long) stringObjectMap.get("id");        String name = (String) stringObjectMap.get("name");        String education = (String)                stringObjectMap.get("education");// 打印出这条记录        System.out.println("======>>>id:" + id + " name:" +                name + " education:" + education);// 更改状态        String updateSql = "update resume set state='已归档' where id=?";        JdbcUtil.executeUpdate(updateSql,id);// 归档这条记录        String insertSql = "insert into resume_bak select * from resume where id=?";        JdbcUtil.executeUpdate(insertSql,id);    }
}
复制代码


测试结果为,当执行器未全部启动时,所有分片在一个执行器上运行。当三个执行器都启动时,会平均分配到三个执行器。


demo 代码地址为https://github.com/zhendiao/deme-code/tree/main/schedule_job

发布于: 3 小时前阅读数: 4
用户头像

赵镇

关注

还未添加个人签名 2017.12.20 加入

还未添加个人简介

评论

发布
暂无评论
Elastic Job简单使用