增加清晰度的网站,网站刚建好怎么做能让百度收录,photoshop电脑版,微信登录网址文章目录 RPC1.定义2.概念3.优缺点4.RPC结构5.RPC消息协议5.1 消息边界5.2 内容5.3 压缩 6.RPC的实现6.1 divide_protocol.py6.2 server.py6.3 client.py RPC
1.定义
远程过程调用(remote procedure call)
2.概念
广义:所有通过网络进行通讯,的调用统称为RPC调用
狭义:不采… 文章目录 RPC1.定义2.概念3.优缺点4.RPC结构5.RPC消息协议5.1 消息边界5.2 内容5.3 压缩 6.RPC的实现6.1 divide_protocol.py6.2 server.py6.3 client.py RPC
1.定义
远程过程调用(remote procedure call)
2.概念
广义:所有通过网络进行通讯,的调用统称为RPC调用
狭义:不采用http协议的方式,采用自定义格式的二进制方式
3.优缺点
优点 效率高发起rpc调用的一方,可以忽略RPC的具体实现,如同编写本地函数调用 缺点 通用性不高
4.RPC结构
client(caller):调用者client stub(bundle args/unbundle ret vals):客户端存根client network serviceserver network serviceserver stub(bundle ret vals/unbundle args)
5.RPC消息协议
5.1 消息边界
分隔符(\r\n)长度声明法(例如HTTP中 Content-Length)
5.2 内容
二进制文本内容
5.3 压缩
压缩处理是一把双刃剑,减少数据量减轻带宽压力同时,额外增加了压缩和解压缩的时间
6.RPC的实现
6.1 divide_protocol.py
import struct
from io import BytesIOclass InvalidOperation(Exception):...class DivideProtocol(object):float divide(1:int num1, 2:int num21)def _read_all(self, size):读取指定长度的字节:param size: 长度:return: 读取出的二进制数据if isinstance(self.conn, BytesIO):# BytesIO类型用于演示buff bhave 0while have size:chunk self.conn.read(size - have)have len(chunk)buff chunkreturn buffelse:# socket类型buff bhave 0while have size:chunk self.conn.recv(size - have)have len(chunk)buff chunk# 客户端关闭了连接if len(chunk) 0:raise EOFError()return buffdef args_encode(self, num1, num21):对调用参数进行编码:param num1: int:param num2: int:return: 编码后的二进制数据# 处理参数num1, 4字节整型buff struct.pack(!B, 1)buff struct.pack(!i, num1)# 处理参数num2, 4字节整型如为默认值1则不再放到消息中if num2 ! 1:buff struct.pack(!B, 2)buff struct.pack(!i, num2)# 处理消息总长度4字节无符号整型length len(buff)# 处理方法名字符串类型name divide# 字符串长度4字节无符号整型msg struct.pack(!I, len(name))msg name.encode()msg struct.pack(!I, length) buffreturn msgdef args_decode(self, connection):获取调用参数并进行解码:param connection: 传输工具对象如socket对象或者BytesIO对象从中可以读取消息数据:return: 解码后的参数字典# 保存到当前对象中供_read_all方式使用self.conn connectionparam_name_map {1: num1,2: num2}param_len_map {1: 4,2: 4}# 用于保存解码后的参数字典args dict()# 读取消息总长度4字无节符号整数buff self._read_all(4)length struct.unpack(!I, buff)[0]# 记录已读取的长度have 0# 读取第一个参数4字节整型buff self._read_all(1)have 1param_seq struct.unpack(!B, buff)[0]param_len param_len_map[param_seq]buff self._read_all(param_len)have param_lenargs[param_name_map[param_seq]] struct.unpack(!i, buff)[0]if have length:return args# 读取第二个参数4字节整型buff self._read_all(1)have 1param_seq struct.unpack(!B, buff)[0]param_len param_len_map[param_seq]buff self._read_all(param_len)have param_lenargs[param_name_map[param_seq]] struct.unpack(!i, buff)[0]return argsdef result_encode(self, result):对调用的结果进行编码:param result: float 或 InvalidOperation对象:return: 编码后的二进制数据if isinstance(result, float):# 没有异常正常执行# 处理结果类型1字节无符号整数buff struct.pack(!B, 1)# 处理结果值, 4字节floatbuff struct.pack(!f, result)else:# 发生了InvalidOperation异常# 处理结果类型1字节无符号整数buff struct.pack(!B, 2)# 处理异常结果值, 字符串# 处理字符串长度, 4字节无符号整数buff struct.pack(!I, len(result.message))# 处理字符串内容buff result.message.encode()return buffdef result_decode(self, connection):对调用结果进行解码:param connection: 传输工具对象如socket对象或者BytesIO对象从中可以读取消息数据:return: 结果数据self.conn connection# 取出结果类型, 1字节无符号整数buff self._read_all(1)result_type struct.unpack(!B, buff)[0]if result_type 1:# float的结果值 4字节floatbuff self._read_all(4)result struct.unpack(!f, buff)[0]return resultelse:# InvalidOperation对象# 取出字符串长度, 4字节无符号整数buff self._read_all(4)str_len struct.unpack(!I, buff)[0]buff self._read_all(str_len)message buff.decode()return InvalidOperation(message)class MethodProtocol(object):def __init__(self, connection):self.conn connectiondef _read_all(self, size):读取指定长度的字节:param size: 长度:return: 读取出的二进制数据if isinstance(self.conn, BytesIO):# BytesIO类型用于演示buff bhave 0while have size:chunk self.conn.read(size - have)have len(chunk)buff chunkreturn buffelse:# socket类型buff bhave 0while have size:print(have%d size%d % (have, size))chunk self.conn.recv(size - have)have len(chunk)buff chunkif len(chunk) 0:raise EOFError()return buffdef get_method_name(self):# 获取方法名# 读取字符串长度4字节无符号整型buff self._read_all(4)str_len struct.unpack(!I, buff)[0]# 读取字符串buff self._read_all(str_len)name buff.decode()return name
6.2 server.py
import socket
import threadingfrom customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperationclass Handlers:staticmethoddef divide(num1, num21):除法:param num1::param num2::return:if num2 0:raise InvalidOperation()val num1 / num2return valclass ServerStub(object):def __init__(self, connection, handlers):服务器存根:param connection: 与客户端的socket连接:param handlers: 存放被调用的方法self._process_map {divide: self._process_divide,}self.conn connectionself.method_proto MethodProtocol(self.conn)self.handlers handlersdef process(self):被服务器调用的入口服务器收到请求后调用该方法# 获取解析调用请求的方法名name self.method_proto.get_method_name()# 调用对应的处理方法self._process_map[name]()def _process_divide(self):执行divide本地调用并将结果返回给客户端# 接收调用参数proto DivideProtocol()args proto.args_decode(self.conn)# 进行本地divide调用try:result self.handlers.divide(**args)except InvalidOperation as e:result e# 构造返回值消息并返回result proto.result_encode(result)self.conn.sendall(result)class Server(object):def __init__(self, host, port, handlers):self.sock socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.host hostself.port portself.sock.bind((host, port))self.handlers handlersdef serve(self):开始服务self.sock.listen(128)print(开始监听)while True:conn, addr self.sock.accept()print(建立链接%s % str(addr))stub ServerStub(conn, self.handlers)try:while True:stub.process()except EOFError:print(客户端关闭连接)# 关闭服务端连接conn.close()class ThreadServer(object):def __init__(self, host, port, handlers):self.sock socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.host hostself.port portself.sock.bind((host, port))self.handlers handlersdef serve(self):开始服务self.sock.listen(128)print(开始监听)while True:conn, addr self.sock.accept()print(建立链接%s % str(addr))t threading.Thread(targetself.handle, args(conn,))t.start()def handle(self, client):stub ServerStub(client, self.handlers)try:while True:stub.process()except EOFError:print(客户端关闭连接)client.close()if __name__ __main__:server Server(127.0.0.1, 8000, Handlers)server.serve() 6.3 client.py
import time
import socketfrom customize_rpc.divide_protocol import DivideProtocol, InvalidOperationclass Channel(object):连接通道def __init__(self, host, port):self.host hostself.port portdef get_connection(self):获取一个tcp连接sock socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.connect((self.host, self.port))return sockclass ClientStub(object):客户端存根def __init__(self, channel: Channel):self.channel channelself.conn self.channel.get_connection()def divide(self, num1, num21):# 构造proto DivideProtocol()args proto.args_encode(num1, num2)self.conn.sendall(args)result proto.result_decode(self.conn)if isinstance(result, InvalidOperation):raise resultelse:return resultif __name__ __main__:channel Channel(127.0.0.1, 8000)stub ClientStub(channel)for i in range(5):try:val stub.divide(i * 100, 10)except InvalidOperation as e:print(e.message)else:print(val)time.sleep(1)