然而Nacos的发布操作并不是上面我们想的那样通过代理去实现,通过下面的代码我们分析下:

public class NacosConfigurationPropertiesBindingPostProcessor implements BeanPostProcessor, ApplicationContextAware {

    /**
* The name of {@link NacosConfigurationPropertiesBindingPostProcessor} Bean
*/
public static final String BEAN_NAME = "nacosConfigurationPropertiesBindingPostProcessor"; private Properties globalNacosProperties; private NacosServiceFactory nacosServiceFactory; private Environment environment; private ApplicationEventPublisher applicationEventPublisher; private ConfigurableApplicationContext applicationContext;
// 类初始化之前,进行绑定监听的操作
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { NacosConfigurationProperties nacosConfigurationProperties = findAnnotation(bean.getClass(), NacosConfigurationProperties.class); if (nacosConfigurationProperties != null) {
bind(bean, beanName, nacosConfigurationProperties);
} return bean;
} private void bind(Object bean, String beanName, NacosConfigurationProperties nacosConfigurationProperties) { NacosConfigurationPropertiesBinder binder = new NacosConfigurationPropertiesBinder(applicationContext); binder.bind(bean, beanName, nacosConfigurationProperties); } @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
} @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
}

下面进行绑定监听操作!

protected void bind(final Object bean, final String beanName, final NacosConfigurationProperties properties) {

        Assert.notNull(bean, "Bean must not be null!");

        Assert.notNull(properties, "NacosConfigurationProperties must not be null!");

        final String dataId = properties.dataId();

        final String groupId = properties.groupId();

        final ConfigService configService = configServiceBeanBuilder.build(properties.properties());
// 这个就是注解里面的autoRefresh的属性是否自动刷新的值,ture的话就会触发下面的操作
if (properties.autoRefreshed()) { // Add a Listener if auto-refreshed try {
configService.addListener(dataId, groupId, new AbstractListener() {
@Override
public void receiveConfigInfo(String config) {
doBind(bean, beanName, dataId, groupId, properties, config, configService);
}
});
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
}
} String content = getContent(configService, dataId, groupId); if (hasText(content)) {
doBind(bean, beanName, dataId, groupId, properties, content, configService);
}
}
 @Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
Listener listenerAdapter = new DelegatingEventPublishingListener(configService, dataId, group, applicationEventPublisher, executor, listener);
configService.addListener(dataId, group, listenerAdapter);
publishEvent(new NacosConfigListenerRegisteredEvent(configService, dataId, group, listener, true));
}
public class NacosConfigService implements ConfigService {
public static final Logger log = LogUtils.logger(NacosConfigService.class);
public final long POST_TIMEOUT = 3000L;
private ServerHttpAgent agent;
private ClientWorker worker;
private String namespace;
private String encode;
private ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager(); public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty("encode");
if (StringUtils.isBlank(encodeTmp)) {
this.encode = "UTF-8";
} else {
this.encode = encodeTmp.trim();
} String namespaceTmp = properties.getProperty("namespace");
if (StringUtils.isBlank(namespaceTmp)) {
this.namespace = TenantUtil.getUserTenant();
properties.put("namespace", this.namespace);
} else {
this.namespace = namespaceTmp;
properties.put("namespace", this.namespace);
} this.agent = new ServerHttpAgent(properties);
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager);
} public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return this.getConfigInner(this.namespace, dataId, group, timeoutMs);
} public void addListener(String dataId, String group, Listener listener) throws NacosException {
this.worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}

ClientWorker.java

 public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) {
group = null2defaultGroup(group);
String tenant = agent.getTenant();
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}

DEBUG就能看出来初始化的时候,client把每个config封装成一个cache,每个cache再增加监听listener;

