1. RPC 简介

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议

该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程

如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用

2. 流行 RPC 框架的对比

3. golang 中如何实现 RPC

golang 中实现 RPC 非常简单,官方提供了封装好的库,还有一些第三方的库

golang 官方的 net/rpc 库使用 encoding/gob 进行编解码,支持 tcp 和 http 数据传输方式,由于其他语言不支持 gob 编解码方式,所以 golang 的 RPC 只支持 golang 开发的服务器与客户端之间的交互

官方还提供了net/rpc/jsonrpc 库实现RPC 方法,jsonrpc 采用JSON 进行数据编解码,因而支持跨语言调用,目前 jsonrpc 库是基于 tcp 协议实现的,暂不支持 http 传输方式

golang 的 RPC 必须符合 4 个条件才可以

结构体字段首字母要大写,要跨域访问,所以大写

函数名必须首字母大写(可以序列号导出的)

函数第一个参数是接收参数,第二个参数是返回给客户端参数,必须是指针类型

函数必须有一个返回值 error

例题:golang 实现 RPC 程序,实现求矩形面积和周长

   server端

package main

import (
"fmt"
"log"
"net/http"
"net/rpc"
) // 服务端,求矩形面积和周长 // 声明矩形对象
type Rect struct {
} // 声明参数结构体,字段首字母大写
type Params struct {
// 长和宽
Width, Height int
} // 定义求矩形面积的方法
func (r *Rect) Area(p Params, ret *int) error {
*ret = p.Width * p.Height
return nil
} //定义求矩形周长的方法
func (r *Rect) Perimeter(p Params, ret *int) error {
*ret = (p.Width + p.Height) * 2
return nil
} func main() {
// 1. 注册服务
rect := new(Rect)
rpc.Register(rect)
// 2.把服务处理绑定到http协议上
rpc.HandleHTTP()
fmt.Println("------ rpc service is already on ------")
// 3. 监听服务,等待客户端调用求周长和面积的方法
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal(err)
}
}

 客户端

package main

import (
"fmt"
"log"
"net/rpc"
) type Params struct {
Width, Height int
} // 调用服务
func main(){
// 1. 连接远程RPC服务
rp, err := rpc.DialHTTP("tcp","182.254.179.186:8080")
if err != nil {
log.Fatal(err)
}
// 2.调用远程方法
// 定义接收服务器传回来的计算结果的值
ret := 0
err2 := rp.Call("Rect.Area",Params{50,100},&ret)
if err2!=nil {
log.Fatal(err2)
}
fmt.Println("面积",ret) //求周长
err3:= rp.Call("Rect.Perimeter",Params{50,100},&ret)
if err3!=nil {
log.Fatal(err3)
}
fmt.Println("周长",ret) }

练习:模仿前面例题,自己实现 RPC 程序,服务端接收 2 个参数,可以做乘法运算,也可以做商和余数的运算,客户端进行传参和访问,得到结果如下:

package main

import (
"errors"
"fmt"
"log"
"net/http"
"net/rpc"
) type Arith struct {
}
// 声明接收的参数结构体
type ArithRequest struct {
A,B int
}
// 声明返回客户端的参数结构体
type ArithResponse struct {
//乘积
Pro int
// 商
Quo int
// 余数
Rem int
}
//乘法
func (this *Arith)Multiply(req ArithRequest,res *ArithResponse)error {
res.Pro = req.A * req.B
return nil
}
// 商和余数
func (this *Arith)Divide(req *ArithRequest, res *ArithResponse)error{
if req.B == 0 {
return errors.New("出书不能为0")
}
//商
res.Quo = req.A / req.B
//余数
res.Rem = req.A % req.B
return nil
} func main() {
//注册服务
rpc.Register(new(Arith))
// 采用http作为rpc载体
rpc.HandleHTTP()
fmt.Println("------ rpc service is already on ------")
// 监听服务,等待客户端调用响应的方法
err := http.ListenAndServe(":8081",nil)
if err!=nil {
log.Fatal(err)
}
}

