diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 0ae462141..980dd3cf4 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -42,11 +42,19 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop) auto update_rate = (uint64_t)(1000.0 / m_options.tic_rate_hertz); + uv_timer_start(&m_timer, [](uv_timer_t *handle) { DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data; c->UpdateDataBudget(); c->Process(); - c->ProcessResend(); + + auto now = Clock::now(); + + if (std::chrono::duration_cast(now - c->m_last_resend_check).count() > 500) { + c->ProcessResend(); + c->m_last_resend_check = Clock::now(); + return; + } }, update_rate, update_rate); uv_udp_init(loop, &m_socket); @@ -60,31 +68,35 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop) static std::atomic buffer_index(0); // Atomic to track the current buffer index // Select the next static buffer in a round-robin manner - - rc = uv_udp_recv_start(&m_socket, - [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - int current_buffer_index = buffer_index.fetch_add(1) % NUM_STATIC_BUFFERS; -// std::cout << "recv current_buffer_index: " << current_buffer_index << " current_thread_id: " << std::this_thread::get_id() << std::endl; - char* buffer = static_buffers[current_buffer_index]; + rc = uv_udp_recv_start( + &m_socket, + [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + int current_buffer_index = buffer_index.fetch_add(1) % NUM_STATIC_BUFFERS; + char *buffer = static_buffers[current_buffer_index]; // clear the buffer memset(buffer, 0, 512); buf->base = buffer; - buf->len = suggested_size; + buf->len = suggested_size; }, - [](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { - DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data; - if (nread < 0 || addr == nullptr) { - return; + []( + uv_udp_t *handle, + ssize_t nread, + const uv_buf_t *buf, + const struct sockaddr *addr, + unsigned flags + ) { + DaybreakConnectionManager *c = (DaybreakConnectionManager *) handle->data; + if (nread < 0 || addr == nullptr) { + return; + } + + char endpoint[16]; + uv_ip4_name((const sockaddr_in *) addr, endpoint, 16); + auto port = ntohs(((const sockaddr_in *) addr)->sin_port); + c->ProcessPacket(endpoint, port, buf->base, nread); } - -// std::cout << "recv nread: " << nread << " current_thread_id: " << std::this_thread::get_id() << std::endl; - - char endpoint[16]; - uv_ip4_name((const sockaddr_in*)addr, endpoint, 16); - auto port = ntohs(((const sockaddr_in*)addr)->sin_port); - c->ProcessPacket(endpoint, port, buf->base, nread); - }); + ); m_attached = loop; } @@ -201,9 +213,7 @@ void EQ::Net::DaybreakConnectionManager::ProcessResend() auto iter = m_connections.begin(); while (iter != m_connections.end()) { auto &connection = iter->second; - auto status = connection->m_status; - - switch (status) + switch (connection->m_status) { case StatusConnected: case StatusDisconnecting: @@ -292,11 +302,13 @@ void EQ::Net::DaybreakConnectionManager::SendDisconnect(const std::string &addr, memcpy(data, out.Data(), out.Length()); send_buffers[0] = uv_buf_init(data, out.Length()); send_req->data = send_buffers[0].base; - int ret = uv_udp_send(send_req, &m_socket, send_buffers, 1, (sockaddr*)&send_addr, - [](uv_udp_send_t* req, int status) { - delete[](char*)req->data; - delete req; - }); + int ret = uv_udp_send( + send_req, &m_socket, send_buffers, 1, (sockaddr *) &send_addr, + [](uv_udp_send_t *req, int status) { + delete[](char *) req->data; + delete req; + } + ); } //new connection made as server @@ -538,7 +550,6 @@ void EQ::Net::DaybreakConnection::AddToQueue(int stream, uint16_t seq, const Pac auto iter = s->packet_queue.find(seq); if (s->packet_queue.bucket_count() < seq) { -// std::cout << "Resizing packet queue to " << seq << std::endl; s->packet_queue.reserve(seq); } @@ -608,14 +619,14 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p) (*(current + 4) << 16) | (*(current + 5) << 8) | (*(current + 6)) - ); + ); current += 7; } else { subpacket_length = (uint32_t)( (*(current + 1) << 8) | (*(current + 2)) - ); + ); current += 3; } } @@ -1116,63 +1127,56 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) return; } - auto resends = 0; - auto now = Clock::now(); - auto s = &m_streams[stream]; - for (auto &entry : s->sent_packets) { - auto time_since_last_send = std::chrono::duration_cast(now - entry.second.last_sent); - if (entry.second.times_resent == 0) { - if ((size_t)time_since_last_send.count() > entry.second.resend_delay) { - auto &p = entry.second.packet; - if (p.Length() >= DaybreakHeader::size()) { - if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) { - m_stats.resent_fragments++; - } - else { - m_stats.resent_full++; - } - } - else { - m_stats.resent_full++; - } - m_stats.resent_packets++; + if (m_streams[stream].sent_packets.empty()) { + return; + } - InternalBufferedSend(p); - entry.second.last_sent = now; - entry.second.times_resent++; - entry.second.resend_delay = EQ::Clamp(entry.second.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max); - resends++; - } - } - else { - auto time_since_first_sent = std::chrono::duration_cast(now - entry.second.first_sent); - if (time_since_first_sent.count() >= m_owner->m_options.resend_timeout) { + std::cout << "Processing resend for stream " << stream << std::endl; + + auto now = Clock::now(); // Current time + auto s = &m_streams[stream]; + + // Get a reference resend delay (assume first packet represents the typical case) + size_t reference_resend_delay = 0; + if (!s->sent_packets.empty()) { + reference_resend_delay = s->sent_packets.begin()->second.resend_delay; + } + + std::cout << "Resending packet count " << s->sent_packets.size() << " reference_resend_delay " << reference_resend_delay << std::endl; + + for (auto& entry : s->sent_packets) { + auto& packet_info = entry.second; + + // Handle timeout only if the packet has been resent + if (packet_info.times_resent > 0) { + auto time_since_first_sent = std::chrono::duration_cast(now - packet_info.first_sent).count(); + if (time_since_first_sent >= m_owner->m_options.resend_timeout) { Close(); return; } - - if ((size_t)time_since_last_send.count() > entry.second.resend_delay) { - auto &p = entry.second.packet; - if (p.Length() >= DaybreakHeader::size()) { - if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) { - m_stats.resent_fragments++; - } - else { - m_stats.resent_full++; - } - } - else { - m_stats.resent_full++; - } - m_stats.resent_packets++; - - InternalBufferedSend(p); - entry.second.last_sent = now; - entry.second.times_resent++; - entry.second.resend_delay = EQ::Clamp(entry.second.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max); - resends++; - } } + + // Increment resend statistics + auto& p = packet_info.packet; + if (p.Length() >= DaybreakHeader::size()) { + if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) { + m_stats.resent_fragments++; + } else { + m_stats.resent_full++; + } + } else { + m_stats.resent_full++; + } + m_stats.resent_packets++; + +// std::cout << "Resending packet " << p.Length() << " on stream " << stream << " times_resent " +// << packet_info.times_resent << " resend_delay " << packet_info.resend_delay << std::endl; + + // Resend the packet + InternalBufferedSend(p); + packet_info.last_sent = now; + packet_info.times_resent++; + packet_info.resend_delay = EQ::Clamp(packet_info.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max); } } @@ -1350,22 +1354,26 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p) DynamicPacket out; out.PutPacket(0, p); - for (int i = 0; i < 2; ++i) { - switch (m_encode_passes[i]) { - case EncodeCompression: - if (out.GetInt8(0) == 0) - Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Compress(out, 1, out.Length() - 1); - break; - case EncodeXOR: - if (out.GetInt8(0) == 0) - Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Encode(out, 1, out.Length() - 1); - break; - default: - break; + for (auto &m_encode_passe: m_encode_passes) { + switch (m_encode_passe) { + case EncodeCompression: + if (out.GetInt8(0) == 0) { + Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); + } + else { + Compress(out, 1, out.Length() - 1); + } + break; + case EncodeXOR: + if (out.GetInt8(0) == 0) { + Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); + } + else { + Encode(out, 1, out.Length() - 1); + } + break; + default: + break; } } diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index 2caa63b49..e62baabfc 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace EQ { @@ -318,6 +319,8 @@ namespace EQ void Attach(uv_loop_t *loop); void Detach(); + Timestamp m_last_resend_check; + EQ::Random m_rand; uv_timer_t m_timer; uv_udp_t m_socket;