一  简介                  

上一篇介绍了zookeeper如何进行分布式协调,这次主要讲解quartz使用zookeeper进行分布式计算,因为上一篇只是讲解原理,而这次实际使用,zookeeperService更改了一部分代码,算是集成优化吧。

系统结构图如下:

上图展示了,两个部分,一个是故障灾难转移集群,一个实现了分片的功能。故障灾难转移集群是quartz自带的功能,就不多说,分片功能是自己实现的。在这里要说下quartz使用故障灾难转移集群的一些配置注意事项:

再者就是netcore不支持remote,这个很重要,所以需要自己做一个web服务接口,但是本实例没有实现,而是仅仅使用数据库去配置和跟踪quartz服务,但是这是主要的。而使用api的一些功能就是实时开启,关闭,监控quartz主机状态,监控分片主机状态。所以大家留意这些功能暂时没有,不过大家在本文学会后很容易就可以自己扩展。

在这里要感谢 github账号为 weizhong1988/Weiz.TaskManager 的一个quartz管理的项目。

当然我这次的案例都是基于linux和mysql,而这个项目是sql server,所以我把sql全部替换了,更改了一些东西。后面会把代码全部放出来。界面如下图

好,下面看代码实现。

二 quartz故障灾难转移和分片功能             

首先看结构:

然后看Program入口方法:

 var host = new HostBuilder()
