diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 19b5ee5d1..82a7e2d12 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -277,7 +277,7 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner m_encode_passes[1] = owner->m_options.encode_passes[1]; m_hold_time = Clock::now(); m_buffered_packets_length = 0; - m_rolling_ping = 900; + m_rolling_ping = 500; m_resend_delay = (m_rolling_ping * m_owner->m_options.resend_delay_factor) + m_owner->m_options.resend_delay_ms; m_combined.reset(new char[512]); m_combined[0] = 0; @@ -300,7 +300,7 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner m_crc_bytes = 0; m_hold_time = Clock::now(); m_buffered_packets_length = 0; - m_rolling_ping = 900; + m_rolling_ping = 500; m_resend_delay = (m_rolling_ping * m_owner->m_options.resend_delay_factor) + m_owner->m_options.resend_delay_ms; m_combined.reset(new char[512]); m_combined[0] = 0; @@ -365,7 +365,7 @@ void EQ::Net::DaybreakConnection::Process() FlushBuffer(); } - ProcessQueue(); + ProcessInboundQueue(); } catch (std::exception ex) { LogF(Logs::Detail, Logs::Netcode, "Error processing connection: {0}", ex.what()); @@ -440,7 +440,7 @@ void EQ::Net::DaybreakConnection::ProcessPacket(Packet &p) } } -void EQ::Net::DaybreakConnection::ProcessQueue() +void EQ::Net::DaybreakConnection::ProcessInboundQueue() { for (int i = 0; i < 4; ++i) { auto stream = &m_streams[i]; @@ -459,6 +459,31 @@ void EQ::Net::DaybreakConnection::ProcessQueue() } } +void EQ::Net::DaybreakConnection::ProcessOutboundQueue() +{ + for (int i = 0; i < 4; ++i) { + auto stream = &m_streams[i]; + + if (stream->outstanding_bytes == 0) { + continue; + } + + while (!stream->buffered_packets.empty()) { + auto &buff = stream->buffered_packets.front(); + + if (stream->outstanding_bytes + buff.sent.packet.Length() >= m_owner->m_options.max_outstanding_bytes || + stream->outstanding_packets.size() + 1 >= m_owner->m_options.max_outstanding_packets) { + break; + } + + stream->outstanding_bytes += buff.sent.packet.Length(); + stream->outstanding_packets.insert(std::make_pair(buff.seq, buff.sent)); + InternalSend(buff.sent.packet); + stream->buffered_packets.pop_front(); + } + } +} + void EQ::Net::DaybreakConnection::RemoveFromQueue(int stream, uint16_t seq) { auto s = &m_streams[stream]; @@ -1019,14 +1044,14 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) auto now = Clock::now(); auto s = &m_streams[stream]; - for (auto &entry : s->sent_packets) { + for (auto &entry : s->outstanding_packets) { auto time_since_last_send = std::chrono::duration_cast(now - entry.second.last_sent); if (entry.second.times_resent == 0) { if ((size_t)time_since_last_send.count() > m_resend_delay) { InternalBufferedSend(entry.second.packet); entry.second.last_sent = now; entry.second.times_resent++; - m_rolling_ping += 300; + m_rolling_ping += 100; } } else { @@ -1040,7 +1065,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) InternalBufferedSend(entry.second.packet); entry.second.last_sent = now; entry.second.times_resent++; - m_rolling_ping += 300; + m_rolling_ping += 100; } } } @@ -1051,8 +1076,8 @@ void EQ::Net::DaybreakConnection::Ack(int stream, uint16_t seq) auto now = Clock::now(); auto s = &m_streams[stream]; - auto iter = s->sent_packets.begin(); - while (iter != s->sent_packets.end()) { + auto iter = s->outstanding_packets.begin(); + while (iter != s->outstanding_packets.end()) { auto order = CompareSequence(seq, iter->first); if (order != SequenceFuture) { @@ -1063,7 +1088,9 @@ void EQ::Net::DaybreakConnection::Ack(int stream, uint16_t seq) m_stats.last_ping = round_time; m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3; - iter = s->sent_packets.erase(iter); + s->outstanding_bytes -= iter->second.packet.Length(); + iter = s->outstanding_packets.erase(iter); + ProcessOutboundQueue(); } else { ++iter; @@ -1075,8 +1102,8 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq) { auto now = Clock::now(); auto s = &m_streams[stream]; - auto iter = s->sent_packets.find(seq); - if (iter != s->sent_packets.end()) { + auto iter = s->outstanding_packets.find(seq); + if (iter != s->outstanding_packets.end()) { uint64_t round_time = (uint64_t)std::chrono::duration_cast(now - iter->second.last_sent).count(); m_stats.max_ping = std::max(m_stats.max_ping, round_time); @@ -1084,10 +1111,31 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq) m_stats.last_ping = round_time; m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3; - s->sent_packets.erase(iter); + s->outstanding_bytes -= iter->second.packet.Length(); + s->outstanding_packets.erase(iter); + ProcessOutboundQueue(); } } +void EQ::Net::DaybreakConnection::BufferPacket(int stream, uint16_t seq, DaybreakSentPacket &sent) +{ + auto s = &m_streams[stream]; + //If we can send the packet then send it + //else buffer it to be sent when we can send it + if (s->outstanding_bytes + sent.packet.Length() >= m_owner->m_options.max_outstanding_bytes || s->outstanding_packets.size() + 1 >= m_owner->m_options.max_outstanding_packets) { + //Would go over one of the limits, buffer this packet. + DaybreakBufferedPacket bp; + bp.sent = std::move(sent); + bp.seq = seq; + s->buffered_packets.push_back(bp); + return; + } + + s->outstanding_bytes += sent.packet.Length(); + s->outstanding_packets.insert(std::make_pair(seq, sent)); + InternalSend(sent.packet); +} + void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq) { DaybreakReliableHeader ack; @@ -1293,11 +1341,9 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, sent.last_sent = Clock::now(); sent.first_sent = Clock::now(); sent.times_resent = 0; - stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); + BufferPacket(stream_id, stream->sequence_out, sent); stream->sequence_out++; - InternalBufferedSend(first_packet); - while (used < length) { auto left = length - used; DynamicPacket packet; @@ -1321,10 +1367,8 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, sent.last_sent = Clock::now(); sent.first_sent = Clock::now(); sent.times_resent = 0; - stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); + BufferPacket(stream_id, stream->sequence_out, sent); stream->sequence_out++; - - InternalBufferedSend(packet); } } else { @@ -1341,10 +1385,8 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, sent.last_sent = Clock::now(); sent.first_sent = Clock::now(); sent.times_resent = 0; - stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); + BufferPacket(stream_id, stream->sequence_out, sent); stream->sequence_out++; - - InternalBufferedSend(packet); } } diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index 2d99153e9..2ec7f200f 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include namespace EQ @@ -145,6 +145,12 @@ namespace EQ size_t times_resent; }; + struct DaybreakBufferedPacket + { + uint16_t seq; + DaybreakSentPacket sent; + }; + struct DaybreakStream { DaybreakStream() { @@ -152,17 +158,20 @@ namespace EQ sequence_out = 0; fragment_current_bytes = 0; fragment_total_bytes = 0; + outstanding_bytes = 0; } uint16_t sequence_in; uint16_t sequence_out; std::unordered_map packet_queue; + std::deque buffered_packets; DynamicPacket fragment_packet; uint32_t fragment_current_bytes; uint32_t fragment_total_bytes; - std::unordered_map sent_packets; + std::unordered_map outstanding_packets; + size_t outstanding_bytes; }; DaybreakStream m_streams[4]; @@ -170,7 +179,8 @@ namespace EQ void Process(); void ProcessPacket(Packet &p); - void ProcessQueue(); + void ProcessInboundQueue(); + void ProcessOutboundQueue(); void RemoveFromQueue(int stream, uint16_t seq); void AddToQueue(int stream, uint16_t seq, const Packet &p); void ProcessDecodedPacket(const Packet &p); @@ -186,6 +196,7 @@ namespace EQ void ProcessResend(int stream); void Ack(int stream, uint16_t seq); void OutOfOrderAck(int stream, uint16_t seq); + void BufferPacket(int stream, uint16_t seq, DaybreakSentPacket &sent); void SendConnect(); void SendKeepAlive(); @@ -206,10 +217,10 @@ namespace EQ DaybreakConnectionManagerOptions() { max_connection_count = 0; keepalive_delay_ms = 9000; - resend_delay_ms = 300; + resend_delay_ms = 150; resend_delay_factor = 1.5; - resend_delay_min = 350; - resend_delay_max = 8000; + resend_delay_min = 150; + resend_delay_max = 4000; connect_delay_ms = 500; stale_connection_ms = 90000; connect_stale_ms = 5000; @@ -225,6 +236,8 @@ namespace EQ tic_rate_hertz = 60.0; resend_timeout = 90000; connection_close_time = 2000; + max_outstanding_packets = 300; + max_outstanding_bytes = 200 * 512; } size_t max_packet_size; @@ -247,6 +260,8 @@ namespace EQ size_t connection_close_time; DaybreakEncodeType encode_passes[2]; int port; + size_t max_outstanding_packets; + size_t max_outstanding_bytes; }; class DaybreakConnectionManager