接上一篇 dubbo-server 之后,再来看一下 dubbo-client 是如何工作的。

dubbo提供者服务示例, 其结构是这样的!


// 提供者:
public class Provider {
public static void main(String[] args) throws Exception {
//Prevent to get IPV6 address,this way only work in debug mode
//But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start(); // 这个比较巧妙, 只需停留在当前点,不按下enter键,服务就会一直存在,等待消费者连接,而且无需真正提供一个监听服务
System.in.read(); // press any key to exit
} } // 消费者:
public class Consumer {
public static void main(String[] args) {
//Prevent to get IPV6 address,this way only work in debug mode
//But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
String hello = demoService.sayHello("world"); // call remote method, 调用远程服务和本地服务一样,这是其优势所在
System.out.println(hello); // get result

以上代码可以直接运行起来,但是建议还是个搭建一个Zookeeper来用用,因为线上基本都是Zk的,搭建也很简单,可参考:ZooKeeper 搭建笔记

初始化Context,如上一篇server过程! Rpc框架dubbo-server(v2.6.3) 源码阅读(一)

// DubboNamespaceHandler, provider与consumer主要的差别在于xml标签的获取不一致,
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static {
} @Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
// provider解析
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
// consumer解析
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
} }

// 主要看获取bean的过程
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

 // spring, 获取bean, 通过 org.springframework.beans.factory.support.DefaultListableBeanFactory, 获取
