package github.com.AllenDuke.rpc.customer;

import github.com.AllenDuke.rpc.netty.NettyClient;
import github.com.AllenDuke.rpc.publicSource.Calculator;
import github.com.AllenDuke.rpc.publicSource.HelloService; /**
* @description 客户端启动类,从NettyClient处获取代理对象,发起RPC
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class ClientBootstrap { //netty消费者
private static final NettyClient customer = new NettyClient(); //自定义的消息协议为: #interfaceName#methodName#2#arg1#arg2
//这里用了fastjson public static void main(String[] args) throws Exception { //创建代理对象
HelloService service = (HelloService) customer.getServiceImpl(HelloService.class);
String res = service.hello("Allen","Duke",2);
Calculator calculator=(Calculator) customer.getServiceImpl(Calculator.class);
calculator.add(1,2);
calculator.multipy(3,6);
}
}
package github.com.AllenDuke.rpc.netty;

import com.alibaba.fastjson.JSON;
import github.com.AllenDuke.concurrentTest.threadPoolTest.ThreadPoolService;
import github.com.AllenDuke.rpc.publicSource.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy;
import java.util.concurrent.FutureTask; /**
* @description netty消费者
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class NettyClient { //创建自己的线程池
private static ThreadPoolService executor = new ThreadPoolService(); public static NettyClientHandler clientHandler; /**
* @description: 返回一个代理对象,其中该代理类中的InvokeHandler中的invoke方法(jdk动态代理的知识)的作用是,
* 将调用信息封装成任务,提交到线程池,任务的返回值为 netty线程接收到的返回值
* @param serivceClass 要实现的接口
* @return: java.lang.Object
* @date: 2020/2/12
*/
public Object getServiceImpl(final Class<?> serivceClass) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serivceClass}, (proxy, method, args) -> {
//lamda表达式,匿名内部类实现InvokeInhandler接口,重写invoke方法 if(method.getName().equals("toString")) {
return "这是代理类的toString方法,避免递归调用";
}
if(clientHandler==null) init();//懒加载
String className = serivceClass.getName();
className=className.substring(className.lastIndexOf(".")+1)+"Impl";//去掉包名 // //自定义消息协议
// String msg=className+"#"+method.getName()+"#"+args.length;
// for (Object arg : args) {
// if(arg!=null) msg+="#"+arg;
// } //利用fastjson
Message message=new Message(className,method.getName(),args);
FutureTask<Object> task=new FutureTask<Object>(message);
executor.execute(task);
Object result=task.get();//主线程在这阻塞,等待结果
return JSON.parseObject((String) result,method.getReturnType());//将json文本转为对象
});
} //初始化netty客户端
private static void init() {
clientHandler=new NettyClientHandler();
//创建EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());//inbound编码器
pipeline.addLast(new StringEncoder());//outbound解码器
pipeline.addLast(clientHandler);//业务处理器
}
}
); try {
bootstrap.connect("127.0.0.1", 7000).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
package github.com.AllenDuke.rpc.netty;

import com.alibaba.fastjson.JSON;
import github.com.AllenDuke.rpc.publicSource.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; /**
* @description netty消费者的业务处理器
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter{ private ChannelHandlerContext context; private Object result; //与服务器的连接创建后,就会被调用, 这个方法是第一个被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context=ctx;
} //netty线程收到消息,调用handler的chaanneltRead方法,唤醒线程池中的工作线程
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg;
notify(); //唤醒等待的工作线程
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
} public synchronized void sendMsg(Message message) throws InterruptedException {
context.writeAndFlush(JSON.toJSONString(message));//netty线程发送json文本
wait(); //工作线程阻塞wait,等待channelRead方法获取到服务器的结果后,唤醒
System.out.println("服务端返回结果:"+result);
} public Object getResult(){
return result;
} }
package github.com.AllenDuke.rpc.provider;

import github.com.AllenDuke.rpc.netty.NettyServer;

/**
* @description 服务端启动类,启动netty服务提供者
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class ServerBootstrap { public static void main(String[] args) { NettyServer.startServer("127.0.0.1", 7000);
}
}
package github.com.AllenDuke.rpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; /**
* @description netty服务提供者
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class NettyServer { public static void startServer(String hostName, int port) {
startServer0(hostName, port);
} private static void startServer0(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());//inbound编码器
pipeline.addLast(new StringEncoder());//outbound解码器
pipeline.addLast(new NettyServerHandler());//业务处理器
}
} ); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
System.out.println("服务提供方开始提供服务~~");
channelFuture.channel().closeFuture().sync();//同步方法,直到有结果才往下执行
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} }
}
package github.com.AllenDuke.rpc.netty;

import com.alibaba.fastjson.JSON;
import github.com.AllenDuke.rpc.publicSource.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map; /**
* @description netty服务提供者的业务处理器
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //实现类所在的包名,可把类都先加载到一个HashMap中
private static String packageName = "github.com.AllenDuke.rpc.serviceImpl."; //key为实现方法的全限定名
private static final Map<String, Method> serviceMap = new HashMap<>(); //key为实现类的全限定名
private static final Map<String, Class> classMap = new HashMap<>(); @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送的消息,并调用服务
System.out.println("服务器收到信息:" + msg + " 准备解码,调用服务");
//将json文本转换为对象
Message message=JSON.parseObject((String) msg, Message.class); // //解析自定义消息协议
// StringBuilder message = new StringBuilder(msg.toString());
// //解析类名
// int classIndex = message.indexOf("#");
// String className = message.substring(0, classIndex) + "Impl";
// message.delete(0, classIndex + 1);
// //解析方法名
// int methodIndex = message.indexOf("#");
// String methodName = message.substring(0, methodIndex);
// message.delete(0, methodIndex + 1);
// //解析参数个数
// int argNumIndex = message.indexOf("#");
// int argNum=Integer.valueOf(message.substring(0,argNumIndex));
// message.delete(0,argNumIndex+1);
// Object[] args=new Object[argNum];
// //解析参数,类型转换?
// for (int i = 0; i < argNum; i++) {
// if(i==argNum-1) args[i]=message.toString();
// else{
// int argIndex=message.indexOf("#");
// args[i]=message.substring(0,argIndex);
// message.delete(0,argIndex+1);
// }
// } String className=message.getClassName();
String methodName=message.getMethodName();
Object[] args=message.getArgs();
Object result = null;//返回结果 Class serviceImpl = classMap.get(packageName + className);
//这里forName去寻找类,也可以一开始就把包下的类都加载进来
if(serviceImpl==null) serviceImpl= Class.forName(packageName + className);
//类中对应的方法
Method service = serviceMap.get(packageName + className + "." + methodName);
if (service == null)
for (Method method : serviceImpl.getMethods()) {
if (method.getName().equals(methodName)) {
service=method;
serviceMap.put(packageName + className + "." + methodName, method);//找到后加入hashMap
break;
} }
result = service.invoke(serviceImpl.newInstance(), args );//serviceImpl无参构造要public
ctx.writeAndFlush(JSON.toJSONString(result));//转换为json文本
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
package github.com.AllenDuke.rpc.publicSource;

import github.com.AllenDuke.rpc.netty.NettyClient;
import github.com.AllenDuke.rpc.netty.NettyClientHandler; import java.util.concurrent.Callable; /**
* @description 消息体
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class Message implements Callable { private static NettyClientHandler handler= NettyClient.clientHandler; private String className;//要调用的类名
private String methodName;//要调用的方法名
private Object[] args;//方法的参数 public Message(String className,String methodName,Object[] args){
this.className=className;
this.methodName=methodName;
this.args=args;
} public String getClassName() {
return className;
} public void setClassName(String className) {
this.className = className;
} public String getMethodName() {
return methodName;
} public void setMethodName(String methodName) {
this.methodName = methodName;
} public Object[] getArgs() {
return args;
} public void setArgs(Object[] args) {
this.args = args;
} //封装成任务后,由线程池的工作线程调用
public Object call() throws Exception {
handler.sendMsg(this);
return handler.getResult();
} }
package github.com.AllenDuke.rpc.publicSource;

/**
* @description sayHello服务
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public interface HelloService { String hello(String name1,String name2,Integer num);
}
package github.com.AllenDuke.rpc.publicSource;

/**
* @description 计算器服务
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public interface Calculator { int add(int a,int b);
int multipy(int a,int b);
}
package github.com.AllenDuke.rpc.serviceImpl;

import github.com.AllenDuke.rpc.publicSource.Calculator;

/**
* @description 计算器服务实现
* @contact AllenDuke@163.com
* @since 2020/2/11
*/
public class CalculatorImpl implements Calculator { @Override
public int add(int a, int b) {
return a+b;
} @Override
public int multipy(int a, int b) {
return a*b;
}
}

线程池的代码在前两篇文章中。

总结:

此次实现总体上看难度不大,但有三个点可能不容易想到。

1.动态代理,这里要站在框架使用者的角度思考,应该做到用户无感知,像是Mybatis的getMapper,还要注意invoke方法的实现。

2.一条消息也是一个任务,工作线程既是执行任务也是发送消息。将消息封装成一个任务,要从接口方法的有返回值来思考。

3.使用fastjson,一开始是自定义消息协议,然后解析,参数还要类型转换,而fastjson刚好可以做到。

当然,实现的方法有千百种,但在可以实现了的条件下,应该考虑怎样的设计看起来更合理,更容易理解,本次模拟实现肯定有很多不妥的地方,还望朋友不吝赐教,共同进步。

最新文章

  1. sharepoint 计算列 年龄
  2. 使用httpclient 调用selenium webdriver
  3. python3代码
  4. java排序算法-选择排序
  5. mySQL简单操作(一)
  6. Android使用Fiddler模拟弱网络环境测试
  7. 2190 ACM 数学概率论的乘法和加法原则
  8. idea js改来改去无效问题的解决
  9. centos 7 删除yum安装的openjdk
  10. Python中的对象行为与特殊方法(二)类型检查与抽象基类
  11. CentOS Minimal Install
  12. win下Apache2.4的下载与安装
  13. logstash-input-jdbc同时同步多个表
  14. go语言开发环境、goland、IDE
  15. 【codeforces div3】【E. Cyclic Components】
  16. Linux vim 编辑命令
  17. COM中的HRESULT
  18. Scrum立会报告+燃尽图(十月二十三日总第十四次)
  19. Deep Learning基础--理解LSTM/RNN中的Attention机制
  20. “全栈2019”Java第八十一章:外部类能否访问嵌套接口里的成员?

热门文章

  1. IntelliJ IDEA安装教程及使用方法
  2. shell正则表达式和cut命令
  3. Java 中级 学习笔记 1 JVM的理解以及新生代GC处理流程
  4. 入门Grunt前端构建工具
  5. Qt中设置窗口图标
  6. Cesium本地影像与地形服务发布
  7. UGUI之MaskableGraphic
  8. 使用内存映射文件MMF实现大数据量导出时的内存优化
  9. Frogger POJ - 2253(求两个石头之间”所有通路中最长边中“的最小边)
  10. 微信小程序修改checkbox的样式