正品查询网站怎么做,哪些网站可以找到做跨境电商的公司,东莞seo排名外包,国内电商企业有哪些今天这篇是接上上篇RPC原理之后这篇是讲如何使用go本身自带的标准库RPC。这篇篇幅会比较短。重点在于上一章对的补充。 文章目录 RPC包的概念使用RPC包服务器代码分析如何实现的#xff1f;总结Server还提供了两个注册服务的方法 客户端代码分析如何实现的#xff1f;如何异步…今天这篇是接上上篇RPC原理之后这篇是讲如何使用go本身自带的标准库RPC。这篇篇幅会比较短。重点在于上一章对的补充。 文章目录 RPC包的概念使用RPC包服务器代码分析如何实现的总结Server还提供了两个注册服务的方法 客户端代码分析如何实现的如何异步编程同步总结 codec序列化框架使用JSON协议的RPC RPC包的概念
回顾RPC原理
看完回顾后其实就可以继续需了解并使用go中所提供的包。
Go语言的 rpc 包提供对通过网络或其他i/o连接导出的对象方法的访问服务器注册一个对象并把它作为服务对外可见服务名称就是类型名称。
注册后对象的导出方法将支持远程访问。服务器可以注册不同类型的多个对象(服务) 但是不支持注册同一类型的多个对象。
Go官方提供了一个RPC库: net/rpc。
包rpc提供了通过网络访问一个对象的输出方法的能力。
服务器需要注册对象通过对象的类型名暴露这个服务。
注册后这个对象的输出方法就可以远程调用这个库封装了底层传输的细节包括序列化(默认GOB序列化器)。
对象的方法要能远程访问它们必须满足一定的条件否则这个对象的方法会被忽略
方法的类型是可输出的方法本身是可输出的方法必须由两个参数必须是输出类型或者是内建类型方法的第二个参数必须是指针类型 方法返回类型为error
所以一个输出方法的格式如下:
func (t *T) MethodName(argType T1, replyType *T2) error这里的T、T1、T2能够被encoding/gob序列化即使使用其它的序列化框架将来这个需求可能回被弱化。
第一个参数T1代表调用者(client)提供的参数第二个参数*T2代表要返回给调用者的计算结果方法的返回值如果不为空 那么它作为一个字符串返回给调用者(所以需要一个序列化框架)如果返回error则reply参数不会返回给调用者
使用RPC包
简单例子是一个非常简单的服务。 我们在这个里面就搞1和12就好:
在这个例子中定义了一个简单的RPC服务器和客户端使用的方法是一个
第一步需要定义传入参数和返回参数的数据结构
type Args struct {A, B int
}
type Quotient struct {Quo, Rem int
}第二步定义一个服务对象这个服务对象可以很简单。
比如类型是int或者是interface{},重要的是它输出的方法。
type Arith int第三步实现这个类型的两个方法 乘法和除法
func (t *Arith) Multiply(args *Args, reply *int) error {*reply args.A * args.Breturn nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {if args.B 0 {return errors.New(divide by zero)}quo.Quo args.A / args.Bquo.Rem args.A % args.Breturn nil
}第四步实现RPC服务器: 基于tcp实现
生成了一个Arith对象并使用rpc.Register注册这个服务然后通过HTTP暴露出来
arith : new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
l, e : net.Listen(tcp, :9091)
if e ! nil {log.Fatal(listen error:, e)
}
go http.Serve(l, nil)
select{
}客户端可以看到服务Arith以及它的两个方法Arith.Multiply和Arith.Divide
第五步创建一个客户端建立客户端和服务器端的连接: 分为同步调用和异步调用(都是远程调用)
同步调用
client, err : rpc.DialHTTP(tcp, 127.0.0.1:9091)
if err ! nil {log.Fatal(dialing:, err)
}args : server.Args{7,8}
var reply int
err client.Call(Arith.Multiply, args, reply)
if err ! nil {log.Fatal(arith error:, err)
}
fmt.Printf(Arith: %d*%d%d, args.A, args.B, reply)
异步调用
client, err : rpc.DialHTTP(tcp, 127.0.0.1:9091)
if err ! nil {log.Fatal(dialing:, err)
}
quotient : new(Quotient)
divCall : client.Go(Arith.Divide, args, quotient, nil)
replyCall : -divCall.Done // will be equal to divCall
// check errors, print, etc.完整的例子
创建一个service.go 的文件用来保存结构体对象以及方法
package mainimport errorstype Args struct {A, B int
}type Quotient struct {Quo, Rem int
}type Arith intfunc (t *Arith) Multiply(args *Args, reply *int) error {*reply args.A * args.Breturn nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {if args.B 0 {return errors.New(divide by zero)}quo.Quo args.A / args.Bquo.Rem args.A % args.Breturn nil
}创建一个RPC服务端server.go
package mainimport (lognetnet/httpnet/rpc
)func main() {arith : new(Arith)rpc.Register(arith)rpc.HandleHTTP()l, e : net.Listen(tcp, :9091)if e ! nil {log.Fatal(listen error:, e)}go http.Serve(l, nil)select {}
}创建一个客户端client.go
package mainimport (fmtlognet/rpc
)func main() {// 建立HTTP连接client, err : rpc.DialHTTP(tcp, 127.0.0.1:9091)if err ! nil {log.Fatal(dialing:, err)}// 同步调用args : Args{7, 8}var reply interr client.Call(Arith.Multiply, args, reply)if err ! nil {log.Fatal(arith error:, err)}fmt.Printf(Arith: %d*%d%d, args.A, args.B, reply)// 异步调用quotient : new(Quotient)divCall : client.Go(Arith.Divide, args, quotient, nil)replyCall : -divCall.Done // will be equal to divCall// check errors, print, etc.fmt.Println(replyCall.Error)fmt.Println(quotient)
}
打开终端
先启动服务器
go run server.go service.go在打开一个终端
最后启动一个客户端
go run client.go service.go结果为
服务器代码分析
Server的很多方法你可以直接调用这对于一个简单的Server的实现更方便但是你如果需要配置不同的Server
比如不同的监听地址或端口就需要自己生成Server:
var DefaultServer NewServer()Server有多种Socket监听的方式: func (server *Server) Accept(lis net.Listener)func (server *Server) HandleHTTP(rpcPath, debugPath string)func (server *Server) ServeCodec(codec ServerCodec)func (server *Server) ServeConn(conn io.ReadWriteCloser)func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)func (server *Server) ServeRequest(codec ServerCodec) errorServeHTTP: 实现了处理 http请求的业务逻辑它首先处理http的CONNECT请求 接收后就Hijacker这个连接conn 然后调用ServeConn在这个连接上处理这个客户端的请求。 其实是实现了 http.Handler接口我们一般不直接调用这个方法。 Server.HandleHTTP设置rpc的上下文路径rpc.HandleHTTP使用默认的上下文路径DefaultRPCPathDefaultDebugPath 当你启动一个http server的时候 http.ListenAndServe面设置的上下文将用作RPC传输这个上下文的请求会教给ServeHTTP来处理以上是RPC over http的实现可以看出 net/rpc只是利用 http CONNECT建立连接这和普通的 RESTful api还是不一样的。源码
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {if req.Method ! CONNECT {w.Header().Set(Content-Type, text/plain; charsetutf-8)w.WriteHeader(http.StatusMethodNotAllowed)io.WriteString(w, 405 must CONNECT\n)return}conn, _, err : w.(http.Hijacker).Hijack()if err ! nil {log.Print(rpc hijacking , req.RemoteAddr, : , err.Error())return}io.WriteString(conn, HTTP/1.0 connected\n\n)server.ServeConn(conn)
}如何实现的
Accept用来处理一个监听器一直在监听客户端的连接一旦监听器接收了一个连接则还是交给ServeConn在另外一个goroutine中去处理(源码)
//Accept接受侦听器上的连接并提供请求
//每个传入连接。接受阻塞直到监听器
//返回非nil错误。对象中调用Accept
//go语句
func (server *Server) Accept(lis net.Listener) {for {conn, err : lis.Accept()if err ! nil {log.Print(rpc.Serve: accept:, err.Error())return}go server.ServeConn(conn)}
}协程进入ServeConn可以看出很重要的一个方法就是ServeConn
// ServeConn在单连接上运行服务器。
// ServeConn阻塞服务连接直到客户端挂起。
//调用者通常在go语句中调用ServeConn。
// ServeConn使用gob连接格式(参见包gob)
//连接。要使用备用编解码器请使用ServeCodec。
//有关并发访问的信息请参阅NewClient的注释。.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {buf : bufio.NewWriter(conn)srv : gobServerCodec{rwc: conn,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),encBuf: buf,}server.ServeCodec(srv)
}连接其实是交给一个ServerCodec去处理这里默认使用gobServerCodec去处理这是一个未输出默认的编解码器可以使用其它的编解码器。
// ServeCodec类似于ServeConn但使用指定的编解码器来
//解码请求和编码响应。
func (server *Server) ServeCodec(codec ServerCodec) {sending : new(sync.Mutex)wg : new(sync.WaitGroup)for {service, mtype, req, argv, replyv, keepReading, err : server.readRequest(codec)if err ! nil {if debugLog err ! io.EOF {log.Println(rpc:, err)}if !keepReading {break}// send a response if we actually managed to read a header.if req ! nil {server.sendResponse(sending, req, invalidRequest, codec, err.Error())server.freeRequest(req)}continue}wg.Add(1)go service.call(server, sending, wg, mtype, req, argv, replyv, codec)}//我们已经看到没有更多的请求。//在关闭编解码器之前等待响应。wg.Wait()codec.Close()
}它其实一直从连接中读取请求然后调用go service.call在另外的goroutine中处理服务调用。
总结 对象重用。 Request和Response都是可重用的通过Lock处理竞争。这在大并发的情况下很有效。 使用了大量的goroutine。如果使用一定数量的goroutine作为worker池去处理这个case可能还会有些性能的提升但是更复杂了。使用goroutine可以获得了非常好的性能。 业务处理是异步的服务的执行不会阻塞其它消息的读取。 一个codec实例必然和一个connnection相关因为它需要从connection中读取request和发送response。
go的rpc官方库的消息(request和response)的定义很简单 就是消息头(header)内容体(body)。 消息体是reply类型的序列化后的值。
type Request struct {ServiceMethod string // format: Service.MethodSeq uint64 // 客户端选择的序列号// 包含过滤或未导出的字段
}Server还提供了两个注册服务的方法
第二个方法为服务起一个别名否则服务名已它的类型命名 func (server *Server) Register(rcvr interface{}) errorfunc (server *Server) RegisterName(name string, rcvr interface{}) error它们俩底层调用register进行服务的注册(这里的源码太多就不放了)
func (server *Server) register(rcvr interface{}, name string, useName bool) error受限于Go语言的特点我们不可能在接到客户端的请求的时候根据反射动态的创建一个对象就是Java那样。
因此在Go语言中我们需要预先创建一个服务map这是在编译的时候完成的 说白了这里需要建立一个注册名与服务之间的映射关系
server.serviceMap make(map[string]*service)同时每个服务还有一个方法map: map[string]*methodType,通过suitableMethods建立映射:
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType这样rpc在读取请求header通过查找这两个map就可以得到要调用的服务及它的对应方法了。
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {if wg ! nil {defer wg.Done()}mtype.Lock()mtype.numCallsmtype.Unlock()function : mtype.method.Func// 调用该方法为应答提供一个新值。returnValues : function.Call([]reflect.Value{s.rcvr, argv, replyv})// 该方法的返回值是一个错误。.errInter : returnValues[0].Interface()errmsg : if errInter ! nil {errmsg errInter.(error).Error()}server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)server.freeRequest(req)
}客户端代码分析
客户端要建立和服务器的连接 func Dial(network, address string) (*Client, error)func DialHTTP(network, address string) (*Client, error)func DialHTTPPath(network, address, path string) (*Client, error)func NewClient(conn io.ReadWriteCloser) *Clientfunc NewClientWithCodec(codec ClientCodec) *Client如何实现的
DialHTTP 和 DialHTTPPath是通过HTTP的方式和服务器建立连接他俩的区别之在于是否设置上下文路径:
// DialHTTPPath连接HTTP RPC服务器在指定的网络地址和路径上
func DialHTTPPath(network, address, path string) (*Client, error) {conn, err : net.Dial(network, address)if err ! nil {return nil, err}io.WriteString(conn, CONNECT path HTTP/1.0\n\n)// 在切换到RPC协议之前需要成功的HTTP响应resp, err : http.ReadResponse(bufio.NewReader(conn), http.Request{Method: CONNECT})if err nil resp.Status connected {return NewClient(conn), nil}if err nil {err errors.New(unexpected HTTP response: resp.Status)}conn.Close()return nil, net.OpError{Op: dial-http,Net: network address,Addr: nil,Err: err,}
}首先发送 CONNECT 请求如果连接成功则通过NewClient(conn)创建client。
Dial则通过TCP直接连接服务器
// Dial连接到指定网络地址的RPC服务器
func Dial(network, address string) (*Client, error) {conn, err : net.Dial(network, address)if err ! nil {return nil, err}return NewClient(conn), nil
}注意根据服务是over HTTP还是 over TCP选择合适的连接方式
NewClient则创建一个缺省codec为glob序列化库的客户端
// NewClient返回一个新的Client来处理到连接另一端的服务集合。
//在连接的写端添加一个缓冲区,报头和有效载荷作为一个单元发送。
//
//连接的读写部分是独立序列化的不需要联锁。然而每一半都可以访问并发所以conn的实现应该防止,并发读或并发写。
func NewClient(conn io.ReadWriteCloser) *Client {encBuf : bufio.NewWriter(conn)client : gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}return NewClientWithCodec(client)
}如果想用其它的序列化库你可以调用NewClientWithCodec方法 一般用来做RPC框架的
// NewClientWithCodec类似于NewClient但使用指定的编码请求和解码响应。
func NewClientWithCodec(codec ClientCodec) *Client {client : Client{codec: codec,pending: make(map[uint64]*Call),}go client.input()return client
}重要的是input方法它以一个死循环的方式不断地从连接中读取response,然后调用map中读取等待的Call.Done channel通知完成。(这个其实有点令牌扫描的作用消息队列中有说)
消息的结构和服务器一致都是HeaderBody的方式
客户端的调用有两个方法: Go 和 Call
Go方法是异步的它返回一个 Call指针对象 它的Done是一个channel如果服务返回Done就可以得到返回的对象(实际是Call对象包含Reply和error信息)。Call 方法是同步的它实际是调用Go实现的。
如何异步编程同步
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {call : -client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}从一个Channel中读取对象会被阻塞住直到有对象可以读取这种实现很简单也很方便。
总结
从源码中我们还可以学到锁Lock的一种实用方式也就是尽快的释放锁而不是defer mu.Unlock直到函数执行到最后才释放那样锁占用的时间太长了。 codec序列化框架
rpc框架默认使用gob序列化库很多情况下我们追求更好的效率的情况下或者追求更通用的序列化格式我们可能采用其它的序列化方式 比如protobuf, json, xml等。
市面上也有许多序列化框架。速度快而且非常好用。gRPC是互联网后台常用的RPC框架其内部是由protobuf协议完成通讯。这个后面再讲。
JDK Serializable、FST、Kryo、Protobuf、Thrift、Hession和AvroFury
Fury是最新的序列化框架号称比jdk 快170倍后面会讲的 支持多种语言
Go官方库实现了JSON-RPC 1.0。JSON-RPC是一个通过JSON格式进行消息传输的RPC规范因此可以进行跨语言的调用。
Go的net/rpc/jsonrpc库可以将JSON-RPC的请求转换成自己内部的格式:
func (c *serverCodec) ReadRequestHeader(r *rpc.Request) error {c.req.reset()if err : c.dec.Decode(c.req); err ! nil {return err}r.ServiceMethod c.req.Methodc.mutex.Lock()c.seqc.pending[c.seq] c.req.Idc.req.Id nilr.Seq c.seqc.mutex.Unlock()return nil
}使用JSON协议的RPC
rpc 包默认使用的是 gob 协议对传输数据进行序列化/反序列化比较有局限性。
将例子进行修改: 服务器端
package mainimport (lognetnet/rpcnet/rpc/jsonrpc
)func main() {arith : new(Arith)rpc.Register(arith)l, e : net.Listen(tcp, :9091)if e ! nil {log.Fatal(listen error:, e)}for {conn, _ : l.Accept()// 使用JSON协议rpc.ServeCodec(jsonrpc.NewServerCodec(conn))}
}客户端
package mainimport (fmtlognetnet/rpcnet/rpc/jsonrpc
)func main() {// 建立HTTP连接conn, err : net.Dial(tcp, 127.0.0.1:9091)if err ! nil {log.Fatal(dialing:, err)}client : rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))// 同步调用args : Args{7, 8}var reply interr client.Call(Arith.Multiply, args, reply)if err ! nil {log.Fatal(arith error:, err)}fmt.Printf(Arith: %d*%d%d, args.A, args.B, reply)// 异步调用quotient : new(Quotient)divCall : client.Go(Arith.Divide, args, quotient, nil)replyCall : -divCall.Done // will be equal to divCall// check errors, print, etc.fmt.Println(replyCall.Error)fmt.Println(quotient)
}如何使用与上面的例子一致。
社区中各式RPC框架grpc、thrift等就是为了让RPC调用更方便。