本文以在SpringBoot下集成ElasticJob的方式对其进行浅析,仅仅是简单使用,不涉及源码级别研究。

事先必备:

注册中心——zookeeper

简略结构:

代码目录结构:

├─.idea
└─src
└─main
├─java
│ └─com
│ └─sakura
│ ├─configuration --SpringJobScheduler、ZookeeperRegistryCenter
│ ├─job
│ │ ├─jobEventConfig --Job事件监听器
│ │ └─jobListener --Job执行监听器
│ └─properties --Zookeeper、Job的配置信息
└─resources --Zookeeper、Job的配置信息

初始化注册中心:

@Configuration
@Slf4j
public class ZookeeperRegistry { @Bean(name = "registryCenter", initMethod = "init")
public ZookeeperRegistryCenter registryCenter(ZookeeperRegistryProperties registryProperties) {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(
registryProperties.getServerLists(), registryProperties.getNamespace());
zookeeperConfiguration.setDigest(registryProperties.getDigest());
zookeeperConfiguration.setBaseSleepTimeMilliseconds(registryProperties.getBaseSleepTimeMilliseconds());
zookeeperConfiguration.setConnectionTimeoutMilliseconds(registryProperties.getConnectionTimeoutMilliseconds());
zookeeperConfiguration.setMaxRetries(registryProperties.getMaxRetries());
zookeeperConfiguration.setMaxSleepTimeMilliseconds(registryProperties.getMaxSleepTimeMilliseconds());
zookeeperConfiguration.setSessionTimeoutMilliseconds(zookeeperConfiguration.getSessionTimeoutMilliseconds());
log.info("elasticJob注册中心——Zookeeper初始化成功。serverLists={}。nameSpace={}", registryProperties.getServerLists(), registryProperties.getNamespace());
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}

定义Job(以SimpleJob为例):

@Slf4j
@Component
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("------开始执行定时任务------");
log.info("jobName:{}", shardingContext.getJobName());
log.info("taskId:{}", shardingContext.getTaskId());
}
}

初始化SpringJobScheduler:

@Configuration
@Data
public class SpringJobSchedulerInit {
private final ZookeeperRegistryCenter registryCenter;
private final ZookeeperRegistryProperties zookeeperRegistryProperties;
private final SimpleJobProperties simpleJobProperties;
private final ElasticJob mySimpleJob;
private final JobEventConfiguration jobEventConfiguration; @Bean(initMethod = "init")
public SpringJobScheduler springJobScheduler() {
return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(),
          //Job事件追踪,非必填
jobEventConfiguration,
          //Job执行监听器,非必填
new MySimpleJobListener());
} public LiteJobConfiguration getLiteJobConfiguration() { JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(simpleJobProperties.getJobName(), simpleJobProperties.getCron()
, simpleJobProperties.getShardingTotalCount())
.failover(simpleJobProperties.isFailover())
.jobParameter(simpleJobProperties.getJobParameter())
.misfire(true)
.shardingItemParameters(simpleJobProperties.getShardingItemParameters())
.build();
JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MySimpleJob.class.getName()); return LiteJobConfiguration.newBuilder(jobTypeConfiguration)
.jobShardingStrategyClass(simpleJobProperties.getJobShardingStrategyClass())
.maxTimeDiffSeconds(simpleJobProperties.getMaxTimeDiffSeconds())
.monitorExecution(simpleJobProperties.isMonitorExecution())
.monitorPort(simpleJobProperties.getMonitorPort())
.maxTimeDiffSeconds(simpleJobProperties.getMaxTimeDiffSeconds())
//是否要用本地的配置覆盖掉远程的ElasticJob配置
.overwrite(false)
.build();
}
}

Job事件追踪——存储到数据库:

@Configuration
@Slf4j
@Data
public class JobEventConfig {
private final DataSource dataSource; @Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}

Job执行监听器:

@Slf4j
public class MySimpleJobListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
log.info("Job执行之前:{}", ReflectionToStringBuilder.toString(shardingContexts));
} @Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
log.info("Job执行之后:{}", ReflectionToStringBuilder.toString(shardingContexts));
}
}

properties配置信息:

