0%

分布式定时任务 elastic-job 自定义注解使用

以前使用 elastic-job 都是使用 *.xml 配置文件的形式,但是现在的 springboot 项目都倾向于使用 Java Bean 的形式,所以本文介绍了如何使用 Java Bean 的形式使用 elastic-job。

1. 引入 maven 依赖

首先是 elastic-job maven 依赖关于 springboot 的依赖这里省略掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<!-- elastic job 依赖 zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.10</version>
</dependency>

2. ElasticJobConfig 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* @author admin
*/
@Configuration
public class ElasticJobConfig {

@Resource
private ElasticJobProperties properties;

/**
* zookeeper config
* @return ZookeeperConfiguration
*/
@Bean
public ZookeeperConfiguration zkConfig() {
ZookeeperConfiguration config =
new ZookeeperConfiguration(properties.getServerlists(), properties.getNamespace());
config.setBaseSleepTimeMilliseconds(properties.getBaseSleepTimeMilliseconds());
config.setMaxRetries(properties.getMaxRetries());
config.setMaxSleepTimeMilliseconds(properties.getMaxSleepTimeMilliseconds());
config.setConnectionTimeoutMilliseconds(properties.getConnectionTimeoutMilliseconds());
return config;
}

/**
* 初始化注册
* @param config ZookeeperConfiguration
* @return ZookeeperRegistryCenter
*/
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(ZookeeperConfiguration config) {
return new ZookeeperRegistryCenter(config);
}
}

ElasticJobProperties 为自定义配置属性类,用于映射 zookeeper 配置信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
* @author admin
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
public class ElasticJobProperties {
/**
* 注册中心 eg:localhost:2181
*/
private String serverlists;
/**
* 名称空间
*/
private String namespace;
/**
* 等待重试的间隔时间的初始值.
* 单位毫秒.
*/
private int baseSleepTimeMilliseconds = 1000;

/**
* 等待重试的间隔时间的最大值.
* 单位毫秒.
*/
private int maxSleepTimeMilliseconds = 3000;

/**
* 最大重试次数.
*/
private int maxRetries = 3;

/**
* 会话超时时间.
* 单位毫秒.
*/
private int sessionTimeoutMilliseconds;

/**
* 连接超时时间.
* 单位毫秒.
*/
private int connectionTimeoutMilliseconds;

/**
* 连接Zookeeper的权限令牌.
* 缺省为不需要权限验证.
*/
private String digest;

}

application.yaml

1
2
3
4
5
6
7
8
9
elasticjob:
zookeeper:
serverlists: 192.168.160.128:2181
namespace: springboot_elasticjob
connectionTimeoutMilliseconds: 50000
sessionTimeoutMilliseconds: 50000
maxRetries: 3
maxSleepTimeMilliseconds: 50000
baseSleepTimeMilliseconds: 50000

3. 使用自定义注解来配置定时任务

我们自定义注解 TaskJob 来配置任务信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 自定义 elastic-job 任务注解
* @author admin
*/
@Component
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TaskJob {

/**
* corn 表达式
*/
String cron() default "";

/**
* 任务名称
*/
String jobName() default "";

/**
* 分片数量
*/
int shardingTotalCount() default 1;

/**
* 分片参数
*/
String shardingItemParameters() default "";

/**
* 任务参数
*/
String jobParameter() default "";

String dataSource() default "";

/**
* 任务描述
*/
String description() default "";

boolean disabled() default false;

boolean overwrite() default true;

/**
* 是否快速失败
*/
boolean failover() default true;

boolean monitorExecution() default true;
}

4. 注册定时任务

通过实现 Spring 的 ApplicationContextAware 来获取 ApplicationContext 上下文信息,获取注册到Spring中的Bean。通过实现 InitializingBean 在给Bean注册到 Spring 之后,利用 ApplicationContext 获取使用 @TaskJob 注解的 Bean,并注册到 elastic-job 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 获取 Task 注解的 job 信息并注入到 Elastic-Job 中
* @author admin
*/
@Component
@Slf4j
public class RegisterJobCenter implements ApplicationContextAware, InitializingBean {

private ApplicationContext applicationContext;

@Resource
private ZookeeperRegistryCenter regCenter;

@Override
public void afterPropertiesSet() {
Map<String, Object> registerJobs = applicationContext.getBeansWithAnnotation(TaskJob.class);
for (Map.Entry<String, Object> entry : registerJobs.entrySet()) {
try {
Object object = entry.getValue();
if (!(object instanceof ElasticJob)) {
throw new ClassCastException("[" + object.getClass().getName()
+ "] The class type is not com.dangdang.ddframe.job.api.ElasticJob");
}
TaskJob task = AnnotationUtils.findAnnotation(object.getClass(), TaskJob.class);
assert task != null;
SpringJobScheduler springJobScheduler = new SpringJobScheduler((ElasticJob) object, regCenter,
getJobConfiguration(task, object));
springJobScheduler.init();
log.info("JobName: {} Register Successfully .", task.jobName());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}

/**
* description: 构建 Job 配置
*/
private LiteJobConfiguration getJobConfiguration(TaskJob taskJob, Object object) {
// 参数校验
Assert.notNull(taskJob.jobName(), "The jobName cannot be null !");
Assert.notNull(taskJob.cron(), "The cron cannot be null !");
Assert.notNull(taskJob.description(), "The description cannot be null !");
// 构建配置
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration
.newBuilder(taskJob.jobName(), taskJob.cron(), taskJob.shardingTotalCount())
.shardingItemParameters(
StringUtils.isEmpty(taskJob.shardingItemParameters()) ? null : taskJob.shardingItemParameters())
.description(taskJob.description()).failover(taskJob.failover())
.jobParameter(StringUtils.isEmpty(taskJob.jobParameter()) ? null : taskJob.jobParameter()).build(),
object.getClass().getName());
return LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(taskJob.overwrite())
.monitorExecution(true).build();
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

}

5. 创建定时任务

到此,基于 Spring Bean 的基本结构就完成了,接下来就是使用其实现我们的定时任务了。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
@Slf4j
@TaskJob(jobName = "my-monitored-job",
cron = "/20 * * * * ? *",
description = "自定义 actuator job 监控",
shardingItemParameters = "0=0-1-2-3,1=4-5-6-7,2=8-9-10-11,3=12-13-14-15",
shardingTotalCount = 4
)
public class MyJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("分片数信息:" + shardingContext.getShardingItem());
log.info("分片参数:" + shardingContext.getShardingParameter());
}
}
------ 本文结束------