CacheData.java

 public void addListener(Listener listener) {
if (null == listener) {
throw new IllegalArgumentException("listener is null");
}
ManagerListenerWrap wrap = new ManagerListenerWrap(listener);
if (listeners.addIfAbsent(wrap)) {
LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,
listeners.size());
}
}
 DelegatingEventPublishingListener(ConfigService configService, String dataId, String groupId,
ApplicationEventPublisher applicationEventPublisher,
Executor executor, Listener delegate) {
this.configService = configService;
this.dataId = dataId;
this.groupId = groupId;
this.applicationEventPublisher = applicationEventPublisher;
this.executor = executor;
this.delegate = delegate;
}
    private void doBind(Object bean, String beanName, String dataId, String groupId,
NacosConfigurationProperties properties, String content, ConfigService configService) {
PropertyValues propertyValues = resolvePropertyValues(bean, content);
doBind(bean, properties, propertyValues);
publishBoundEvent(bean, beanName, dataId, groupId, properties, content, configService);
publishMetadataEvent(bean, beanName, dataId, groupId, properties);
} private void publishMetadataEvent(Object bean, String beanName, String dataId, String groupId,
NacosConfigurationProperties properties) { NacosProperties nacosProperties = properties.properties(); NacosConfigMetadataEvent metadataEvent = new NacosConfigMetadataEvent(properties); // Nacos Metadata
metadataEvent.setDataId(dataId);
metadataEvent.setGroupId(groupId);
Properties resolvedNacosProperties = configServiceBeanBuilder.resolveProperties(nacosProperties);
Map<String, Object> nacosPropertiesAttributes = getAnnotationAttributes(nacosProperties);
metadataEvent.setNacosPropertiesAttributes(nacosPropertiesAttributes);
metadataEvent.setNacosProperties(resolvedNacosProperties); // Bean Metadata
Class<?> beanClass = bean.getClass();
metadataEvent.setBeanName(beanName);
metadataEvent.setBean(bean);
metadataEvent.setBeanType(beanClass);
metadataEvent.setAnnotatedElement(beanClass); // Publish event
applicationEventPublisher.publishEvent(metadataEvent);
}

这样就自动发布了事件,然后我们的监听就收到了事件,然后触发相应的操作;下面我们结合Nacos一起debug下,看下效果!

ClientWorker.java
  /**
* groupKey -> cacheData
*/
AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager; executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
}); executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
}); executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
线程任务!
class LongPollingRunnable implements Runnable {
private int taskId; public LongPollingRunnable(int taskId) {
this.taskId = taskId;
} public void run() {
try {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
} List<String> inInitializingCacheList = new ArrayList<String>();
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
} catch (Throwable e) {
LOGGER.error("longPolling error", e);
} finally {
executorService.execute(this);
}
}
}
private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 没有 -> 有
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
} // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
dataId, group, tenant);
return;
} // 有变更
if (cacheData.isUseLocalConfigInfo() && path.exists()
&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
}
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener; Runnable job = new Runnable() {
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener)listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listener.receiveConfigInfo(contentTmp);
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException de) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
md5, listener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
}; final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}

listener.receiveConfigInfo(contentTmp);

DelegatingEventPublishingListener.java就会发布事件,一个闭环就形成了!
  public void receiveConfigInfo(String content) {
this.publishEvent(content);
this.onReceived(content);
}

客户端会有个长轮询不断去Nacos平台去取值,更新缓存;所以并不是Nacos推过来,而是client主动去取值的!

但是我们还有个核心的问题没搞明白,我们取值的时候,怎么去cacheData里面取值的?

这些成熟的产品都是经过N次迭代,不断的打磨才完善的,多人多日,现在我们自己看的时候有疑惑是很正常的,想办法,加油!

下面是在网上查的资料

  • BeanFactoryPostProcessor.postProcessBeanFactory接口函数处理注解@NacosPropertySource和xml property source配置,获取配置,插入spring配置源,注册配置变更监听回调函数
  • InstantiationAwareBeanPostProcessor.postProcessPropertyValues接口函数
    处理@NacosInjected注入configservice服务
    处理@NacosValue注入属性的配置值信息,
  • BeanPostProcessor.postProcessBeforeInitialization
    处理@NacosConfigurationProperties,注入配置到目标bean,注册配置变更监听回调更新目标bean
    处理@NacosValue,缓存标记的配置项用做后续动态更新
  • ContextRefreshedEvent监听,spring初始化完成时触发。
    处理@NacosConfigListener, 注册配置变更监听回调。
  • ApplicationListener.onApplicationEvent监听spring事件NacosConfigReceivedEvent,每次从配置服务端获取配置时都会触发该事件,用于动态变更@NacosValue注解目标。