@ConfigurationProperties(prefix = "simple.job")
@Data
public class SimpleJobProperties {
  //执行Job的cron表达式
private String cron;
  //Job分片总数
private int shardingTotalCount;
  //分片序列号和个性化参数对照表
  //分片序列号和参数用等号分隔,多个键值对用逗号分隔
  //分片序列号从0开始,不可大于或等于Job分片总数
  //如:0=a,1=b,2=c
   private String shardingItemParameters;
  //Job自定义参数
private String jobParameter;
  //是否开启失效转移。
  //只有对monitorExecution的情况下才可以开启失效转移。
  private boolean failover;
  //监控Job执行时状态。每次Job执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率, 因为是瞬时状态, 所以无必要监控。
private boolean monitorExecution;
  //作业辅助监控端口
private int monitorPort;
  //最大容忍的本机与注册中心的时间误差秒数,如果时间误差超过配置秒数则作业启动时将抛异常。
  //设置为-1表示不进行检查。
   private int maxTimeDiffSeconds;
  //作业分片策略实现类全路径
private String jobShardingStrategyClass;
  //Job的名称
private String jobName;
}
@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class ZookeeperRegistryProperties {
  //服务地址,ip:port,多个地址用逗号分隔
private String serverLists;
  //命名空间
private String namespace;
  //最大重试次数
private int maxRetries = ;
  //连接超时时间,毫秒
private int connectionTimeoutMilliseconds = ;
  //会话超时时间,毫秒
private int sessionTimeoutMilliseconds = ;
  //等待重试的间隔时间的初始值,毫秒
private int baseSleepTimeMilliseconds = ;
  //等待重试的间隔时间的最大值,毫秒
private int maxSleepTimeMilliseconds = ;
  //连接zk的权限令牌,缺省为不需要权限验证。
private String digest = ""; }
server.port=
spring.application.name=elasticJobTest #ZK
elastic.job.zk.serverLists=192.168.204.140:,192.168.204.141:,192.168.204.142:
elastic.job.zk.namespace=elastic-job #ElasticJob
simple.job.jobName=simpleJob
simple.job.cron=/ * * * * ?
simple.job.shardingTotalCount=
simple.job.shardingItemParameters==beijing,=shanghai,=changchun
simple.job.job-parameter=source1=public,source2=private
simple.job.failover=true
simple.job.monitor-execution=true
simple.job.monitor-port=
simple.job.max-time-diff-seconds=-
simple.job.job-sharding-strategy-class=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy spring.datasource.url=jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&useSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=

pom.xml依赖:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- elastic-job dependency -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.</version>
</dependency>
</dependencies>

启动类:

@SpringBootApplication(scanBasePackages = {"com.sakura.*"})
@EnableConfigurationProperties(value = {ZookeeperRegistryProperties.class, SimpleJobProperties.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}

分片:

ElasticJob提供了三种分片策略。

  1. 基于平均分配算法的分片策略。如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器。
    1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8]。
    2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5]。
    3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]。
  2. 根据作业名的哈希值奇偶数决定IP升降序算法的分片策略。作业名的哈希值为奇数则IP升序。作业名的哈希值为偶数则IP降序。
    1. 如果有3台服务器, 分成2片, 作业名称的哈希值为奇数, 则每台服务器分到的分片是: 1=[0], 2=[1], 3=[]。
    2. 如果有3台服务器, 分成2片, 作业名称的哈希值为偶数, 则每台服务器分到的分片是: 3=[0], 2=[1], 1=[]。
  3. 根据作业名的哈希值对服务器列表进行轮转的分片策略。

为什么要进行分片:

将一个任务拆分为多个可以并行执行的子任务(分片),每个服务器负责处理一定量的子任务,提高效率。

在本实例代码中一共有三个服务器,进行了三个分片,所以在执行Job时会看到如下的日志打印:

------开始执行定时任务------JobParameter:source1=public,source2=private,ShardingItem:,ShardingParameter:beijing
------开始执行定时任务------JobParameter:source1=public,source2=private,ShardingItem:,ShardingParameter:changchun
------开始执行定时任务------JobParameter:source1=public,source2=private,ShardingItem:,ShardingParameter:shanghai

可以根据Job的ShardingParameter不同做区分,让其处理不同的子任务。

最新文章

  1. AFNetworking 3.0 源码解读(三)之 AFURLRequestSerialization
  2. web框架django初探
  3. Search in Rotated Sorted Array
  4. Docker 不能被外网正常访问
  5. iOS之小功能模块--彩虹动画进度条学习和自主封装改进
  6. Ajax在调用含有SoapHeader的webservice方法
  7. mysql5.6优化建议
  8. 如何修改因Informatica 8.6服务器IP而造资料库无法访问的问题
  9. 注册Model类
  10. 网络流(最大流) CodeForces 546E:Soldier and Traveling
  11. Starting nagios:This account is currently not available nagios
  12. activiti笔记二:用户任务
  13. 实用的两款jquery树形tree插件
  14. day1:java学习第一天之eclipse安装
  15. 复制ASP.NET的ASHX、aspx文件的注意事项
  16. 解决ajax异步请求数据后swiper不能循环轮播(loop失效)问题、滑动后不能轮播的问题。
  17. js setInterval参数设置
  18. BZOJ4415: [Shoi2013]发牌 树状数组+二分
  19. (转)MYSQL线程池总结(一)
  20. Linux和Docker常用命令

热门文章

  1. 前段人员必藏的7 个 CSS 好用的属性绝对干货
  2. Chive CTF 2020 - Tiki
  3. 震惊!慎老师怒吃pks并大呼:一口就吃完了!
  4. day20 函数收尾+面向过程+模块
  5. 方正璞华Java面试总结(武汉)
  6. DVWA学习记录 PartⅨ
  7. vue axios接口封装、Promise封装、简单的axios方法封装、vue接口方法封装、vue post、get、patch、put方法封装
  8. Hadoop进阶命令使用介绍
  9. 操作句柄Handle(7)
  10. P4017 最大食物链计数 (拓扑排序)