专门做海报的网站,域名查询,网站添加多个关键词,wordpress外网打开慢远程过程调用#xff08;RPC#xff09;协议详解 什么是RPC协议RPC的基本原理RPC的关键组件RPC的优缺点Protobuf函数绑定CallEncodeRecvDecodeSocket.Send和Recv项目地址 什么是RPC协议
远程过程调用#xff08;Remote Procedure Call#xff0c;简称RPC#xff09;是一种… 远程过程调用RPC协议详解 什么是RPC协议RPC的基本原理RPC的关键组件RPC的优缺点Protobuf函数绑定CallEncodeRecvDecodeSocket.Send和Recv项目地址 什么是RPC协议
远程过程调用Remote Procedure Call简称RPC是一种网络通信协议允许程序在不同的地址空间通常在不同的物理计算机上中调用彼此的方法好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节使开发人员能够像调用本地函数一样简单地调用远程服务。
RPC的基本原理
RPC的工作原理基于客户端-服务器模型主要包括以下步骤
1.客户端调用客户端程序发起对某个远程过程的调用请求。 2.请求打包调用参数被打包成消息发送到服务器。 3.服务器解包和执行服务器接收到消息解包获取调用参数执行相应的远程过程。 4.结果打包和返回执行结果被打包成消息发送回客户端。 5.客户端接收结果客户端解包消息获取调用结果
RPC的关键组件
1.客户端代理负责将本地调用请求转换为远程调用请求打包参数并通过网络发送给服务器。 2.服务器代理负责接收客户端的请求解包参数调用相应的服务方法并将结果打包返回给客户端。 3.通信协议定义客户端和服务器之间如何通信常见的协议有HTTP、TCP等。 4.编解码器负责参数和结果的序列化和反序列化常见的格式有JSON、XML、Protobuf等。
RPC的优缺点
优点 1.简化远程调用使得远程调用像本地调用一样简单开发人员无需关心底层的网络通信细节。 2.语言无关大多数RPC框架支持多种编程语言方便不同语言的系统互操作。
缺点 1.调试困难由于涉及网络通信调试远程调用的问题比本地调用更加复杂。 2.可靠性要求高需要处理网络延迟、丢包、超时等问题增加了系统的复杂性。 3.耦合性客户端和服务器需要共同遵循同一套接口定义一旦接口发生变化可能需要同时更新多个系统。
Protobuf
protoc.exe 生成C#文件
Gen.bat
echo offrem 设置路径变量
set PROTOC_PATHprotoc.exe
set PROTO_DIRProtos
set OUTPUT_DIRProtocolCodesrem 创建日志头
echo .......................proto2C#.......................
echo.rem 检查目录是否存在
if not exist %PROTO_DIR% (echo Error: Protocols directory does not exist.echo Please create the Protocols directory and place your .proto files in it.echo.pauseexit /b
)rem 创建输出目录
if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR%rem 批量处理 .proto 文件
for %%f in (%PROTO_DIR%\*.proto) do (echo %%f complete%PROTOC_PATH% --proto_path%PROTO_DIR% --csharp_out%OUTPUT_DIR% %%f
)echo code generation complete. Press any key to close.
pause nul
函数绑定
1.使用反射自动获取所有RPC函数, 对其进行Hash绑定
函数的定义 RPCMsgHandles.cs
public sealed class RPCMsgHandles
{private static void ReqMove(int unitId, Move move){}private static void RecvAttack(int skillid, Attack attack, ItemList itemList){LogHelper.Log($Recv: skillid {attack.Id}, targetId {attack.TargetId}, itemList.Count {itemList.Items.Count});}private static void RecvDelete(int msg){LogHelper.Log($Recv: state {msg});}private static void RecvReflectMove(Move move){LogHelper.Log($move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir});}
}使用反射进行函数绑定 RPCMoudle.cs
public sealed class RPCMoudle
{private static Dictionaryint, IRPC _msg new Dictionaryint, IRPC();public static void Init(){System.Type type typeof(RPCMsgHandles);MethodInfo[] methods type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic);foreach (MethodInfo methodInfo in methods){RPC method new RPC(methodInfo);int index 0;ParameterInfo[] infos methodInfo.GetParameters();foreach (var info in infos){if (typeof(IMessage).IsAssignableFrom(info.ParameterType)){IMessage message Activator.CreateInstance(info.ParameterType) as IMessage;method.AddParamType(DateType.Message);method.AddParam(index, message);}else{DateType dateType GetDateType(info.ParameterType);method.AddParamType(dateType);}index;}int hash Globals.Hash(methodInfo.Name);if (_msg.ContainsKey(hash))throw new Exception(AddParamType rpc _method hash conflict: methodInfo.Name);_msg.Add(hash, method);}}
}2.使用泛型手动进行RPC函数绑定
泛型类进行函数绑定 RPCMoudle.cs
public static void RegisterT(string methodName, ActionT action) where T : class, IMessage, new()
{int id Globals.Hash(methodName);RPCStaticT method new RPCStaticT();method.Register(action, new T());if (_msg.ContainsKey(id)){LogHelper.LogError($repeat id, id {id});}_msg[id] method;
}public static void Unregister(string methodName)
{int id Globals.Hash(methodName);if (_msg.ContainsKey(id)){_msg.Remove(id);}else{LogHelper.LogError($no find method, id {id});}
}Call
Call的实现, encode数据到byte[]第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid
Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包
Call函数有多个方法重载, 根据业务需求使用 1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定 2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷
具体调用例子
Move move new Move();
move.X 10;
move.Y 20;
move.Speed 100;
move.Dir 20;这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型
使用起来简单方便, 业务逻辑开发上使用较为方便
比如请求领取奖励 RPCMoudle.Call(ReqAward, 传入表奖励id);
比如请求保存勾选 RPCMoudle.Call(Save, true);
RPCMoudle.Call(ReqMove, 10016, move);这样是类型安全的, 也不会存在装箱拆箱的开销
更加高效, 战斗场景较为适合
RPCMoudle.Call(ReqMove, move);Call的实现, 将数据进行Encode转换成二进制
public static void Call(string methodName, IMessage message)
{if (message null) return;try{int id Globals.Hash(methodName);int offset 0;BuffMessage msg GameFrame.message.GetBuffMessage();BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);offset sizeof(int);BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);BitConverterHelper.WriteMessage(msg.bytes, ref offset, message);msg.length offset;Main.Instance.Send(msg);}catch(Exception ex){LogHelper.LogError(ex.ToString());}
}public static void Call(string id, params object[] args)
{try{Profiler.BeginSample(rpc call);int hash Globals.Hash(id);BuffMessage msg Encode(hash, args);Main.Instance.Send(msg);Profiler.EndSample();}catch(Exception ex){LogHelper.LogError(ex.ToString());}
}Encode
Encode函数的实现
private static BuffMessage Encode(int id, params object[] args)
{int offset 0;BuffMessage msg GameFrame.message.GetBuffMessage();BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);offset sizeof(int);BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);foreach (object arg in args){try{System.Type type arg.GetType();switch (arg){case IMessage:BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg);break;case Int16:BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg);break;case Int32:BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg);break;case Int64:BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg);break;case UInt16:BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg);break;case UInt32:BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg);break;case UInt64:BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg);break;case bool:BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg);break;case Byte:BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);break;case SByte:BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);break;case Char:BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg);break;case Single:BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg);break;case Double:BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg);break;case string:BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg);break;}}catch(Exception ex){LogHelper.LogError($id: {id}, ex.ToString());msg.Dispose();return msg;}}msg.length offset;return msg;
}0GC的TryWriteBytes方案
namespace Game
{public static class BitConverterHelper{private static readonly int BUFFER_SIZE 1024 * 1024;private static readonly byte[] buffer new byte[BUFFER_SIZE];private static CodedOutputStream _stream;private static Stopwatch _watch;public static void Init(){CreateStream();_watch new Stopwatch();_watch.Start();}private static void CreateStream(){if (_stream ! null)_stream.Dispose();if (_watch ! null){_watch.Stop();LogHelper.LogWarning($create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s);_watch.Restart();}_stream new CodedOutputStream(buffer);}private static Spanbyte ToByteArray(IMessage message){if (message null)return new byte[0];int length message.CalculateSize();if (length 0)return new byte[0];if (length BUFFER_SIZE){throw new Exception($overflow: message length {BUFFER_SIZE});}if (_stream.Position length BUFFER_SIZE)CreateStream();int position (int)_stream.Position;message.WriteTo(_stream);return buffer.AsSpan(position, length);}public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Int16;Check(buffer, offset sizeof(Int16));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(Int16);}public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Int32;Check(buffer, offset sizeof(Int32));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(Int32);}public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Int64;Check(buffer, offset sizeof(Int64));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(Int64);}public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.UInt16;Check(buffer, offset sizeof(UInt16));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(UInt16);}public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.UInt32;Check(buffer, offset sizeof(UInt32));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(UInt32);}public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.UInt64;Check(buffer, offset sizeof(UInt64));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(UInt64);}public static void WriteBool(byte[] buffer, ref int offset, bool arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Boolean;Check(buffer, offset sizeof(bool));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(bool);}public static void WriteByte(byte[] buffer, ref int offset, byte arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Byte;Check(buffer, offset 1);buffer[offset] arg;}public static void WriteChar(byte[] buffer, ref int offset, Char arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Char;Check(buffer, offset sizeof(Char));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(Char);}public static void WriteSingle(byte[] buffer, ref int offset, Single arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Single;Check(buffer, offset sizeof(Single));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(Single);}public static void WriteDouble(byte[] buffer, ref int offset, Double arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.Double;Check(buffer, offset sizeof(Double));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset sizeof(Double);}public static void WriteString(byte[] buffer, ref int offset, string arg){Check(buffer, offset 1);buffer[offset] (byte)DateType.String;byte[] bytes Encoding.UTF8.GetBytes(arg);Check(buffer, offset bytes.Length);BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);offset sizeof(int);Spanbyte target new Spanbyte(buffer, offset, buffer.Length - offset);bytes.CopyTo(target);offset bytes.Length;}public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg){IMessage message arg;Spanbyte bytes ToByteArray(message);Check(buffer, offset 1);buffer[offset] (byte)DateType.Message;Check(buffer, offset bytes.Length);BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);offset sizeof(int);Spanbyte target new Spanbyte(buffer, offset, bytes.Length);bytes.CopyTo(target);offset bytes.Length;}private static void Check(byte[] buffer, int offset){if (offset buffer.Length)throw new Exception($date length: {offset} {Globals.DATA_SZIE}, Invalid data!!);}public static void Dispose(){_stream?.Dispose();_stream null;}}
}Recv
Decode 数据解析调用本地方法 public static void OnRPC(BuffMessage msg)
{if(msg null){LogHelper.LogError(socket recv error, msg null);return;}Decode(msg.bytes);
}Decode
0GC的Decode方案
private static void Decode(byte[] buffer)
{if (buffer null || buffer.Length sizeof(int)){LogHelper.LogError(Invalid buffer received);return;}int protoId BitConverter.ToInt32(buffer, 0);if (!_msg.TryGetValue(protoId, out IRPC method)){LogHelper.LogError($Method not found for protoId: {protoId});return;}BuffMessage buffMessage GameFrame.message.GetBuffMessage();try{Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int));method.Decode(buffMessage.bytes);}catch (Exception ex){LogHelper.LogError($Error invoking method for protoId {protoId}: {ex.Message});}finally{GameFrame.message.PutBuffMessage(buffMessage);}
}namespace Game
{public interface IRPC : IDisposable{public void Decode(byte[] buffer);}public abstract class RPCBase : IRPC{protected byte[] buffer;public abstract void Decode(byte[] buffer);protected ReadOnlySpanbyte ReadData(DateType type, ref int offset){ReadOnlySpanbyte data null;int length GetLength(type);if (length 0){data new ReadOnlySpanbyte(buffer, offset, length);offset length;}return data;}protected bool ToBoolean(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Boolean, ref offset);return BitConverter.ToBoolean(data);}protected Byte ToByte(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Char, ref offset);return data[0];}protected char ToChar(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Char, ref offset);return BitConverter.ToChar(data);}protected Int16 ToInt16(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Int16, ref offset);return BitConverter.ToInt16(data);}protected UInt16 ToUInt16(ref int offset){ReadOnlySpanbyte data ReadData(DateType.UInt16, ref offset);return BitConverter.ToUInt16(data);}protected Int32 ToInt32(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Int32, ref offset);return BitConverter.ToInt32(data);}protected UInt32 ToUInt32(ref int offset){ReadOnlySpanbyte data ReadData(DateType.UInt32, ref offset);return BitConverter.ToUInt32(data);}protected Int64 ToInt64(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Int64, ref offset);return BitConverter.ToInt64(data);}protected UInt64 ToUInt64(ref int offset){ReadOnlySpanbyte data ReadData(DateType.UInt64, ref offset);return BitConverter.ToUInt64(data);}protected Single ToSingle(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Single, ref offset);return BitConverter.ToSingle(data);}protected Double ToDouble(ref int offset){ReadOnlySpanbyte data ReadData(DateType.Double, ref offset);return BitConverter.ToDouble(data);}protected string ToString(ref int offset){ReadOnlySpanbyte data ReadData(DateType.String, ref offset);return DecodeString(ref offset);}protected IMessage ToMessage(ref int offset, IMessage message){ReadOnlySpanbyte data ReadData(DateType.Message, ref offset);return DecodeMessage(ref offset, message);}private IMessage DecodeMessage(ref int offset, IMessage message){int length BitConverter.ToInt32(buffer, offset);offset sizeof(int);ReadOnlySpanbyte messageData new ReadOnlySpanbyte(buffer, offset, length);offset length;return message.Descriptor.Parser.ParseFrom(messageData)!;}private string DecodeString(ref int offset){int length BitConverter.ToInt32(buffer, offset);offset sizeof(int);ReadOnlySpanbyte messageData new ReadOnlySpanbyte(buffer, offset, length);offset length;return Encoding.UTF8.GetString(messageData);}private static int GetLength(DateType type){switch (type){case DateType.Boolean:return sizeof(bool);case DateType.Char:return sizeof(char);case DateType.SByte:case DateType.Byte:return sizeof(byte);case DateType.Int16:return sizeof(Int16);case DateType.UInt16:return sizeof(UInt16);case DateType.Int32:return sizeof(Int32);case DateType.UInt32:return sizeof(UInt32);case DateType.Int64:return sizeof(Int64);case DateType.UInt64:return sizeof(UInt64);case DateType.Single:return sizeof(Single);case DateType.Double:return sizeof(double);}return -1;}public virtual void Dispose(){buffer null;}}
}Decode数据到对象列表, 然后Invoke
namespace Game
{public class RPC : RPCBase{private MethodInfo _method;private ListDateType _types;private Listobject _params;private Dictionaryint, IMessage _param;private int _paramIndex;public RPC(MethodInfo method){this._method method;_types new ListDateType();_params new Listobject();_param new Dictionaryint, IMessage();}public void AddParamType(DateType type){_types?.Add(type!);}public void AddParam(int index, IMessage message){_param[index] message;}public override void Decode(byte[] buffer){base.buffer buffer;_paramIndex 0;int offset 0;_params.Clear();foreach (DateType type in _types){DateType dateType (DateType)buffer[offset];if (dateType ! type){LogHelper.LogError($dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} ! local: {Enum.GetName(typeof(DateType), dateType)});}object obj ToObject(dateType, ref offset);_params.Add(obj!);_paramIndex;}_method?.Invoke(null, _params.ToArray());}private object ToObject(DateType type, ref int offset){switch (type){case DateType.Message:IMessage message null;if (!_param!.TryGetValue(_paramIndex, out message)){LogHelper.LogError(no find message);return null;}return ToMessage(ref offset, message);case DateType.Boolean:return ToBoolean(ref offset);case DateType.Char:return ToChar(ref offset);case DateType.SByte:case DateType.Byte:return ToByte(ref offset);case DateType.Int16:return ToInt16(ref offset);case DateType.UInt16:return ToUInt16(ref offset);case DateType.Int32:return ToInt32(ref offset);case DateType.UInt32:return ToUInt32(ref offset);case DateType.Int64:return ToInt64(ref offset);case DateType.UInt64:return ToUInt64(ref offset);case DateType.Single:return ToSingle(ref offset);case DateType.Double:return ToDouble(ref offset);case DateType.String:return ToString(ref offset);default:LogHelper.LogError(no find dateType: type);break;}return null;}public override void Dispose(){base.Dispose();_method null;_types null;}}
}泛型Decode然后Invokde
namespace Game
{public class RPCStaticT : RPCBase{private ActionT _action;private IMessage _message;public RPCStatic(){}public virtual void Register(ActionT action, IMessage message){this._message message;this._action action;}public override void Decode(byte[] buffer){base.buffer buffer;int offset 0;DateType dateType (DateType)buffer[offset];try{if (dateType DateType.Message){IMessage arg ToMessage(ref offset, _message);_action?.Invoke((T)arg);}else{LogHelper.LogError($invoke error, type ! DateType.Message, type {dateType});}}catch (Exception ex){LogHelper.LogError(ex.ToString());}}public override void Dispose(){}}
}Socket.Send和Recv
使用UniTask实现的多线程异步收发消息处理了超时重发和异常处理接收消息时的粘包处理
namespace Game
{public enum SocketState{None 0,Connected 1,Disconnected 2,Connecting 3,ConnectFailed 4,Close 5,Dispose 6,}public class Tcp{private ConcurrentQueueBuffMessage _sendMsgs;private ConcurrentQueueBuffMessage _receiveMsgs;private TcpClient _tcpClient;private SocketState _socketState;private byte[] _recvBuff;private int _recvOffset;private int _delay 10;private CancellationTokenSource _recvCancelToken;private CancellationTokenSource _sendCancelToken;public SocketState State { get { return _socketState; } }public string IP { get; set; }public int Port { get; set; }public NetworkStream Stream{get { return _tcpClient.GetStream(); }}public Tcp(){_sendMsgs new ConcurrentQueueBuffMessage();_receiveMsgs new ConcurrentQueueBuffMessage();_recvBuff new byte[Globals.BUFFER_SIZE];}private void InitTcpClient(){_tcpClient new TcpClient();_recvCancelToken new CancellationTokenSource();_sendCancelToken new CancellationTokenSource();}public void Update(){Profiler.BeginSample(on tcp rpc);if (_receiveMsgs.TryDequeue(out BuffMessage msg)){RPCMoudle.OnRPC(msg);GameFrame.message.PutBuffMessage(msg);}Profiler.EndSample();}public void Connect(string ip, int port){IP ip;Port port;Connect();}public async void Connect(){try{Close();InitTcpClient();SetSocketState(SocketState.Connecting);await _tcpClient.ConnectAsync(IP, Port);OnConnect();}catch (Exception ex){LogHelper.LogError(ex.ToString());}}private void OnConnect(){try{if (_tcpClient.Connected){LogHelper.Log(connected...);SetSocketState(SocketState.Connected);StartAsyncTasks();}else{SetSocketState(SocketState.ConnectFailed);}}catch (Exception ex){LogHelper.LogError(连接或通信发生错误{0} ex.Message);SetSocketState(SocketState.ConnectFailed);}}private void StartAsyncTasks(){UniTask send UniTask.Create(SendThread);UniTask recv UniTask.Create(RecvThread);}private async UniTask SendThread(){await UniTask.SwitchToThreadPool();while (_socketState SocketState.Connected){while (true){if (!_sendMsgs.TryDequeue(out BuffMessage msg))break;var timeoutToken new CancellationTokenSource();timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond));var linkedCts CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token);try{if(_sendCancelToken.IsCancellationRequested)break;await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token);LogHelper.Log($发送完成: {msg.length} byte);GameFrame.message.PutBuffMessage(msg);}catch (OperationCanceledException ex){if (timeoutToken.IsCancellationRequested){_sendMsgs.Enqueue(msg);LogHelper.LogWarning(消息发送超时, 添加到队列末尾, 等待发送...);await UniTask.Delay(10);continue;}LogHelper.LogWarning(发送操作被终止... ex.Message);break;}catch (IOException ex) when (ex.InnerException is SocketException socketEx socketEx.SocketErrorCode SocketError.ConnectionAborted){LogHelper.Log(发送操作被终止...);break;}catch (Exception ex){LogHelper.LogError(发送错误: ex.Message);break;}}await UniTask.Delay(_delay);}}private async UniTask RecvThread(){await UniTask.SwitchToThreadPool();while (_socketState SocketState.Connected){try{if (_recvCancelToken.IsCancellationRequested) break;int length await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token);if (length 0){LogHelper.Log(connect failed...);break;}_recvOffset length;int offset 0;while (true){if (_recvOffset - offset sizeof(int))// 没有足够的数据读取下一个消息的长度break;int dataLength BitConverter.ToInt32(_recvBuff, offset);if (_recvOffset - offset dataLength sizeof(int))// 没有足够的数据读取完整的消息break;// 读取完整消息BuffMessage msg GameFrame.message.GetBuffMessage();Buffer.BlockCopy(_recvBuff, offset sizeof(int), msg.bytes, 0, dataLength);_receiveMsgs.Enqueue(msg);// 移动偏移量到下一个消息offset sizeof(int) dataLength;}// 将未处理的数据移到缓冲区开头if (_recvOffset - offset 0)Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset);_recvOffset - offset;}catch(OperationCanceledException ex){LogHelper.Log(读取操作被终止: ex.Message);break;}catch (IOException ex) when (ex.InnerException is SocketException socketEx socketEx.SocketErrorCode SocketError.OperationAborted){LogHelper.Log(读取操作被终止...);break;}catch (Exception ex){LogHelper.LogError(读取错误: ex.ToString());break;}await UniTask.Delay(_delay);}}private void SetSocketState(SocketState state){_socketState state;}public void Send(BuffMessage message){if (message.length 0){int headLength sizeof(int);Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length);BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length);message.length headLength;_sendMsgs.Enqueue(message);}else{GameFrame.message.PutBuffMessage(message);}}public void Close(){if (_tcpClient null)return;try{if (_tcpClient.Connected){SetSocketState(SocketState.Close);_recvCancelToken.Dispose();_sendCancelToken.Dispose();_tcpClient.Close();}}catch (Exception ex){LogHelper.LogError(ex.ToString());}}public void Dispose(){Close();if (_tcpClient ! null){_tcpClient.Dispose();_tcpClient null;}if (_sendMsgs ! null){_sendMsgs.Clear();_sendMsgs null;}if (_receiveMsgs ! null){_receiveMsgs.Clear();_receiveMsgs null;}SetSocketState(SocketState.Dispose);}}
}项目地址
SimpleRPC