golang内置net/rpc功能的使用

in #cn29 days ago

关于go sdk中的rpc的实现源代码参见google open source go/net/rpc

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议, 该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而开发者无需额外地为这个交互作用编程。

关键字 通信协议 / 数据编码

SDK 内置rpc 支持

  1. golang官方的net/rpc库使用encoding/gob进行编解码,支持tcp和http数据传输方式,由于其他语言不支持gob编解码方式,所以golang的RPC只支持golang开发的服务器与客户端之间的交互。

  2. 官方还提供了net/rpc/jsonrpc库实现RPC方法,jsonrpc采用JSON进行数据编解码,因而支持跨语言调用,目前jsonrpc库是基于tcp协议实现的,暂不支持http传输方式。

  • 对于传输数据格式
    两端要约定好数据包的格式,成熟的RPC框架会有自定义传输协议。
    假设自定义数据包格式:【 4个字节消息头+ 消息体】
 // 网络字节流
  ----------------  ---------------- -------------
|header uint32  | data []byte             
  ----------------  -----------------------------
   

根据定义好的数据流格式,声明一个基础net.conn连接的数据写入和读取,将对应的header和body数据正常取出。

type Session struct {
    conn net.Conn
}

func NewSession(conn net.Conn) *Session {
    return &Session{conn: conn}
}

// 向一个net.conn连接写入数据
func (s *Session) Write(data []byte) error {
    // 定义写数据的格式, 4字节头部 + 可变长度数据
    buf := make([]byte, 4+len(data))
    // 写入头部,记录数据长度
    binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))

    // 将整个数据,放入后面
    copy(buf[4:], data)
    _, err := s.conn.Write(buf)
    if err != nil {
        return err
    }
    return nil
}

// 从一个net.conn连接读取数据
func (s *Session) Read() ([]byte, error) {
    // 读取头部,记录数据长度
    header := make([]byte, 4)
    _, err := io.ReadFull(s.conn, header)
    if err != nil {
        return nil, err
    }
    dataLength := binary.BigEndian.Uint32(header)
    // 读取数据
    data := make([]byte, dataLength)
    _, err = io.ReadFull(s.conn, data)
    if err != nil {
        return nil, err
    }
    return data, nil
}


  • 定义好rpc交互API数据结构(字节流 -> Struct)
type RPCData struct {
    Name string // 访问函数名字
    // 访问的参数
    Args []interface{}
}

// 编码(dto -> []bytes)
func encode(data RPCData) ([]byte, error) {
    // 得到字节数组的编码器
    var buf bytes.Buffer
    bufEnc := gob.NewEncoder(&buf)
    if err := bufEnc.Encode(data);err != nil {
        return nil, err
    }
    return buf.Bytes(),nil
}

// 解码 ([]bytes -> dto)
func decode(b []byte)(RPCData,error) {
    buf := bytes.NewBuffer(b)
    bufDec := gob.NewDecoder(buf)
    var data RPCData
    if err := bufDec.Decode(&data);err != nil {
        return data,err
    }
    return data,nil
}

内置rpc实现服务端

  1. 服务端接收到的数据需要包括什么?
    调用的函数名、参数列表,还有一个返回值error类型。

  2. 服务端需要解决的问题是什么?
    Map维护客户端传来调用函数,服务端知道去调哪一个服务功能。

  3. 服务端的核心功能有哪些?
    维护函数map ; 客户端传来的请求数据进行解析; 函数的返回值打包,传给客户端。

总的来说,rpc的交互,就是client端调用一个server的method。在设计和交互中,都围绕方法function进行处理,并且要将两端在网络中交互的字节流数据进行序列化和反序列化处理。

type Server struct {
    addr  string
    funcs map[string]reflect.Value
}

func NewServer(addr string) *Server {
    return &Server{
        addr:  addr,
        funcs: make(map[string]reflect.Value),
    }
}

// rpcName : 参数函数名
// f: 传入真正的函数
func (s *Server) Register(rpcName string, f interface{}) {
    if _, ok := s.funcs[rpcName]; ok {
        return
    }
    s.funcs[rpcName] = reflect.ValueOf(f)
}

