一、带版本控制的注册中心RPC框架

  server端

  

//注册中心接口
public interface IRegisterCenter { public void register(String serviceName,String serviceAddress);
}

 

//实现类
package zoorpc.zk; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode; public class RegisterCenter implements IRegisterCenter { private CuratorFramework curatorFramework; public RegisterCenter() {
curatorFramework = CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION_STR)
.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
}
@Override
public void register(String serviceName, String serviceAddress) { // 注册相应服务
String Servicepath = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
try { //判断服务/registrys/product-service/是否存在,否则创建
if (curatorFramework.checkExists().forPath(Servicepath) == null) {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(Servicepath,"0".getBytes());
}
//创建服务iP节点
String adressPath = Servicepath+"/"+serviceAddress;
String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(adressPath,"0".getBytes());
System.out.println("服务节点创建成功:"+rsNode);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }

  

//常量类
package zoorpc.zk; public class ZooConfig { final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
final static String ZK_REGISTER_PATH = "/registrys";
}

  

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; @Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnnotation { /**
* 对外发布的接口地址
* @return
*/
Class<?> value(); //多版本功能扩展
String version() default "";
}

  

//服务接口
public interface IHelloWorld { public String sayHello(String msg);
}

  

//服务接口实现类1,不带版本控制
package zoorpc; import anno.RpcAnnotation; @RpcAnnotation(IHelloWorld.class)
public class HelloWorldServiceImpl implements IHelloWorld { @Override
public String sayHello(String msg) {
// TODO Auto-generated method stub
return "HelloWorld,8080"+msg;
} }

  

//服务接口实现类2,带版本控制
import anno.RpcAnnotation; @RpcAnnotation(value = IHelloWorld.class,version = "2.0")
public class HelloWorldServiceImpl2 implements IHelloWorld { @Override
public String sayHello(String msg) {
// TODO Auto-generated method stub
return "HelloWorld2,8081"+msg;
} }
//服务发布类
package zoorpc; import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import anno.RpcAnnotation;
import zoorpc.zk.IRegisterCenter; public class RpcServer { private static final ExecutorService executorService = Executors.newCachedThreadPool(); private IRegisterCenter registerCenter;//注册中心
private String serviceAddress;//服务发布地址
//存放服务名称和服务对象之间的关系
Map<String,Object> handlerMap = new HashMap<String,Object>(); public RpcServer(IRegisterCenter registerCenter, String serviceAddress) {
this.registerCenter = registerCenter;
this.serviceAddress = serviceAddress;
}
//绑定服务名称和服务对象
public void bind(Object...services){
for(Object service :services ){
RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class);
String serviceName = rpcAnnotation.value().getName();
//添加版本号控制
String version = rpcAnnotation.version();
if(version!=null && !version.equals("")){
serviceName = serviceName+"-"+version;
}
//添加版本号控制
handlerMap.put(serviceName, service);//绑定接口服务名称及对应的服务
}
}
//发布服务
public void publisher(){
ServerSocket serverSocket = null;
try {
String[] split = serviceAddress.split(":");
serverSocket = new ServerSocket(Integer.parseInt(split[1]));//启动一个服务监听
for(String interfaceName : handlerMap.keySet()){
registerCenter.register(interfaceName, serviceAddress);
System.out.println("服务注册成功:"+interfaceName+"->"+serviceAddress);
}
while(true){
Socket socket = serverSocket.accept();
executorService.execute(new ProcessorHandler(socket,handlerMap));
}
} catch (Exception e) {
e.printStackTrace();
}finally {
if(serverSocket!=null){
try {
serverSocket.close(); } catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} }
}

  

package zoorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map; public class ProcessorHandler implements Runnable { private Socket socket;
private Map<String,Object> handlerMap; public ProcessorHandler(Socket socket, Map<String,Object> handlerMap) {
this.socket = socket;
this.handlerMap = handlerMap;
} @Override
public void run() {
// TODO 处理请求
ObjectInputStream objectInputStream =null;
ObjectOutputStream objectOutputStream =null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
RpcRequest request = (RpcRequest) objectInputStream.readObject();
Object result = invoke(request);
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
if(objectInputStream!=null){
try {
objectInputStream.close();
objectOutputStream.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
} private Object invoke(RpcRequest request) throws Exception, IllegalArgumentException, InvocationTargetException{
Object[] args = request.getParameters();
Class<?> [] types = new Class[args.length];
for (int i = 0; i < types.length; i++) {
types[i] = args[i].getClass();
}
//添加版本号控制
String version = request.getVersion();
String serviceName =request.getClassName();
if(version!=null && !version.equals("")){
serviceName =request.getClassName()+"-"+version;
}
//添加版本号控制
//从handlerMap中,根据客户端额请求地址,去拿到响应的服务,通过反射发起调用
//Object service = handlerMap.get(request.getClassName());
Object service = handlerMap.get(serviceName);//添加版本号控制
Method method = service.getClass().getMethod(request.getMethodName(), types);
return method.invoke(service, args);
}
}

  

//传输类
package zoorpc; import java.io.Serializable;
/**
* 传输对象
* @author admin
*
*/
public class RpcRequest implements Serializable{ private static final long serialVersionUID = 6351477854838485391L;
private String className;
private String methodName;
private Object[] parameters;
private String version; public String getVersion() {
return version;
} public RpcRequest(String className, String methodName, Object[] parameters, String version) {
super();
this.className = className;
this.methodName = methodName;
this.parameters = parameters;
this.version = version;
} public void setVersion(String version) {
this.version = version;
} public RpcRequest(String className, String methodName, Object[] parameters) {
super();
this.className = className;
this.methodName = methodName;
this.parameters = parameters;
} public RpcRequest() {
super();
// TODO Auto-generated constructor stub
} public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
} }

  

//发布服务
package zoorpc; import java.io.IOException; import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter; public class ServerDemo { public static void main(String[] args) throws IOException {
IHelloWorld service = new HelloWorldServiceImpl();
IHelloWorld service2 = new HelloWorldServiceImpl2();
IRegisterCenter registerCenter = new RegisterCenter();
RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080");
server.bind(service,service2);
server.publisher();
System.in.read();
}
}

  客户端

package zoorpc.zk;

public interface IDiscovery {

    /**
* 根据请求的服务地址,获取到服务的调用地址
* @param serviceName
* @return
*/
public String Discovery(String serviceName);
}
package zoorpc.zk;

import java.util.ArrayList;
import java.util.List; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry; import zoorpc.loadbalance.ILoadBalance;
import zoorpc.loadbalance.RandomLoadBalance; public class Discovery implements IDiscovery { private CuratorFramework curatorFramework; List<String> repos = new ArrayList<>();
private String adresses;
public Discovery(String adresses) {
this.adresses = adresses;
curatorFramework = CuratorFrameworkFactory.builder().connectString(adresses)
.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework.start();
}
@Override
public String Discovery(String serviceName) {
String path = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
ILoadBalance randomLoadBalance = null;
try {
repos = curatorFramework.getChildren().forPath(path);
//动态发现节点的变化
registerWatcher(path);
//发现多个服务,做负载均衡
randomLoadBalance = new RandomLoadBalance(); } catch (Exception e) {
e.printStackTrace();
} return randomLoadBalance.selectHost(repos);//返回调用的服务地址
} private void registerWatcher(final String path) throws Exception{
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
repos = curatorFramework.getChildren().forPath(path);
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();
}
}

  

package zoorpc.zk;

public class ZooConfig {

    public final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
public final static String ZK_REGISTER_PATH = "/registrys";
}
package zoorpc;

public interface IHelloWorld {

	public String sayHello(String msg);
}

  

package zoorpc;

import java.io.Serializable;
/**
* 传输对象
* @author admin
*
*/
public class RpcRequest implements Serializable{ private static final long serialVersionUID = 6351477854838485391L;
private String className;
private String methodName;
private Object[] parameters;
private String version; public String getVersion() {
return version;
} public void setVersion(String version) {
this.version = version;
} public RpcRequest(String className, String methodName, Object[] parameters) {
super();
this.className = className;
this.methodName = methodName;
this.parameters = parameters;
} public RpcRequest() {
super();
// TODO Auto-generated constructor stub
} public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
} }
package zoorpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import zoorpc.zk.IDiscovery; public class RpcClientProxy { private IDiscovery discovery; public RpcClientProxy(IDiscovery discovery) {
this.discovery = discovery;
} public <T> T clientProxy(final Class<T> interfaceCls,String version){ return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
new Class[] {interfaceCls},
new RemoteInvocationHandler(version,discovery)); }
}

  