.UseEnvironment(EnvironmentName.Development)
.ConfigureAppConfiguration((hostContext, configApp) =>
{
configApp.SetBasePath(Directory.GetCurrentDirectory());
configApp.AddJsonFile(
$"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
optional: true);
configApp.AddEnvironmentVariables("PREFIX_");
configApp.AddCommandLine(args);
var rootConfiguration = configApp.Build();
QuartzOpt = new QuartzOption();
rootConfiguration.GetSection("Quartz").Bind(QuartzOpt); //绑定quartz的配置类的数据
}).ConfigureLogging((hostContext, configBuild) =>
{
configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging"));
configBuild.AddConsole();
configBuild.AddCustomizationLogger();
})
.ConfigureServices((hostContext, service) =>
{
service.AddKafka(KafkaBuilder =>
{
KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService"));
});
service.AddZookeeper(zookeeperBuilder =>
{
zookeeperBuilder.AddConfiguration(hostContext.Configuration.GetSection("zookeeperService"));
});
service.AddDbContext<QuartzDbContext>(option =>
option.UseMySQL(hostContext.Configuration.GetConnectionString("QuartzDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);
              //这个是操作数据库的数据库服务,这个和 quartz的cluster数据提供程序是分开的。
})
.Build(); Host = host;
ILoggerFactory loggerFact = host.Services.GetService<ILoggerFactory>(); LogProvider.SetCurrentLogProvider(new ConsoleLogProvider(loggerFact)); //将框架的日志提供程序,传递给quart的日志接口。  
       var ischema = RunProgramRunExample(loggerFact); //从数据库构造job的方法
host.WaitForShutdown(); //netcore的通用主机。
ischema.Shutdown(true);//quartz自己的主机。

quartz框架的架构和netcore目前的架构不兼容,netcore的通道和服务部件的软件架构方式,quartz先天不支持,你无法将任何上下文,比如host上下文,configuration上下文或者service上下文,传递给quartz。所以我使用了属性的方式:

     private ILoggerFactory _loggerFact;

        public static IHost Host { get; set; }

        public static String QUARTZ_INSTANCE_ID = "PREFIX_QUARTZ_INSTANCE_ID";

        public static QuartzOption QuartzOpt { get; set; }

在quartz上下文是这样使用的:

      ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();

而我在上一节说的不支持remote的解决方案,使用netcore的解决方案就是kestrel,netcoe宣称后面将要将webHost合并到通用主机里面,netcore确实目前发展较快,内部代码优化时,代码变动较大

,比如我上次扩展log模块,上一版本程序设计上和当前版本几乎没有什么可重用的,当然,对于开发者,并没有什么区别,因为肯定会保持兼容。

咱们看quart的日志模块:ConsoleLogProvider

using System;
using Microsoft.Extensions.Logging;
using Quartz.Logging; namespace Walt.Framework.Quartz.Host
{
public class ConsoleLogProvider : ILogProvider
{
private ILoggerFactory _logFactory; public ConsoleLogProvider(ILoggerFactory logFactory)
{
_logFactory=logFactory;
}
public Logger GetLogger(string name)
{
return (level, func, exception, parameters) =>
{
if (func != null)
{
string logInfo=string.Format(func(), parameters);
var log=_logFactory.CreateLogger<ConsoleLogProvider>(); //将提供程序,替换为自定义的分布式log提供程序
log.LogDebug(logInfo);
}
return true;
};
} public IDisposable OpenNestedContext(string message)
{
throw new NotImplementedException();
} public IDisposable OpenMappedContext(string key, string value)
{
throw new NotImplementedException();
}
}
}

再看quartz的配置类:

namespace Walt.Framework.Quartz.Host
{
public class QuartzOption
{
public string InsatanceId{get;set;} //很重要,cluster中必须两个实例不一样。 public string InstanceName{get;set;} //quartz的实例名称,一般情况下用于显示名称。 public bool IsClear{get;set;} //是否启动的时候清理job,因为cluster在数据库中有历史数据。 public bool IsSlave{get;set;} //是不是slave,预留,暂时没用 public int CustomerRecordCountForTest{get;set;} //分片时候,每个机器分到的需要处理的数据的数量
}
}

然后就是咱们的主要方法:RunProgramRunExample

  private static IScheduler RunProgramRunExample(ILoggerFactory loggerFact)
{
var log = loggerFact.CreateLogger<Program>();
try
{ var config = Host.Services.GetService<IConfiguration>();
// Grab the Scheduler instance from the Factory
NameValueCollection properties = new NameValueCollection
{
["quartz.scheduler.instanceName"] = QuartzOpt.InstanceName,
["quartz.scheduler.instanceId"] = QuartzOpt.InsatanceId,
["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz",
["quartz.threadPool.threadCount"] = "",
["quartz.jobStore.misfireThreshold"] = "",
["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
["quartz.jobStore.useProperties"] = "false",
["quartz.jobStore.dataSource"] = "default",
["quartz.jobStore.tablePrefix"] = "QRTZ_",
["quartz.jobStore.clustered"] = "true",
["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz",
["quartz.dataSource.default.connectionString"] = config.GetConnectionString("QuatrzClustDatabase"),
["quartz.dataSource.default.provider"] = "MySql",
["quartz.serializer.type"] = "json",
["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz",
          //从这个往下netcore都不支持,以上为集群配置,只要两个实例id不同的quartz,配置同一个数据源
,就会自动的按照cluster运行,还有一点就是如果分布到不同机器,一定要配置ntp时间服务器,同步时间。
["quartz.scheduler.exporter.port"] = "",
["quartz.scheduler.exporter.bindName"] = "QuartzScheduler",
["quartz.scheduler.exporter.channelType"] = "tcp",
["quartz.scheduler.exporter.channelName"] = "httpQuartz",
["quartz.scheduler.exporter.rejectRemoteRequests"] = "true"
};
StdSchedulerFactory factory = new StdSchedulerFactory(properties); IScheduler scheduler = factory.GetScheduler().GetAwaiter().GetResult(); string machine = Environment.MachineName; //获取当前的机器名
QuartzDbContext db = Host.Services.GetService<QuartzDbContext>();
var listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
&& w.MachineName == machine && w.InstanceId == QuartzOpt.InsatanceId)
.ToListAsync().GetAwaiter().GetResult(); //从数据库中获取这台机器和实例中的job log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask));
Dictionary<string,Assembly> collAssembly=new Dictionary<string, Assembly>(); //加载程序集
foreach (var item in listQuartzTask)//首先第一次加载全部的程序集
{
//加载程序集
if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName))
{
try
{
collAssembly[item.AssemblyName] =
AssemblyHelp.GetAssemblyByteByAssemblyName(
Path.Combine(Directory.GetCurrentDirectory(), "AssemblyColl"), item.AssemblyName);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到程序集.");
Task.Delay();
continue;
}
}
} // and start it off
scheduler.Start();
// if (!QuartzOpt.IsSlave)
// {
var task = Task.Run(() =>
{
bool isClear = QuartzOpt.IsClear;
log.LogInformation("job监控程序开始循环,间隔为15秒"); while (true) //主要用来循环数据库记录,在添加或者修改job的时候,自动重新添加和执行job。
{
try
{
if (scheduler != null)
{ log.LogDebug("检查scheduler是否开始");
if (scheduler.IsStarted)
{
if (isClear) //启动清理
{
scheduler.Clear().GetAwaiter().GetResult();
isClear = false;
}
log.LogDebug("scheduler已经开始");
                                        db = Host.Services.GetService<QuartzDbContext>();
listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
&& w.MachineName == machine && w.InstanceId == QuartzOpt.InsatanceId)
.ToListAsync().GetAwaiter().GetResult(); //在循环中获取数据库中本机器和实例的job记录。
log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask));
foreach (var item in listQuartzTask)
{//加载程序集
if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName)) //预先加载新添加的job的程序集
{
try
{
collAssembly[item.AssemblyName] =
AssemblyHelp.GetAssemblyByteByAssemblyName(
Path.Combine(Directory.GetCurrentDirectory(), "AssemblyColl"), item.AssemblyName);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到程序集.");
Task.Delay();
continue;
}
}
log.LogDebug("开始检查task:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(item));
var jobKey = new JobKey(item.TaskName, item.GroupName);
var triggerKey = new TriggerKey(item.TaskName, item.GroupName);
if (scheduler.CheckExists(jobKey).Result) //如果存在,则根据状态处理相应的动作。
{
var jobDetai = scheduler.GetJobDetail(jobKey);
var trigger = scheduler.GetTrigger(triggerKey);
log.LogDebug("此task已经存在scheduler中,数据库状态:{0},scheduer中的状态:{1}.trigger状态:{2}"
, ((OperateStatus)item.OperateStatus).ToString(), jobDetai.Status.ToString(), trigger.Status.ToString()); if ((OperateStatus)item.OperateStatus == OperateStatus.Stop) //如果数据库中停止job,则删除这个job,如果有remote,可以实时,处理,这断代码就没有用了,但是可以作为远程处理失败的预防错误,所以可以保留。
{
log.LogInformation("删除schduler中的job:{0}", jobKey.ToString());
if (!scheduler.DeleteJob(jobKey).GetAwaiter().GetResult())
{
log.LogError("删除job失败。name:{0},group:{1}", jobKey.Name, jobKey.Group);
}
}
else
{
if (jobDetai.IsFaulted) //如果失败,则更改数据库中job的状体,同理,如果有remote,这个可以作为预防错误。
{
if (jobDetai.Exception != null)
{
log.LogError(, jobDetai.Exception, "job faulted");
}
var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete ==
&& w.TaskName == jobKey.Name
&& w.GroupName == jobKey.Group
&& w.MachineName == machine
&& w.InstanceId == scheduler.SchedulerInstanceId);
item.Status = (int)TaskStatus.Faulted;
item.OperateStatus = (int)OperateStatus.Stop;
db.Update<QuartzTask>(jobItem);
db.SaveChanges();
}
else //如果非执行状态,则中断
{
if (jobDetai.Status != TaskStatus.Running
&& jobDetai.Status != TaskStatus.RanToCompletion
&& jobDetai.Status != TaskStatus.WaitingForActivation
&& jobDetai.Status != TaskStatus.WaitingForChildrenToComplete
&& jobDetai.Status != TaskStatus.WaitingToRun)
{
var interTask = scheduler.Interrupt(jobKey, new CancellationToken(true))
.GetAwaiter().GetResult();
jobDetai.Start();
}
}
} var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate"); if (triggerListener == null)
{
triggerListener = new TriggerUpdateListens("trigger"+item.TaskName);
IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(triggerKey);
scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
} var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");
if (jobListener == null)
{
IJobListener jobUpdateListener = new JobUpdateListens("job"+item.TaskName);
IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(jobKey);
scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
}
}
else //如果不存在,则新添加job,以及执行
{
log.LogInformation("添加新的job,判断是否状态为停止。");
if ((OperateStatus)item.OperateStatus != OperateStatus.Stop)
{
log.LogInformation("添加新的job");
var assemblyName = item.AssemblyName;
var className = item.ClassName; Type jobTaskType = null;
try
{
jobTaskType = AssemblyHelp.GetTypeByAssemblyNameAndClassName(collAssembly[item.AssemblyName], className);
log.LogInformation("找到类型,type:{0}",className);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到type.");
}
if (jobTaskType == null)
{
try
{
jobTaskType = AssemblyHelp
.GetTypeByCurrentAssemblyNameAndClassName(className, Assembly.GetExecutingAssembly());
if (jobTaskType == null)
{
log.LogInformation("没有找到类型");
continue;
}
log.LogInformation("找到类型,type:{0}",className);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到类型.");
continue;
}
}
IJobDetail job = JobBuilder.Create(jobTaskType)
.WithIdentity(item.TaskName, item.GroupName)
.Build(); ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(item.TaskName, item.GroupName)
.StartNow()
.WithCronSchedule(item.CronExpressionString)
.Build();
scheduler.ScheduleJob(job, trigger).GetAwaiter().GetResult();
log.LogInformation("添加成功,type:{0}",className);
ITriggerListener triggerListener = new TriggerUpdateListens("trigger"+item.TaskName);
IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(trigger.Key);
scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher); IJobListener jobUpdateListener = new JobUpdateListens("job"+item.TaskName);
IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(job.Key);
scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
}
}
}
}
else
{
log.LogInformation("scheduler is not IsStarted");
}
}
else
{
log.LogInformation("scheduler is null");
}
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "task监控程序执行错误.");
}
Thread.Sleep();
}
});
// }
// else
// {
// db = Host.Services.GetService<QuartzDbContext>();
// listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
// && w.MachineName == machine
// && w.InstanceId == QuartzOpt.InsatanceId)
// .ToListAsync().GetAwaiter().GetResult();
// foreach (var item in listQuartzTask)
// {
// var jobKey = new JobKey(item.TaskName, item.GroupName);
// var triggerKey = new TriggerKey(item.TaskName, item.GroupName); // // var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0
// // && w.TaskName == jobKey.Name
// // && w.GroupName == jobKey.Group
// // && w.MachineName == machine
// // && w.InstanceId == scheduler.SchedulerInstanceId);
// // item.Status = (int)TaskStatus.Faulted;
// // item.OperateStatus = (int)OperateStatus.Stop;
// // db.Update<QuartzTask>(jobItem);
// // db.SaveChanges(); // if (scheduler.CheckExists(jobKey).Result)
// {
// var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate");
// if (triggerListener == null)
// {
// triggerListener = new TriggerUpdateListens();
// IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(triggerKey);
// scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
// } // var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");
// if (jobListener == null)
// {
// IJobListener jobUpdateListener = new JobUpdateListens();
// IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(jobKey);
// scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
// }
// }
// }
//}
return scheduler;
// Tell quartz to schedule the job using our trigger
//await scheduler.ScheduleJob(job, trigger);
}
catch (SchedulerException sep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , sep, "job执行错误。");
}
return null;
}