// 服务启动,等待调用
func (s *Server) Run() {
    lis, err := net.Listen("tcp", s.addr)
    if err != nil {
        fmt.Printf("监听 %s err :%v", s.addr, err)
        return
    }
    for {
        conn, err := lis.Accept()
        if err != nil {
            fmt.Printf("Accept err :%v", err)
            return
        }

        // 创建自定义rpc解析器,得到一个连接会话句柄
        serSession := NewSession(conn)
        // 使用rpc方式读取数据
        b, err := serSession.Read() // 从当前conn中拆分出header和body
        if err != nil {
            return
        }

        // 得到字节流数据后,进行反序列化到具体object处理
        rpcData, err := Decode(b)
        if err != nil {
            return
        }

        // 根据得到rpc object 得到调用方法名
        f, ok := s.funcs[rpcData.Name]
        if !ok {
            fmt.Printf("函数 %s 不存在", rpcData.Name)
            return
        }

        // 调用服务端方法
        // 1. 遍历解析客户端传来的参数,放切片里
        inArgs := make([]reflect.Value, len(rpcData.Args))
        for _, arg := range rpcData.Args {
            inArgs = append(inArgs, reflect.ValueOf(arg))
        }

        // 2. 通过反射调用方法,out是方法处理返回结果 []value类型
        out := f.Call(inArgs)
        // 3. 遍历结果,得到数据
        outArgs := make([]interface{}, len(out))
        for _, v := range out {
            outArgs = append(outArgs, v.Interface())
        }

        // 4. 数据编码,转换成字节流,返回客户端
        respRpcData := RPCData{rpcData.Name, outArgs}
        b, err = Encode(respRpcData)
        if err != nil {
            return
        }

        // 通过相同的conn连接写入字节流数据返回
        err = serSession.Write(b)
        if err != nil {
            return
        }

    }
}

内置rpc实现客户端

  1. 客户端只有函数原型,使用reflect.MakeFunc() 可以完成原型到函数的调用
  2. reflect.MakeFunc()是Client从函数原型到网络调用的关键。
type Client struct {
    conn net.Conn
}

func NewClient(conn net.Conn) *Client {
    return &Client{conn: conn}
}

// 实现通用的rpc客户端,抽象调用的方法函数
// fPtr 指向函数原型
func (c *Client) CallRPC(rpcName string, fPtr interface{}) {
    // 通过反射,获取fPtr指向的函数原型
    fn := reflect.ValueOf(fPtr).Elem()
    // 需要另一个函数,作用是对第一个函数参数操作
    argsf := func(args []reflect.Value) []reflect.Value {
        // 处理参数
        inArgs := make([]interface{}, 0, len(args))
        for _, arg := range args {
            inArgs = append(inArgs, arg.Interface())
        }

        // 连接
        cliSession := NewSession(c.conn)
        // 1. 编码数据
        reqRpc := RPCData{Name: rpcName, Args: inArgs}
        b, err := Encode(reqRpc)
        if err != nil {
            panic(err)
        }

        // 2. 写数据,发送数据,字节流
        err = cliSession.Write(b)
        if err != nil {
            panic(err)
        }

        // 3. 服务端响应数据处理
        respBytes, err := cliSession.Read()
        if err != nil {
            panic(err)
        }

        // 3.1 处理服务端返回数据,进行解码
        respRpc, err := Decode(respBytes)
        if err != nil {
            panic(err)
        }
        outArgs := make([]reflect.Value, 0, len(respRpc.Args))
        for i, arg := range respRpc.Args {
            // // 必须进行nil转换
            if arg == nil {
                //reflect.Zero()会返回类型的零值的value
                // .out()会返回函数输出的参数类型
                outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
                continue
            }
            outArgs = append(outArgs, reflect.ValueOf(arg))
        }
        return outArgs
    }
    
    // 完成原型到函数的内部转换
    // 参数1是reflect.type , 参数2 f是函数类型,是对于参数1 fn函数的操作
    // fn是定义,argsf是具体操作
    v := reflect.MakeFunc(fn.Type(),argsf)
    // 为函数fPtr赋值
    fn.Set(v)
}

调用实现例子

给服务端注册一个查询用户的方法,客户端使用rpc方式调用。

对于调用的过程,要结合client和server的connection连接的阻塞式的readwrite 时刻进行穿插理解。

// 给服务端注册一个查询用户的方法,客户端使用rpc方式调用
type User struct {
    Name string
    Age  int
}

func queryUser(uid int) (User, error) {
    user := make(map[int]User)

    // mock data
    user[0] = User{"zs", 20}
    user[1] = User{"xs", 21}
    user[2] = User{"ps", 22}

    // mock query user
    if u, ok := user[uid]; ok {
        return u, nil
    }
    return User{}, fmt.Errorf("%d err", uid)
}

func TestRPC(t *testing.T) {
    gob.Register(User{})

    rpcCallMethodName := "queryUser"
    addr := "127.0.0.1:8000"
    // init server
    srv := NewServer(addr)

    // 注册服务端方法
    srv.Register(rpcCallMethodName, queryUser)

    // 启动服务端,等待调用
    go srv.Run()

    // 客户端获取连接
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return
    }

    // init client, 与服务端建立了连接,但是还未写入请求数据,阻塞
    client := NewClient(conn)
    // client 声明函数原型
    var query func(int) (User, error)

    // 类似于实现了本地queryMethod方法的封装,函数名,入参数,返回数据
    // 定义好了client与server方法交互的hook逻辑,还未正式调用触发hook
    client.CallRPC(rpcCallMethodName, &query)

    // 调用本地函数,触发hook函数,实现函数入参写入connect,请求CallRPC.argsf逻辑
    u, err := query(1)
    if err != nil {
        return
    }
    fmt.Println(u)

}

Coin Marketplace

STEEM 0.29
TRX 0.13
JST 0.033
BTC 63295.30
ETH 3053.22
USDT 1.00
SBD 3.70