服务端的示例

我们首先贴上我们的服务端的示例:

    public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端 providerConfig.export(); // 发布服务
}

ProviderConfig#export

从示例入手我们设置好ServerConfig和ProviderConfig之后调用ProviderConfig的export方法进行暴露

ProviderConfig#export

    public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
//DefaultProviderBootstrap
providerBootstrap.export();
}

Bootstraps#from这个方法我们在《源码分析---SOFARPC客户端服务引用》中已经分析过,所以这里就不分析了,里面就是调用SOFARPC自己的SPI机制返回的是DefaultProviderBootstrap实例。

DefaultProviderBootstrap#export

然后调用DefaultProviderBootstrap#export方法

    @Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}

这里有两个分支,如果在设置providerConfi的时候设置了延迟属性的话,那么就会调用NamedThreadFactory#newThread方法起一个线程,然后延迟调用doExport方法。

DefaultProviderBootstrap#doExport


/**
* 是否已发布
*/
protected transient volatile boolean exported;
/**
* 发布的服务配置
*/
protected final static ConcurrentMap<String, AtomicInteger> EXPORTED_KEYS = new ConcurrentHashMap<String, AtomicInteger>(); private void doExport() {
//校验一下,如果服务已经暴露过了那么就不再进行暴露
if (exported) {
return;
} // 检查参数
checkParameters(); String appName = providerConfig.getAppName(); //key is the protocol of server,for concurrent safe
Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol(); String key = providerConfig.buildKey() + ":" + protocol; if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
} // 注意同一interface,同一uniqueId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
//计数器加一,如果计数器的值超过了设置的exportLimit,那么就会抛出异常
int c = cnt.incrementAndGet();
hasExportedInCurrent.put(serverConfig.getProtocol(), true);
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
//计数器减一
decrementCounter(hasExportedInCurrent);
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
+ " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!");
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
} } try {
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注册中心
if (providerConfig.isRegister()) {
//如果有设置注册中心的话就遍历注册中心
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
// 提前初始化Registry到ALL_REGISTRIES对象中
RegistryFactory.getRegistry(registryConfig);
}
}
}
//如果有多个配置则逐个遍历
// 将处理器注册到server
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册请求调用器
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
} } catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
} providerConfig.setConfigListener(new ProviderAttributeListener());
// 注册到注册中心
register();
} catch (Exception e) {
decrementCounter(hasExportedInCurrent); if (e instanceof SofaRpcRuntimeException) {
throw (SofaRpcRuntimeException) e;
} else {
throw new SofaRpcRuntimeException("Build provider proxy error!", e);
}
} // 记录一些缓存数据
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}

doExport方法里面主要做了以下几件事:

  1. 检查参数是否正确

    1. 检查注入的对象是否是接口的实现类
    2. providerConfig是否有设置server参数
    3. 检查方法,是否有重名的方法,对方法进行黑白名单的过滤(对配置的include和exclude方法进行过滤)
  2. 遍历设置的serverConfigs
  3. 对要发布的接口进行计数,如果超过了设置的repeatedExportLimit那么就抛出异常
  4. 构造请求调用器
  5. 初始化注册中心
  6. 注册请求调用器
  7. 启动服务
  8. 设置监听
  9. 注册到注册中心

接下来我们挨个分析上面的步骤

参数检查DefaultProviderBootstrap#checkParameters
    protected void checkParameters() {
// 检查注入的ref是否接口实现类
Class proxyClass = providerConfig.getProxyClass();
String key = providerConfig.buildKey();
T ref = providerConfig.getRef();
if (!proxyClass.isInstance(ref)) {
throw ExceptionUtils.buildRuntime("provider.ref",
ref == null ? "null" : ref.getClass().getName(),
"This is not an instance of " + providerConfig.getInterfaceId()
+ " in provider config with key " + key + " !");
}
// server 不能为空
if (CommonUtils.isEmpty(providerConfig.getServer())) {
throw ExceptionUtils.buildRuntime("server", "NULL", "Value of \"server\" is not specified in provider" +
" config with key " + key + " !");
}
checkMethods(proxyClass);
}

