Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

Vinllen Chen


烛昭

golang net/rpc源码分析

  为什么需要rpc框架?一次rpc需要指定调用的方法,参数,接收返回值。如果没有rpc框架,裸写tcp,什么时候知道报文传递完毕的界限。最简单我们可以搞个私有协议,TLV格式指定:T(type)指定类型,L(length)指定长度,V(Value)指定值,但是这个也会带入一些问题,比如规范问题,不同服务提供不同协议,这不乱套了吗;另外还有效率问题,比如我要传递一个数组怎么传?基于以上几个问题,rpc框架出现了,rpc框架采用序列化操作将请求和返回在发送端进行序列化,然后在接收端进行解序列化达到目的,如下图所示,图片来自博客
pic1.gif
服务调用流程如下:

  1. client调用client stub,这是一次本地过程调用
  2. client stub将参数打包成一个消息,然后发送这个消息。打包过程(序列化)也叫做 marshalling
  3. client所在的系统将消息发送给server
  4. server的的系统将收到的包传给server stub
  5. server stub解包得到参数。 解包(解序列化)也被称作 unmarshalling
  6. 最后server stub调用服务过程. 返回结果按照相反的步骤传给client

  net/rpc是go自带的rpc框架,采用gob进行序列化。现在rpc框架有许多,比如跨语言调用的grpc,thrift等,服务治理框架dubbo,RPCX,go-micro等。rpc不同于RESTful API,前者可以基于HTTP,也可以基于TCP,UDP,主要注重方法,而后者为HTTP,主要为资源操作(增删改查)。关于rpc部分,可以查看这篇rpcx作者写的博客。本文的编写也参考了Go官方库RPC开发指南,这篇已经对net/rpc分析有个大概的轮廓了,只不过有些细节没有深究,本文我来扣一扣总结一下。
  本文框架:首先给出服务端和客户端调用的例子,然后介绍服务端代码,然后介绍客户端,最后总结一下。

1. 调用的例子

1.1 服务端调用

package server

import "errors"

type Args struct {  
    A, B int
}

