Added packet buffering for dbg connections to avoid connections sending a ton of packets at once

This commit is contained in:
KimLS 2017-06-22 22:26:12 -07:00
parent d0e612b5ff
commit 1cabb091e7
2 changed files with 77 additions and 21 deletions

View File

@ -365,7 +365,8 @@ void EQ::Net::DaybreakConnection::Process()
FlushBuffer(); FlushBuffer();
} }
ProcessQueue(); ProcessInboundQueue();
ProcessOutboundQueue();
} }
catch (std::exception ex) { catch (std::exception ex) {
LogF(Logs::Detail, Logs::Netcode, "Error processing connection: {0}", ex.what()); LogF(Logs::Detail, Logs::Netcode, "Error processing connection: {0}", ex.what());
@ -440,7 +441,7 @@ void EQ::Net::DaybreakConnection::ProcessPacket(Packet &p)
} }
} }
void EQ::Net::DaybreakConnection::ProcessQueue() void EQ::Net::DaybreakConnection::ProcessInboundQueue()
{ {
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
auto stream = &m_streams[i]; auto stream = &m_streams[i];
@ -459,6 +460,31 @@ void EQ::Net::DaybreakConnection::ProcessQueue()
} }
} }
void EQ::Net::DaybreakConnection::ProcessOutboundQueue()
{
for (int i = 0; i < 4; ++i) {
auto stream = &m_streams[i];
if (stream->outstanding_bytes == 0) {
continue;
}
while (!stream->buffered_packets.empty()) {
auto &buff = stream->buffered_packets.front();
if (stream->outstanding_bytes + buff.sent.packet.Length() >= m_owner->m_options.max_outstanding_bytes ||
stream->outstanding_packets.size() + 1 >= m_owner->m_options.max_outstanding_packets) {
break;
}
stream->outstanding_bytes += buff.sent.packet.Length();
stream->outstanding_packets.insert(std::make_pair(buff.seq, buff.sent));
InternalSend(buff.sent.packet);
stream->buffered_packets.pop_front();
}
}
}
void EQ::Net::DaybreakConnection::RemoveFromQueue(int stream, uint16_t seq) void EQ::Net::DaybreakConnection::RemoveFromQueue(int stream, uint16_t seq)
{ {
auto s = &m_streams[stream]; auto s = &m_streams[stream];
@ -1019,7 +1045,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
auto now = Clock::now(); auto now = Clock::now();
auto s = &m_streams[stream]; auto s = &m_streams[stream];
for (auto &entry : s->sent_packets) { for (auto &entry : s->outstanding_packets) {
auto time_since_last_send = std::chrono::duration_cast<std::chrono::milliseconds>(now - entry.second.last_sent); auto time_since_last_send = std::chrono::duration_cast<std::chrono::milliseconds>(now - entry.second.last_sent);
if (entry.second.times_resent == 0) { if (entry.second.times_resent == 0) {
if ((size_t)time_since_last_send.count() > m_resend_delay) { if ((size_t)time_since_last_send.count() > m_resend_delay) {
@ -1051,8 +1077,8 @@ void EQ::Net::DaybreakConnection::Ack(int stream, uint16_t seq)
auto now = Clock::now(); auto now = Clock::now();
auto s = &m_streams[stream]; auto s = &m_streams[stream];
auto iter = s->sent_packets.begin(); auto iter = s->outstanding_packets.begin();
while (iter != s->sent_packets.end()) { while (iter != s->outstanding_packets.end()) {
auto order = CompareSequence(seq, iter->first); auto order = CompareSequence(seq, iter->first);
if (order != SequenceFuture) { if (order != SequenceFuture) {
@ -1063,7 +1089,8 @@ void EQ::Net::DaybreakConnection::Ack(int stream, uint16_t seq)
m_stats.last_ping = round_time; m_stats.last_ping = round_time;
m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3; m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3;
iter = s->sent_packets.erase(iter); s->outstanding_bytes -= iter->second.packet.Length();
iter = s->outstanding_packets.erase(iter);
} }
else { else {
++iter; ++iter;
@ -1075,8 +1102,8 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq)
{ {
auto now = Clock::now(); auto now = Clock::now();
auto s = &m_streams[stream]; auto s = &m_streams[stream];
auto iter = s->sent_packets.find(seq); auto iter = s->outstanding_packets.find(seq);
if (iter != s->sent_packets.end()) { if (iter != s->outstanding_packets.end()) {
uint64_t round_time = (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(now - iter->second.last_sent).count(); uint64_t round_time = (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(now - iter->second.last_sent).count();
m_stats.max_ping = std::max(m_stats.max_ping, round_time); m_stats.max_ping = std::max(m_stats.max_ping, round_time);
@ -1084,10 +1111,30 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq)
m_stats.last_ping = round_time; m_stats.last_ping = round_time;
m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3; m_rolling_ping = (m_rolling_ping * 2 + round_time) / 3;
s->sent_packets.erase(iter); s->outstanding_bytes -= iter->second.packet.Length();
s->outstanding_packets.erase(iter);
} }
} }
void EQ::Net::DaybreakConnection::BufferPacket(int stream, uint16_t seq, DaybreakSentPacket &sent)
{
auto s = &m_streams[stream];
//If we can send the packet then send it
//else buffer it to be sent when we can send it
if (s->outstanding_bytes + sent.packet.Length() >= m_owner->m_options.max_outstanding_bytes || s->outstanding_packets.size() + 1 >= m_owner->m_options.max_outstanding_packets) {
//Would go over one of the limits, buffer this packet.
DaybreakBufferedPacket bp;
bp.sent = std::move(sent);
bp.seq = seq;
s->buffered_packets.push_back(bp);
return;
}
s->outstanding_bytes += sent.packet.Length();
s->outstanding_packets.insert(std::make_pair(seq, sent));
InternalSend(sent.packet);
}
void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq) void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq)
{ {
DaybreakReliableHeader ack; DaybreakReliableHeader ack;
@ -1293,11 +1340,9 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id,
sent.last_sent = Clock::now(); sent.last_sent = Clock::now();
sent.first_sent = Clock::now(); sent.first_sent = Clock::now();
sent.times_resent = 0; sent.times_resent = 0;
stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); BufferPacket(stream_id, stream->sequence_out, sent);
stream->sequence_out++; stream->sequence_out++;
InternalBufferedSend(first_packet);
while (used < length) { while (used < length) {
auto left = length - used; auto left = length - used;
DynamicPacket packet; DynamicPacket packet;
@ -1321,10 +1366,8 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id,
sent.last_sent = Clock::now(); sent.last_sent = Clock::now();
sent.first_sent = Clock::now(); sent.first_sent = Clock::now();
sent.times_resent = 0; sent.times_resent = 0;
stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); BufferPacket(stream_id, stream->sequence_out, sent);
stream->sequence_out++; stream->sequence_out++;
InternalBufferedSend(packet);
} }
} }
else { else {
@ -1341,10 +1384,8 @@ void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id,
sent.last_sent = Clock::now(); sent.last_sent = Clock::now();
sent.first_sent = Clock::now(); sent.first_sent = Clock::now();
sent.times_resent = 0; sent.times_resent = 0;
stream->sent_packets.insert(std::make_pair(stream->sequence_out, sent)); BufferPacket(stream_id, stream->sequence_out, sent);
stream->sequence_out++; stream->sequence_out++;
InternalBufferedSend(packet);
} }
} }

View File

@ -9,7 +9,7 @@
#include <memory> #include <memory>
#include <map> #include <map>
#include <unordered_map> #include <unordered_map>
#include <queue> #include <deque>
#include <list> #include <list>
namespace EQ namespace EQ
@ -145,6 +145,12 @@ namespace EQ
size_t times_resent; size_t times_resent;
}; };
struct DaybreakBufferedPacket
{
uint16_t seq;
DaybreakSentPacket sent;
};
struct DaybreakStream struct DaybreakStream
{ {
DaybreakStream() { DaybreakStream() {
@ -152,17 +158,20 @@ namespace EQ
sequence_out = 0; sequence_out = 0;
fragment_current_bytes = 0; fragment_current_bytes = 0;
fragment_total_bytes = 0; fragment_total_bytes = 0;
outstanding_bytes = 0;
} }
uint16_t sequence_in; uint16_t sequence_in;
uint16_t sequence_out; uint16_t sequence_out;
std::unordered_map<uint16_t, Packet*> packet_queue; std::unordered_map<uint16_t, Packet*> packet_queue;
std::deque<DaybreakBufferedPacket> buffered_packets;
DynamicPacket fragment_packet; DynamicPacket fragment_packet;
uint32_t fragment_current_bytes; uint32_t fragment_current_bytes;
uint32_t fragment_total_bytes; uint32_t fragment_total_bytes;
std::unordered_map<uint16_t, DaybreakSentPacket> sent_packets; std::unordered_map<uint16_t, DaybreakSentPacket> outstanding_packets;
size_t outstanding_bytes;
}; };
DaybreakStream m_streams[4]; DaybreakStream m_streams[4];
@ -170,7 +179,8 @@ namespace EQ
void Process(); void Process();
void ProcessPacket(Packet &p); void ProcessPacket(Packet &p);
void ProcessQueue(); void ProcessInboundQueue();
void ProcessOutboundQueue();
void RemoveFromQueue(int stream, uint16_t seq); void RemoveFromQueue(int stream, uint16_t seq);
void AddToQueue(int stream, uint16_t seq, const Packet &p); void AddToQueue(int stream, uint16_t seq, const Packet &p);
void ProcessDecodedPacket(const Packet &p); void ProcessDecodedPacket(const Packet &p);
@ -186,6 +196,7 @@ namespace EQ
void ProcessResend(int stream); void ProcessResend(int stream);
void Ack(int stream, uint16_t seq); void Ack(int stream, uint16_t seq);
void OutOfOrderAck(int stream, uint16_t seq); void OutOfOrderAck(int stream, uint16_t seq);
void BufferPacket(int stream, uint16_t seq, DaybreakSentPacket &sent);
void SendConnect(); void SendConnect();
void SendKeepAlive(); void SendKeepAlive();
@ -225,6 +236,8 @@ namespace EQ
tic_rate_hertz = 60.0; tic_rate_hertz = 60.0;
resend_timeout = 90000; resend_timeout = 90000;
connection_close_time = 2000; connection_close_time = 2000;
max_outstanding_packets = 200;
max_outstanding_bytes = 200 * 512;
} }
size_t max_packet_size; size_t max_packet_size;
@ -247,6 +260,8 @@ namespace EQ
size_t connection_close_time; size_t connection_close_time;
DaybreakEncodeType encode_passes[2]; DaybreakEncodeType encode_passes[2];
int port; int port;
size_t max_outstanding_packets;
size_t max_outstanding_bytes;
}; };
class DaybreakConnectionManager class DaybreakConnectionManager