diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 19b5ee5d1..501cf30c0 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -365,7 +365,8 @@ void EQ::Net::DaybreakConnection::Process() FlushBuffer(); } - ProcessQueue(); + ProcessInboundQueue(); + ProcessOutboundQueue(); } catch (std::exception ex) { LogF(Logs::Detail, Logs::Netcode, "Error processing connection: {0}", ex.what()); @@ -440,7 +441,7 @@ void EQ::Net::DaybreakConnection::ProcessPacket(Packet &p) } } -void EQ::Net::DaybreakConnection::ProcessQueue() +void EQ::Net::DaybreakConnection::ProcessInboundQueue() { for (int i = 0; i < 4; ++i) { auto stream = &m_streams[i]; @@ -459,6 +460,31 @@ void EQ::Net::DaybreakConnection::ProcessQueue() } } +void EQ::Net::DaybreakConnection::ProcessOutboundQueue() +{ + for (int i = 0; i < 4; ++i) { + auto stream = &m_streams[i]; + + if (stream->outstanding_bytes == 0) { + continue; + } + + while (!stream->buffered_packets.empty()) { + auto &buff = stream->buffered_packets.front(); + + if (stream->outstanding_bytes + buff.sent.packet.Length() >= m_owner->m_options.max_outstanding_bytes || + stream->outstanding_packets.size() + 1 >= m_owner->m_options.max_outstanding_packets) { + break; + } + + stream->outstanding_bytes += buff.sent.packet.Length(); + stream->outstanding_packets.insert(std::make_pair(buff.seq, buff.sent)); + InternalSend(buff.sent.packet); + stream->buffered_packets.pop_front(); + } + } +} + void EQ::Net::DaybreakConnection::RemoveFromQueue(int stream, uint16_t seq) { auto s = &m_streams[stream]; @@ -1019,7 +1045,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) auto now = Clock::now(); auto s = &m_streams[stream]; - for (auto &entry : s->sent_packets) { + for (auto &entry : s->outstanding_packets) { auto time_since_last_send = std::chrono::duration_cast(now - entry.second.last_sent); if (entry.second.times_resent == 0) { if ((size_t)time_since_last_send.count() > m_resend_delay) { @@ -1051,8 +1077,8 @@ void EQ::Net::DaybreakConnection::Ack(int stream, uint16_t seq) auto now = Clock::now(); auto s = &m_streams[stream]; - auto iter = s->sent_packets.begin(); - while (iter != s->sent_packets.end()) { + auto iter = s->outstanding_packets.begin(); + while (iter != s->outstanding_packets.end()) { auto order = CompareSequence(seq, iter->first); if (order != SequenceFuture) { @@ -1063,7 +1089,8 @@ void EQ::Net::DaybreakConnection::Ack(int stream, uint16_t seq) m_stats.last_ping = round_time; m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3; - iter = s->sent_packets.erase(iter); + s->outstanding_bytes -= iter->second.packet.Length(); + iter = s->outstanding_packets.erase(iter); } else { ++iter; @@ -1075,8 +1102,8 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq) { auto now = Clock::now(); auto s = &m_streams[stream]; - auto iter = s->sent_packets.find(seq); - if (iter != s->sent_packets.end()) { + auto iter = s->outstanding_packets.find(seq); + if (iter != s->outstanding_packets.end()) { uint64_t round_time = (uint64_t)std::chrono::duration_cast(now - iter->second.last_sent).count(); m_stats.max_ping = std::max(m_stats.max_ping, round_time); @@ -1084,10 +1111,30 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq) m_stats.last_ping = round_time; m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3; - s->sent_packets.erase(iter); + s->outstanding_bytes -= iter->second.packet.Length(); + s->outstanding_packets.erase(iter); } } +void EQ::Net::DaybreakConnection::BufferPacket(int stream, uint16_t seq, DaybreakSentPacket &sent) +{ + auto s = &m_streams[stream]; + //If we can send the packet then send it + //else buffer it to be sent when we can send it + if (s->outstanding_bytes + sent.packet.Length() >= m_owner->m_options.max_outstanding_bytes || s->outstanding_packets.size() + 1 >= m_owner->m_options.max_outstanding_packets) { + //Would go over one of the limits, buffer this packet. + DaybreakBufferedPacket bp; + bp.sent = std::move(sent); + bp.seq = seq; + s->buffered_packets.push_back(bp); + return; + } + + s->outstanding_bytes += sent.packet.Length(); + s->outstanding_packets.insert(std::make_pair(seq, sent)); + InternalSend(sent.packet); +} + void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq) { DaybreakReliableHeader ack; @@ -1293,11 +1340,9 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, sent.last_sent = Clock::now(); sent.first_sent = Clock::now(); sent.times_resent = 0; - stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); + BufferPacket(stream_id, stream->sequence_out, sent); stream->sequence_out++; - InternalBufferedSend(first_packet); - while (used < length) { auto left = length - used; DynamicPacket packet; @@ -1321,10 +1366,8 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, sent.last_sent = Clock::now(); sent.first_sent = Clock::now(); sent.times_resent = 0; - stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); + BufferPacket(stream_id, stream->sequence_out, sent); stream->sequence_out++; - - InternalBufferedSend(packet); } } else { @@ -1341,10 +1384,8 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, sent.last_sent = Clock::now(); sent.first_sent = Clock::now(); sent.times_resent = 0; - stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); + BufferPacket(stream_id, stream->sequence_out, sent); stream->sequence_out++; - - InternalBufferedSend(packet); } } diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index 2d99153e9..2c318793f 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include namespace EQ @@ -145,6 +145,12 @@ namespace EQ size_t times_resent; }; + struct DaybreakBufferedPacket + { + uint16_t seq; + DaybreakSentPacket sent; + }; + struct DaybreakStream { DaybreakStream() { @@ -152,17 +158,20 @@ namespace EQ sequence_out = 0; fragment_current_bytes = 0; fragment_total_bytes = 0; + outstanding_bytes = 0; } uint16_t sequence_in; uint16_t sequence_out; std::unordered_map packet_queue; + std::deque buffered_packets; DynamicPacket fragment_packet; uint32_t fragment_current_bytes; uint32_t fragment_total_bytes; - std::unordered_map sent_packets; + std::unordered_map outstanding_packets; + size_t outstanding_bytes; }; DaybreakStream m_streams[4]; @@ -170,7 +179,8 @@ namespace EQ void Process(); void ProcessPacket(Packet &p); - void ProcessQueue(); + void ProcessInboundQueue(); + void ProcessOutboundQueue(); void RemoveFromQueue(int stream, uint16_t seq); void AddToQueue(int stream, uint16_t seq, const Packet &p); void ProcessDecodedPacket(const Packet &p); @@ -186,6 +196,7 @@ namespace EQ void ProcessResend(int stream); void Ack(int stream, uint16_t seq); void OutOfOrderAck(int stream, uint16_t seq); + void BufferPacket(int stream, uint16_t seq, DaybreakSentPacket &sent); void SendConnect(); void SendKeepAlive(); @@ -225,6 +236,8 @@ namespace EQ tic_rate_hertz = 60.0; resend_timeout = 90000; connection_close_time = 2000; + max_outstanding_packets = 200; + max_outstanding_bytes = 200 * 512; } size_t max_packet_size; @@ -247,6 +260,8 @@ namespace EQ size_t connection_close_time; DaybreakEncodeType encode_passes[2]; int port; + size_t max_outstanding_packets; + size_t max_outstanding_bytes; }; class DaybreakConnectionManager