type Quotient struct {  
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {  
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {  
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

  上面给出了服务端的例子,Arith提供了2个函数:Multiply相乘函数和Divide相除,格式规范为:第一个参数为传入的参数,第二个参数为返回的参数,返回值是error。这个也是定义rpc的必备约束:

  • the method's type is exported.
  • the method is exported.
  • the method has two arguments, both exported (or builtin) types.
  • the method's second argument is a pointer.
  • the method has return type error.

也就是说,一个合格的RPC调用接口应该长这样:

func (t *T) MethodName(argType T1, replyType *T2) error  

  服务端启动服务完整代码,具体调用流程在下面小节分析:

package main

import(  
    "net"
    "net/rpc"
    "net/http/httptest"
    "errors"
    "log"
    "sync"
)

type Args struct {  
    A, B int
}

type Quotient struct {  
    Quo, Rem int
}

type Arith int

func (t *Arith) Multiply(args *Args, reply *int) error {  
    *reply = args.A * args.B
    return nil
}

func (t *Arith) Divide(args *Args, quo *Quotient) error {  
    if args.B == 0 {
        return errors.New("divide by zero")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    return nil
}

func listenTCP() (net.Listener, string) {  
    l, e := net.Listen("tcp", "127.0.0.1:10011")
    if e != nil {
        log.Fatalf("net.Listen tcp :0: %v", e)
    }
    return l, l.Addr().String()
}

func startHttpServer() {  
    server := httptest.NewServer(nil)
    httpServerAddr := server.Listener.Addr().String()
    log.Println("Test HTTP RPC server listening on", httpServerAddr)
}

func main() {  
    rpc.Register(new(Arith)) //注册服务
    var l net.Listener
    l, serverAddr := listenTCP() //监听TCP连接
    log.Println("Test RPC server listening on", serverAddr)
    go rpc.Accept(l)

    rpc.HandleHTTP() //监听HTTP连接

    var httpOnce sync.Once
    httpOnce.Do(startHttpServer)

    select{}
}

1.2 客户端调用

  客户端分别调用异步和同步连接到TCP和HTTP2个接口。

package main

import(  
    "net/rpc"
    "log"
    "fmt"
)

type Args struct {  
    A, B int
}

type Quotient struct {  
    Quo, Rem int
}

func main() {  
    client, err := rpc.DialHTTP("tcp", "127.0.0.1:64120") //64120为服务端启动服务的端口
    if err != nil {
        log.Fatal("dialing:", err)
    }

    // Synchronous call
    args := &Args{7,8}
    var reply int
    err = client.Call("Arith.Multiply", args, &reply)
    if err != nil {
        log.Fatal("arith error:", err)
    }
    fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)

    // Asynchronous call
    clientTCP, err := rpc.Dial("tcp", "127.0.0.1:10011")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    quotient := new(Quotient)
    divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
    replyCall := <-divCall.Done    // will be equal to divCall
    if replyCall.Error != nil {
        fmt.Println(replyCall.Error)
    } else {
        fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
    }
}

2.服务端代码分析

  我们先来看一下服务端代码中的流程:注册(rpc.Register(new(Arith)))、启动监听(listenTCP())、协程处理TCP连接(rpc.Accept(l))、处理HTTP连接(rpc.HandleHTTP())。

2.1 注册(rpc.Register(new(Arith))

  首先是服务端代码调用rpc.Register(new(Arith)),然后是对应的Register代码:

// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }  

其中,DefaultServer是全局变量var DefaultServer = NewServer(),也就是说如果注册多次,都是挂在同一个Server下面(除非new一个新的Server,有相应的接口)。
  调用Server中的Register:

// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
//    - exported method of exported type
//    - two arguments, both of exported type
//    - the second argument is a pointer
//    - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {  
    return server.register(rcvr, "", false)
}

  接下来是具体的register函数:

func (server *Server) register(rcvr interface{}, name string, useName bool) error {  
    s := new(service)
    s.typ = reflect.TypeOf(rcvr)
    s.rcvr = reflect.ValueOf(rcvr)
    sname := reflect.Indirect(s.rcvr).Type().Name()
    if useName {
        sname = name
    }
    if sname == "" {
        s := "rpc.Register: no service name for type " + s.typ.String()
        log.Print(s)
        return errors.New(s)
    }
    if !isExported(sname) && !useName {
        s := "rpc.Register: type " + sname + " is not exported"
        log.Print(s)
        return errors.New(s)
    }
    s.name = sname

    // Install the methods
    s.method = suitableMethods(s.typ, true) //判断是否符合rpc规范

    if len(s.method) == 0 {
        str := ""

        // To help the user, see if a pointer receiver would work.
        method := suitableMethods(reflect.PtrTo(s.typ), false)
        if len(method) != 0 {
            str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
        } else {
            str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
        }
        log.Print(str)
        return errors.New(str)
    }

    if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
        return errors.New("rpc: service already defined: " + sname)
    }
    return nil
}

通过反射获取接口类型和值,并通过suitableMethods函数判断注册的rpc是否符合规范,最后调用server.serviceMap.LoadOrStore(sname, s)将对应rpc存放于map中,供之后查找。

2.2 启动监听(listenTCP()

  注册监听端口。

2.3 协程处理TCP连接(rpc.Accept(l)

  Accept函数内对端口进行监听,有新来的连接,启动协程调用server.ServerConn方法进行处理:

// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks until the listener
// returns a non-nil error. The caller typically invokes Accept in a
// go statement.
func (server *Server) Accept(lis net.Listener) {  
    for {
        conn, err := lis.Accept()
        if err != nil {
            log.Print("rpc.Serve: accept:", err.Error())
            return
        }
        go server.ServeConn(conn)
    }
}

  ServeConn接着调用ServeCodec,也就是走到了序列化/解序列化的地方:

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {  
    buf := bufio.NewWriter(conn)
    srv := &gobServerCodec{
        rwc:    conn,
        dec:    gob.NewDecoder(conn),
        enc:    gob.NewEncoder(buf),
        encBuf: buf,
    }
    server.ServeCodec(srv)
}

  ServeCodec代码,其主要为读取消息然后对request进行解序列化,然后调用相应的RPC方法,处理后发送序列化后的返回参数:

// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {  
    sending := new(sync.Mutex)
    wg := new(sync.WaitGroup)
    for {
        service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) //读取消息并解序列化,其主要包含两部分:1.调用readRequestHeader读取请求,并查看请求的service是否存在,存在则返回。2.对输入参数进行解序列化,以及对应service的返回参数的类型

        if err != nil {
            if debugLog && err != io.EOF {
                log.Println("rpc:", err)
            }
            if !keepReading {
                break
            }
            // send a response if we actually managed to read a header.
            if req != nil {
                server.sendResponse(sending, req, invalidRequest, codec, err.Error())
                server.freeRequest(req)
            }
            continue
        }
        wg.Add(1) //信号量控制,在下面call方法中会进行Done()
        go service.call(server, sending, wg, mtype, req, argv, replyv, codec)//调用对应的service处理,然后返回序列化后的返回值。
    }
    // We've seen that there are no more requests.
    // Wait for responses to be sent before closing codec.
    wg.Wait()//等待所有service.call完成
    codec.Close()
}

  readRequest不具体展开了,来看一下call函数:

func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {  
    if wg != nil {
        defer wg.Done() //信号量控制释放
    }
    mtype.Lock()
    mtype.numCalls++ //访问次数计数
    mtype.Unlock()
    function := mtype.method.Func
    // Invoke the method, providing a new value for the reply.
    returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) //调用对应的rpc
    // The return value for the method is an error.
    errInter := returnValues[0].Interface()
    errmsg := ""
    if errInter != nil {
        errmsg = errInter.(error).Error()
    }
    server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) //返回加密后的返回值
    server.freeRequest(req) //释放request,此处下面有疑问
}

  别的流程挺清晰了,此处freeRequest用于释放Request,对应的,readRequest中调用getRequest()获取头部。先来看一下Request结构体:

// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {  
    ServiceMethod string   // format: "Service.Method"
    Seq           uint64   // sequence number chosen by client
    next          *Request // for free list in Server
}

包括了rpc方法名,序列号,已经链表结构存储下一个结点。看起来像是链表结构存储Request,request请求来了,拿一个Request结点,请求处理完毕,释放掉。

func (server *Server) getRequest() *Request {  
    server.reqLock.Lock()
    req := server.freeReq
    if req == nil {
        req = new(Request)
    } else {
        server.freeReq = req.next
        *req = Request{}
    }
    server.reqLock.Unlock()
    return req
}

func (server *Server) freeRequest(req *Request) {  
    server.reqLock.Lock()
    req.next = server.freeReq
    server.freeReq = req
    server.reqLock.Unlock()
}

然而看以上2个代码实现,现在的问题是:如果链表内没有结点可拿,则new一个,结束后把结点插入到链表的头部。那么链表的长度表示“最大一次并发访问量”,比如最大一次并发接受了100个请求,则结束后这个链表长度为100个Request,那么这个意义在哪里?为啥要用链表存,直接请求来了new不行吗?反正也是链表中结点也是拿出来复用的。此处的确没看懂。
  同理,Response也是这么做的。

2.4 处理HTTP连接(rpc.HandleHTTP()

  以上是RPC over TCP的情况,go这个rpc库还提供了RPC over HTTP的接口。HandleHTTP调用链就不具体讲了,其主要将默认的DefaultRPCPath传递给http.Handle,当启动http server的时候,上面设置的RPC path将会生效,默认访问到该path。接下来是ServeHTTP处理方法:

// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {  
    if req.Method != "CONNECT" {
        w.Header().Set("Content-Type", "text/plain; charset=utf-8")
        w.WriteHeader(http.StatusMethodNotAllowed)
        io.WriteString(w, "405 must CONNECT\n")
        return
    }
    conn, _, err := w.(http.Hijacker).Hijack()
    if err != nil {
        log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
        return
    }
    io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
    server.ServeConn(conn)
}

  函数内对连接进行Hijack,然后调用ServeConn处理连接(这里为RPC over HTTP情况,与RPC over TCP为一个处理函数,也就是说下层透明)。

2.5服务端小结

  net/rpc中默认生成了一个server供调用,当然你也可以自己new一个。