这个方法里面主要做了一下几件事:

  1. 调用providerConfig.getProxyClass();获取接口class,在我们这个示例中是interface com.alipay.sofa.rpc.quickstart.HelloService
  2. 调用providerConfig.getRef();获取接口实现类引用,我们这里对应的是HelloServiceImpl
  3. 调用proxyClass.isInstance判断ref是否是接口的实现类,如果不是的话抛出异常
  4. 校验server不能为空
  5. 调用checkMethods方法校验方法

进入到checkMethods方法中

    protected void checkMethods(Class<?> itfClass) {
ConcurrentHashMap<String, Boolean> methodsLimit = new ConcurrentHashMap<String, Boolean>();
for (Method method : itfClass.getMethods()) {
String methodName = method.getName();
if (methodsLimit.containsKey(methodName)) {
// 重名的方法
if (LOGGER.isWarnEnabled(providerConfig.getAppName())) {
LOGGER.warnWithApp(providerConfig.getAppName(), "Method with same name \"" + itfClass.getName()
+ "." + methodName + "\" exists ! The usage of overloading method in rpc is deprecated.");
}
}
// 判断服务下方法的黑白名单
Boolean include = methodsLimit.get(methodName);
if (include == null) {
//对配置的include和exclude方法进行过滤
// 检查是否在黑白名单中
include = inList(providerConfig.getInclude(), providerConfig.getExclude(), methodName);
methodsLimit.putIfAbsent(methodName, include);
}
}
providerConfig.setMethodsLimit(methodsLimit);
}

在这个方法里首先会去遍历实现类的所有的方法。

  1. 判断一下这个实现类里面是否有同名的方法,如果有的话会打印提示,所以看到这里我们就可以知道,我们在使用定义接口的时候最好不要定义重载的方法,官方不提倡这么做
  2. 如果设置了Include和Exclude参数的话,会根据这两个参数对要发布的对象进行方法级别的过滤,默认Include是*,Exclude参数默认是一个空的字符串
初始化注册中心

往下我们可以看到会调用RegistryFactory.getRegistry(registryConfig);对注册中心进行初始化

    public static synchronized Registry getRegistry(RegistryConfig registryConfig) {
// 超过3次 是不是配错了?
if (ALL_REGISTRIES.size() > 3) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Size of registry is greater than 3, Please check it!");
}
}
try {
// 注意:RegistryConfig重写了equals方法,如果多个RegistryConfig属性一样,则认为是一个对象
Registry registry = ALL_REGISTRIES.get(registryConfig);
if (registry == null) {
//通过spi根据protocol来获取注册中心实例
ExtensionClass<Registry> ext = ExtensionLoaderFactory.getExtensionLoader(Registry.class)
.getExtensionClass(registryConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("registry.protocol", registryConfig.getProtocol(),
"Unsupported protocol of registry config !");
}
registry = ext.getExtInstance(new Class[]{RegistryConfig.class}, new Object[]{registryConfig});
ALL_REGISTRIES.put(registryConfig, registry);
}
return registry;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}

这个方法里面主要是通过SPI实例化相应的注册中心,然后放入到ALL_REGISTRIES对象当中去。

初始化server

接下来我们会调用serverConfig.buildIfAbsent();初始化server

    public synchronized Server buildIfAbsent() {
if (server != null) {
return server;
}
// 提前检查协议+序列化方式
// ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
// SerializationType.valueOf(getSerialization())); //通过工厂获取server实例
server = ServerFactory.getServer(this);
return server;
} public synchronized static Server getServer(ServerConfig serverConfig) {
try {
//根据端口获取实例
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
if (server == null) {
// 算下网卡和端口
resolveServerConfig(serverConfig); ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
"Unsupported protocol of server!");
}
//通过SPI获取server实例
server = ext.getExtInstance();
//初始化server里面具体的参数
server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
}
return server;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}

然后我们会根据不同的server实现类调用不同的init方法来初始化参数,这里我们使用的是BoltServer

    public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