server.go

package main

import (
"fmt"
"log"
"net/rpc"
) type Arith struct {
}
// 声明接收的参数结构体
type ArithRequest struct {
A,B int
}
// 声明返回客户端的参数结构体
type ArithResponse struct {
//乘积
Pro int
// 商
Quo int
// 余数
Rem int
} func main() {
conn, err := rpc.DialHTTP("tcp", "182.254.179.186:8081")
//conn, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
if err!= nil{
log.Fatal(err)
}
req := ArithRequest{9, 2}
var res ArithResponse
err2 := conn.Call("Arith.Multiply",req, &res)
if err2 != nil {
log.Println(err2)
}
fmt.Printf("%d * %d = %d\n", req.A,req.B,res.Pro)
err3 := conn.Call("Arith.Divide",req, &res)
if err3 != nil {
log.Println(err3)
}
fmt.Printf("%d / %d = %d,%d", req.A,req.B,res.Quo,res.Rem)
}

client.go

另外,net/rpc/jsonrpc 库通过 json 格式编解码,支持跨语言调用

package main

import (
"errors"
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
) type Arith struct {
} // 声明接收的参数结构体
type ArithRequest struct {
A, B int
} // 声明返回客户端的参数结构体
type ArithResponse struct {
//乘积
Pro int
// 商
Quo int
// 余数
Rem int
} //乘法
func (this *Arith) Multiply(req ArithRequest, res *ArithResponse) error {
res.Pro = req.A * req.B
return nil
} // 商和余数
func (this *Arith) Divide(req *ArithRequest, res *ArithResponse) error {
if req.B == 0 {
return errors.New("出书不能为0")
}
//商
res.Quo = req.A / req.B
//余数
res.Rem = req.A % req.B
return nil
} func main() {
//注册服务
rpc.Register(new(Arith))
// 采用http作为rpc载体
fmt.Println("------ jsonrpc service is already on ------")
// 监听服务,等待客户端调用响应的方法
listener, err := net.Listen("tcp","127.0.0.1:8081")
if err != nil {
log.Fatal(err)
}
//循环监听服务
for {
conn, err := listener.Accept()
if err != nil {
continue
}
//协程
go func(conn net.Conn) {
fmt.Println("a new client visit -----")
jsonrpc.ServeConn(conn)
}(conn)
}
}

server.go

package main

