个人网站做百度竞价,华建设计网站,什么行业要做网站建设推广这些,珠海高端网站制作前言#xff1a;
c20出来有一段时间了。其中一大功能就是终于支持协程了#xff08;c作为行业大哥大级别的语言#xff0c;居然到C20才开始支持协程#xff0c;我也是无力吐槽了#xff0c;让多少人等了多少年#xff0c;等了多少青春#xff09;但千呼万唤他终于还是来…前言
c20出来有一段时间了。其中一大功能就是终于支持协程了c作为行业大哥大级别的语言居然到C20才开始支持协程我也是无力吐槽了让多少人等了多少年等了多少青春但千呼万唤他终于还是来了c标准委员会的谨慎态度也造就了c20的给出来协程“性能之优秀”“开发之灵活”和让人劝退的“门槛之高”。
不过话说回来c从出身就注定了背负性能使命他不是为简单为应用层维度开发的语言如果应用层你大可以用python java ruby lua等语言他是一门可以开发其他语言的语言所以追逐高性能和灵活性舍弃矫情的低门槛毕竟C不是设计来给所有人用的语言。
之前用过python的协程协程易用程度高所以c20到来也想尝试c状态下的协程但是接触以后发现问题c20的协程状态是只有基础设施也就是实现了无栈协程的所有机制和功能但没有封装到具体的应用层标准库STL。此时大部分人就只能干瞪眼了由于复杂的协程运作机制没有实现标准库的情况下说要用上协程你是在开玩笑网上一致的意见c20是半成品要真的用上c协程得等c23协程标准库完善后才行。
一贯本着不作死就不会死得态度只会用库不懂底层机制那不是用c的态度所以深入学习c20协程半个月时间写了一个简单的协程库在此过程中也对复杂的c协程机制有了深入的了解。话说asio和cppcoro两个库已经支持了c20协程但是我觉得还是庞大和复杂对于想通过看库源代码学习c协程的同学我觉得还是算了在不懂协程机理的情况下你连看源代码都看不懂好吧有人会说有源代码了你都看不懂你是吹牛。那还真不是c协程在语法上会有些颠覆你的三观我们来举个例子
A func(int a){ co_await std::suspend_always{};co_yield a;co_return 0;
}
int main(){auto co func(1); co.hd.resume();int a co.hd.resume();int b co.hd.resume();
}
有人说func是一个协程函数main中的func运行后会返回0也就是 co是一个int变量值为0
如果你按常规代码理解没错。但是在c协程的世界他完全不是上面说的情况。
正确的情况是 func在这里是一个协程生成器这个概念很重要他不是函数返回值co是一个协程管理类A关联了具体协程执行体后的协程实例的控制句柄的包装对象。明确co不是协程实例协程帧是协程实例的控制句柄的包装对象在func(1)执行之后他只是“动态”生成了一个协程实例并把控制句柄返回给用户但此时这个协程是挂起的协程体{}代码块还没有被执行过所以不存在返回值。这非常的绕让人难以理解后面还有更难理解的。
在三次co.hd.resume();调用后协程才被完全执行完毕此时a1b0;
返回值保存在协程的实例协程帧中通过协程管理类A的内部流程控制函数管理着返回值A的promise_type定义了所有的协程控制行为。
总结几点 重要不要混淆
1、“协程管理类A是包含协程行为控制的类定义 A不是协程形如 A func(int a, …){ … } 才是一个完整的协程定义”所以A func1(){}; A func2(){}; A func3(){}; 都可以与同一个协程控制A绑定但他们是3个不同的协程定义只是协程控制行为都为A。好处是你可以用一个std::vector A 保存下这3个不同的协程他们的主协程体功能实现各不相同。要让A为一个协程管理类必须包含struct promise_type{}定义和一个控制句柄对象std::coroutine_handle promise_type hd; 特别的A可以不实现await_xxx接口他可以不是可等待体。
2、代码块体中有co_await co_yieldco_return关键字则为协程体代码块运行到关键字位置会**“触发协程挂起” ** 此时原调用者代码阻塞在resume函数位置运行权重新回到调用者此时resume会返回调用者继续执行
3、特别的
co_await可以与可等待对象配合形成更为复杂的协程挂起行为一般异步IO操作都是通过co_await io可等待对象完成异步操作后挂起协程等待异步io完成后再由**“调度器”**恢复协程继续运行从而发挥异步的意义形成io复杂度向cpu复杂度的转移。因此协程解决的是问题是“异步”而不是“并行”要实现并行只能考虑多线程或多进程协程可以将单个线程cpu效率发挥到最大而不会被io阻塞浪费掉当前线程的cpu算能那问题来了如果我们用 协程 多线程/多进程 结合模式呢那恭喜你世界都将是你的
co_yield实现简单挂起简单的立即放弃运行权返回调用者可恢复异步应用场景相对较少多用于循环生成器
co_return实现最后一次简单挂起立即放弃运行权返回调用者协程后续不再可恢复应用于协程退出
4、可等待体类形如 struct B{ await_ready();await_suspend();await_resume(); } 实现 三个await_xxx接口的类B是一个可等待体定义他的实例是一个可等待对象其中await_suspend()在执行后不是执行前会触发当前协程挂起记住此处不是可等待对象挂起是co_await 此可等待对象的当前协程挂起不能混淆由于概念不清我在这个位置耽误了很久的时间
5、协程管理类A和可等待体B他们没有直接关系是两个不同的东西。可等待体B控制挂起时点的行为是局部的协程控制A控制协程整体创建运行挂起销毁异常捕获等过程的行为是整体的协程只对应有一个控制类A但是内部可以有多次挂起操作每次操作对应一个可等待对象
库开发
本文重点是库实战开发关于协程框架中的 3大概念协程定义类及promise_type{}可等待体awaitable协程控制句柄std::coroutine_handle 此处不做介绍自行了解。
但是要介绍一下协程调度的运行逻辑以此加深库开发过程的理解。这个过程在多线程下面是由内核管理的我们很少会了解但是到了协程你还要自己写库那必须自己实现协程的调度算法和event loop模式
在此我打个形象比喻
现在一个家中有5个儿子他们能力各不相同工作者协程还有一个妈妈调度者协程现在只有一台电脑单线程时间片同一时刻这台电脑只能被老妈分给其中一个儿子来使用协程抢占其中一个儿子首先得到电脑开始工作协程恢复其他儿子只能等着无法工作协程等待状态有电脑的儿子工作一会后此时他发送一封对外邮件可等待对象但要等待邮件回复后才能继续工作io等待完成因为其他人此时还在等着用电脑而自己此时不具备继续工作的条件所以他识趣的放弃电脑的使用权并把电脑交还给老妈协程挂起等待执行权交还caller并等着老妈下次再把电脑给他使用老妈拿到电脑后调度协程恢复执行检查是否有回复邮件到来调度协程检查事件完成对应事件循环iocp/epoll如果有了老妈检查这封回复邮件是回复给哪个儿子的并叫来对应的儿子协程调度把电脑交给他协程恢复得到电脑的儿子打开回复邮件拿到结果await_resume() 返回异步io结果继续工作… 不断循环。至此完成一个协程完整调度流程。
要实现一个协程库他需要几个东西
1、实现具体的异步操作的可等待体类似比喻中的发邮件操作定义是否将电脑归还获取回复后打开查询结果等行为
2、协程控制类A他是一个协程任务taskA的promise_type中应该记录协程的相关状态记录挂起点的可等待对象的指针很重要可等待对象也可以充当task和调度协程信息交换的媒介可等待对象指针通过 await_suspend() 过程传递给task的promise做记录并保存。调度协程通过可等待对象指针在异步操作完成时将异步操作结果传回给等待的task。
3、 如总结和比喻所说最重要的还需要一个“协程调度器”。第一、他有一个主调度协程调度协程具有一系列的调度算法他的工作就是监测io异步完成事件的到来和分配执行权给task第二他维护有一个task协程队列可以多种方法实现队列记录着所有的协程实例的句柄这个队列是为了协程调度准备的。
注之所以C20无法直接使用的原因其实就是以上3个具体的工具没有现成的库由于高度灵活c希望使用者自己实现以上组件这让用惯成品库的我们非常难受望而却步天天喊着等c23的标准库但c23也不能将所有的需求都囊括遇到特殊需求还是要自己写
相关视频推荐
用协程ntyco来解决大块数据传输连续包处理接收
epoll的原理与使用epoll比select/poll强在哪里
window高效网络编程iocp的原理与实现
免费学习地址c/c linux服务器开发/后台架构师
需要C/C Linux服务器架构师学习资料加qun812855908获取资料包括C/CLinuxgolang技术NginxZeroMQMySQLRedisfastdfsMongoDBZK流媒体CDNP2PK8SDockerTCP/IP协程DPDKffmpeg等免费分享 实现思路
调度器
1 调度协程中的event loop本例是在Windows下采用的iocp模型linux下可以使用epoll也很好改原理一样
2、调度算法采用简单的等权重调度也就是挂入协程队列的task轮流调度每个被调度的task被调度的机会相同
3、完成事件标记和task恢复业务分开这样目的是使得通过co_yield简单挂起的任务有重新执行的机会因为co_yeild不会在后续触发完成事件
4、调度器中记录着协程队列的开始task和末尾task的handle以便调度协程
可等待体
1、文件file异步readwrite操作
2、网络套接字tcp协议下异步listenaccept, send, recv 操作
3、网络套接字udp协议下异步sendto, recvfrom 操作
4、协程休眠实现sleepForsleepForEx操作分别实现协程任务的毫秒和微秒级休眠
5、在iocp模型下以上api都提供了重叠io操作此时将api执行成功的重叠io操作将对应的可等待体指针记录到当前协程变量中promise_type中的变量一旦完成事件到来调度协程就会设置可等待对象的完成标记状态为true调度协程只要在轮询中逐个检查task保存的可等待对象指针检查完成标记是否为true为true恢复执行该协程为false则跳过该协程继续轮询 event loop;
任务定义task协程
1、task协程的promise_type中定义3个变量
2、保存当前挂起的可等待提指针如果当前协程不是io挂起或者是没有挂起该指针应该为null
3、保存当前协程自身所属调度器Scheduler的指针
4、保存此刻协程队列中的前一个协程task的handle和后一个协程task的handle
5、若当前task的可等待对象完成标记为true则调度协程会将该task的before task和behind task链接将该task的handle移动到协程队列尾部并且resume task完成调度和恢复
启动协程调度
1、实例化调度器 CoScheduler
2、通过lambda表达方式定义task协程并加入到调度器的协程队列
3、通过run方法启动调度器调度运行各协程任务
4、task协程中又可以动态嵌套生产新的task协程加入到调度队列
先看测试效果后面会有源码
案例1tcp 服务器/客户端模型测试
除调度协程外协程队列中会产生4个task一个服务监听器task一个客户端生成器task服务端task客户端task
Main coro scheduler started ...
Main coro scheduler: Iocp loop started ... //0 调度协程执行
Iocp: New task arrived and first run, tid26064
Tcp server coro started ... //1 监听器task执行
Server listener accept wait ... --》 在accept异步挂起
Iocp: New task arrived and first run, tid26064 //0 调度协程执行 event loop段
Clients creater coro started. //2 客户端生成器task执行
Clients creater make client task 1. --》 动态生成客户端task加入队列
Clients creater yield run privilege to coro scheduler. -- 通过co_yield返回调度协程
Iocp: New task arrived and first run, tid26064 //0 调度协程执行
Iocp: New task arrived and first run, tid26064 --》 调度新到来的task
Client[1] connect wait ... //3 客户端task执行 在connect异步挂起
Iocp: IoFileAwaitable[TConnect] completed 0 bytes, tid26064 //0 调度协程 执行 检测到connect完成事件
Clients creater fetch run privilege again. //2 客户端生成器task 执行
Clients creater yield run privilege to coro scheduler.
Client[1] send wait ...
Iocp: IoFileAwaitable[TAccept] completed 47 bytes, tid26064 //0 调度协程执行 检测到accept完成事件
Server listener accept wait ... //1 服务端监听task执行 在accept异步挂起
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid26064 //0 调度协程 执行
Clients creater fetch run privilege again. //2 客户端生成器task执行
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid26064 //0 调度协程执行
Server[1] send wait ... //4 服务端task执行
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid26064 //0 调度协程执行 检测到send完成事件
Client[1] recv wait ... //3 客户端task执行
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid26064 //0 调度协程执行 检测到recv完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv wait ... //4 服务端task执行 在recv异步挂起
Client[1] recv server msg //3 客户端task执行
Hello client. this is server 1. 1st response. --》打印服务端发来的消息
Client[1] send wait ... --》在send异步挂起
Iocp: IoFileAwaitable[TRecv] completed 47 bytes, tid26064 //0 调度协程执行 检测到recv完成事件
Iocp: IoFileAwaitable[TSend] completed 47 bytes, tid26064 --》 检测到send完成事件
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Server[1] recv client msg //4 服务端task执行
Helle server, this is client 1: 2st request. --打印客户端发来的消息
Server[1] send wait ...
多个协程任务的异步交替执行就是在一个协程遇到 可挂起的异步操作时比如connect accept send recv等把运行权限归还给调度器当完成事件到来调度器又把执行权返回给task形成执行权在调度器和task之间反复横跳的情况实现cpu的多任务复用
案例2udp 广播模式测试
Main coro scheduler started ... // 同案例1 调度启动分别产生3个服务和3个客户端
Main coro scheduler: Iocp loop started ...
Iocp: New task arrived and first run, tid31188
Servers creater coro started.
Servers creater make server task 1.
Servers creater make server task 2.
Servers creater make server task 3.
Servers creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid31188
Clients creater coro started.
Clients creater make broadcastor client task 1.
Clients creater make broadcastor client task 2.
Clients creater make broadcastor client task 3.
Clients creater yield run privilege to coro scheduler.
Iocp: New task arrived and first run, tid31188
Iocp: New task arrived and first run, tid31188
Udp server[1] coro started bind port 33000...
Udp server[1] recvfrom wait ... //服务端1 异步接收
Iocp: New task arrived and first run, tid31188
Udp server[2] coro started bind port 33001...
Udp server[2] recvfrom wait ... //服务端2 异步接收
Iocp: New task arrived and first run, tid31188
Udp server[3] coro started bind port 33002...
Udp server[3] recvfrom wait ... //服务端3 异步接收
Iocp: New task arrived and first run, tid31188
Broadcastor[1] send wait ... //客户端1 异步发送
Iocp: New task arrived and first run, tid31188
Broadcastor[2] send wait ... //客户端2 异步发送
Iocp: New task arrived and first run, tid31188
Broadcastor[3] send wait ... //客户端3 异步发送
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid31188 //调度器 recvfrom事件完成
Servers creater fetch run privilege again.
Servers creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid31188 //调度器 sendto事件完成
Clients creater fetch run privilege again.
Clients creater yield run privilege to coro scheduler.
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid31188
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid31188
Udp server[2] recvfrom 1st broadcast 75 bytes data, msg //服务端2 收到并打印消息
Helle server, this is broadcastor 1: 1st randon broadcast to port33001.
Udp server[2] recvfrom wait ... --》 在recvfrom异步挂起
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid31188
Udp server[3] recvfrom 1st broadcast 75 bytes data, msg
Helle server, this is broadcastor 2: 1st randon broadcast to port33002.
Udp server[3] recvfrom wait ...
Broadcastor[1] sendto server msg
Helle server, this is broadcastor 1: 1st randon broadcast to port33001.
Broadcastor[1] send wait ...
Iocp: IoFileAwaitable[TSendto] completed 75 bytes, tid31188
Broadcastor[2] sendto server msg
Helle server, this is broadcastor 2: 1st randon broadcast to port33002.
Broadcastor[2] send wait ...
Iocp: IoFileAwaitable[TRecvfrom] completed 75 bytes, tid31188
Broadcastor[3] sendto server msg
Helle server, this is broadcastor 3: 1st randon broadcast to port33001.
再看看性能测试
同样采用案例1和案例2的模型但这次tcp采用100个server/client共200个taskudp采用20个braodcast/reciever共40个task来测试并发效果做一下统计效果如下
Tcp server coro started ...
Clients creater coro started.
Clients creater make client task 1.
...
Clients creater make client task 100.
Summary coro count 203: total handle 92752 times (spend time 3.06413s), 30902.3 times/per-second.
Summary coro count 203: total handle 185852 times (spend time 6.06633s), 31010.6 times/per-second.
Summary coro count 203: total handle 278601 times (spend time 9.06766s), 30902.6 times/per-second.
Summary coro count 203: total handle 371901 times (spend time 12.0696s), 31080.1 times/per-second.
Summary coro count 203: total handle 466752 times (spend time 15.0719s), 31592 times/per-second.
按server和client一次完整的send和recv也就是4此tcp通信记录为一次有效通讯记录记为1times
则结果显示在coro200时候单个线程平均每秒将完成3万次有效通讯虽然是自导自演但是协程的功能完整实现了性能可观
Servers creater coro started.
Servers creater make server task 1.
...
Servers creater make server task 20.
Clients creater coro started.
Clients creater make broadcastor client task 1.
...
Clients creater make broadcastor client task 20.
Udp server[1] coro started bind port 33000...
...
Udp server[20] coro started bind port 33019...
Summary coro count 43: total handle 541730 times (spend time 3.02587s), 180571 times/per-second.
Summary coro count 43: total handle 1082377 times (spend time 6.02621s), 180196 times/per-second.
Summary coro count 43: total handle 1623102 times (spend time 9.02651s), 180223 times/per-second.
Summary coro count 43: total handle 2165716 times (spend time 12.0268s), 180853 times/per-second.
Summary coro count 43: total handle 2731919 times (spend time 15.0271s), 188714 times/per-second.
由于udp是单向非链接协议速度会比tcp快得多按一次sendto和recvfrom记为一次有效通讯则在coro40时候单线程每秒有效通讯18万次。
最后
c协程理解之后并不是很难并且只要api提供异步方案都可以实现协程库的封装比如mysqlredis等异步操作后续都可以依葫芦画瓢很快实现c协程库的开发。
本库开发只是为记录c协程学习的经历很多功能后续还需完善。目前支持在windows下的各位file socket sleep的异步操作后续可扩展支持linux的epoll模型。
代码
头文件CLCoroutine.h 其中的void test_coroutine_tcp_server()和void test_coroutine_udp_random_broadcast()就是案例1和案例2的测试代码。
#ifndef __CL_COROUTINE__
#define __CL_COROUTINE__#if (defined(__cplusplus) __cplusplus 202002L) || (defined(_HAS_CXX20) _HAS_CXX20)
#ifndef CLUseCorotine
#define CLUseCorotine 1
#endif
#endif#if (defined(CLUseCorotine) CLUseCorotine)#include coroutine
#include thread
#include atomic
#include ../_cl_common/CLCommon.h#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN //精简windows包含库的大小
#define WIN32_LEAN_AND_MEAN
#endif // !WIN32_LEAN_AND_MEAN
#include Windows.h
#include Winsock2.h
#include WS2tcpip.h
#include MSWSock.h
#pragma comment(lib, ws2_32.lib)
#else
#endifstruct CoScheduler;//协程任务单元
struct CoTask {using return_type void;struct promise_type;using handle std::coroutine_handlepromise_type;struct promise_type {CoTask get_return_object() { return { handle::from_promise(*this) }; }void unhandled_exception() { std::terminate(); }std::suspend_always initial_suspend() { return {}; }std::suspend_always final_suspend() noexcept { return {}; }void return_void() { }templatetypename Ustd::suspend_always yield_value(const U val) {pAwaitableFile nullptr;return {};}CoScheduler* sc 0;handle before 0, behind 0;void* pAwaitableFile 0;};bool resume();handle hd;
};//协程任务调度器。包含主调度协程和事件循环维护挂起的协程任务队列
struct CoScheduler {struct MainCoro {using return_type void;struct promise_type;using handle std::coroutine_handlepromise_type;struct promise_type {MainCoro get_return_object() { return { handle::from_promise(*this) }; }void unhandled_exception() { std::terminate(); }std::suspend_always initial_suspend() { return {}; }std::suspend_never final_suspend() noexcept { return {}; }void return_void() { }CoScheduler* sc 0;};constexpr bool await_ready() const { return false; }void await_suspend(std::coroutine_handle) { }auto await_resume() const { }handle hd;};CoScheduler(): m_curTid(std::this_thread::get_id()), m_hIocp(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)){WSADATA wsaData;WSAStartup(MAKEWORD(2, 2), wsaData);}bool registe(HANDLE hFile) {if (!hFile || hFile INVALID_HANDLE_VALUE || !m_hIocp || ::CreateIoCompletionPort(hFile, m_hIocp, 0, 0) ! m_hIocp)return false;elsereturn true;}bool registe(SOCKET sock) {return registe((HANDLE)sock);}// 创建task并等待后续调度执行templatetypename F, typename ...Argsvoid gather(F func, Args ...args) {if (m_curTid ! std::this_thread::get_id())throw std::logic_error(Scheduler thread is not match.);CoTask coro func(std::forwardArgs(args)...);appendNewTaskToEnd(coro);::PostQueuedCompletionStatus(m_hIocp, 0, (ULONG_PTR)coro.hd.address(), 0);}// 创建task并立即调度执行templatetypename F, typename ...Argsvoid createTask(F func, Args ...args) {if (m_curTid ! std::this_thread::get_id())throw std::logic_error(Scheduler thread is not match.);CoTask coro func(std::forwardArgs(args)...);appendNewTaskToEnd(coro);coro.resume();}size_t taskCount() const { return m_taskCount; }// 执行协程调度void run();private:void appendNewTaskToEnd(CoTask cur) {auto cprm cur.hd.promise();cprm.sc this;if (m_end.hd) {cprm.before m_end.hd;cprm.behind 0;m_end.hd.promise().behind cur.hd;}m_end.hd cur.hd;m_taskCount;if (m_begin.hd 0) {m_begin.hd cur.hd;cprm.before 0;}}void moveTaskToEnd(CoTask::handle h) {if (removeDoneTask())return;if (!h)return;auto cprm h.promise();if (h m_begin.hd) {m_begin.hd cprm.behind;if (m_begin.hd)m_begin.hd.promise().before 0;if (m_end.hd)m_end.hd.promise().behind h;cprm.behind 0;cprm.before m_end.hd;m_end.hd h;}else if (h m_end.hd) {}else {cprm.behind.promise().before cprm.before;cprm.before.promise().behind cprm.behind;if (m_end.hd)m_end.hd.promise().behind h;cprm.behind 0;cprm.before m_end.hd;m_end.hd h;}}bool removeDoneTask() {bool ret false;while (m_begin.hd m_begin.hd.done()) {auto h m_begin.hd;m_begin.hd h.promise().behind;if (m_begin.hd)m_begin.hd.promise().before 0;h.destroy();--m_taskCount;ret true;}return ret;}HANDLE m_hIocp;const std::thread::id m_curTid;MainCoro m_main;CoTask m_begin, m_end;std::atomicsize_t m_taskCount 0;
};// IO文件操作类型
enum IoFileType :int {TUnknown 0,TRead,TWrite,TListen,TAccept,TConnect,TSend,TRecv,TSendto,TRecvfrom,TSleep,
};// IO文件调度优先级
enum IoFilePriority : int {WaitingForPolling 0, // 等待顺序轮询调度DispatchImmediately, // 立即调度
};// 支持异步挂起的可等待文件对象基类
templatetypename Ret int
struct IoFileAwaitable : OVERLAPPED {operator HANDLE() const { return m_hFile; }operator SOCKET() const { return (SOCKET)m_hFile; }bool isRegisted() const { return m_isRegisted; }bool isCompleted() const { return m_isCompleted; }void setCompleted() { m_isCompleted true; }void resetCompleted() {memset(this, 0, sizeof(OVERLAPPED));m_isCompleted 0;}void setReturn(Ret ret) { m_ret ret; }Ret getReturn() const { return m_ret; }IoFileType type() { return m_fileType; }const char* typeName() const {
#define _TypeNameItem( tp ) case tp: return #tp;switch (m_fileType){_TypeNameItem(TUnknown);_TypeNameItem(TRead);_TypeNameItem(TWrite);_TypeNameItem(TListen);_TypeNameItem(TAccept);_TypeNameItem(TConnect);_TypeNameItem(TSend);_TypeNameItem(TRecv);_TypeNameItem(TSendto);_TypeNameItem(TRecvfrom);_TypeNameItem(TSleep);default:return TUnknown;}}void* getTransferredBytesCountBuffer() const {return m_transferredBytesCount;}void setTransferredBytesCountRecvBuffer(void* countBuf) {m_transferredBytesCount countBuf;}bool close() {if (m_hFile) {return CloseHandle(detach());}return true;}HANDLE detach() {HANDLE ret *this;m_hFile 0;m_isRegisted 0;return ret;}HANDLE attach(CoScheduler sc, HANDLE s) {HANDLE ret *this;m_hFile s;m_isRegisted sc.registe(m_hFile);return ret;}int getLastError() const { return m_lastError; }void setLastError(int err) { m_lastError err; }CoTask::handle onwer() { return m_owner; }auto getPriority() const { return m_priority; }void setPriority(IoFilePriority priority) { m_priority priority; }// awaitable methedbool await_ready() const { return isCompleted(); }void await_suspend(CoTask::handle h) {h.promise().pAwaitableFile this;m_owner h;}Ret await_resume() {setTransferredBytesCountRecvBuffer(nullptr);return getReturn();}
protected:IoFileAwaitable(): m_hFile((HANDLE)0), m_isRegisted(false){resetCompleted();}IoFileAwaitable(CoScheduler sc, HANDLE hFile): m_hFile(hFile), m_isRegisted(sc.registe(m_hFile)){resetCompleted();}IoFileAwaitable(CoScheduler sc, SOCKET sock): m_hFile((HANDLE)sock), m_isRegisted(sc.registe(sock)){resetCompleted();}HANDLE m_hFile;bool m_isRegisted;bool m_isCompleted;IoFileType m_fileType IoFileType::TUnknown;void* m_transferredBytesCount nullptr;int m_lastError ERROR_SUCCESS;IoFilePriority m_priority IoFilePriority::WaitingForPolling;CoTask::handle m_owner;Ret m_ret 0;
};// 支持异步挂起的套接字基类
templatetypename Ret int
struct AsyncSocket :public IoFileAwaitableRet {using base IoFileAwaitableRet;~AsyncSocket() { close(); }sockaddr_in localAddress() const { return m_local; }sockaddr_in remoteAddress() const { return m_remote; }sockaddr_in* localAddress() { return m_local; }sockaddr_in* remoteAddress() { return m_remote; }int close() {int ret 0;if (base::m_hFile) {if (base::m_hFile ! (HANDLE)INVALID_SOCKET) {ret closesocket(detach());}else {base::m_hFile 0;base::m_isRegisted 0;}}return ret;}SOCKET detach() {return (SOCKET)base::detach();}SOCKET attach(CoScheduler sc, SOCKET s) {return (SOCKET)base::attach(sc, (HANDLE)s);}
protected:AsyncSocket(CoScheduler sc, SOCKET sock):base(sc, sock){ }sockaddr_in m_local { 0 };sockaddr_in m_remote { 0 };
};struct AsyncAcceptor;
// 支持异步挂起的服务端监听器是一个等待连接到来的TCP监听套接字
struct AsyncListener :public AsyncSocketbool {AsyncListener(CoScheduler sc, unsigned long addr, unsigned short port, int backlog SOMAXCONN):AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED)){type() IoFileType::TListen;m_local.sin_family AF_INET;m_local.sin_addr.s_addr addr;m_local.sin_port htons(port);char opt 1;if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)opt, sizeof(opt))) {throw std::system_error(WSAGetLastError(), std::system_category(), AsyncListener set reuse addr error.);}if (SOCKET_ERROR ::bind(*this, (sockaddr*)m_local, sizeof(m_local))|| SOCKET_ERROR ::listen(*this, backlog)){throw std::system_error(WSAGetLastError(), std::system_category(), AsyncListener bind or listen error.);}}AsyncListener(CoScheduler sc, const char* ip, unsigned short port, int backlog SOMAXCONN):AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED)){type() IoFileType::TListen;m_local.sin_family AF_INET;m_local.sin_port htons(port);InetPton(AF_INET, ip, m_local.sin_addr);char opt 1;if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)opt, sizeof(opt))) {throw std::system_error(WSAGetLastError(), std::system_category(), AsyncListener set reuse addr error.);}if (SOCKET_ERROR ::bind(*this, (sockaddr*)m_local, sizeof(m_local))|| SOCKET_ERROR ::listen(*this, backlog)){throw std::system_error(WSAGetLastError(), std::system_category(), AsyncListener bind or listen error.);}}sockaddr_in listenAddress() const { return localAddress(); }// 返回值true成功false失败AsyncAcceptor accept(AsyncAcceptor sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);
};// 支持异步挂起的TCP连接基类
struct AsyncTcp :public AsyncSocketint {// 返回值0成功SOCKET_ERROR失败AsyncTcp send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend);// 返回值0成功SOCKET_ERROR失败AsyncTcp recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv);
protected:AsyncTcp(CoScheduler sc):AsyncSocket(sc, WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED)){ }
};// 支持异步挂起的服务端接收器是一个接受端TCP套接字
struct AsyncAcceptor : public AsyncTcp {AsyncAcceptor(CoScheduler sc): AsyncTcp(sc){type() IoFileType::TAccept;}// 解析到来连接的地址信息保存在内部地址变量void perseAddress(void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer) {if (lpAcceptBuffer 0 || nNumberOfBytesAcceptBuffer 0)throw std::logic_error(perseAddress parm is invalid.);static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptSockAddrs 0;if (!lpfnGetAcceptSockAddrs) {GUID GuidGetAcceptexSockAddrs WSAID_GETACCEPTEXSOCKADDRS;unsigned long dwBytes 0;if (SOCKET_ERROR WSAIoctl(*this,SIO_GET_EXTENSION_FUNCTION_POINTER,GuidGetAcceptexSockAddrs,sizeof(GuidGetAcceptexSockAddrs),lpfnGetAcceptSockAddrs,sizeof(lpfnGetAcceptSockAddrs),dwBytes, NULL, NULL)){throw std::system_error(WSAGetLastError(), std::system_category(), AsyncListener GetAcceptexSockAddrs error.);}}int localLen 0, remoteLen 0;lpfnGetAcceptSockAddrs(lpAcceptBuffer,nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) 16) * 2,sizeof(sockaddr_in) 16,sizeof(sockaddr_in) 16,(LPSOCKADDR*)localAddress(),localLen,(LPSOCKADDR*)remoteAddress(),remoteLen);}// 返回值true成功false失败AsyncAcceptor accept(AsyncListener sockListener, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted);int await_resume() {setPriority(IoFilePriority::WaitingForPolling);return AsyncTcp::await_resume();}
};// 支持异步挂起的用户端连接器是一个发起端TCP套接字
struct AsyncConnector : public AsyncTcp {AsyncConnector(CoScheduler sc): AsyncTcp(sc){type() IoFileType::TConnect;}AsyncConnector(CoScheduler sc, const char* ip, unsigned short port): AsyncTcp(sc){type() IoFileType::TConnect;setConnectRemoteAddress(ip, port);bindConnectLocalPort(0);}void setConnectRemoteAddress(const char* ip, unsigned short port) {memset(m_remote, 0, sizeof(m_remote));m_remote.sin_family AF_INET;m_remote.sin_port htons(port);InetPton(AF_INET, ip, m_remote.sin_addr);}int bindConnectLocalPort(unsigned short port 0) {memset(m_local, 0, sizeof(m_local));m_local.sin_family AF_INET;m_local.sin_addr.s_addr INADDR_ANY;m_local.sin_port htons(port);return ::bind(*this, (const sockaddr*)m_local, sizeof(m_local));}// 返回值true成功false失败AsyncConnector connect(const sockaddr* name, int namelen, void* lpSendBuffer nullptr, unsigned long dwSendDataLength 0, unsigned long* lpdwBytesSent nullptr);// 返回值true成功false失败AsyncConnector connect(const char* ip, unsigned short port, void* lpSendBuffer nullptr, unsigned long dwSendDataLength 0, unsigned long* lpdwBytesSent nullptr);// 返回值true成功false失败AsyncConnector connect(void* lpSendBuffer nullptr, unsigned long dwSendDataLength 0, unsigned long* lpdwBytesSent nullptr);
};// 作为服务端Acceptor应该具有事件完成并立即调度优先级保证吞吐量
// 返回值true成功false失败
inline
AsyncAcceptor
accept(AsyncListener sockListener, AsyncAcceptor sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted) {static LPFN_ACCEPTEX lpfnAcceptEx 0;sockListener.type() IoFileType::TListen;sockAccept.type() IoFileType::TAccept;sockAccept.resetCompleted();sockAccept.setTransferredBytesCountRecvBuffer(lpNumberOfBytesAccepted);sockAccept.setPriority(IoFilePriority::DispatchImmediately);//设置为立即调度优先级if (lpNumberOfBytesAccepted)*lpNumberOfBytesAccepted 0;if (!lpfnAcceptEx) {GUID GuidAcceptEx WSAID_ACCEPTEX; // GUID这个是识别AcceptEx函数必须的unsigned long dwBytes 0;if (SOCKET_ERROR WSAIoctl(sockListener,SIO_GET_EXTENSION_FUNCTION_POINTER,GuidAcceptEx,sizeof(GuidAcceptEx),lpfnAcceptEx,sizeof(lpfnAcceptEx),dwBytes, NULL, NULL)){lpfnAcceptEx 0;throw std::system_error(WSAGetLastError(), std::system_category(), Accept get AcceptEx function address error.);}}bool ret lpfnAcceptEx(sockListener,sockAccept,(char*)lpAcceptBuffer,nNumberOfBytesAcceptBuffer - (sizeof(sockaddr_in) 16) * 2,sizeof(sockaddr_in) 16,sizeof(sockaddr_in) 16,lpNumberOfBytesAccepted,sockAccept);if (ret false) {auto lr WSAGetLastError();if (lr WSA_IO_PENDING)ret true;}if (ret) {sockAccept.setReturn(ret);return sockAccept;}sockAccept.setReturn(false);sockAccept.setCompleted();sockAccept.setPriority(IoFilePriority::WaitingForPolling);return sockAccept;
}// 返回值true成功false失败
inline
AsyncConnector
connect(AsyncConnector sockCon, const sockaddr* name, int namelen, void* lpSendBuffer nullptr, unsigned long dwSendDataLength 0, unsigned long* lpdwBytesSent nullptr) {static LPFN_CONNECTEX lpfnConnectEx 0;sockCon.type() IoFileType::TConnect;sockCon.resetCompleted();if (lpdwBytesSent)*lpdwBytesSent 0;if (!lpfnConnectEx) {GUID GuidConnectEx WSAID_CONNECTEX; // GUID这个是识别AcceptEx函数必须的unsigned long dwBytes 0;if (SOCKET_ERROR WSAIoctl(sockCon,SIO_GET_EXTENSION_FUNCTION_POINTER,GuidConnectEx,sizeof(GuidConnectEx),lpfnConnectEx,sizeof(lpfnConnectEx),dwBytes, NULL, NULL)){lpfnConnectEx 0;throw std::system_error(WSAGetLastError(), std::system_category(), Connect get ConnectEx function address error.);}}sockCon.setTransferredBytesCountRecvBuffer(lpdwBytesSent);bool ret lpfnConnectEx(sockCon,name,namelen,lpSendBuffer,dwSendDataLength,lpdwBytesSent,sockCon);if (ret false) {auto lr WSAGetLastError();if (lr WSA_IO_PENDING)ret true;}if (ret) {sockCon.setReturn(ret);return sockCon;}sockCon.setReturn(false);sockCon.setCompleted();return sockCon;
}// 返回值true成功false失败
inline
AsyncConnector
connect(AsyncConnector sockCon, const char* ip, unsigned short port, void* lpSendBuffer nullptr, unsigned long dwSendDataLength 0, unsigned long* lpdwBytesSent nullptr) {sockCon.setConnectRemoteAddress(ip, port);sockCon.bindConnectLocalPort(0);return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}// 返回值true成功false失败
inline
AsyncConnector
connect(AsyncConnector sockCon, void* lpSendBuffer nullptr, unsigned long dwSendDataLength 0, unsigned long* lpdwBytesSent nullptr) {return connect(sockCon, (const sockaddr*)sockCon.remoteAddress(), sizeof(*sockCon.remoteAddress()),lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}// 返回值0成功SOCKET_ERROR失败
inline
AsyncTcp
send(AsyncTcp sock, const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend) {sock.type() IoFileType::TSend;sock.resetCompleted();if (lpNumberOfBytesSend)*lpNumberOfBytesSend 0;sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSend);WSABUF wsaBuf{ nNumberOfBytesSendBuffer , (char*)lpSendBuffer };auto ret WSASend(sock, wsaBuf, 1, lpNumberOfBytesSend, 0, sock, NULL);if (ret SOCKET_ERROR) {auto lr WSAGetLastError();if (lr WSA_IO_PENDING)ret 0;elsesock.setCompleted();}sock.setReturn(ret);return sock;
}// 返回值0成功SOCKET_ERROR失败
inline
AsyncTcp
recv(AsyncTcp sock, void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv) {sock.type() IoFileType::TRecv;sock.resetCompleted();if (lpNumberOfBytesRecv)*lpNumberOfBytesRecv 0;sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecv);WSABUF wsaBuf{ nNumberOfBytesRecvBuffer , (char*)lpRecvBuffer };unsigned long dwFlag 0;auto ret WSARecv(sock, wsaBuf, 1, NULL, dwFlag, sock, NULL);if (ret SOCKET_ERROR) {auto lr WSAGetLastError();if (lr WSA_IO_PENDING)ret 0;elsesock.setCompleted();}sock.setReturn(ret);return sock;
}// 支持异步挂起的UDP非连接套接字
struct AsyncUdp : public AsyncSocketint {// 设置失败返回-1返回1设置为广播模式client端返回0则为接收端server端int status() const { return m_isBroadCast; }int* remoteLen() { return m_remoteLen; }protected://isBroadCast true则为发送端udpclient端使用sendTo此时可以在sendTo阶段动态指定广播目的地址//isBroadCast false则为接受端udpserver端使用recvFrom构造时必须指定绑定的广播接收地址AsyncUdp(CoScheduler sc, bool isBroadCast true, const char* ip 0, unsigned short port 0): AsyncSocket(sc, WSASocketW(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED)){setBroadCast(isBroadCast, ip, port);}// 设置失败返回-1返回1设置为广播模式client端返回0则为接收端server端int setBroadCast(bool isBroadCast, const char* ip, unsigned short port) {if (*this *this ! INVALID_SOCKET){m_isBroadCast isBroadCast;if (::setsockopt(*this, SOL_SOCKET, SO_BROADCAST, (char*)m_isBroadCast, sizeof(m_isBroadCast)) 0) {if (isBroadCast) {setBindAddress(0, 0);setBroadcastAddress(ip, port);}else {setBindAddress(ip, port);}return m_isBroadCast;}}return m_isBroadCast -1;}// 设置接收器绑定的收听本地地址bool setBindAddress(const char* ip, unsigned short port){memset(m_local, 0, sizeof(m_local));m_local.sin_family AF_INET;m_local.sin_addr.S_un.S_addr ip ? inet_addr(ip) : INADDR_ANY;m_local.sin_port htons(port);char opt 1;if (setsockopt(*this, SOL_SOCKET, SO_REUSEADDR, (const char*)opt, sizeof(opt))) {throw std::system_error(WSAGetLastError(), std::system_category(), AsyncUdp set reuse address error.);}if (::bind(*this, (const sockaddr*)m_local, sizeof(sockaddr_in))){throw std::system_error(WSAGetLastError(), std::system_category(), AsyncUdp bind address error.);}return true;}// 设置发送要广播到的目标地址远端地址void setBroadcastAddress(const char* ip, unsigned short port){memset(m_remote, 0, sizeof(m_remote));m_remote.sin_family AF_INET;m_remote.sin_addr.S_un.S_addr ip ? inet_addr(ip) : INADDR_ANY;m_remote.sin_port htons(port);}int m_remoteLen 0;int m_isBroadCast -1;
};// 支持异步挂起的UDP协议广播器套接字发送端client端
struct AsyncBroadcastor :public AsyncUdp {AsyncBroadcastor(CoScheduler sc, const char* ip 0, unsigned short port 0):AsyncUdp(sc, true, ip, port){type() IoFileType::TSendto;}// 发送端udpclient端向内部已保存的指定的广播地址发送数据未设置广播地址将失败// 返回值0成功SOCKET_ERROR失败AsyncBroadcastor sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);// 发送端udpclient端向动态指定的广播地址发送数据// 返回值0成功SOCKET_ERROR失败AsyncBroadcastor sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent);bool isValidBroadcastor() const { return status() 1; }using AsyncUdp::setBroadcastAddress;
};// 支持异步挂起的UDP协议接收器套接字接收端server端
struct AsyncReceiver :public AsyncUdp {AsyncReceiver(CoScheduler sc, const char* ip, unsigned short port):AsyncUdp(sc, false, ip, port){type() IoFileType::TRecvfrom;}// 接收端udpserver端向绑定的本地地址获取广播数据// 返回值0成功SOCKET_ERROR失败AsyncReceiver recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd);bool isValidReceiver() const { return status() 0; }using AsyncUdp::setBindAddress;
};// 返回值0成功SOCKET_ERROR失败
inline
AsyncBroadcastor
sendTo(AsyncBroadcastor sock, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {sock.type() IoFileType::TSendto;sock.resetCompleted();if (lpNumberOfBytesSent)*lpNumberOfBytesSent 0;sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesSent);WSABUF wsaBuf{ nNumberOfBytesSentBuffer , (char*)lpSentBuffer };auto ret WSASendTo(sock, wsaBuf, 1, lpNumberOfBytesSent, 0,(const sockaddr*)sock.remoteAddress(), (int)sizeof(sockaddr_in), sock, NULL);if (ret SOCKET_ERROR) {auto lr WSAGetLastError();if (lr WSA_IO_PENDING)ret 0;elsesock.setCompleted();}sock.setReturn(ret);return sock;
}// 返回值0成功SOCKET_ERROR失败
inline
AsyncBroadcastor
sendTo(AsyncBroadcastor sock, const char* ip, unsigned short port,const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent) {sock.setBroadcastAddress(ip, port);return ::sendTo(sock, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);;
}// 返回值0成功SOCKET_ERROR失败
inline
AsyncReceiver
recvFrom(AsyncReceiver sock, void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd) {sock.type() IoFileType::TRecvfrom;sock.resetCompleted();if (lpNumberOfBytesRecvd)*lpNumberOfBytesRecvd 0;sock.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRecvd);WSABUF wsaBuf{ nNumberOfBytesRecvdBuffer , (char*)lpRecvdBuffer };DWORD dwFlag 0;*sock.remoteLen() sizeof(sockaddr_in);auto ret WSARecvFrom(sock, wsaBuf, 1, NULL, dwFlag,(sockaddr*)sock.remoteAddress(), sock.remoteLen(), sock, NULL);if (ret SOCKET_ERROR) {auto lr WSAGetLastError();if (lr WSA_IO_PENDING)ret 0;elsesock.setCompleted();}sock.setReturn(ret);return sock;
}struct AsyncFile : public IoFileAwaitablebool {AsyncFile(CoScheduler sc, const char* filename,unsigned long dwDesiredAccess,unsigned long dwShareMode,LPSECURITY_ATTRIBUTES lpSecurityAttributes,unsigned long dwCreationDisposition,unsigned long dwFlagsAndAttributes,HANDLE hTemplateFile):IoFileAwaitable(sc, CreateFileA(filename, dwDesiredAccess, dwShareMode,lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile)){}AsyncFile(CoScheduler sc, const wchar_t* filename,unsigned long dwDesiredAccess,unsigned long dwShareMode,LPSECURITY_ATTRIBUTES lpSecurityAttributes,unsigned long dwCreationDisposition,unsigned long dwFlagsAndAttributes,HANDLE hTemplateFile):IoFileAwaitable(sc, CreateFileW(filename, dwDesiredAccess, dwShareMode,lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile)){}~AsyncFile() { close(); }AsyncFile read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead);AsyncFile write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten);
};// 返回值true成功false失败
inline
AsyncFile
read(AsyncFile file, void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead) {file.type() IoFileType::TWrite;file.resetCompleted();if (lpNumberOfBytesRead)*lpNumberOfBytesRead 0;file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesRead);auto ret ReadFile(file, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, file);if (ret false) {auto lr GetLastError();if (lr ERROR_IO_PENDING)ret true;elsefile.setCompleted();}file.setReturn(ret);return file;
}// 返回值true成功false失败
inline
AsyncFile
write(AsyncFile file, const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten) {file.type() IoFileType::TWrite;file.resetCompleted();if (lpNumberOfBytesWritten)*lpNumberOfBytesWritten 0;file.setTransferredBytesCountRecvBuffer(lpNumberOfBytesWritten);auto ret WriteFile(file, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, file);if (ret false) {auto lr GetLastError();if (lr ERROR_IO_PENDING)ret true;elsefile.setCompleted();}file.setReturn(ret);return file;
}struct AsyncSleepor :public IoFileAwaitablelong long {AsyncSleepor(long long microOrMilliSeconds 0, bool useMicroSeconds false): microOrMilliSeconds(microOrMilliSeconds), useMicroSeconds(useMicroSeconds){type() IoFileType::TSleep;start();}void start(){tp std::chrono::steady_clock::now();}auto getSpendMicroSeconds() const {constexpr auto div std::nano::den / std::micro::den;std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);return delta.count() / div;}auto getSpendMilliSeconds() const {constexpr auto div std::nano::den / std::milli::den;std::chrono::nanoseconds delta(std::chrono::steady_clock::now() - tp);return delta.count() / div;}bool isCompleted() {setReturn(useMicroSeconds ? getSpendMicroSeconds() : getSpendMilliSeconds());return (m_isCompleted getReturn() microOrMilliSeconds);}
protected:long long microOrMilliSeconds;bool useMicroSeconds;std::chrono::steady_clock::time_point tp;
};//毫秒妙级别休眠,返回实际休眠的毫妙数
inline
AsyncSleepor
sleepFor(long long milliSeconds) {return AsyncSleepor{ milliSeconds };
}//微妙级别休眠,返回实际休眠的微妙数
inline
AsyncSleepor
sleepForEx(long long microSeconds) {return AsyncSleepor{ microSeconds, true };
}void test_coroutine_tcp_server(unsigned short serverPort 33100, int totalClientCount 100, bool dumpTestInfo 0);void test_coroutine_udp_random_broadcast(unsigned short broadCastPort 33000, int totalClientBroadcastCount 20, bool dumpTestInfo 0);#endif#endif
实现文件
CLCoroutine.cpp
#include CLCoroutine.h#if (defined(CLUseCorotine) CLUseCorotine)#include ../_cl_common/CLCommon.h
#include ../_cl_string/CLString.h
#include ../_cl_logger/CLLogger.hvoid CoScheduler::run() {auto coro [this]() -MainCoro {//logger.debug(\nMain coro scheduler started ...);
#ifdef _WIN32 if (m_hIocp) {CLString err;DWORD dwMilliseconds 0;//logger.debug(\nMain coro scheduler: Iocp loop started ...);while (1) {DWORD numberOfBytesTransferred 0;ULONG_PTR completionKey 0;OVERLAPPED* pOverlapped 0;while (GetQueuedCompletionStatus(m_hIocp, numberOfBytesTransferred, completionKey, pOverlapped, dwMilliseconds)){if (pOverlapped) { //io完成事件auto pFile (IoFileAwaitable*)pOverlapped;pFile-setCompleted();pFile-setLastError(ERROR_SUCCESS);auto saveBuf (DWORD*)pFile-getTransferredBytesCountBuffer();if (saveBuf) *saveBuf numberOfBytesTransferred;// 根据可等待对象的优先级决定是否立即调度或是轮流调度让每个任务的权重相同switch (pFile-getPriority()){case IoFilePriority::DispatchImmediately:moveTaskToEnd(pFile-onwer()); //立即调度break;default:moveTaskToEnd(m_begin.hd); //轮询调度break;}m_end.resume();}else { //新task来到立即调度if (numberOfBytesTransferred 0 completionKey) {auto h CoTask::handle::from_address((void*)completionKey);moveTaskToEnd(h);h.resume();}else {auto lr GetLastError();logger.warning(Iocp: get status in event loop: ,err.getLastErrorString(lr));CLString().getLastErrorMessageBoxExceptSucceed(lr);}}}auto lr GetLastError();if (lr WSA_WAIT_TIMEOUT) {moveTaskToEnd(m_begin.hd); //轮询调度m_end.resume(); //执行resume此刻所有等待io均未完成不会执行但yeild让渡的协程得到执行}else if(pOverlapped) {auto pFile (IoFileAwaitable*)pOverlapped;pFile-setCompleted();pFile-setLastError(lr);auto saveBuf (DWORD*)pFile-getTransferredBytesCountBuffer();if (saveBuf) *saveBuf 0;IoFileType fileType pFile-type();switch (fileType){case TUnknown:break;case TRead:case TWrite:case TListen:case TAccept:case TConnect:pFile-setReturn(false);break;case TSend:case TRecv:case TSendto:case TRecvfrom:pFile-setReturn(SOCKET_ERROR);break;case TSleep:break;default:break;}switch (lr){case ERROR_NETNAME_DELETED: //64 指定的网络名不再可用 break;case ERROR_SEM_TIMEOUT://121 信号灯超时break;default:logger.error(Iocp: get status out event loop: , err.getLastErrorString(lr));CLString().getLastErrorMessageBoxExceptSucceed(lr);break;}// 根据可等待对象的优先级决定是否立即调度或是轮流调度让每个任务的权重相同switch (pFile-getPriority()){case IoFilePriority::DispatchImmediately:moveTaskToEnd(pFile-onwer()); //立即调度break;default:moveTaskToEnd(m_begin.hd); //轮询调度break;}m_end.resume();}else {logger.error(Iocp: get status out event loop no completed: , err.getLastErrorString(lr));CLString().getLastErrorMessageBoxExceptSucceed(lr);}if (taskCount() 0)break;}CloseHandle(m_hIocp);m_hIocp 0;//logger.debug(\nMain coro scheduler: Iocp loop has done ...);}
#endif//logger.debug(\nMain coro scheduler quit ...);co_return;};m_main coro();m_main.hd.promise().sc this;m_main.hd.resume();m_main.hd.destroy();
}bool CoTask::resume() {if (!hd)return true;else if (hd.done()) {return false;}else {auto pFile (IoFileAwaitable*) hd.promise().pAwaitableFile;if (!pFile) //第一次调度或者yield的协程hd.resume();else {if (pFile-type() IoFileType::TSleep) { //休眠调度if (((AsyncSleepor*)pFile)-isCompleted()) {hd.promise().pAwaitableFile nullptr;hd.resume();}}else if (pFile-isCompleted()) { //io完成调度hd.promise().pAwaitableFile nullptr;hd.resume();}}return true;}
}#ifdef _WIN32
#else // Windows
#endif // LinuxAsyncAcceptor AsyncListener::accept(AsyncAcceptor sockAccept, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{return ::accept(* this, sockAccept, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}AsyncAcceptor AsyncAcceptor::accept(AsyncListener sListen, void* lpAcceptBuffer, unsigned long nNumberOfBytesAcceptBuffer, unsigned long* lpNumberOfBytesAccepted)
{return ::accept(sListen, *this, lpAcceptBuffer, nNumberOfBytesAcceptBuffer, lpNumberOfBytesAccepted);
}AsyncConnector AsyncConnector::connect(const sockaddr* name, int namelen, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{return ::connect(*this, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}AsyncConnector AsyncConnector::connect(const char* ip, unsigned short port, void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{return ::connect(*this, ip, port, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}AsyncConnector AsyncConnector::connect(void* lpSendBuffer, unsigned long dwSendDataLength, unsigned long* lpdwBytesSent)
{return ::connect(*this, lpSendBuffer, dwSendDataLength, lpdwBytesSent);
}AsyncTcp AsyncTcp::send(const void* lpSendBuffer, unsigned long nNumberOfBytesSendBuffer, unsigned long* lpNumberOfBytesSend)
{return ::send(*this, lpSendBuffer, nNumberOfBytesSendBuffer, lpNumberOfBytesSend);
}AsyncTcp AsyncTcp::recv(void* lpRecvBuffer, unsigned long nNumberOfBytesRecvBuffer, unsigned long* lpNumberOfBytesRecv)
{return ::recv(*this, lpRecvBuffer, nNumberOfBytesRecvBuffer, lpNumberOfBytesRecv);
}AsyncBroadcastor AsyncBroadcastor::sendTo(const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{return ::sendTo(*this, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}AsyncBroadcastor AsyncBroadcastor::sendTo(const char* ip, unsigned short port, const void* lpSentBuffer, unsigned long nNumberOfBytesSentBuffer, unsigned long* lpNumberOfBytesSent)
{return ::sendTo(*this, ip, port, lpSentBuffer, nNumberOfBytesSentBuffer, lpNumberOfBytesSent);
}AsyncReceiver AsyncReceiver::recvFrom(void* lpRecvdBuffer, unsigned long nNumberOfBytesRecvdBuffer, unsigned long* lpNumberOfBytesRecvd)
{return ::recvFrom(*this, lpRecvdBuffer, nNumberOfBytesRecvdBuffer, lpNumberOfBytesRecvd);
}AsyncFile AsyncFile::read(void* lpBuffer, unsigned long nNumberOfBytesToRead, unsigned long* lpNumberOfBytesRead)
{return ::read(*this, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead);
}AsyncFile AsyncFile::write(const void* lpBuffer, unsigned long nNumberOfBytesToWrite, unsigned long* lpNumberOfBytesWritten)
{return ::write(*this, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten);
}#include vectorvoid test_coroutine_tcp_server(unsigned short serverPort, int totalClientCount, bool dumpTestInfo)
{logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);logger.openHeadInfoFlag(false);CoScheduler sc;int servRun 0;int totals 0;CLTick tk;// 服务端监听器tasksc.gather([]()-CoTask {logger.info(\nTcp server coro started ...);AsyncListener listener(sc, ADDR_ANY, serverPort);// loop acceptstd::vectorchar acceptbuf(260);AsyncAcceptor* pAcceptor 0;int servId 0;while (true){AsyncAcceptor acceptor pAcceptor ? *pAcceptor : *(pAcceptor new AsyncAcceptor(sc));DWORD nValidAccept;logger.debug(\nServer listener accept wait ...);bool ret co_await listener.accept(acceptor, acceptbuf.data(), acceptbuf.size(), nValidAccept);if (ret) {//create server taskacceptor.perseAddress(acceptbuf.data(), acceptbuf.size());servRun;// 服务端tasksc.gather([](AsyncAcceptor* pAcceptor, int idx) -CoTask {AsyncAcceptor acp *pAcceptor;std::vectorchar bufSend(260), bufRecv(260);DWORD nbytesSend, nbytesRecv;int total 1;while (1) {std::sprintf(bufSend.data(), \nHello client. this is server %d. %dst response., idx, total);logger.debug(\nServer[%d] send wait ..., idx);int ret co_await acp.send(bufSend.data(), std::strlen(bufSend.data()) 1, nbytesSend);logger.debug(\nServer[%d] recv wait ..., idx);ret co_await acp.recv(bufRecv.data(), bufRecv.size(), nbytesRecv);if (nbytesRecv 0)break;logger.debug(\nServer[%d] recv client msg %s, idx, bufRecv.data());total;totals;}logger.debug(\nServer[%d] recv client close msg, idx);delete pAcceptor;servRun--;}, pAcceptor, servId);pAcceptor 0;}}logger.info(\nTcp server coro quit.%d, GetCurrentThreadId());});// 客户端生成器sc.gather([]()-CoTask {logger.info(\nClients creater coro started.);int nClient 0;for (int i 0; 1; ){if (nClient totalClientCount) {i;logger.info(\nClients creater make client task %d., i);nClient;// 客户端tasksc.gather([](int idx)-CoTask {AsyncConnector con(sc);logger.debug(\nClient[%d] connect wait ..., idx);auto ret co_await con.connect(127.0.0.1, serverPort);if (!ret) {logger.debug(\nClinet[%d] connect server fail, %s, idx, CLString().getLastErrorString(GetLastError()));co_return;}std::vectorchar bufSend(260), bufRecv(260);DWORD nbytesSend, nbytesRecv;int total 1;while (1) {std::snprintf(bufSend.data(), bufSend.size(), \nHelle server, this is client %d: %dst request., idx, total);logger.debug(\nClient[%d] send wait ..., idx);auto ret co_await con.send(bufSend.data(), std::strlen(bufSend.data()) 1, nbytesSend);if (!(ret SOCKET_ERROR || nbytesSend 0)) {logger.debug(\nClient[%d] recv wait ..., idx);ret co_await con.recv(bufRecv.data(), bufRecv.size(), nbytesRecv);if (ret SOCKET_ERROR || nbytesRecv 0)break;logger.debug(\nClient[%d] recv server msg %s, idx, bufRecv.data());}total;}logger.debug(\nClient[%d] get server close msg and shutdown., idx);nClient--;}, i);}else {logger.debug(\nClients creater yield run privilege to coro scheduler.);co_yield 1;logger.debug(\nClients creater fetch run privilege again.);}}logger.debug(\nClients creater coro quit.);});// 统计协程sc.gather([]()-CoTask {auto last totals;auto lastTime tk.getSpendTime();while (1){co_await sleepFor(3000);auto time tk.getSpendTime();logger.info(\nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.,sc.taskCount(), totals, time, (totals - last) / (time - lastTime));last totals;lastTime time;}});sc.run();}void test_coroutine_udp_random_broadcast(unsigned short broadCastPort, int totalClientBroadcastCount, bool dumpTestInfo)
{logger.setLevel(dumpTestInfo ? logger.Debug : logger.Info);logger.openHeadInfoFlag(false);srand(time(0));CoScheduler sc;int servRun 0;int totalsRecvd 0;int totalsSendto 0;CLTick tk;std::vectorunsigned short portList(totalClientBroadcastCount);for (int i 0; i totalClientBroadcastCount; i)portList[i] broadCastPort i;// 服务端生成器sc.gather([]()-CoTask {logger.info(\nServers creater coro started.);int nServer 0;for (int i 0; 1; ){if (nServer totalClientBroadcastCount) {i;logger.info(\nServers creater make server task %d., i);nServer;// 服务端task 广播接收端sc.gather([](int i)-CoTask {logger.info(\nUdp server[%d] coro started bind port %d..., i, portList[i - 1]);AsyncReceiver serv(sc, 127.0.0.1, portList[i - 1]);// recvstd::vectorchar recv(260);int servId 0;int total 1;while (true){DWORD nbytesRecv;logger.debug(\nUdp server[%d] recvfrom wait ..., i);int ret co_await serv.recvFrom(recv.data(), recv.size(), nbytesRecv);if (ret SOCKET_ERROR || nbytesRecv 0) {CLString().getLastErrorMessageBoxExceptSucceed(WSAGetLastError());break;}logger.debug(\nUdp server[%d] recvfrom %dst broadcast %u bytes data, msg %s, i, total, nbytesRecv, recv.data());total;totalsRecvd;}logger.info(\nUdp server[%d] coro quit.%d, i, GetCurrentThreadId());nServer--;}, i);}else {logger.debug(\nServers creater yield run privilege to coro scheduler.);co_yield 1;logger.debug(\nServers creater fetch run privilege again.);}}logger.debug(\nServers creater coro quit.);});// 客户端生成器sc.gather([]()-CoTask {logger.info(\nClients creater coro started.);int nClient 0;for (int i 0; 1; ){if (nClient totalClientBroadcastCount) {i;logger.info(\nClients creater make broadcastor client task %d., i);nClient;// 客户端task 广播发送端sc.gather([](int idx)-CoTask {AsyncBroadcastor broadcast(sc);std::vectorchar bufSent(260);DWORD nbytesSent;int total 1;while (1) {auto randPort portList[rand() % totalClientBroadcastCount];std::snprintf(bufSent.data(), bufSent.size(), \nHelle server, this is broadcastor %d: %dst randon broadcast to port%d., idx, total, randPort);logger.debug(\nBroadcastor[%d] send wait ..., idx);auto ret co_await broadcast.sendTo(127.0.0.1, randPort,bufSent.data(), std::strlen(bufSent.data()) 1, nbytesSent);if (ret SOCKET_ERROR || nbytesSent 0) {break;}logger.debug(\nBroadcastor[%d] sendto server msg %s, idx, bufSent.data());total;totalsSendto;}logger.debug(\nBroadcastor[%d] send 0 bytes and shutdown., idx);nClient--;}, i);}else {logger.debug(\nClients creater yield run privilege to coro scheduler.);co_yield 1;logger.debug(\nClients creater fetch run privilege again.);}}logger.debug(\nClients creater coro quit.);});// 统计协程sc.gather([]()-CoTask {auto last totalsRecvd totalsSendto;auto lastTime tk.getSpendTime();while (1){co_await sleepFor(3000); // 协程休眠3000毫秒auto time tk.getSpendTime();logger.info(\nSummary coro count %u: total handle %d times (spend time %gs), %g times/per-second.,sc.taskCount(), totalsRecvd totalsSendto, time, (totalsRecvd totalsSendto - last) / (time - lastTime));last totalsRecvd totalsSendto;lastTime time;}});sc.run();
}#endif