3.客户端代码分析

  客户端代码提供了2种方式:同步Call和异步Go,其中Call方法的内部还是调用了Go方法,只不过进行了一次channel阻塞。

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {  
    call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    if done == nil {
        done = make(chan *Call, 10) // buffered.
    } else {
        // If caller passes done != nil, it must arrange that
        // done has enough buffer for the number of simultaneous
        // RPCs that will be using that channel. If the channel
        // is totally unbuffered, it's best not to run at all.
        if cap(done) == 0 {
            log.Panic("rpc: done channel is unbuffered")
        }
    }
    call.Done = done
    client.send(call)
    return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {  
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}

  Go里面是需要传入一个channel的且必须是带buffer的,如果没有,将会new一个,但是为什么这里channel buffer大小是10,我也不知道……send方法用于发送,考虑到一个client可能会调用多次send,所以有个加锁机制,但是为什么会有2个加锁?内层加锁是用于锁序列号,外层加锁锁整个发送过程?问题是既然2层嵌套,好像内层锁没什么用啊。

func (client *Client) send(call *Call) {  
    client.reqMutex.Lock()
    defer client.reqMutex.Unlock()

    // Register this call.
    client.mutex.Lock()
    if client.shutdown || client.closing {
        call.Error = ErrShutdown
        client.mutex.Unlock()
        call.done()
        return
    }
    seq := client.seq
    client.seq++
    client.pending[seq] = call
    client.mutex.Unlock()

    // Encode and send the request.
    client.request.Seq = seq
    client.request.ServiceMethod = call.ServiceMethod
    err := client.codec.WriteRequest(&client.request, call.Args)
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}

  上面是发送过程,接收回复在Dial方法中调用了:

// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {  
    conn, err := net.Dial(network, address)
    if err != nil {
        return nil, err
    }
    return NewClient(conn), nil
}

  NewClient调用:

// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
func NewClient(conn io.ReadWriteCloser) *Client {  
    encBuf := bufio.NewWriter(conn)
    client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
    return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {  
    client := &Client{
        codec:   codec,
        pending: make(map[uint64]*Call),
    }
    go client.input()
    return client
}

  同样也是先过解码器,调用input处理回复,失败或者处理完毕会调用done方法,这样读channel端就不会一直阻塞住。

func (client *Client) input() {  
    var err error
    var response Response
    for err == nil {
        response = Response{}
        err = client.codec.ReadResponseHeader(&response)
        if err != nil {
            break
        }
        seq := response.Seq
        client.mutex.Lock()
        call := client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()

        switch {
        case call == nil:
            // We've got no pending call. That usually means that
            // WriteRequest partially failed, and call was already
            // removed; response is a server telling us about an
            // error reading request body. We should still attempt
            // to read error body, but there's no one to give it to.
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
        case response.Error != "":
            // We've got an error response. Give this to the request;
            // any subsequent requests will get the ReadResponseBody
            // error if there is one.
            call.Error = ServerError(response.Error)
            err = client.codec.ReadResponseBody(nil)
            if err != nil {
                err = errors.New("reading error body: " + err.Error())
            }
            call.done()
        default:
            err = client.codec.ReadResponseBody(call.Reply)
            if err != nil {
                call.Error = errors.New("reading body " + err.Error())
            }
            call.done()
        }
    }
    // Terminate pending calls.
    client.reqMutex.Lock()
    client.mutex.Lock()
    client.shutdown = true
    closing := client.closing
    if err == io.EOF {
        if closing {
            err = ErrShutdown
        } else {
            err = io.ErrUnexpectedEOF
        }
    }
    for _, call := range client.pending {
        call.Error = err
        call.done()
    }
    client.mutex.Unlock()
    client.reqMutex.Unlock()
    if debugLog && err != io.EOF && !closing {
        log.Println("rpc: client protocol error:", err)
    }
}

func (call *Call) done() {  
    select {
    case call.Done <- call:
        // ok
    default:
        // We don't want to block here. It is the caller's responsibility to make
        // sure the channel has enough buffer space. See comment in Go().
        if debugLog {
            log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
        }
    }
}

总结

  net/rpc库默认采用gob进行序列化,当然这个可以更改为protobuf, json等。据说net/rcp的性能很不错。

说明:

转载请注明链接: http://vinllen.com/golang-net-rpcyuan-ma-fen-xi/

参考:

http://colobu.com/2016/09/18/go-net-rpc-guide/
https://www.gitbook.com/book/smallnest/go-rpc-programming-guide/details


About the author

vinllen chen

Beijing, China

格物致知


Discussions

comments powered by Disqus