// 启动线程池
bizThreadPool = initThreadPool(serverConfig);
boltServerProcessor = new BoltServerProcessor(this);
}

调用完buildIfAbsent方法后回到doExport方法中继续往下走,调用server.registerProcessor。这个方法里面主要是把对应的Invoker实例和实例所对应的方法缓存起来。

boltServer#registerProcessor

    public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
// 缓存Invoker对象 包路径名+类名+版本号 com.alipay.sofa.rpc.quickstart.HelloService:1.0
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
invokerMap.put(key, instance);
// 把这个实例所对应的类加载器缓存到SERVICE_CLASSLOADER_MAP中
ReflectCache.registerServiceClassLoader(key, providerConfig.getProxyClass().getClassLoader());
// 缓存接口的方法
for (Method m : providerConfig.getProxyClass().getMethods()) {
ReflectCache.putOverloadMethodCache(key, m);
}
}

然后调用boltServer#start方法启动服务

    public void start() {
//如果已经启动了,那么直接返回
if (started) {
return;
}
synchronized (this) {
//双重检查锁
if (started) {
return;
}
// 生成Server对象,返回的是RpcServer实例
remotingServer = initRemotingServer();
try {
//调用bolt包里面的内容,通过netty启动服务
if (remotingServer.start()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
serverConfig.getPort());
}
} else {
throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
//设置started参数为true
started = true; if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
} } catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}
注册服务

接下来就是调用register方法注册服务

   protected void register() {
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (registryConfigs != null) {
for (RegistryConfig registryConfig : registryConfigs) {
//得到注册中心对象
Registry registry = RegistryFactory.getRegistry(registryConfig);
//初始化注册中心
registry.init();
registry.start();
try {
registry.register(providerConfig);
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = providerConfig.getAppName();
if (LOGGER.isWarnEnabled(appName)) {
LOGGER.warnWithApp(appName, "Catch exception when register to registry: "
+ registryConfig.getId(), e);
}
}
}
}
}
}

这里主要是通过RegistryFactory获取不同的注册中心实现类

目前来说sofarpc主要实现了一下几种注册中心:

SOFARegistry

Zookeeper

本地文件

Consul

Nacos

注册中心的内容我打算放在接下来的文章中讲解,所以这里我们略过。

以上就是SOFARPC服务端暴露的全部内容

最新文章

  1. wampserver的安装以及使用
  2. 关于Unity3D中Resources动态加载NGUI图片的方法
  3. gcc 的visibility 使用
  4. ASP.NET MVC 第四回 向View传值
  5. GIS 地图中术语解释
  6. IOS设计模式学习(11)中介者
  7. 浅尝JavaScript document对象
  8. asp.net mvc 接入最新支付宝支付+退款 alipay-sdk-NET-20170615110549
  9. gRPC异步处理应答
  10. Docker基本架构
  11. 版本控制工具——Git常用操作(下)
  12. YCD 软件更新方法
  13. RedisGeo
  14. Ocelot中文文档-Configuration
  15. 最近公共祖先(LCA)模板
  16. vi/vim 命令速查手册
  17. 找DEV,欢迎挑战高薪 --方向:互联网金融,地点广州
  18. Delphi中静态方法重载还是覆盖的讨论
  19. [webapp]移动平台各浏览器的分辨率适配
  20. 使用Hystrix进行微服务降级管理

热门文章

  1. maven的下载与安装,卸载替换eclipse自带的maven
  2. oh-my-zsh自定义配置
  3. POJ 1966:Cable TV Network(最小点割集)***
  4. Docker笔记(二):Docker管理的对象
  5. 扒一扒那些教程中不常被提及的JavaScript小技巧
  6. java学习笔记(基础篇)—java数组
  7. 洛谷P1129 [ZJOI2007]矩阵游戏 题解
  8. 简单题[期望DP]
  9. VBox on 14.04: Kernel driver not installed (rc=-1908) [duplicate]
  10. SpringBoot热部署报错(BeanCreationException: Error creating bean with name 'classPathFileSystemWatcher' d)