基本上就是这样一个逻辑,

NacosValueAnnotationBeanPostProcessor.java
public void onApplicationEvent(NacosConfigReceivedEvent event) {
String content = event.getContent();
if (content != null) {
Properties configProperties = NacosUtils.toProperties(content);
Iterator var4 = configProperties.keySet().iterator(); while(true) {
String propertyKey;
List beanPropertyList;
do {
if (!var4.hasNext()) {
return;
} Object key = var4.next();
propertyKey = (String)key;
beanPropertyList = (List)this.placeholderNacosValueTargetMap.get(propertyKey);
} while(beanPropertyList == null); String propertyValue = configProperties.getProperty(propertyKey);
Iterator var9 = beanPropertyList.iterator(); while(var9.hasNext()) {
NacosValueAnnotationBeanPostProcessor.NacosValueTarget nacosValueTarget = (NacosValueAnnotationBeanPostProcessor.NacosValueTarget)var9.next();
if (nacosValueTarget.method == null) {
this.setField(nacosValueTarget, propertyValue);
} else {
this.setMethod(nacosValueTarget, propertyValue);
}
}
}
}
}

他会变更这个property对应的key值;我们结合上面的代码看下:

下面标色的这些类复杂,很多我们专注也业务端代码的同学们可能都没见过,后面我们就一一分析归类

public class NacosConfigBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware, BeanFactoryAware 
public class NacosPropertySourcePostProcessor implements BeanDefinitionRegistryPostProcessor, BeanFactoryPostProcessor, EnvironmentAware, Ordered 
public abstract class AbstractNacosPropertySourceBuilder<T extends BeanDefinition> implements EnvironmentAware, BeanFactoryAware, BeanClassLoaderAware, ApplicationContextAware, InitializingBean 
public class CacheableEventPublishingNacosServiceFactory implements NacosServiceFactory, ApplicationContextAware 
public class NacosConfigurationPropertiesBindingPostProcessor implements BeanPostProcessor, ApplicationContextAware {

最新文章

  1. 深入理解DOM事件类型系列第二篇——键盘事件
  2. C# 读写excel 用于导入数据库 批量导入导出excel
  3. shell中如何判断某一命令是否存在
  4. Android开发之自定义圆形的ImageView的实现
  5. 【CF】196 Div.2 D. Book of Evil
  6. 解决IE11不能进行webTest脚本录制的方法
  7. Hibernate 缓存 关于注解方式
  8. HDU2647(拓扑排序+反向建图)
  9. Linux 压缩解压命令
  10. 简单选择排序 Selection Sort 和树形选择排序 Tree Selection Sort
  11. [自制操作系统] 原子操作&核间中断&读写锁&PRWLock
  12. 【Tomcat】Tomcat配置JVM参数步骤
  13. d3.js d3.transform 方法移除的解决方案
  14. Java类更改常量后编译不生效
  15. js的map遍历和array遍历
  16. 第七篇——Struts2的接收参数
  17. 从实践出发:微服务布道师告诉你Spring Cloud与Spring Boot他如何选择
  18. git config --global user.email
  19. 题解 P1378 【油滴扩展】
  20. 题目1460:Oil Deposit(递归遍历图)

热门文章

  1. PHP获取一年中每个星期的开始和结束日期的方法
  2. linux命令学习笔记(55):traceroute命令
  3. ACM学习历程—Hihocoder 1289 403 Forbidden(字典树 || (离线 &amp;&amp; 排序 &amp;&amp; 染色))
  4. bzoj 3572: [Hnoi2014]世界树 虚树
  5. spark减少提交jar包处理
  6. bzoj 4823 &amp; 洛谷 P3756 老C的方块 —— 最小割
  7. 第四章——Lock的使用
  8. 利用src.rpm包修改源码后重新制作rpm包
  9. 关于 vs 2012 键盘无法输入的问题
  10. VisualGDB系列2:VisualGDB对Linux平台的支持特性