网站开发科技公司,如何做原创漫画网站,黄冈做网站价格,wordpress调用指定文章idUDT主要通过在数据收发的过程中进行精细的控制来实现对于网络带宽更加有效的利用#xff0c;并使网络中数据传输的速率尽可能快。 如我们前面在分析数据发送的控制中看到的#xff0c;对于正常的顺序packet发送#xff0c;发送控制主要在于两个方面#xff0c;一是发送窗口… UDT主要通过在数据收发的过程中进行精细的控制来实现对于网络带宽更加有效的利用并使网络中数据传输的速率尽可能快。 如我们前面在分析数据发送的控制中看到的对于正常的顺序packet发送发送控制主要在于两个方面一是发送窗口的大小也就是某个时刻已经发送但未得到相应的packet的最大个数这一点主要由拥塞窗口大小m_dCongestionWindow和滑动窗口大小m_iFlowWindowSize来描述发送窗口大小为两者中较小的那一个二是控制两个数据包发送的时间间隔也就是包的发送速率这一点则主要用数据包发送时间间隔m_ullInterval来描述。所有的发送控制机制主要通过影响这几个变量来控制发送过程。 发送窗口大小对于数据packet发送过程的影响比较直接在CUDT::packData(CPacket packet, uint64_t ts)中会检查最后被ACK的数据packet的SeqNo m_iSndLastAck到最近发送的数据packet的SeqNo m_iSndCurrSeqNo的offset若offset大于窗口大小就不再发送数据。 发送送率的控制则略微复杂一点。在CUDT::packData(CPacket packet, uint64_t ts)中会计算下一个数据packet发送的理想的时间点记录在m_ullTargetTime中用于track及调整后续数据packet的发送时间并会将该时间值返回给调用者CSndUList::pop()CSndUList::pop()则会在将CUDT重新插入发送列表时更新CUDT的CSNode m_pSNode的时间戳字段并根据新的时间戳来讲CUDT放在CSndUList的CUDT堆的适当位置上。 这里就来更细致地看一下UDT中发送窗口大小及发送速率的调整。 滑动窗口大小m_iFlowWindowSize 对于m_iFlowWindowSize搜遍UDT的整个code可以看到主要在这样的几个地方会去更新它 1. 数据接收端和数据发送端在建立连接的过程中会通过HandShake消息协商确认该值。 CUDT::connect(const CPacket response) m_iFlowWindowSize m_ConnRes.m_iFlightFlagSize; 和CUDT::connect(const sockaddr* peer, CHandShake* hs) // exchange info for maximum flow window sizem_iFlowWindowSize hs-m_iFlightFlagSize; CUDT::connect(const sockaddr* serv_addr)中连接握手请求m_ConnReq的m_iFlightFlagSize m_ConnReq.m_iFlightFlagSize (m_iRcvBufSize m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize; 在CUDT::CUDT()中会初始化m_iFlightFlagSize和m_iRcvBufSize m_iFlightFlagSize 25600;m_iSndBufSize 8192;m_iRcvBufSize 8192; //Rcv buffer MUST NOT be bigger than Flight Flag size m_iFlightFlagSize和m_iRcvBufSize这两个选项还可以通过UDT::setsockopt()进行设置(src/core.cpp) void CUDT::setOpt(UDTOpt optName, const void* optval, int) {if (m_bBroken || m_bClosing)throw CUDTException(2, 1, 0);CGuard cg(m_ConnectionLock);CGuard sendguard(m_SendLock);CGuard recvguard(m_RecvLock);switch (optName) {case UDT_FC:if (m_bConnecting || m_bConnected)throw CUDTException(5, 2, 0);if (*(int*) optval 1)throw CUDTException(5, 3);// Mimimum recv flight flag size is 32 packetsif (*(int*) optval 32)m_iFlightFlagSize *(int*) optval;elsem_iFlightFlagSize 32;break;case UDT_RCVBUF:if (m_bOpened)throw CUDTException(5, 1, 0);if (*(int*) optval 0)throw CUDTException(5, 3, 0);// Mimimum recv buffer size is 32 packetsif (*(int*) optval (m_iMSS - 28) * 32)m_iRcvBufSize *(int*) optval / (m_iMSS - 28);elsem_iRcvBufSize 32;// recv buffer MUST not be greater than FC sizeif (m_iRcvBufSize m_iFlightFlagSize)m_iRcvBufSize m_iFlightFlagSize;break; 2. 数据接收端发送的“light” ACK消息减小m_iFlowWindowSize。 如CUDT::processCtrl(CPacket ctrlpkt)中这样的一段code // process a lite ACKif (4 ctrlpkt.getLength()) {ack *(int32_t *) ctrlpkt.m_pcData;if (CSeqNo::seqcmp(ack, m_iSndLastAck) 0) {m_iFlowWindowSize - CSeqNo::seqoff(m_iSndLastAck, ack);m_iSndLastAck ack;}break;} 如我们前面在 UDT数据收发的可靠性保障 的ACK部分看到的那样“light” ACK消息通常是由于在某一小段时间内突然到达了大量的数据packet才会发送的这通常表明发送端发送数据过快过多了因而“light” ACK消息会减小数据发送端的滑动窗口大小m_iFlowWindowSize。 3. 数据接收端发送的常规ACK消息直接设置发送端的滑动窗口大小m_iFlowWindowSize。 如CUDT::processCtrl(CPacket ctrlpkt)中这样的一段code if (CSeqNo::seqcmp(ack, m_iSndLastAck) 0) {// Update Flow Window Size, must update before and together with m_iSndLastAckm_iFlowWindowSize *((int32_t *) ctrlpkt.m_pcData 3);m_iSndLastAck ack;} 而数据接收端则主要根据它自己的接收缓冲区的可用大小来设置如CUDT::sendCtrl()中的这段code data[3] m_pRcvBuffer-getAvailBufSize();// a minimum flow window of 2 is used, even if buffer is full, to break potential deadlockif (data[3] 2)data[3] 2;if (currt 更新m_iFlowWindowSize的地方基本上就是这3个。 m_iFlowWindowSize除了会在CUDT::packData()中被用来决定发送窗口的大小之外它的初始值还决定着发送丢失列表的大小如CUDT::connect()中 // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.m_pSndLossList new CSndLossList(m_iFlowWindowSize * 2);m_pRcvLossList new CRcvLossList(m_iFlightFlagSize); 及拥塞控制器最大的拥塞窗口的大小如CUDT::connect()中 m_pCC-setMaxCWndSize(m_iFlowWindowSize); UDT中与滑动窗口大小相关的内容基本上就是这些。 数据包发送时间间隔m_ullInterval 对于m_ullInterval在UDT中则有如下的的几个地方会去更新它 1. 数据接收端和数据发送端在建立连接过程中的CUDT::connect(const CPacket response)和CUDT::connect(const sockaddr* peer, CHandShake* hs)中会根据拥塞控制器的m_dPktSndPeriod的初始值计算该值 m_ullInterval (uint64_t) (m_pCC-m_dPktSndPeriod * m_ullCPUFrequency); UDT默认的拥塞控制器是将m_dPktSndPeriod初始化为1.0的(src/ccc.cpp) CCC::CCC(): m_iSYNInterval(CUDT::m_iSYNInterval),m_dPktSndPeriod(1.0),
......void CUDTCC::init() {
......m_dCWndSize 16;m_dPktSndPeriod 1;
} 2. 将拥塞控制器中的计算成果同步进CUDT中时会更新m_ullInterval。可以看下CUDT::CCUpdate() void CUDT::CCUpdate() {m_ullInterval (uint64_t) (m_pCC-m_dPktSndPeriod * m_ullCPUFrequency);m_dCongestionWindow m_pCC-m_dCWndSize;if (m_llMaxBW 0)return;const double minSP 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency;if (m_ullInterval minSP)m_ullInterval minSP;
} 这里同样是根据拥塞控制器的m_dPktSndPeriod值计算该值计算方法也与前一种相同。但m_ullInterval的实际取值会受限与最大数据传输率m_llMaxBW的值。最大数据传输率m_llMaxBW默认为无效值-1CUDT::CUDT()中 m_llMaxBW -1; 但可以通过UDT::setsockopt()进行设置(src/core.cpp) case UDT_MAXBW:m_llMaxBW *(int64_t*) optval;break; 3. 数据接收端反馈的DelayWarning控制消息增加m_ullInterval。在CUDT::processCtrl()中可以看到对于DelayWarning消息的处理 case 4: //100 - Delay Warning// One way packet delay is increasing, so decrease the sending ratem_ullInterval (uint64_t) ceil(m_ullInterval * 1.125);m_iLastDecSeq m_iSndCurrSeqNo;break; CUDT::sendCtrl()中发送DelayWarning的过程 case 4: //100 - Congestion Warningctrlpkt.pack(pkttype);ctrlpkt.m_iID m_PeerID;m_pSndQueue-sendto(m_pPeerAddr, ctrlpkt);CTimer::rdtsc(m_ullLastWarningTime);break; 这个过程看上去蛮直接的。但在4.0版本的UDT中DelayWarning/CongestiongWarning消息实际上不再被用到了。在CUDT::processCtrl()中处理ACK2的case中可以看到被注释掉的这行code //if increasing delay detected...// sendCtrl(4); 只能缅怀曾经在发挥作用的 DelayWarning /CongestiongWarning消息 了。 如此看来则数据包发送时间间隔m_ullInterval似乎总是由拥塞控制器的m_dPktSndPeriod在决定了。 拥塞控制器 UDT使用拥塞控制器来追踪网络数据传输过程中发生的事件如接收到了ACK发生了超时等并根据这些事件产生的时机及其它的一些基本设置进行计算用计算结果来控制ACK消息发送的频率发送窗口的大小及数据包的发送频率等。 拥塞控制器虚拟工厂CCCVirtualFactory UDT的拥塞控制器机制中不直接创建拥塞控制器而是通过拥塞控制器工厂来创建。可以看一下拥塞控制器工厂接口CCCVirtualFactory及默认的拥塞控制器工厂实现CCCFactory的定义 class CCCVirtualFactory {public:virtual ~CCCVirtualFactory() {}virtual CCC* create() 0;virtual CCCVirtualFactory* clone() 0;
};templateclass T
class CCCFactory : public CCCVirtualFactory {public:virtual ~CCCFactory() {}virtual CCC* create() {return new T;}virtual CCCVirtualFactory* clone() {return new CCCFactoryT ;}
}; CCCFactory主要会被用于创建默认拥塞控制器CUDTCC(src/core.cpp) CUDT::CUDT() {
......m_pCCFactory new CCCFactoryCUDTCC; 要实现自定义的拥塞控制策略也是需要实现自己的拥塞控制器虚拟工厂接口CCCVirtualFactory令自定义的拥塞控制器虚拟工厂创建自定义的拥塞控制器并将自定义的拥塞控制器虚拟工厂设置给UDT。在CUDT::setOpt()中可见 case UDT_CC:if (m_bConnecting || m_bConnected)throw CUDTException(5, 1, 0);if (NULL ! m_pCCFactory)delete m_pCCFactory;m_pCCFactory ((CCCVirtualFactory *) optval)-clone();break; 顺便提一下在CUDT::getOpt()中获取UDT_CC选项的值则是直接获取的拥塞控制器而不是拥塞控制器虚拟工厂 void CUDT::getOpt(UDTOpt optName, void* optval, int optlen) {CGuard cg(m_ConnectionLock);
。。。。。。case UDT_CC:if (!m_bOpened)throw CUDTException(5, 5, 0);*(CCC**) optval m_pCC;optlen sizeof(CCC*);break; UDT中拥塞控制器虚拟工厂的内容基本上就是这些了。 拥塞控制器CCC 这里再来看一下拥塞控制CCC的定义(src/ccc.h) class UDT_API CCC {friend class CUDT;public:CCC();virtual ~CCC();private:CCC(const CCC);CCC operator(const CCC) {return *this;}public:// Functionality:// Callback function to be called (only) at the start of a UDT connection.// note that this is different from CCC(), which is always called.// Parameters:// None.// Returned value:// None.virtual void init() {}// Functionality:// Callback function to be called when a UDT connection is closed.// Parameters:// None.// Returned value:// None.virtual void close() {}// Functionality:// Callback function to be called when an ACK packet is received.// Parameters:// 0) [in] ackno: the data sequence number acknowledged by this ACK.// Returned value:// None.virtual void onACK(int32_t) {}// Functionality:// Callback function to be called when a loss report is received.// Parameters:// 0) [in] losslist: list of sequence number of packets, in the format describled in packet.cpp.// 1) [in] size: length of the loss list.// Returned value:// None.virtual void onLoss(const int32_t*, int) {}// Functionality:// Callback function to be called when a timeout event occurs.// Parameters:// None.// Returned value:// None.virtual void onTimeout() {}// Functionality:// Callback function to be called when a data is sent.// Parameters:// 0) [in] seqno: the data sequence number.// 1) [in] size: the payload size.// Returned value:// None.virtual void onPktSent(const CPacket*) {}// Functionality:// Callback function to be called when a data is received.// Parameters:// 0) [in] seqno: the data sequence number.// 1) [in] size: the payload size.// Returned value:// None.virtual void onPktReceived(const CPacket*) {}// Functionality:// Callback function to Process a user defined packet.// Parameters:// 0) [in] pkt: the user defined packet.// Returned value:// None.virtual void processCustomMsg(const CPacket*) {}protected:// Functionality:// Set periodical acknowldging and the ACK period.// Parameters:// 0) [in] msINT: the period to send an ACK.// Returned value:// None.void setACKTimer(int msINT);// Functionality:// Set packet-based acknowldging and the number of packets to send an ACK.// Parameters:// 0) [in] pktINT: the number of packets to send an ACK.// Returned value:// None.void setACKInterval(int pktINT);// Functionality:// Set RTO value.// Parameters:// 0) [in] msRTO: RTO in macroseconds.// Returned value:// None.void setRTO(int usRTO);// Functionality:// Send a user defined control packet.// Parameters:// 0) [in] pkt: user defined packet.// Returned value:// None.void sendCustomMsg(CPacket pkt) const;// Functionality:// retrieve performance information.// Parameters:// None.// Returned value:// Pointer to a performance info structure.const CPerfMon* getPerfInfo();// Functionality:// Set user defined parameters.// Parameters:// 0) [in] param: the paramters in one buffer.// 1) [in] size: the size of the buffer.// Returned value:// None.void setUserParam(const char* param, int size);private:void setMSS(int mss);void setMaxCWndSize(int cwnd);void setBandwidth(int bw);void setSndCurrSeqNo(int32_t seqno);void setRcvRate(int rcvrate);void setRTT(int rtt);protected:const int32_t m_iSYNInterval; // UDT constant parameter, SYNdouble m_dPktSndPeriod; // Packet sending period, in microsecondsdouble m_dCWndSize; // Congestion window size, in packetsint m_iBandwidth; // estimated bandwidth, packets per seconddouble m_dMaxCWndSize; // maximum cwnd size, in packetsint m_iMSS; // Maximum Packet Size, including all packet headersint32_t m_iSndCurrSeqNo; // current maximum seq no sent outint m_iRcvRate; // packet arrive rate at receiver side, packets per secondint m_iRTT; // current estimated RTT, microsecondchar* m_pcParam; // user defined parameterint m_iPSize; // size of m_pcParamprivate:UDTSOCKET m_UDT; // The UDT entity that this congestion control algorithm is bound toint m_iACKPeriod; // Periodical timer to send an ACK, in millisecondsint m_iACKInterval; // How many packets to send one ACK, in packetsbool m_bUserDefinedRTO; // if the RTO value is defined by usersint m_iRTO; // RTO value, microsecondsCPerfMon m_PerfInfo; // protocol statistics information
}; 然后是CCC的实现(src/ccc.cpp) CCC::CCC(): m_iSYNInterval(CUDT::m_iSYNInterval),m_dPktSndPeriod(1.0),m_dCWndSize(16.0),m_iBandwidth(),m_dMaxCWndSize(),m_iMSS(),m_iSndCurrSeqNo(),m_iRcvRate(),m_iRTT(),m_pcParam(NULL),m_iPSize(0),m_UDT(),m_iACKPeriod(0),m_iACKInterval(0),m_bUserDefinedRTO(false),m_iRTO(-1),m_PerfInfo() {
}CCC::~CCC() {delete[] m_pcParam;
}void CCC::setACKTimer(int msINT) {m_iACKPeriod msINT m_iSYNInterval ? m_iSYNInterval : msINT;
}void CCC::setACKInterval(int pktINT) {m_iACKInterval pktINT;
}void CCC::setRTO(int usRTO) {m_bUserDefinedRTO true;m_iRTO usRTO;
}void CCC::sendCustomMsg(CPacket pkt) const {CUDT* u CUDT::getUDTHandle(m_UDT);if (NULL ! u) {pkt.m_iID u-m_PeerID;u-m_pSndQueue-sendto(u-m_pPeerAddr, pkt);}
}const CPerfMon* CCC::getPerfInfo() {try {CUDT* u CUDT::getUDTHandle(m_UDT);if (NULL ! u)u-sample(m_PerfInfo, false);} catch (...) {return NULL;}return m_PerfInfo;
}void CCC::setMSS(int mss) {m_iMSS mss;
}void CCC::setBandwidth(int bw) {m_iBandwidth bw;
}void CCC::setSndCurrSeqNo(int32_t seqno) {m_iSndCurrSeqNo seqno;
}void CCC::setRcvRate(int rcvrate) {m_iRcvRate rcvrate;
}void CCC::setMaxCWndSize(int cwnd) {m_dMaxCWndSize cwnd;
}void CCC::setRTT(int rtt) {m_iRTT rtt;
}void CCC::setUserParam(const char* param, int size) {delete[] m_pcParam;m_pcParam new char[size];memcpy(m_pcParam, param, size);m_iPSize size;
} 然后是UDT中默认的CCC CUDTCC定义 class CUDTCC : public CCC {public:CUDTCC();public:virtual void init();virtual void onACK(int32_t);virtual void onLoss(const int32_t*, int);virtual void onTimeout();private:int m_iRCInterval; // UDT Rate control intervaluint64_t m_LastRCTime; // last rate increase timebool m_bSlowStart; // if in slow start phaseint32_t m_iLastAck; // last ACKed seq nobool m_bLoss; // if loss happened since last rate increaseint32_t m_iLastDecSeq; // max pkt seq no sent out when last decrease happeneddouble m_dLastDecPeriod; // value of pktsndperiod when last decrease happenedint m_iNAKCount; // NAK counterint m_iDecRandom; // random threshold on decrease by number of loss eventsint m_iAvgNAKNum; // average number of NAKs per congestionint m_iDecCount; // number of decreases in a congestion epoch
}; 然后是UDT中默认的CCC CUDTCC的实现 CUDTCC::CUDTCC(): m_iRCInterval(),m_LastRCTime(),m_bSlowStart(),m_iLastAck(),m_bLoss(),m_iLastDecSeq(),m_dLastDecPeriod(),m_iNAKCount(),m_iDecRandom(),m_iAvgNAKNum(),m_iDecCount() {
}void CUDTCC::init() {m_iRCInterval m_iSYNInterval;m_LastRCTime CTimer::getTime();setACKTimer(m_iRCInterval);m_bSlowStart true;m_iLastAck m_iSndCurrSeqNo;m_bLoss false;m_iLastDecSeq CSeqNo::decseq(m_iLastAck);m_dLastDecPeriod 1;m_iAvgNAKNum 0;m_iNAKCount 0;m_iDecRandom 1;m_dCWndSize 16;m_dPktSndPeriod 1;
}void CUDTCC::onACK(int32_t ack) {int64_t B 0;double inc 0;// Note: 1/24/2012// The minimum increase parameter is increased from 1.0 / m_iMSS to 0.01// because the original was too small and caused sending rate to stay at low level// for long time.const double min_inc 0.01;uint64_t currtime CTimer::getTime();if (currtime - m_LastRCTime (uint64_t) m_iRCInterval)return;m_LastRCTime currtime;if (m_bSlowStart) {m_dCWndSize CSeqNo::seqlen(m_iLastAck, ack);m_iLastAck ack;if (m_dCWndSize m_dMaxCWndSize) {m_bSlowStart false;if (m_iRcvRate 0)m_dPktSndPeriod 1000000.0 / m_iRcvRate;elsem_dPktSndPeriod (m_iRTT m_iRCInterval) / m_dCWndSize;}} elsem_dCWndSize m_iRcvRate / 1000000.0 * (m_iRTT m_iRCInterval) 16;// During Slow Start, no rate increaseif (m_bSlowStart)return;if (m_bLoss) {m_bLoss false;return;}B (int64_t) (m_iBandwidth - 1000000.0 / m_dPktSndPeriod);if ((m_dPktSndPeriod m_dLastDecPeriod) ((m_iBandwidth / 9) B))B m_iBandwidth / 9;if (B 0)inc min_inc;else {// inc max(10 ^ ceil(log10( B * MSS * 8 ) * Beta / MSS, 1/MSS)// Beta 1.5 * 10^(-6)inc pow(10.0, ceil(log10(B * m_iMSS * 8.0))) * 0.0000015 / m_iMSS;if (inc min_inc)inc min_inc;}m_dPktSndPeriod (m_dPktSndPeriod * m_iRCInterval) / (m_dPktSndPeriod * inc m_iRCInterval);
}void CUDTCC::onLoss(const int32_t* losslist, int) {//Slow Start stopped, if it hasnt yetif (m_bSlowStart) {m_bSlowStart false;if (m_iRcvRate 0) {// Set the sending rate to the receiving rate.m_dPktSndPeriod 1000000.0 / m_iRcvRate;return;}// If no receiving rate is observed, we have to compute the sending// rate according to the current window size, and decrease it// using the method below.m_dPktSndPeriod m_dCWndSize / (m_iRTT m_iRCInterval);}m_bLoss true;if (CSeqNo::seqcmp(losslist[0] 0x7FFFFFFF, m_iLastDecSeq) 0) {m_dLastDecPeriod m_dPktSndPeriod;m_dPktSndPeriod ceil(m_dPktSndPeriod * 1.125);m_iAvgNAKNum (int) ceil(m_iAvgNAKNum * 0.875 m_iNAKCount * 0.125);m_iNAKCount 1;m_iDecCount 1;m_iLastDecSeq m_iSndCurrSeqNo;// remove global synchronization using randomizationsrand(m_iLastDecSeq);m_iDecRandom (int) ceil(m_iAvgNAKNum * (double(rand()) / RAND_MAX));if (m_iDecRandom 1)m_iDecRandom 1;} else if ((m_iDecCount 5) (0 (m_iNAKCount % m_iDecRandom))) {// 0.875^5 0.51, rate should not be decreased by more than half within a congestion periodm_dPktSndPeriod ceil(m_dPktSndPeriod * 1.125);m_iLastDecSeq m_iSndCurrSeqNo;}
}void CUDTCC::onTimeout() {if (m_bSlowStart) {m_bSlowStart false;if (m_iRcvRate 0)m_dPktSndPeriod 1000000.0 / m_iRcvRate;elsem_dPktSndPeriod m_dCWndSize / (m_iRTT m_iRCInterval);} else {/*m_dLastDecPeriod m_dPktSndPeriod;m_dPktSndPeriod ceil(m_dPktSndPeriod * 2);m_iLastDecSeq m_iLastAck;*/}
} 具体的算法这里就不再仔细厘清了。 拥塞控制器就像是一个加工长一样接收一些事件经过自己的处理输出一些数据来控制数据的收发过程。拥塞控制器输出的数据主要有 1. 包发送周期m_dPktSndPeriod用于控制数据包的发送周期。 2. 拥塞控制窗口大小m_dCWndSize用于控制发送窗口的大小。 3. ACK发送周期m_iACKPeriod和ACK发送间隔m_iACKInterval用于控制ACK包发送的频率。 4. m_bUserDefinedRTO和m_iRTO用于控制超时时间。 如我们前面看到的m_dPktSndPeriod和m_dCWndSize会在CUDT::CCUpdate()中同步给CUDT。 Done。 转载于:https://my.oschina.net/wolfcs/blog/510893