咱们现在看trriger监控类和job的监控类:

using System;
using System.Threading;
using System.Threading.Tasks;
using Quartz;
using Quartz.Logging;
using Quartz.Impl;
using Walt.Framework.Service.Zookeeper;
using Microsoft.Extensions.DependencyInjection;
using org.apache.zookeeper;
using System.Linq;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using static org.apache.zookeeper.KeeperException; namespace Walt.Framework.Quartz.Host
{ public class TriggerUpdateListens : ITriggerListener
{
public string Name { get; set; } public TriggerUpdateListens(string name)
{
Name = name;
} private bool VoteJob{ get; set;} public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default(CancellationToken))
{
ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();
_logger.LogInformation(, null, "执行成功.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
return Task.FromResult(true);
}
     //trigger激发,这是job执行的第一个执行的。
public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();
_logger.LogInformation(, null, "开始执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
string machine = Environment.MachineName; //获取当前机器名
try
{
var customerAttri = context.JobDetail.JobType.GetCustomAttributes(false);
foreach (var customer in customerAttri)
{
if (customer is DistributingAttributes) //如果打这个标签,则说明是分片job。
{
var distri = customer as DistributingAttributes;
var zookeeper = Program.Host.Services.GetService<IZookeeperService>();
string currentTempNodeName = string.Empty;
string fullPath = "/lock/"+ context.JobDetail.Key.Name + context.JobDetail.Key.Group;
int flag = ;
Repeat: //这里因为某些原因失败,可以给重复几次。
string jsonData = zookeeper.GetDataByLockNode(fullPath, "getlock"
, ZooDefs.Ids.OPEN_ACL_UNSAFE, out currentTempNodeName);
if(jsonData==null)
{
_logger.LogError("获取锁失败。节点:{0},锁前缀:{1},重试:{2}",fullPath,"getlock",flag);
if(flag<=)
{
flag = flag + ;
goto Repeat;
}
VoteJob = true; //如果获取失败,则否决执行job,这个变量在下面的trriger方法中使用。
//context.Scheduler.Interrupt(context.JobDetail.Key);
                  return Task.FromResult(false); //返回false,则会执行VetoJobExecution方法。
                       }

              //获取锁成功,处理分片数据,构造分片上下文。
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var item = db.QuartzTask.Where(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId).FirstOrDefault(); if (item != null)
{
//TODO 这里可以找出机器名,拼接remote的api,可以查看分片主机是否存活,从而将一些挂起的任务重新分配。
}
string distributeFlag = item.MachineName + item.InstanceId;
List<DistributingData> distriData = new List<DistributingData>();
DistributingData currentDistriEntity = null;
if (string.IsNullOrEmpty(jsonData))
{
currentDistriEntity= new DistributingData //分片元数据
{
DistributeFlag =distributeFlag, //分片标记,以机器名和实例名构造
PageIndex = ,
PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置的需要处理的数据数量
};
distriData.Add(currentDistriEntity);
}
else
{
distriData = Newtonsoft.Json.JsonConvert.DeserializeObject<List<DistributingData>>(jsonData);
if (distriData == null || distriData.Count() < )
{
currentDistriEntity= new DistributingData
{
DistributeFlag =distributeFlag,
PageIndex = ,
PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置
};
distriData.Add(currentDistriEntity);
}
else
{
currentDistriEntity= distriData.Where(w => w.DistributeFlag == distributeFlag).SingleOrDefault();
if (currentDistriEntity == null) //当前主机还没有分片过,将当前主机加入分片集群
{
var maxPageIndex = distriData.Max(w => w.PageIndex);
maxPageIndex = maxPageIndex + ;
var entity = new DistributingData
{
DistributeFlag = distributeFlag,
PageIndex = maxPageIndex,
PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置
};
distriData.Add(entity);
}
else
{
var maxPageIndex = distriData.Max(w => w.PageIndex);
maxPageIndex = maxPageIndex + ;
currentDistriEntity.PageIndex = maxPageIndex;
}
}
}
item.Remark = Newtonsoft.Json.JsonConvert.SerializeObject(currentDistriEntity);
db.Update(item);
db.SaveChanges();
string resultData = Newtonsoft.Json.JsonConvert.SerializeObject(distriData);
context.JobDetail.JobDataMap.Put("distriData", currentDistriEntity); //将分片数据放入数据上下文,job中可以访问。 zookeeper.SetDataAsync(fullPath
, resultData, false).GetAwaiter().GetResult();
zookeeper.DeleteNode(currentTempNodeName); //处理完成,需要删除当前节点,释放锁。
_logger.LogInformation("分片执行:{0}",resultData);
}
}
}
catch(ConnectionLossException cle)
{
VoteJob = true;
_logger.LogError(cle, "获取同步锁出现错误。连接丢失");
}
catch(SessionExpiredException sep)
{
VoteJob = true;
_logger.LogError(sep, "获取同步锁出现错误。连接过期");
}
catch(KeeperException kep)
{
VoteJob = true;
_logger.LogError(kep, "获取同步锁出现错误。操作zookeeper出错");
}
catch(Exception ep)
{ try
{
_logger.LogError(,ep,"分片失败。");
//context.Scheduler.DeleteJob(context.JobDetail.Key).GetAwaiter().GetResult();
VoteJob = true;
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var item = db.QuartzTask.Where(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId).FirstOrDefault();
if (item == null)
{
_logger.LogError(, ep, "分片失败,获取数据库记录失败。");
}
else
{ item.Status = (int)TaskStatus.Canceled;
item.OperateStatus = (int)OperateStatus.Stop;
item.Remark = ep.ToString();
db.Update(item);
db.SaveChanges();
}
}
catch (Exception eep)
{
_logger.LogError(, eep, "分片失败,更新数据库失败。");
}
}
return Task.FromResult(true);
} public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.FromResult(true);
}
     //
     //当TriggerComplete返回false,执行这个方法。