package zoorpc;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket; import zoorpc.zk.IDiscovery; public class RemoteInvocationHandler implements InvocationHandler { private String version;//添加版本号控制
private IDiscovery discovery; public RemoteInvocationHandler(String version,IDiscovery discovery) {
this.discovery = discovery;
this.version = version;
} @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO Auto-generated method stub
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setVersion(version);
String serviceAddress = discovery.Discovery(request.getClassName());
TcpTransport trans = new TcpTransport(serviceAddress);
return trans.send(request);
} }

  

package zoorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket; public class TcpTransport { private String serviceAddress; public TcpTransport(String serviceAddress) {
super();
this.serviceAddress = serviceAddress;
} Socket newSocket(){
System.out.println("创建一个连接");
Socket socket = null;
try {
String[] split = serviceAddress.split(":");
socket = new Socket(split[0],Integer.parseInt(split[1]));
return socket;
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException("连接建立失败!");
}
} public Object send(RpcRequest request){
Socket socket = null;
ObjectOutputStream objectOutputStream = null;
ObjectInputStream objectInputStream = null;
try {
socket = newSocket(); objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(request);
objectOutputStream.flush(); objectInputStream = new ObjectInputStream(socket.getInputStream());
Object readObject = objectInputStream.readObject();
return readObject;
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
throw new RuntimeException("连接建立失败!");
}finally {
if(socket!=null){
try {
socket.close();
objectOutputStream.close();
objectInputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
} }

  

package zoorpc.loadbalance;

import java.util.List;

public interface ILoadBalance {

	public String selectHost(List<String> repos);

}

  

package zoorpc.loadbalance;

import java.util.List;

public abstract class LoadBalance implements ILoadBalance{

    @Override
public String selectHost(List<String> repos) {
if(repos.size()<){
return null;
}else if(repos.size() ==){
return repos.get();
}else{
return doSelect(repos);
}
} protected abstract String doSelect(List<String> repos);
}
package zoorpc.loadbalance;

import java.util.List;
import java.util.Random; public class RandomLoadBalance extends LoadBalance { @Override
protected String doSelect(List<String> repos) {
int len = repos.size();
Random random = new Random();
return repos.get(random.nextInt(len)); } }
package zoorpc;

import zoorpc.zk.Discovery;
import zoorpc.zk.IDiscovery;
import zoorpc.zk.ZooConfig; public class ClientDemo {
public static void main(String[] args) {
IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
         //IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"");结果:HelloWorld,8080lf
IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"2.0");结果:HelloWorld2,8081lf
System.out.println(hello.sayHello("lf"));
}
}

  二、模拟集群

  新增发布类:

  

package zoorpc;

import java.io.IOException;

import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter; public class LBServerDemo1 {
//模拟集群
public static void main(String[] args) throws IOException {
IHelloWorld service = new HelloWorldServiceImpl();
IRegisterCenter registerCenter = new RegisterCenter();
RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8080");
server.bind(service);
server.publisher();
System.in.read();
}
}
package zoorpc;

import java.io.IOException;

import zoorpc.zk.IRegisterCenter;
import zoorpc.zk.RegisterCenter; public class LBServerDemo2 {
//模拟集群
public static void main(String[] args) throws IOException {
IHelloWorld service = new HelloWorldServiceImpl2();
IRegisterCenter registerCenter = new RegisterCenter();
RpcServer server = new RpcServer(registerCenter,"127.0.0.1:8081");
server.bind(service);
server.publisher();
System.in.read();
}
}

修改示例2类的注解

package zoorpc;

import anno.RpcAnnotation;

//@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
@RpcAnnotation(value = IHelloWorld.class)
public class HelloWorldServiceImpl2 implements IHelloWorld { @Override
public String sayHello(String msg) {
// TODO Auto-generated method stub
return "HelloWorld2,8081"+msg;
} }

  运行发布类1,2

  linux 下查看节点显示:

[zk: localhost:2181(CONNECTED) 13] ls /registrys/zoorpc.IHelloWorld
[127.0.0.1:8081, 127.0.0.1:8080]
[zk: localhost:2181(CONNECTED) 14]

 客户端

package zoorpc;

import zoorpc.zk.Discovery;
import zoorpc.zk.IDiscovery;
import zoorpc.zk.ZooConfig; public class LBClientDemo {
public static void main(String[] args) throws InterruptedException {
IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
for (int i = ; i < ; i++) {
IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,null);
System.out.println(hello.sayHello("lf"));
Thread.sleep();
}
}
}

运行结果:

创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld2,8081lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld,8080lf
创建一个连接
HelloWorld2,8081lf

实现原理图:

四、集群扩容

  一、停机扩容,修改配置

  二、逐台扩容,一台台重启

最新文章

  1. Using Internal EEPROM of PIC Microcontroller
  2. (转载)JavaWeb学习总结(五十一)——邮件的发送与接收原理
  3. 用 python实现简单EXCEL数据统计
  4. Dubbo入门
  5. oracle 连接查询,和(+)符号的用法
  6. hibernate连接查询
  7. js判断输入框的范围,并且只能输入数字
  8. excel表格公式出现#REF是什么意思
  9. win7下jdk安装环境变量配置
  10. Android Sdk 国内镜像下载地址
  11. 核心类生成-Mybatis Generator的使用
  12. 小程序md5加密
  13. PHP的swoole框架/扩展socket聊天示例
  14. Elasticsearch学习笔记(一)cat API
  15. 项目开发中关于jquery中出现问题小结(textarea,disabled,关键字等)
  16. DL服务器主机环境配置(ubuntu14.04+GTX1080+cuda8.0)解决桌面重复登录
  17. 第十九节:Java基本数据类型,循环结构与分支循环
  18. Ubuntn16.04.3配置root权限及启用root用户
  19. spark使用hadoop native库
  20. Python3 itchat实现微信定时发送群消息

热门文章

  1. qt编程
  2. js写法【2】
  3. Observable讲解
  4. Java事件监听器的四种实现方式
  5. bzoj5043: 密码破译
  6. JavaScript之图片操作5
  7. HDOJ 2003 求绝对值
  8. Activity的启动模式--总结
  9. header头参数不能带下划线
  10. C# 知识特性 Attribute,XMLSerialize,