EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));

跟着代码一步步往下看:

public class EventDispatcher {

    /**
* add event listener
*/
static public void addEventListener(AbstractEventListener listener) {
for (Class<? extends Event> type : listener.interest()) {
getEntry(type).listeners.addIfAbsent(listener);
}
} /**
* fire event, notify listeners.
*/
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
} for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}

这个getEntry(event.getClass()).listeners需要重点看下

/**
* get event listener for eventType. Add Entry if not exist.
*/
static Entry getEntry(Class<? extends Event> eventType) {
for (; ; ) {
for (Entry entry : LISTENER_HUB) {
if (entry.eventType == eventType) {
return entry;
}
} Entry tmp = new Entry(eventType);
/**
* false means already exists
*/
if (LISTENER_HUB.addIfAbsent(tmp)) {
return tmp;
}
}
}
    static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();

这个LISTENER_HUB 会预先把几个相应的Entry 加载进去(这个我们后面分析),然后就开始onEvent,通知Listeners了;

@Service
public class AsyncNotifyService extends AbstractEventListener { @Override
public List<Class<? extends Event>> interest() {
List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>();
// 触发配置变更同步通知
types.add(ConfigDataChangeEvent.class);
return types;
} @Override
public void onEvent(Event event) { // 并发产生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
   // listen的服务地址
List<?> ipList = serverListService.getServerList(); // 其实这里任何类型队列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (int i = 0; i < ipList.size(); i++) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta));
}
EXCUTOR.execute(new AsyncTask(httpclient, queue));
}
}

把这个配置关联的服务一个个通知下,建议大家可以结合观察者模式来理解这个;

    private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());

然后把这个通知服务扔到县线程池,放到另外一个线程中去执行通知任务;

 class AsyncTask implements Runnable {

        public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) {
this.httpclient = httpclient;
this.queue = queue;
} @Override
public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() {
while (!queue.isEmpty()) { NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(
targetIp)) {
// 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知
if (serverListService.isHealthCheck()
&& ServerListService.getServerListUnhealth().contains(targetIp)) {
// target ip 不健康,则放入通知列表中
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(),
LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
// get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
String.valueOf(task.getLastModified()));
request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
if (task.isBeta) {
request.setHeader("isBeta", "true");
}
httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));
}
}
}
} private Queue<NotifySingleTask> queue;
private CloseableHttpAsyncClient httpclient; }

上面这段代码明天再慢慢解析吧,加油!

最新文章

  1. sql数据库获取表名称和表列名
  2. C#设计模式 - 观察者模式(使用委托)
  3. iOS-NSDate
  4. SqlSever基础 rtrim与ltrim嵌套使用 除去字符串左右两边的空格
  5. Mysql Partition 理论知识总结
  6. Struts+Spring+Hibernate进阶开端(一)
  7. JVM 指令集合
  8. 我的MYSQL学习心得(七)
  9. Java 静态内部类注意点
  10. 笔记13 AOP中After和AfterReturning的区别
  11. python实现购物车
  12. 开发Canvas 绘画应用(二):实现绘画
  13. Kali学习笔记26:OWASP_ZAP
  14. 通过父元素的hover控制子元素的显示
  15. CMM:软件成熟度模型
  16. web项目,美工和前台配合,页面路径访问问题
  17. 让linux history命令显示命令的运行时间、在哪个机器运行的这个命令
  18. 最新zencart支付宝插件(支持1.5)
  19. redis基本操作命令
  20. vim乱码的解决

热门文章

  1. 【html学习整理】meta,img,表格,表单
  2. LoadRunner监控图表与配置(三)对系统与网络资源进行监控
  3. JQuery调用iframe子页面函数/对象的方法
  4. 反向ssh
  5. linux命令学习笔记(46):vmstat命令
  6. 小心transform
  7. RPM包构建
  8. vue文件名规范
  9. 千万别用MongoDB?真的吗?!
  10. MOVE降低高水位 HWM