public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();
if (VoteJob)
{
_logger.LogInformation(, null, "取消执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
return Task.FromResult(VoteJob);//VoteJob在TriggerComplete中指定,默认为false
                          //,如果获取锁失败,则设置为true,这个方法返回true,则只执行JobUpdateListens的JobExecutionVetoed方法,然后job这一次将不执行。
} } }

接下来看job执行前都执行那些方法:

using System;
using System.Threading;
using System.Threading.Tasks;
using Quartz;
using Quartz.Logging;
using Quartz.Impl;
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging; namespace Walt.Framework.Quartz.Host
{ public class JobUpdateListens : IJobListener
{
public string Name { get; set; } public JobUpdateListens(string name)
{
Name = name;
} public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.FromResult(true);
}
//job执行前执行,将状态放入数据库。
public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
string machine = Environment.MachineName;
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId);
item.Status = (int)TaskStatus.WaitingToRun;
db.Update<QuartzTask>(item);
db.SaveChanges();
}
catch (Exception ep)
{
//context.Scheduler.Interrupt(context.JobDetail.Key);
var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
var log = logFaoctory.CreateLogger<JobUpdateListens>();
log.LogError(, ep, "JobToBeExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
return Task.FromResult(true);
}
//job执行后执行,这个方法将执行结果放入数据库,处理异常。
public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
var log = logFaoctory.CreateLogger<JobUpdateListens>();
string machine = Environment.MachineName;
var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId);
if (jobException != null)
{
item.Status = (int)TaskStatus.Faulted;
item.Remark = Newtonsoft.Json.JsonConvert.SerializeObject(jobException);
log.LogError("Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
else
{
item.Status = (int)TaskStatus.RanToCompletion;
item.RecentRunTime = context.FireTimeUtc.DateTime;
if (context.NextFireTimeUtc.HasValue)
{
item.NextFireTime = context.NextFireTimeUtc.Value.DateTime;
}
}
db.Update<QuartzTask>(item);
db.SaveChanges();
}
catch (Exception ep)
{
//context.Scheduler.Interrupt(context.JobDetail.Key);
var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
var log = logFaoctory.CreateLogger<JobUpdateListens>();
log.LogError(, ep, "JobWasExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
return Task.FromResult(true);
}
} }

