前阵子工作中遇到了一个很麻烦的问题。
本人所在的项目组做了一个机遇quartz集群的任务系统。通俗点讲就是用quartz框架(quartz是一款能跑定时任务的框架支持复杂的时间表达式)
来执行定时任务。但是这里定时任务的并发数很多,就出现了一个问题,同一个trigger被多个机器重复的触发了,这就造成了执行的任务数目
比预期的多很多。领导就让我处理这个问题。
开始我以为是这个框架本身的配置有问题,结果翻了很多资料还是没解决(这里不过多讲这个,有兴趣可以留言)。那么问题出在哪里呢?
quartz的任务工作的方式是这样的。当任务达到触发条件的时候(当这条任务满足qrtz_cron_triggers表中定义的相关的时间表达式的时候)
qrtz_triggers表对应的这条记录的状态发生改变,同时下次触发时间根据时间表达式做出改变,同时根据sched_name找到qrtz_job_details
表中的具体job去执行,下面就是具体的业务了。我这里的问题就出在同一时间内(前后相差几ms)多台机器触发了同一条trigger。然而这个
我是没有办法解决的或者说不想动quartz的源码(有朋友能从这一步就把问题搞定的可以交流一下),所以我就顺着quartz的工作流程继续往下
到了job这里,由于多个trigger被触发所以执行了多次job,那么我是否能通过让他只执行一次job来防止重复执行呢。如果有一种方法可以
让这个job执行一次就可以达到我的要求了(选择在job这一步处理其实还是因为不想动源码,到java这里我就好办了)。正好我对zookeeper
有一些了解,zookeeper恰好有一种注册机制可以解决这个问题。
回顾一下zookeeper关于节点注册的用法:
zookeeper只可以注册一个同名节点如果节点已经存在则返回nodeexits.
那么运用到我这个场景就是当任务进入job之后用job id(同时触发的这几个job的id是一样的)去向zookeeper完成注册,由于
id是一样的那么只能有一个注册成功,只要在注册成功的条件下我才允许task。这样就保证了不做重复的运算。
具体如下:

public class PlatformQuartzJobBean extends QuartzJobBean {
private String path = "/zk_triggerID";
private String lock = "/zk_lock";
private static ZooKeeper zk = null;
static{
try {
zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
}
//任务执行的具体逻辑
protected void executeInternal(JobExecutionContext jobexecutioncontext)
throws JobExecutionException {
Trigger trigger = jobexecutioncontext.getTrigger();
String triggerName = trigger.getKey().getName();//triggername是唯一的
boolean createSuccess = false;
boolean doTask = false; //不对zookeeper注册执行任务
int childrens = 0;
List<String> children = null;
boolean onDelete = false;//是否获取删除节点的权限
try {
zk.create(path+"/"+triggerName, triggerName.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//如果注册出现问题说明节点存在是重复的任务
createSuccess = true;
children = zk.getChildren(path, false);
if(children != null){
childrens = children.size();
}
if(childrens>99){//节点个数达到100个就执行删除操作
try {
zk.create(lock+"/dodelete", "dodelete".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
onDelete = true;
} catch (KeeperException e1) {
if(e1 instanceof NodeExistsException){
logger.info("already on delete!");
}else{
logger.error(e1.getMessage(),e1);
}
} catch (InterruptedException e1) {
logger.error(e1.getMessage(),e1);
} }
//执行具体的任务
execuTask(trigger,triggerName,jobexecutioncontext,af);
} catch (KeeperException e) {
if(e instanceof NodeExistsException){
logger.info("already on do");
}else if(e instanceof ConnectionLossException){
logger.info("ConnectionLoss ,do task without registered!!");
doTask = true;
}else if(e instanceof SessionExpiredException){
logger.info("session expired ,do task without registered!!");
doTask = true;
try {
zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
} catch (IOException e1) {
logger.error(e1.getMessage(),e1);
}
}else{
logger.error(e.getMessage(),e);
}
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
}
if(createSuccess && onDelete){//如果创建成功并且root下有执行删除的权利
try {
for(String str:children){
zk.delete(path+"/"+str, -1);
}
} catch (KeeperException e1) {
logger.error(e1.getMessage(),e1);
} catch (InterruptedException e1) {
logger.error(e1.getMessage(),e1);
}finally{
if(onDelete){
try {
zk.delete(lock+"/dodelete", -1);
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
} catch (KeeperException e) {
if(e instanceof ConnectionLossException){
logger.info("ConnectionLoss ,reconnect zk!!");
try {
zk.close();//人为失效,删除dodelete节点
zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
} }else{
logger.error(e.getMessage(),e);
} }
}
} }
//如果出现connectloss和sessionexpired 可能是网络有点问题找不到zookeeper就不管重复问题了完成任务为最重要的
if(doTask){//如果出现connectloss和sessionexpired 就直接执行任务
execuTask(trigger,triggerName,jobexecutioncontext,af);
} } }

整个过程就是:当job开始的时候去向zookeeper申请注册,只有当注册成功的时候才执行业务,失败则退出job。同时由于我这里是每天循环的
定时任务所以当zookeeper下的节点数目达到一定的个数的时候加一个删除锁(就是向zookeeper create一个ondetele节点),同时删除之前
的triggername节点,这样保证了明天这些任务可以继续完成。至此,任务重复执行的问题就解决了。下一篇博客将简单的介绍一下zookeeper和
zookeeper的布置,虽然网上这方面东西很多,不过自己写出来(自己实践过可以用的),以后可以直接拿来用。。

最新文章

  1. 安装devtoolset
  2. ahjesus Ubuntu配置svn服务器
  3. Spring学习笔记之整合struts
  4. curl命令具体解释
  5. linux命令后台运行[转]
  6. lintcode:二叉树的中序遍历
  7. Mac下使用Fiddler
  8. jQuery的自定义事件——滚轮
  9. 博客换了,比较少用这个博客(http://loid.cf)
  10. 将项目上传到bitbucket仓库
  11. LeetCode OJ 217.Contains Duplicate
  12. Swift 学习笔记(四)
  13. java mvc框架系列总结ssh,ssm,servlet
  14. Ant-打增量包
  15. iOS旋钮动画-CircleKnob
  16. Logback日志基础配置以及自定义配置
  17. Linux命令大全 欢迎补充 评论添加~
  18. [css选择器]总结:IE6不支持的CSS选择符
  19. Openwrt 3g模块
  20. ClamAV学习【2】——clamscan入口函数浏览

热门文章

  1. Android 访问权限设置
  2. PHPCMS v9修改栏目或者单页没有权限
  3. Python交互模式下方向键出现乱码
  4. python实现tailf
  5. oldboy第一天学习
  6. 避免Block的循环引用
  7. win8 开启administrator 管理员帐户
  8. android 截取指定位置字符串
  9. Misha and Changing Handles
  10. springMVC学习(1)