以前使用 elastic-job 都是使用 *.xml 配置文件的形式,但是现在的 springboot 项目都倾向于使用 Java Bean 的形式,所以本文介绍了如何使用 Java Bean 的形式使用 elastic-job。
1. 引入 maven 依赖
首先是 elastic-job maven 依赖关于 springboot 的依赖这里省略掉了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.10</version> </dependency>
|
2. ElasticJobConfig 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
@Configuration public class ElasticJobConfig {
@Resource private ElasticJobProperties properties;
@Bean public ZookeeperConfiguration zkConfig() { ZookeeperConfiguration config = new ZookeeperConfiguration(properties.getServerlists(), properties.getNamespace()); config.setBaseSleepTimeMilliseconds(properties.getBaseSleepTimeMilliseconds()); config.setMaxRetries(properties.getMaxRetries()); config.setMaxSleepTimeMilliseconds(properties.getMaxSleepTimeMilliseconds()); config.setConnectionTimeoutMilliseconds(properties.getConnectionTimeoutMilliseconds()); return config; }
@Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) { return new ZookeeperRegistryCenter(config); } }
|
ElasticJobProperties 为自定义配置属性类,用于映射 zookeeper 配置信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
@Data @Configuration @ConfigurationProperties(prefix = "elasticjob.zookeeper") public class ElasticJobProperties {
private String serverlists;
private String namespace;
private int baseSleepTimeMilliseconds = 1000;
private int maxSleepTimeMilliseconds = 3000;
private int maxRetries = 3;
private int sessionTimeoutMilliseconds;
private int connectionTimeoutMilliseconds;
private String digest;
}
|
application.yaml
1 2 3 4 5 6 7 8 9
| elasticjob: zookeeper: serverlists: 192.168.160.128:2181 namespace: springboot_elasticjob connectionTimeoutMilliseconds: 50000 sessionTimeoutMilliseconds: 50000 maxRetries: 3 maxSleepTimeMilliseconds: 50000 baseSleepTimeMilliseconds: 50000
|
3. 使用自定义注解来配置定时任务
我们自定义注解 TaskJob 来配置任务信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
@Component @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface TaskJob {
String cron() default "";
String jobName() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
String dataSource() default "";
String description() default "";
boolean disabled() default false;
boolean overwrite() default true;
boolean failover() default true;
boolean monitorExecution() default true; }
|
4. 注册定时任务
通过实现 Spring 的 ApplicationContextAware 来获取 ApplicationContext 上下文信息,获取注册到Spring中的Bean。通过实现 InitializingBean 在给Bean注册到 Spring 之后,利用 ApplicationContext 获取使用 @TaskJob 注解的 Bean,并注册到 elastic-job 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
@Component @Slf4j public class RegisterJobCenter implements ApplicationContextAware, InitializingBean {
private ApplicationContext applicationContext;
@Resource private ZookeeperRegistryCenter regCenter;
@Override public void afterPropertiesSet() { Map<String, Object> registerJobs = applicationContext.getBeansWithAnnotation(TaskJob.class); for (Map.Entry<String, Object> entry : registerJobs.entrySet()) { try { Object object = entry.getValue(); if (!(object instanceof ElasticJob)) { throw new ClassCastException("[" + object.getClass().getName() + "] The class type is not com.dangdang.ddframe.job.api.ElasticJob"); } TaskJob task = AnnotationUtils.findAnnotation(object.getClass(), TaskJob.class); assert task != null; SpringJobScheduler springJobScheduler = new SpringJobScheduler((ElasticJob) object, regCenter, getJobConfiguration(task, object)); springJobScheduler.init(); log.info("JobName: {} Register Successfully .", task.jobName()); } catch (Exception e) { log.error(e.getMessage(), e); } } }
private LiteJobConfiguration getJobConfiguration(TaskJob taskJob, Object object) { Assert.notNull(taskJob.jobName(), "The jobName cannot be null !"); Assert.notNull(taskJob.cron(), "The cron cannot be null !"); Assert.notNull(taskJob.description(), "The description cannot be null !"); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration .newBuilder(taskJob.jobName(), taskJob.cron(), taskJob.shardingTotalCount()) .shardingItemParameters( StringUtils.isEmpty(taskJob.shardingItemParameters()) ? null : taskJob.shardingItemParameters()) .description(taskJob.description()).failover(taskJob.failover()) .jobParameter(StringUtils.isEmpty(taskJob.jobParameter()) ? null : taskJob.jobParameter()).build(), object.getClass().getName()); return LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(taskJob.overwrite()) .monitorExecution(true).build(); }
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }
}
|
5. 创建定时任务
到此,基于 Spring Bean 的基本结构就完成了,接下来就是使用其实现我们的定时任务了。示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component @Slf4j @TaskJob(jobName = "my-monitored-job", cron = "/20 * * * * ? *", description = "自定义 actuator job 监控", shardingItemParameters = "0=0-1-2-3,1=4-5-6-7,2=8-9-10-11,3=12-13-14-15", shardingTotalCount = 4 ) public class MyJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info("分片数信息:" + shardingContext.getShardingItem()); log.info("分片参数:" + shardingContext.getShardingParameter()); } }
|