在这次使用 zookeeperservice中,优化了一些代码:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using static org.apache.zookeeper.ZooKeeper;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using static org.apache.zookeeper.Watcher.Event;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using static org.apache.zookeeper.ZooDefs;
using static org.apache.zookeeper.KeeperException; namespace Walt.Framework.Service.Zookeeper
{ internal class WaitLockWatch : Watcher
{
private AutoResetEvent _autoResetEvent; private ManualResetEvent _mutex;
private ILogger _logger; private string _path; public WaitLockWatch(AutoResetEvent autoResetEvent
, ILogger logger, string path
, ManualResetEvent mutex)
{
_autoResetEvent = autoResetEvent;
_logger = logger;
_path = path;
_mutex = mutex;
} public override Task process(WatchedEvent @event)
{
_mutex.Set();
return Task.FromResult(true);
}
} internal class WaitConnWatch : Watcher
{
private AutoResetEvent _autoResetEvent;
private ILogger _logger; private ManualResetEvent _mutex; public WaitConnWatch(AutoResetEvent autoResetEvent
,ILogger logger
,ManualResetEvent mutex)
{
_autoResetEvent=autoResetEvent;
_logger=logger;
_mutex = mutex;
} public override Task process(WatchedEvent @event)
{
_logger.LogInformation("watch激发,回掉状态:{0}",@event.getState().ToString());
if(@event.getState()== KeeperState.SyncConnected
||@event.getState()== KeeperState.ConnectedReadOnly)
{
_logger.LogInformation("释放连接阻塞");
_autoResetEvent.Set();
}
else
{
_logger.LogInformation("连接断开,释放分布式锁阻塞");
_mutex.Set();
}
return Task.FromResult();
}
} public class ZookeeperService : IZookeeperService
{
private ZookeeperOptions _zookeeperOptions;
private ZooKeeper _zookeeper; private static readonly byte[] NO_PASSWORD = new byte[]; public Watcher Wathcer {get;set;} public ILoggerFactory LoggerFac { get; set; } private ILogger _logger; internal Thread CurrThread{ get; } AutoResetEvent[] autoResetEvent=new AutoResetEvent[]
{new AutoResetEvent(false),new AutoResetEvent(false)};
ManualResetEvent _manualReset = new ManualResetEvent(false);
public ZookeeperService(IOptionsMonitor<ZookeeperOptions> zookeeperOptions
,ILoggerFactory loggerFac)
{
LoggerFac=loggerFac;
_logger=LoggerFac.CreateLogger<ZookeeperService>();
_zookeeperOptions=zookeeperOptions.CurrentValue;
_logger.LogInformation("配置参数:{0}",JsonConvert.SerializeObject(_zookeeperOptions));
zookeeperOptions.OnChange((zookopt,s)=>{
_zookeeperOptions=zookopt;
});
_logger.LogInformation("开始连接");
Conn(_zookeeperOptions);
CurrThread = System.Threading.Thread.CurrentThread;
} private void Conn(ZookeeperOptions zookeeperOptions)
{
bool isReadOnly=default(Boolean);
Wathcer=new WaitConnWatch(autoResetEvent[],_logger,_manualReset);
if(isReadOnly!=zookeeperOptions.IsReadOnly)
{
isReadOnly=zookeeperOptions.IsReadOnly;
} byte[] pwd=new byte[];
//如果没有密码和sessionId
if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
&&_zookeeperOptions.SessionId==default(int))
{
_zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
}
else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
{
pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
_zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,,pwd,isReadOnly);
}
else
{
_zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
}
if(_zookeeper.getState()==States.CONNECTING)
{
_logger.LogInformation("当前状态:CONNECTING。阻塞等待");
autoResetEvent[].WaitOne();
}
} public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList)
{
ReConn();
if(string.IsNullOrEmpty(path)||!path.StartsWith('/'))
{
_logger.LogInformation("path路径非法,参数:path:{0}",path);
return null;
}
byte[] dat=new byte[];
if(string.IsNullOrEmpty(data))
{
dat=System.Text.Encoding.Default.GetBytes(data);
}
if(createMode==null)
{
_logger.LogInformation("createMode为null,默认使用CreateMode.PERSISTENT");
createMode=CreateMode.PERSISTENT;
}
return _zookeeper.createAsync(path,dat,aclList,createMode);
} public async void Sync(string path)
{
try
{
_logger.LogInformation("同步成功");
await _zookeeper.sync(path);
}
catch (Exception ep)
{
_logger.LogError("同步失败。", ep);
}
} public async Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync)
{
ReConn();
if(await _zookeeper.existsAsync(path)==null )
{
_logger.LogInformation("path不存在");
return null;
}
if (isSync)
{
_logger.LogInformation("即将进行同步。");
try
{
await _zookeeper.sync(path);
_logger.LogInformation("同步成功");
}
catch (Exception ep)
{
_logger.LogError("同步失败。", ep);
}
}
return await _zookeeper.getDataAsync(path,watcher);
} public async Task<Stat> SetDataAsync(string path,string data,bool isSync)
{
ReConn();
if(await _zookeeper.existsAsync(path)==null )
{
_logger.LogInformation("path不存在");
return null;
}
byte[] dat=new byte[];
if(!string.IsNullOrEmpty(data))
{
dat=System.Text.Encoding.Default.GetBytes(data);
}
return await _zookeeper.setDataAsync(path,dat);
} public async Task<ChildrenResult> GetChildrenAsync(string path, Watcher watcher, bool isSync)
{
ReConn();
if (await _zookeeper.existsAsync(path) == null)
{
_logger.LogInformation("path不存在");
return null;
}
if (isSync)
{
_logger.LogInformation("即将进行同步。");
try
{
_logger.LogInformation("开始同步");
await _zookeeper.sync(path);
_logger.LogInformation("同步成功");
}
catch (Exception ep)
{
_logger.LogError("同步失败。", ep);
}
}
return await _zookeeper.getChildrenAsync(path, watcher);
} public async Task DeleteNode(string path)
{
ReConn();
if(await _zookeeper.existsAsync(path)==null )
{
_logger.LogDebug("删除path:path不存在");
return;
}
try
{
_logger.LogDebug("删除node:{0}", path);
await _zookeeper.deleteAsync(path);
}
catch (Exception ep)
{
_logger.LogError("删除失败", ep);
return;
}
} public async Task<bool> SetWatcher(string path,Watcher watcher)
{
ReConn();
var stat = await _zookeeper.existsAsync(path);
if(stat==null )
{
_logger.LogDebug("判断path是否存在:path不存在");
return false;
}
try
{
_logger.LogDebug("设置监控:{0}", path);
await _zookeeper.getDataAsync(path,watcher);
return true;
}
catch (Exception ep)
{
_logger.LogError("设置监控错误", ep);
return false;
}
} public string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
{
_logger.LogInformation("获取分布式锁开始。");
string tempNode=string.Empty;
tempNodeOut=string.Empty;
try
{ ReConn();
if (_zookeeper.existsAsync(path).Result == null)
{
_logger.LogDebug("path不存在,创建");
CreateZNode(path, "", CreateMode.PERSISTENT, aclList).GetAwaiter().GetResult();
} tempNode = CreateZNode(path + "/" + sequenceName, "", CreateMode.EPHEMERAL_SEQUENTIAL, aclList).Result;
_logger.LogDebug("创建节点:{0}", tempNode);
if (tempNode == null)
{
_logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
, path + "/squence", "", CreateMode.EPHEMERAL_SEQUENTIAL);
return null;
}
_logger.LogInformation("创建成功。"); // var taskGetData=Task.Run(async () =>{
// int circleCount = 0;
// while (true)
// {
// Thread.Sleep(200);
// circleCount++;
// _logger.LogInformation("循环获取锁。当前循环次数:{0}", circleCount);
// try
// {
// var childList =await GetChildrenAsync(path, null, true);
// if (childList == null || childList.Children == null || childList.Children.Count < 1)
// {
// _logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);
// return null;
// }
// _logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); // var top = childList.Children.OrderBy(or => or).First();
// if (path + "/" + top == tempNode)
// {
// return tempNode;
// }
// }
// catch (Exception ep)
// {
// _logger.LogError(ep,"循环获取锁出错。");
// return null;
// }
// }
// });
// tempNode = taskGetData.GetAwaiter().GetResult();
// if (!string.IsNullOrEmpty(tempNode))
// {
// byte[] da = null;
// tempNodeOut = tempNode;
// da = GetDataAsync(path, null, true).Result.Data;
// if (da == null || da.Length < 1)
// {
// return string.Empty;
// }
// return System.Text.Encoding.Default.GetString(da);
// }
int clycleCount = ;
GetChild: //这里防止并发出现错误。
clycleCount++;
var childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();
if (childList == null || childList.Children == null || childList.Children.Count < )
{
_logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);
return null;
}
_logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); var top = childList.Children.OrderBy(or => or).First();
if (path + "/" + top == tempNode)
{
tempNodeOut = tempNode;
var da = GetDataAsync(path, null, true).Result.Data;
if (da == null || da.Length < )
{
return string.Empty;
}
return System.Text.Encoding.Default.GetString(da);
}
// bool isSet=
// SetWatcher(path + "/" + top,).Result;
// if(!isSet)
// {
// goto GetChild;
// }
bool isSet= SetWatcher(path + "/" + top,new WaitLockWatch(autoResetEvent[], _logger, path,_manualReset)).Result;
if(!isSet)
{
_logger.LogWarning("没有设置上watcher,需要重新运行一遍。");
goto GetChild;
}
_manualReset.WaitOne();
childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();
if (childList == null || childList.Children == null || childList.Children.Count < )
{
_logger.LogWarning("再次获取子序列失败,计数为零.path:{0}", path);
return null;
}
_logger.LogInformation("再次获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
top = childList.Children.OrderBy(or => or).First();
if (path + "/" + top == tempNode)
{
_logger.LogDebug("节点获取到锁权限。");
tempNodeOut = tempNode;
var da = GetDataAsync(path, null, true).Result.Data;
if (da == null || da.Length < )
{
return string.Empty;
}
return System.Text.Encoding.Default.GetString(da);
}
else
{
_logger.LogDebug("没有获取到锁权限,进行循环。循环第:{0}次",clycleCount);
Thread.Sleep();
goto GetChild;
// Sync(path); //DeleteNode(tempNode).GetAwaiter().GetResult();
// DeleteNode(tempNode).GetAwaiter().GetResult();
// _logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");
// if (_zookeeper.existsAsync(tempNode).Result== null)
// {
// _logger.LogWarning("tempNode:{0}存在,但是没有获取到锁,在等待的时候,被线程检查程序释放了阻塞,属于误伤"
// ,tempNode); // }
// else
// {
// _logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");
// }
} }
catch(ConnectionLossException cle)
{
_logger.LogError(cle, "获取同步锁出现错误。连接丢失");
}
catch(SessionExpiredException sep)
{
_logger.LogError(sep, "获取同步锁出现错误。连接过期");
}
catch(KeeperException kep)
{
_logger.LogError(kep, "获取同步锁出现错误。操作zookeeper出错");
}
catch (Exception ep)
{
_logger.LogError(ep, "获取同步锁出现错误。");
if (!string.IsNullOrEmpty(tempNode))
{
try{
DeleteNode(tempNode).GetAwaiter().GetResult();
}catch(Exception)
{ }
}
} return null;
} private void ReConn()
{
_logger.LogInformation("检查连接状态,status:{0}",_zookeeper.getState());
if(_zookeeper.getState()==States.CLOSED
||_zookeeper.getState()== States.NOT_CONNECTED)
{
_logger.LogInformation("连接为关闭,开始重新连接。");
Conn(_zookeeperOptions);
}
} public async void Close(string tempNode)
{
try
{
await _zookeeper.closeAsync();
}
catch (Exception ep)
{
_logger.LogError("zookeeper关闭失败。", ep);
}
} }
}