import (
"fmt"
"log"
"net/rpc/jsonrpc"
) type Arith struct {
}
// 声明接收的参数结构体
type ArithRequest struct {
A,B int
}
// 声明返回客户端的参数结构体
type ArithResponse struct {
//乘积
Pro int
// 商
Quo int
// 余数
Rem int
} func main() {
conn, err := jsonrpc.Dial("tcp", "182.254.179.186:8081")
//conn, err := jsonrpc.Dial("tcp", "127.0.0.1:8081")
if err!= nil{
log.Fatal(err)
}
req := ArithRequest{9, 2}
var res ArithResponse
err2 := conn.Call("Arith.Multiply",req, &res)
if err2 != nil {
log.Println(err2)
}
fmt.Printf("%d * %d = %d\n", req.A,req.B,res.Pro)
err3 := conn.Call("Arith.Divide",req, &res)
if err3 != nil {
log.Println(err3)
}
fmt.Printf("%d / %d = %d,%d", req.A,req.B,res.Quo,res.Rem)

client.go

4. RPC 调用流程

微服务架构下数据交互一般是对内 RPC,对外 REST

将业务按功能模块拆分到各个微服务,具有提高项目协作效率、降低模块耦合度、提高系统可用性等优点,但是开发门槛比较高,比如 RPC  框架的使用、后期的服务监控等工作

一般情况下,我们会将功能代码在本地直接调用,微服务架构下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用

5. 网络传输数据格式

成熟的 RPC 框架会有自定义传输协议,这里网络传输格式定义如下,前面是固定长度消息头,后面是变长消息体

6. 实现 RPC 服务端

服务端接收到的数据需要包括什么?

调用的函数名、参数列表

一般会约定函数的第二个返回值是 error 类型

通过反射实现

服务端需要解决的问题是什么?

Client 调用时只传过来函数名,需要维护函数名到函数之间的 map 映射

服务端的核心功能有哪些?

维护函数名到函数反射值的 map

client 端传函数名、参数列表后,服务端要解析为反射值,调用执行

函数的返回值打包,并通过网络返回给客户端

7. 实现 RPC 客户端

客户端只有函数原型,使用reflect.MakeFunc()  可以完成原型到函数的调用

reflect.MakeFunc()是 Client 从函数原型到网络调用的关键

8. 实现 RPC 通信测试

给服务端注册一个查询用户的方法,客户端去 RPC 调用

package main

import (
"encoding/binary"
"io"
"log"
"net"
) // 会话连接的结构体
type Session struct {
conn net.Conn
} // 创建新连接
func NewSession(conn net.Conn) *Session {
return &Session{conn:conn}
} // 向连接中写数据
func (s *Session)Write(data []byte)error{
// 4字节头 + 数据长度的切片
buf := make([]byte, 4+len(data))
// 写入头部数据,记录数据长度
// binary只认固定长度的类型,所以使用uint32,而不是直接写入
binary.BigEndian.PutUint32(buf[:4],uint32(len(data)))
// 写入
copy(buf[4:], data)
_, err := s.conn.Write(buf)
if err != nil{
log.Fatal(err)
}
return nil
} // 从连接中读取数据
func (s *Session)Read()([]byte, error) {
// 读取头部长度
header := make([]byte,4)
// 按头部长度,读取头部数据
_,err := io.ReadFull(s.conn, header)
if err != nil{
log.Fatal(err)
}
// 读取数据长度
datalen := binary.BigEndian.Uint32(header)
// 按照数据长度去读取数据
data := make([]byte, datalen)
_,err = io.ReadFull(s.conn, data)
if err != nil{
log.Fatal(err)
}
return data,nil
}

session.go

package main

import (
"bytes"
"encoding/gob"
) // 定义数据格式和编解码 // 定义RPC交互的数据格式
type RPCData struct {
// 访问的函数
Name string
// 访问时传的参数
Args []interface{}
} // 编码
func encode(data RPCData)([]byte,error) {
var buf bytes.Buffer
// 得到字节数组的编码器
bufEnc := gob.NewEncoder(&buf)
// 对数据编码
if err := bufEnc.Encode(data);err!=nil {
return nil,err
}
return buf.Bytes(), nil
}
//解码
func decode(b []byte)(RPCData,error) {
buf := bytes.NewReader(b)
// 返回字节数组解码器
bufDec := gob.NewDecoder(buf)
var data RPCData
// 对数据解码
if err := bufDec.Decode(&data); err!= nil{
return data,err
}
return data, nil
}

codec.go

package main

import (
"fmt"
"net"
"reflect"
) //声明服务器
type Server struct {
// 地址
addr string
// 服务端维护的函数名到函数反射值的map
funcs map[string]reflect.Value
}
// 创建服务端对象
func NewServer(addr string)*Server {
return &Server{addr,make(map[string]reflect.Value)}
} // 服务端绑定注册方法
// 将函数名与函数真正实现对应起来
// 第一个参数为函数名,第二个传入真正的函数
func (s *Server)Register(rpcName string, f interface{}){
if _, ok := s.funcs[rpcName];ok{
return
}
// map中没有值,则映射添加进map,便于调用
fVal := reflect.ValueOf(f)
s.funcs[rpcName] = fVal
}
//服务端等待调用
func (s *Server)Run() {
// 监听
lis, err := net.Listen("tcp",s.addr)
if err!=nil {
fmt.Printf("监听 %s err:%v", s.addr, err)
return
}
for {
//拿到连接
conn,err := lis.Accept()
if err != nil {
fmt.Printf("accept err:%v",err)
}
// 创建会话
srvSession := NewSession(conn)
// RPC读取数据
b, err := srvSession.Read()
if err != nil {
fmt.Printf("read err:%v", err)
}
// 对数据解码
rpcData, err := decode(b)
if err != nil {
fmt.Printf("read err:%v", err)
}
// 根据读取到的数据的Name,得到调用的函数名
f, ok := s.funcs[rpcData.Name]
if !ok {
fmt.Printf("函数 %s 不存在", rpcData.Name)
return
}
// 解析遍历客户端出来的参数,放到一个数组中
inArgs := make([]reflect.Value, 0, len(rpcData.Args))
for _, arg := range rpcData.Args {
inArgs = append(inArgs, reflect.ValueOf(arg))
}
// 反射调用方法,传入参数
out := f.Call(inArgs)
// 解析遍历结果,放到一个数组中
outArgs := make([]interface{}, 0, len(out))
for _, o := range out {
outArgs = append(outArgs, o.Interface())
}
// 包装数据,返回给客户端
respRPCData := RPCData{rpcData.Name, outArgs}
// 编码
respBytes, err := encode(respRPCData)
if err != nil {
fmt.Printf("encode err:%v", err)
return
}
// 使用rpc写出数据
err = srvSession.Write(respBytes)
if err != nil {
fmt.Printf("session write err:%v", err)
return
}
}
}

server.go

package main

import (
"net"
"reflect"
) // 声明客户端
type Client struct {
conn net.Conn
} // 创建客户端对象
func NewClient(conn net.Conn) *Client {
return &Client{conn: conn}
} // 实现通用的RPC客户端
// 绑定RPC访问的方法
// 传入访问的函数名 // 函数具体实现在Server端,Client只有函数原型
// 使用MakeFunc()完成原型到函数的调用 //fPtr指向函数原型
// xxx.callRPC("queryUser", &query)
func (c *Client) callRPC(rpcName string, fPtr interface{}) {
// 通过反射,获取fPtr未初始化的函数原型
fn := reflect.ValueOf(fPtr).Elem()
// 另一个函数,作用是对第一个函数参数操作
f := func(args []reflect.Value) []reflect.Value {
// 处理输入的参数
inArgs := make([]interface{}, 0, len(args))
for _, arg := range args {
inArgs = append(inArgs, arg.Interface())
}
// 创建连接
cliSession := NewSession(c.conn)
// 编码数据
reqRPC := RPCData{rpcName, inArgs}
b, err := encode(reqRPC)
if err != nil {
panic(err)
}
// 写出数据
err = cliSession.Write(b)
if err != nil {
panic(err)
}
// 读取响应数据
respBytes, err := cliSession.Read()
if err != nil {
panic(err)
}
// 解码数据
respRPC, err := decode(respBytes)
if err != nil {
panic(err)
}
// 处理服务端返回的数据
outArgs := make([]reflect.Value, 0, len(respRPC.Args))
for i, arg := range respRPC.Args {
// 必须进行nil转换
if arg == nil {
// 必须填充一个真正的类型,不能是nil
outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
continue
}
outArgs = append(outArgs, reflect.ValueOf(arg))
}
return outArgs
}
// 参数1:一个未初始化函数的方法值,类型是reflect.Type
// 参数2:另一个函数,作用是对第一个函数参数操作
// 返回reflect.Value类型
// MakeFunc 使用传入的函数原型,创建一个绑定 参数2的新函数
v := reflect.MakeFunc(fn.Type(), f)
// 为函数fPtr赋值
fn.Set(v)
}

client.go

  • simple_rpc_test.go
package main

import (
"encoding/gob"
"fmt"
"net"
"testing"
) // 用户查询 //用于测试的结构体
// 字段首字母必须大写
type User struct {
Name string
Age int
} // 用于测试的查询用户的方法
func queryUser(uid int)(User,error) {
user := make(map[int]User)
user[0] = User{"张三",20}
user[1] = User{"李四",21}
user[2] = User{"王五",22}
// 模拟查询用户
if u, ok := user[uid];ok {
return u,nil
}
return User{},fmt.Errorf("id %d not in user db",uid)
} // 测试方法
func TestRPC(t *testing.T) {
// 需要对interface{}可能产生的类型进行注册
gob.Register(User{})
addr := "127.0.0.1:8080"
// 创建服务端
srv := NewServer(addr)
srv.Register("queryUser",queryUser)
// 服务端等待调用
go srv.Run()
// 客户端获取连接
conn, err := net.Dial("tcp",addr)
if err != nil {
t.Error(err)
}
// 创建客户端
cli := NewClient(conn)
// 声明函数原型
var query func(int)(User,error)
cli.callRPC("queryUser",&query)
// 得到查询结果
u,err := query(1)
if err!=nil {
t.Fatal(err)
}
fmt.Println(u)
}

  • 测试读写
package main

import (
"fmt"
"log"
"net"
"sync"
"testing"
) // 测试读写
func TestSession_ReadWrite(t *testing.T) {
// 定义监听IP和端口
addr := "127.0.0.1:8000"
//定义传输的数据
my_data := "hello"
// 等待组
wg := sync.WaitGroup{}
// 协程1个读,1个写
wg.Add(2)
// 写数据协程
go func() {
defer wg.Done()
//创建rpc连接
listener,err := net.Listen("tcp",addr)
if err!=nil {
log.Fatal(err)
}
conn,_ := listener.Accept()
s := Session{conn:conn}
// 写数据
err = s.Write([]byte(my_data))
if err!=nil {
log.Fatal(err)
}
}()
// 读数据协程
go func() {
defer wg.Done()
conn,err := net.Dial("tcp",addr)
if err!=nil {
log.Fatal(err)
}
s := Session{conn:conn}
// 读数据
data, err := s.Read()
if err!=nil {
log.Fatal(err)
}
if string(data) != my_data {
t.Fatal(err)
}
fmt.Println(string(data))
}()
wg.Wait()
}

最新文章

  1. repo 修改邮箱地址
  2. PHP CURL模拟提交数据 攻击N次方
  3. Android基于mAppWidget实现手绘地图(七)–根据坐标添加地图对象
  4. ie下如果已经有缓存,load方法的效果就无法执行.的解决方法
  5. pagefile.sys and heberfil.sys
  6. jquery垂直展开折叠手风琴二级菜单
  7. 使用IGP和BGP的配合达到降低路由容量目的的实验与总结
  8. jQuery学习笔记一
  9. 「造个轮子」——cicada 设计一个配置模块
  10. Python3 实现简易局域网视频聊天工具
  11. sockaddr_in 与 in_addr的区别
  12. redis在游戏服务器中的使用初探(二) 客户端开源库选择
  13. easyUI中textbox或number的数值大小校验
  14. Java基础(basis)-----代码块详解
  15. 逆袭之旅DAY16.东软实训.Oracle.索引
  16. 【Linux】shell编程案例
  17. 计蒜客 31447 - Fantastic Graph - [有源汇上下界可行流][2018ICPC沈阳网络预赛F题]
  18. Leetcode题库——12.整数转罗马数字
  19. 【CF835D】Palindromic characteristics 加强版 解题报告
  20. 20165203《Java程序设计》第九周学习总结

热门文章

  1. 根据注释生成xml和从nuget包中复制xml显示到swagger
  2. C#.NET编程小考30题错题纠错
  3. PMP变更流程
  4. Notepad++ 常用功能:批量取消替换换行、强制刷新数据
  5. 日历的种类(Project)
  6. ASP.NET MVC 导入Excel文件(完整版)
  7. Nginx加载新的模块,编译报错记录
  8. 【LeetCode】1437. 是否所有 1 都至少相隔 k 个元素 Check If All 1s Are at Least Length K Places Away
  9. 【LeetCode】1019. Next Greater Node In Linked List 解题报告 (Python&C++)
  10. 【九度OJ】题目1163:素数 解题报告