public Object getBean(String name) throws BeansException {
return getBeanFactory().getBean(name);
} protected <T> T doGetBean(
final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
throws BeansException { final String beanName = transformedBeanName(name);
Object bean; // Eagerly check singleton cache for manually registered singletons.
// 通过该方法获取bean代理实例,com.alibaba.dubbo.config.spring.ReferenceBean
Object sharedInstance = getSingleton(beanName);
if (sharedInstance != null && args == null) {
if (logger.isDebugEnabled()) {
if (isSingletonCurrentlyInCreation(beanName)) {
logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
"' that is not fully initialized yet - a consequence of a circular reference");
else {
logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
} else {
// Fail if we're already creating this bean instance:
// We're assumably within a circular reference.
if (isPrototypeCurrentlyInCreation(beanName)) {
throw new BeanCurrentlyInCreationException(beanName);
} // Check if bean definition exists in this factory.
BeanFactory parentBeanFactory = getParentBeanFactory();
if (parentBeanFactory != null && !containsBeanDefinition(beanName)) {
// Not found -> check parent.
String nameToLookup = originalBeanName(name);
if (args != null) {
// Delegation to parent with explicit args.
return (T) parentBeanFactory.getBean(nameToLookup, args);
else {
// No args -> delegate to standard getBean method.
return parentBeanFactory.getBean(nameToLookup, requiredType);
} if (!typeCheckOnly) {
} try {
final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);
checkMergedBeanDefinition(mbd, beanName, args); // Guarantee initialization of beans that the current bean depends on.
String[] dependsOn = mbd.getDependsOn();
if (dependsOn != null) {
for (String dep : dependsOn) {
if (isDependent(beanName, dep)) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"Circular depends-on relationship between '" + beanName + "' and '" + dep + "'");
registerDependentBean(dep, beanName);
try {
catch (NoSuchBeanDefinitionException ex) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"'" + beanName + "' depends on missing bean '" + dep + "'", ex);
} // Create bean instance.
if (mbd.isSingleton()) {
sharedInstance = getSingleton(beanName, new ObjectFactory<Object>() {
public Object getObject() throws BeansException {
try {
return createBean(beanName, mbd, args);
catch (BeansException ex) {
// Explicitly remove instance from singleton cache: It might have been put there
// eagerly by the creation process, to allow for circular reference resolution.
// Also remove any beans that received a temporary reference to the bean.
throw ex;
bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
} else if (mbd.isPrototype()) {
// It's a prototype -> create a new instance.
Object prototypeInstance = null;
try {
prototypeInstance = createBean(beanName, mbd, args);
finally {
bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
} else {
String scopeName = mbd.getScope();
final Scope scope = this.scopes.get(scopeName);
if (scope == null) {
throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
try {
Object scopedInstance = scope.get(beanName, new ObjectFactory<Object>() {
public Object getObject() throws BeansException {
try {
return createBean(beanName, mbd, args);
finally {
bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
catch (IllegalStateException ex) {
throw new BeanCreationException(beanName,
"Scope '" + scopeName + "' is not active for the current thread; consider " +
"defining a scoped proxy for this bean if you intend to refer to it from a singleton",
catch (BeansException ex) {
throw ex;
} // Check if required type matches the type of the actual bean instance.
if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
try {
return getTypeConverter().convertIfNecessary(bean, requiredType);
catch (TypeMismatchException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to convert bean '" + name + "' to required type '" +
ClassUtils.getQualifiedName(requiredType) + "'", ex);
throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
return (T) bean;

// ApplicationConfig 初始化时,调用dubbo实例进行初始化

    // org.springframework.beans.BeanUtils.instantiateClass 初始化 ReferenceConfig 类,通过反射调用
// public com.alibaba.dubbo.config.RegistryConfig()
public static <T> T instantiateClass(Constructor<T> ctor, Object... args) throws BeanInstantiationException {
Assert.notNull(ctor, "Constructor must not be null");
try {
// 生成新的实例
return ctor.newInstance(args);
catch (InstantiationException ex) {
throw new BeanInstantiationException(ctor, "Is it an abstract class?", ex);
catch (IllegalAccessException ex) {
throw new BeanInstantiationException(ctor, "Is the constructor accessible?", ex);
catch (IllegalArgumentException ex) {
throw new BeanInstantiationException(ctor, "Illegal arguments for constructor", ex);
catch (InvocationTargetException ex) {
throw new BeanInstantiationException(ctor, "Constructor threw exception", ex.getTargetException());

// 通过getBean, 转移到 ReferenceBean.getObject() 触发代理初始化

    //     org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean, 触发 创建bean操作,触发 getObject() 进行初始化
protected <T> T doGetBean(
final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
throws BeansException { final String beanName = transformedBeanName(name);
Object bean; // Eagerly check singleton cache for manually registered singletons.
Object sharedInstance = getSingleton(beanName);
if (sharedInstance != null && args == null) {
if (logger.isDebugEnabled()) {
if (isSingletonCurrentlyInCreation(beanName)) {
logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
"' that is not fully initialized yet - a consequence of a circular reference");
else {
logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
} else {
// Fail if we're already creating this bean instance:
// We're assumably within a circular reference.
if (isPrototypeCurrentlyInCreation(beanName)) {
throw new BeanCurrentlyInCreationException(beanName);
} // Check if bean definition exists in this factory.
BeanFactory parentBeanFactory = getParentBeanFactory();
if (parentBeanFactory != null && !containsBeanDefinition(beanName)) {
// Not found -> check parent.
String nameToLookup = originalBeanName(name);
if (args != null) {
// Delegation to parent with explicit args.
return (T) parentBeanFactory.getBean(nameToLookup, args);
else {
// No args -> delegate to standard getBean method.
return parentBeanFactory.getBean(nameToLookup, requiredType);
} if (!typeCheckOnly) {
} try {
final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);
checkMergedBeanDefinition(mbd, beanName, args); // Guarantee initialization of beans that the current bean depends on.
String[] dependsOn = mbd.getDependsOn();
if (dependsOn != null) {
for (String dep : dependsOn) {
if (isDependent(beanName, dep)) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"Circular depends-on relationship between '" + beanName + "' and '" + dep + "'");
registerDependentBean(dep, beanName);
try {
catch (NoSuchBeanDefinitionException ex) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"'" + beanName + "' depends on missing bean '" + dep + "'", ex);
} // Create bean instance.
if (mbd.isSingleton()) {
// 此处进行bean创建
sharedInstance = getSingleton(beanName, new ObjectFactory<Object>() {
public Object getObject() throws BeansException {
try {
return createBean(beanName, mbd, args);
catch (BeansException ex) {
// Explicitly remove instance from singleton cache: It might have been put there
// eagerly by the creation process, to allow for circular reference resolution.
// Also remove any beans that received a temporary reference to the bean.
throw ex;
bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
} else if (mbd.isPrototype()) {
// It's a prototype -> create a new instance.
Object prototypeInstance = null;
try {
prototypeInstance = createBean(beanName, mbd, args);
finally {
bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
} else {
String scopeName = mbd.getScope();
final Scope scope = this.scopes.get(scopeName);
if (scope == null) {
throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
try {
Object scopedInstance = scope.get(beanName, new ObjectFactory<Object>() {
public Object getObject() throws BeansException {
try {
return createBean(beanName, mbd, args);
finally {
bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
catch (IllegalStateException ex) {
throw new BeanCreationException(beanName,
"Scope '" + scopeName + "' is not active for the current thread; consider " +
"defining a scoped proxy for this bean if you intend to refer to it from a singleton",
catch (BeansException ex) {
throw ex;
} // Check if required type matches the type of the actual bean instance.
if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
try {
return getTypeConverter().convertIfNecessary(bean, requiredType);
catch (TypeMismatchException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to convert bean '" + name + "' to required type '" +
ClassUtils.getQualifiedName(requiredType) + "'", ex);
throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
return (T) bean;
} // org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton
public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) {
Assert.notNull(beanName, "'beanName' must not be null");
synchronized (this.singletonObjects) {
Object singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null) {
if (this.singletonsCurrentlyInDestruction) {
throw new BeanCreationNotAllowedException(beanName,
"Singleton bean creation not allowed while singletons of this factory are in destruction " +
"(Do not request a bean from a BeanFactory in a destroy method implementation!)");
if (logger.isDebugEnabled()) {
logger.debug("Creating shared instance of singleton bean '" + beanName + "'");
boolean newSingleton = false;
boolean recordSuppressedExceptions = (this.suppressedExceptions == null);
if (recordSuppressedExceptions) {
this.suppressedExceptions = new LinkedHashSet<Exception>();
try {
singletonObject = singletonFactory.getObject();
newSingleton = true;
catch (IllegalStateException ex) {
// Has the singleton object implicitly appeared in the meantime ->
// if yes, proceed with it since the exception indicates that state.
singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null) {
throw ex;
catch (BeanCreationException ex) {
if (recordSuppressedExceptions) {
for (Exception suppressedException : this.suppressedExceptions) {
throw ex;
finally {
if (recordSuppressedExceptions) {
this.suppressedExceptions = null;
if (newSingleton) {
// ref是在此处进行初始化的,有点神奇
addSingleton(beanName, singletonObject);
return (singletonObject != NULL_OBJECT ? singletonObject : null);
} // org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.addSingleton
protected void addSingleton(String beanName, Object singletonObject) {
// 就是该 synchronized 方法,触发了 ReferenceConfig.ref 的初始化,实际上是触发了 AbstractConfig.toString() 方法
synchronized (this.singletonObjects) {
this.singletonObjects.put(beanName, (singletonObject != null ? singletonObject : NULL_OBJECT));

通过 ReferenceBean.getObject() 方法,获取代理:

 // 由 com.alibaba.dubbo.config.spring.ReferenceBean.getObject 进行获取代理,从而进行代理创建
public Object getObject() throws Exception {
return get();
} // 调用父类 com.alibaba.dubbo.config.ReferenceConfig.get() 方法, 获取实例
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
if (ref == null) {
// 未初始化时,触发一次初始化
return ref;


    // 初始化代理,并设值到 ref 变量中
private void init() {
if (initialized) {
initialized = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
// 在此处添加堆栈打印,排除路径,如下
new Throwable("*******************************ReferenceConfig.init trace dump").printStackTrace();
new Throwable().printStackTrace();
// get consumer's global configuration
if (getGeneric() == null && getConsumer() != null) {
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
checkInterfaceAndMethods(interfaceClass, methods);
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
resolveFile = userResolveFile.getAbsolutePath();
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
} catch (IOException e) {
throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if (null != fis) fis.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
resolve = properties.getProperty(interfaceName);
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null) {
logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
if (module == null) {
module = consumer.getModule();
if (registries == null) {
registries = consumer.getRegistries();
if (monitor == null) {
monitor = consumer.getMonitor();
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
if (monitor == null) {
monitor = module.getMonitor();
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
if (monitor == null) {
monitor = application.getMonitor();
// 检查mock配置情况,尝试调用mock实例
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
} // 获取wrapper方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prefix = StringUtils.getServiceKey(map);
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
appendAttributes(attributes, method, prefix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
} String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //attributes are stored by system context.
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);


    // AbstractInterfaceConfig.checkStubAndMock()
protected void checkStubAndMock(Class<?> interfaceClass) {
if (ConfigUtils.isNotEmpty(local)) {
Class<?> localClass = ConfigUtils.isDefault(local) ? ReflectUtils.forName(interfaceClass.getName() + "Local") : ReflectUtils.forName(local);
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());
try {
ReflectUtils.findConstructor(localClass, interfaceClass);
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such constructor \"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")\" in local implementation class " + localClass.getName());
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> localClass = ConfigUtils.isDefault(stub) ? ReflectUtils.forName(interfaceClass.getName() + "Stub") : ReflectUtils.forName(stub);
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());
try {
ReflectUtils.findConstructor(localClass, interfaceClass);
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such constructor \"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")\" in local implementation class " + localClass.getName());
if (ConfigUtils.isNotEmpty(mock)) {
if (mock.startsWith(Constants.RETURN_PREFIX)) {
String value = mock.substring(Constants.RETURN_PREFIX.length());
try {
} catch (Exception e) {
throw new IllegalStateException("Illegal mock json value in <dubbo:service ... mock=\"" + mock + "\" />");
} else {
Class<?> mockClass = ConfigUtils.isDefault(mock) ? ReflectUtils.forName(interfaceClass.getName() + "Mock") : ReflectUtils.forName(mock);
if (!interfaceClass.isAssignableFrom(mockClass)) {
throw new IllegalStateException("The mock implementation class " + mockClass.getName() + " not implement interface " + interfaceClass.getName());
try {
mockClass.getConstructor(new Class<?>[0]);
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such empty constructor \"public " + mockClass.getSimpleName() + "()\" in mock implementation class " + mockClass.getName());


    // com.alibaba.dubbo.common.bytecode.Wrapper
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
c = c.getSuperclass(); if (c == Object.class)
return OBJECT_WRAPPER; Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
return ret;
} private static Wrapper makeWrapper(Class<?> c) {
if (c.isPrimitive())
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c); String name = c.getName();
ClassLoader cl = ClassHelper.getClassLoader(c); StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ "); c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // <property name, property types>
Map<String, Method> ms = new LinkedHashMap<String, Method>(); // <method desc, Method instance>
List<String> mns = new ArrayList<String>(); // method names.
List<String> dmns = new ArrayList<String>(); // declaring method names. // get all public field.
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))
continue; c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
pts.put(fn, ft);
} Method[] methods = c.getMethods();
// get all public method.
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
for (Method m : methods) {
if (m.getDeclaringClass() == Object.class) //ignore Object's method.
continue; String mn = m.getName();
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
c3.append(" && ").append(" $3.length == ").append(len); boolean override = false;
for (Method m2 : methods) {
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
} c3.append(" ) { "); if (m.getReturnType() == Void.TYPE)
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");"); c3.append(" }"); mns.add(mn);
if (m.getDeclaringClass() == c)
ms.put(ReflectUtils.getDesc(m), m);
if (hasMethod) {
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
} c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }"); // deal with get/set method.
Matcher matcher;
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = (Method) entry.getValue();
if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
pts.put(pn, pt);
c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); // make class
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
ClassGenerator cc = ClassGenerator.newInstance(cl);
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
cc.setSuperClass(Wrapper.class); cc.addDefaultConstructor();
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for (int i = 0, len = ms.size(); i < len; i++)
cc.addField("public static Class[] mts" + i + ";"); cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c3.toString()); try {
Class<?> wc = cc.toClass();
// setup static field.
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values())
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {

// InvokerInvocationHandler, 代理所有的dubbo请求处理

// InvokerInvocationHandler, 代理所有的dubbo请求处理
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
} @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
return invoker.invoke(new RpcInvocation(method, args)).recreate();
} }

// 小插曲,日志打印

//FailsafeLogger.warn(), 格式为: No spring extension(bean) named:defaultCompiler, try to find an extension(bean) of type java.lang.String, dubbo version: , current host:

public void warn(String msg, Throwable e) {
try {
logger.warn(appendContextMessage(msg), e);
} catch (Throwable t) {
private String appendContextMessage(String msg) {
return " [DUBBO] " + msg + ", dubbo version: " + Version.getVersion() + ", current host: " + NetUtils.getLocalHost();
} private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);

// wrapper0

    // 生成的包装类如下: com.alibaba.dubbo.common.bytecode.Wrapper0
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
import com.alibaba.dubbo.demo.DemoService;
import java.lang.reflect.InvocationTargetException;
import java.util.Map; public class Wrapper0 extends Wrapper implements DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0; public String[] getPropertyNames() {
return pns;
} public boolean hasProperty(String var1) {
return pts.containsKey(var1);
} public Class getPropertyType(String var1) {
return (Class)pts.get(var1);
} public String[] getMethodNames() {
return mns;
} public String[] getDeclaredMethodNames() {
return dmns;
} public void setPropertyValue(Object var1, String var2, Object var3) {
try {
DemoService var4 = (DemoService)var1;
} catch (Throwable var6) {
throw new IllegalArgumentException(var6);
} throw new NoSuchPropertyException("Not found property \"" + var2 + "\" filed or setter method in class com.alibaba.dubbo.demo.DemoService.");
} public Object getPropertyValue(Object var1, String var2) {
try {
DemoService var3 = (DemoService)var1;
} catch (Throwable var5) {
throw new IllegalArgumentException(var5);
} throw new NoSuchPropertyException("Not found property \"" + var2 + "\" filed or setter method in class com.alibaba.dubbo.demo.DemoService.");
} public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
DemoService var5;
try {
var5 = (DemoService)var1;
} catch (Throwable var8) {
throw new IllegalArgumentException(var8);
} try {
if ("sayHello".equals(var2) && var3.length == 1) {
return var5.sayHello((String)var4[0]);
} catch (Throwable var9) {
throw new InvocationTargetException(var9);
} throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.alibaba.dubbo.demo.DemoService.");
} public Wrapper0() {

// createProxy

    // com.alibaba.dubbo.common.bytecode.ClassGenerator, 生成class实例
public Class<?> toClass() {
return toClass(ClassHelper.getClassLoader(ClassGenerator.class), getClass().getProtectionDomain());
} public Class<?> toClass(ClassLoader loader, ProtectionDomain pd) {
if (mCtc != null)
long id = CLASS_NAME_COUNTER.getAndIncrement();
try {
CtClass ctcs = mSuperClass == null ? null : mPool.get(mSuperClass);
if (mClassName == null)
mClassName = (mSuperClass == null || javassist.Modifier.isPublic(ctcs.getModifiers())
? ClassGenerator.class.getName() : mSuperClass + "$sc") + id;
mCtc = mPool.makeClass(mClassName);
if (mSuperClass != null)
// 添加dubbo ClassGenerator 生成的动态类的标志,接口方法为空
mCtc.addInterface(mPool.get(DC.class.getName())); // add dynamic class tag.
if (mInterfaces != null)
for (String cl : mInterfaces) mCtc.addInterface(mPool.get(cl));
if (mFields != null)
for (String code : mFields) mCtc.addField(CtField.make(code, mCtc));
if (mMethods != null) {
for (String code : mMethods) {
if (code.charAt(0) == ':')
mCtc.addMethod(CtNewMethod.copy(getCtMethod(mCopyMethods.get(code.substring(1))), code.substring(1, code.indexOf('(')), mCtc, null));
mCtc.addMethod(CtNewMethod.make(code, mCtc));
if (mDefaultConstructor)
if (mConstructors != null) {
for (String code : mConstructors) {
if (code.charAt(0) == ':') {
mCtc.addConstructor(CtNewConstructor.copy(getCtConstructor(mCopyConstructors.get(code.substring(1))), mCtc, null));
} else {
String[] sn = mCtc.getSimpleName().split("\\$+"); // inner class name include $.
mCtc.addConstructor(CtNewConstructor.make(code.replaceFirst(SIMPLE_NAME_TAG, sn[sn.length - 1]), mCtc));
return mCtc.toClass(loader, pd);
} catch (RuntimeException e) {
throw e;
} catch (NotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
} catch (CannotCompileException e) {
throw new RuntimeException(e.getMessage(), e);
} // 创建操作代理类
// ref = createProxy(map); 生成新proxy {methods=sayHello, timestamp=1537527140342, dubbo=2.0.2, register.ip=, application=demo-consumer, check=false, side=consumer, pid=8776, interface=com.alibaba.dubbo.demo.DemoService, qos.port=33333}
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false;
} else {
isJvmRefer = isInjvm().booleanValue();
} if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
} else { // assemble URL from register center's configuration
// 最后得到的地址是这样的: registry://
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加载监控地址,以便进行上报数据
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
} if (urls.size() == 1) {
// Protocol$Adaptive, 动态生成的代理类,调用远程方法 com.alibaba.dubbo.remoting.transport.AbstractClient(), 先将自己注册到注册中心,再调用提供者方法
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at com.alibaba.dubbo.remoting.transport.AbstractClient.<init>(AbstractClient.java:80)
at com.alibaba.dubbo.remoting.transport.netty.NettyClient.<init>(NettyClient.java:59)
at com.alibaba.dubbo.remoting.transport.netty.NettyTransporter.connect(NettyTransporter.java:37)
at com.alibaba.dubbo.remoting.Transporter$Adaptive.connect(Transporter$Adaptive.java:-1)
at com.alibaba.dubbo.remoting.Transporters.connect(Transporters.java:75)
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger.connect(HeaderExchanger.java:39)
at com.alibaba.dubbo.remoting.exchange.Exchangers.connect(Exchangers.java:109)
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.initClient(DubboProtocol.java:417)
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getSharedClient(DubboProtocol.java:384)
- locked <0xb4f> (a java.lang.Object)
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.getClients(DubboProtocol.java:355)
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer(DubboProtocol.java:337)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:108)
at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:67)
at com.alibaba.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)
at com.alibaba.dubbo.registry.integration.RegistryDirectory.toInvokers(RegistryDirectory.java:387)
at com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(RegistryDirectory.java:253)
at com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(RegistryDirectory.java:223)
- locked <0xb2f> (a com.alibaba.dubbo.registry.integration.RegistryDirectory)
at com.alibaba.dubbo.registry.support.AbstractRegistry.notify(AbstractRegistry.java:414)
at com.alibaba.dubbo.registry.support.FailbackRegistry.doNotify(FailbackRegistry.java:280)
at com.alibaba.dubbo.registry.support.FailbackRegistry.notify(FailbackRegistry.java:266)
at com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe(ZookeeperRegistry.java:190)
at com.alibaba.dubbo.registry.support.FailbackRegistry.subscribe(FailbackRegistry.java:196)
at com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe(RegistryDirectory.java:159)
at com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(RegistryProtocol.java:307)
at com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(RegistryProtocol.java:288)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper.refer(ProtocolFilterWrapper.java:106)
at com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper.refer(ProtocolListenerWrapper.java:65)
// 从此处开始进行调用
at com.alibaba.dubbo.rpc.Protocol$Adaptive.refer(Protocol$Adaptive.java:-1)
at com.alibaba.dubbo.config.ReferenceConfig.createProxy(ReferenceConfig.java:395)
at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:334)
at com.alibaba.dubbo.config.ReferenceConfig.get(ReferenceConfig.java:163)
- locked <0x76b> (a com.alibaba.dubbo.config.spring.ReferenceBean)
at com.alibaba.dubbo.config.spring.ReferenceBean.getObject(ReferenceBean.java:66)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:170)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
- locked <0x790> (a java.util.concurrent.ConcurrentHashMap)
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1640)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:254)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1080)
at com.alibaba.dubbo.demo.consumer.Consumer.main(Consumer.java:30)
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
} Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
if (c == null) {
c = true; // default true
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
// create service proxy
return (T) proxyFactory.getProxy(invoker);
} // loadRegistries protected List<URL> loadRegistries(boolean provider) {
// 加载前,先检查是否已初始化
List<URL> registryList = new ArrayList<URL>();
if (registries != null && !registries.isEmpty()) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
// 解析注册中心地址列表
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
return registryList;


    // com.alibaba.dubbo.rpc.Protocol$Adaptive, 动态生成的类,去调用远程方法

public class Protocol$Adaptive implements Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
} public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
} public Invoker refer(Class var1, URL var2) throws RpcException {
if (var2 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");
} else {
Protocol var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4);
return var5.refer(var1, var2);
} public Exporter export(Invoker var1) throws RpcException {
if (var1 == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
} else if (var1.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
} else {
URL var2 = var1.getUrl();
String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();
if (var3 == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");
} else {
Protocol var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);
return var4.export(var1);
} public Protocol$Adaptive() {
    // com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe, 订阅服务
public void subscribe(URL url) {
registry.subscribe(url, this);
} // 以zookeeper订阅为例,看一下订阅过程,com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doSubscribe()
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
zkListener = listeners.get(listener);
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
zkListener = listeners.get(listener);
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
notify(url, listener, urls);
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
} // com.alibaba.dubbo.registry.support.AbstractRegistry.notify, 获取服务提供者地址
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
if (result.size() == 0) {
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
// 通知观察者
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
    // 获取registry,以决定调用哪个协议实现
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
// 创建registry, 该方法为抽象方法只能由具体的实现类实现
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
} // 该方法为在生成代理时自动生成实现
protected abstract Registry createRegistry(URL url); // 生成的registry代理类如下
package com.alibaba.dubbo.registry; import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class RegistryFactory$Adaptive implements RegistryFactory {
public Registry getRegistry(URL var1) {
if (var1 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var3 = var1.getProtocol() == null ? "dubbo" : var1.getProtocol();
if (var3 == null) {
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + var1.toString() + ") use keys([protocol])");
} else {
RegistryFactory var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(var3);
return var4.getRegistry(var1);
} public RegistryFactory$Adaptive() {
    // 创建 Registry, todo: 返回xxxRegistry 实例
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
} // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.refer,
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker. 初始化调用远程方法
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
return invoker;
} //
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
} ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
return clients;

// netty 连接远程服务过程

    // 初始化调用端,此处开始连接netty
private ExchangeClient initClient(URL url) { // client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
} ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
return client;
} // com.alibaba.dubbo.remoting.exchange.Exchangers.connect
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
if (handler == null) {
throw new IllegalArgumentException("handler == null");
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
} // com.alibaba.dubbo.remoting.Transporters.connect
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
return getTransporter().connect(url, handler);
} // com.alibaba.dubbo.remoting.transport.netty
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
} @Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
} } // com.alibaba.dubbo.remoting.transport.AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler); send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); try {
// 模板方法,由子类实现
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
try {
// connect.
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
} executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
// 将连接步骤固化,调用实现类的 doConnect
protected void connect() throws RemotingException {
try {
if (isConnected()) {
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
} // com.alibaba.dubbo.remoting.transport.netty.NettyClient.
public class NettyClient extends AbstractClient { private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); // ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
private ClientBootstrap bootstrap; private volatile Channel channel; // volatile, please copy reference to use public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
} @Override
protected void doOpen() throws Throwable {
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
} @Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
} finally {
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
} finally {
NettyClient.this.channel = null;
} else {
NettyClient.this.channel = newChannel;
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
} finally {
if (!isConnected()) {
} @Override
protected void doDisConnect() throws Throwable {
try {
} catch (Throwable t) {
} @Override
protected void doClose() throws Throwable {
/*try {
} catch (Throwable t) {
} @Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
return NettyChannel.getOrAddChannel(c, getUrl(), this);
} } // com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper.getProxy
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
T proxy = proxyFactory.getProxy(invoker);
if (GenericService.class != invoker.getInterface()) {
String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> serviceType = invoker.getInterface();
if (ConfigUtils.isDefault(stub)) {
if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
stub = serviceType.getName() + "Stub";
} else {
stub = serviceType.getName() + "Local";
try {
Class<?> stubClass = ReflectUtils.forName(stub);
if (!serviceType.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
try {
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
proxy = (T) constructor.newInstance(new Object[]{proxy});
//export stub service
URL url = invoker.getUrl();
if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
try {
export(proxy, (Class) invoker.getInterface(), url);
} catch (Exception e) {
LOGGER.error("export a stub service error.", e);
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
} catch (Throwable t) {
LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
// ignore
return proxy;
* AbstractProxyFactory com.alibaba.dubbo.rpc.proxy.AbstractProxyFactory
public abstract class AbstractProxyFactory implements ProxyFactory { @Override
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return getProxy(invoker, false);
} @Override
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
} if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = GenericService.class;
} return getProxy(invoker, interfaces);
} public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types); } // com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory, 调用 wrapper 去处理
public class JavassistProxyFactory extends AbstractProxyFactory { @Override
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
} @Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
} } // 保存调用路径
public static boolean initConsumerModel(String serviceName, ConsumerModel consumerModel) {
if (consumedServices.putIfAbsent(serviceName, consumerModel) != null) {
logger.warn("Already register the same consumer:" + serviceName);
return false;
return true;

// String hello = demoService.sayHello("world"); // call remote method,
// 启用该远程方法,其实是调用的一个代理类,动态生成

"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:38)
at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)
at com.alibaba.dubbo.demo.consumer.Consumer.main(Consumer.java:35)
// 调用动态代理类的方法,其实为调用 InvocationHandler方法
public class proxy0 implements DC, EchoService, DemoService {
public static Method[] methods;
private InvocationHandler handler; public String sayHello(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (String)var3;
} public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[1], var2);
return (Object)var3;
} public proxy0() {
} public proxy0(InvocationHandler var1) {
this.handler = var1;
// com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke 方法,使用反射调用 invoker方法
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
} @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
// 调用通用方法
return invoker.invoke(new RpcInvocation(method, args)).recreate();
} } // com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.
public Result invoke(Invocation invocation) throws RpcException {
Result result = null; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock, 调用真实代理
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
result = doMockInvoke(invocation, e);
return result;
} // com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke
public Result invoke(final Invocation invocation) throws RpcException {
LoadBalance loadbalance = null;
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
// 获取负载均衡策略,默认为 random
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
// 如果是异步调用,会先创建一个调用id,以便在需要的时候使用
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
} // 模板方法
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException; // 快速失败 invoker 调用
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) {
} @Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
// 调用AbstractClusterInvoker的负载均衡算法
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
} }

// 负载均衡

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
//ignore concurrency problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected); if (sticky) {
stickyInvoker = invoker;
return invoker;
} private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty())
return null;
// 如果只有一个提供者,就不用负载均衡了
if (invokers.size() == 1)
return invokers.get(0);
if (loadbalance == null) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
return invoker;
} // 随机负载均衡策略的选择算法
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); @Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
} } // filter 过滤器调用
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() { @Override
public Class<T> getInterface() {
return invoker.getInterface();
} @Override
public URL getUrl() {
return invoker.getUrl();
} @Override
public boolean isAvailable() {
return invoker.isAvailable();
} @Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
} @Override
public void destroy() {
} @Override
public String toString() {
return invoker.toString();
return last;

// filter调用链

@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter { @Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
.setLocalAddress(NetUtils.getLocalHost(), 0)
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
try {
// 同步异步调用
RpcResult result = (RpcResult) invoker.invoke(invocation);
return result;
} finally {
} } // com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter { protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class); @Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); fireInvokeCallback(invoker, invocation);
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
// 此处为链式调用,先调用 monitor, 再调用具体的方法 DubboInvoker
Result result = invoker.invoke(invocation);
if (isAsync) {
// 如果url参数中标识为异步调用,则执行异步调用
asyncCallback(invoker, invocation);
} else {
// 否则同步调用
syncCallback(invoker, invocation, result);
return result;
} private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
} private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
///must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
Result result = (Result) rpcResult;
// 结果通知回调
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
} @Override
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
} private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); if (onInvokeMethod == null && onInvokeInst == null) {
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
if (!onInvokeMethod.isAccessible()) {
} Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
} private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY)); //not set onreturn callback, 回调设置了 onreturn 方法
if (onReturnMethod == null && onReturnInst == null) {
} if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
if (!onReturnMethod.isAccessible()) {
} Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = result;
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = result;
System.arraycopy(args, 0, params, 1, args.length);
} else {
params = new Object[]{result};
try {
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
} private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY)); //onthrow callback not configured
if (onthrowMethod == null && onthrowInst == null) {
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
if (!onthrowMethod.isAccessible()) {
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
try {
Object[] args = invocation.getArguments();
Object[] params; if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
} else {
params = new Object[]{exception};
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);


    // 调用具体的远程方法类
public class DubboInvoker<T> extends AbstractInvoker<T> { private final ExchangeClient[] clients; private final AtomicPositiveInteger index = new AtomicPositiveInteger(); private final String version; private final ReentrantLock destroyLock = new ReentrantLock(); private final Set<Invoker<?>> invokers; public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients) {
this(serviceType, url, clients, null);
} public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
this.clients = clients;
// get version.
this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
this.invokers = invokers;
} // 实际调用
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 单向调用,立马返回
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return new RpcResult();
} else if (isAsync) {
// 异步调用,立马返回,后续接收结果
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 一般调用都是这个,同步+超时调用
return (Result) currentClient.request(inv, timeout).get();
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} @Override
public boolean isAvailable() {
if (!super.isAvailable())
return false;
for (ExchangeClient client : clients) {
if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
//cannot write == not Available ?
return true;
return false;
} }



1. 面向接口编程,全局使用代理模式进行rpc方法调用,使rpc访问更加透明如本地方法;

2. 使用模板方法模式,由抽象父类实现框架式方法,调用由子类实现其中某个特有功能,更好的封装;

3. 使用外观模式,将多个实现不一的类,统一暴露为一个统一的接口,更易于使用方的调用;

4. 使用字节码生成技术,动态生成各种代理类,使实现更灵活;

5. 懒加载的应用,单例模式的应用;

6. 借助spring进行bean管理,更符合市场需要;

7. 使用观察者模式,进行提供者消费者通知,使变更能够周知需要的监听者;

8. filter的应用,责任链模式,更易于扩展辅助功能;

9. 使用策略模式,使多个负载均衡统一调度,这也大量在SPI机制中体现;