下面看结果

咱们关闭master2,然后看看slave1:

大家看时间上,master2已经2分钟没运行了,如果要有好的监控,还是最好实现remote,也不是什么难事,将通用主机改为webhost,然后写几个api就行了,就会实时的监控quartz主机的状态

,而且分片也会自动去除失败的主机,自动分派任务。咱们看看分片的情况:

master2已经失败,但是这里没有去掉,不过不影响接下来的任务,再者master2的任务如果在失败的时候没执行完成,那么会有一部分数据是没有处理的。

三 总结                  

quartz扩展主要注意两点,一是job listeners和trigger listeners,做好异常处理,尤其是trigger listeners,如果出错,job会失去控制,不激发,而且job状态也会失效,必须重新关闭和重新运行一次。在写业务代码的时候,分片的需要处理的数据源必须是有规律自增的或者是静态的,这样分页才满足业务分片的要求。如果是非自增的或者随机增加,那么quartz就必须把需要处理的主键存进去,但是这样的需求毕竟是少数。

微服务系列的github:https://github.com/ck0074451665/Walt.Framework.git

测试例子:https://github.com/ck0074451665/Walt.MicroServicesTest.git

管理界面:https://pan.baidu.com/s/1gYNDX1j3-XctuPiejV2XPQ

最新文章

  1. java开发命名规范
  2. Flex air修改外部xml文件 (转)
  3. OC第七节——内存管理
  4. Linux 导入epel源
  5. idea快捷键(转)
  6. 107. Binary Tree Level Order Traversal II
  7. iOS之pch文件的正确使用
  8. hihocoder 北大网络赛 E.
  9. bzoj2127: happiness
  10. oracle Can&#39;t connect to X11 window server using &#39;:0.0&#39; /Checking monitor: must be configured to display at least 256 colors解决方法
  11. 在Java中Arrays工具类实现功能的六种方法
  12. Orchard 学习
  13. NOIp2018 复习笔记
  14. css样式问题解决
  15. C# Monitor的Wait和Pulse方法使用详解
  16. 读《SQL优化核心思想》:你不知道的优化技巧
  17. TableStore:多行数据操作
  18. 工具神器推荐 Vox 和 search everything
  19. Dream_Spark-----Spark 定制版:003~Spark Streaming(三)
  20. Linux 守护进程

热门文章

  1. Web安全学习笔记之HTTP协议
  2. 配置windows qt开发环境
  3. 网易编程题——Fibonacci数列
  4. Java解析XML的四种方法详解 - 转载
  5. Android开发中的logcat工具使用
  6. KNN 算法,以及与Kmeans的简单对比
  7. sql内连接外连接自然连接
  8. C++(十九) — const 和 #define 区别
  9. socket http read
  10. Oracle Rman恢复