Resend changes

This commit is contained in:
Akkadius 2025-01-28 00:13:43 -06:00
parent 6b548576da
commit d69225d1eb
2 changed files with 110 additions and 99 deletions

View File

@ -42,11 +42,19 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
auto update_rate = (uint64_t)(1000.0 / m_options.tic_rate_hertz);
uv_timer_start(&m_timer, [](uv_timer_t *handle) {
DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data;
c->UpdateDataBudget();
c->Process();
c->ProcessResend();
auto now = Clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(now - c->m_last_resend_check).count() > 500) {
c->ProcessResend();
c->m_last_resend_check = Clock::now();
return;
}
}, update_rate, update_rate);
uv_udp_init(loop, &m_socket);
@ -60,31 +68,35 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
static std::atomic<int> buffer_index(0); // Atomic to track the current buffer index
// Select the next static buffer in a round-robin manner
rc = uv_udp_recv_start(&m_socket,
[](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
int current_buffer_index = buffer_index.fetch_add(1) % NUM_STATIC_BUFFERS;
// std::cout << "recv current_buffer_index: " << current_buffer_index << " current_thread_id: " << std::this_thread::get_id() << std::endl;
char* buffer = static_buffers[current_buffer_index];
rc = uv_udp_recv_start(
&m_socket,
[](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
int current_buffer_index = buffer_index.fetch_add(1) % NUM_STATIC_BUFFERS;
char *buffer = static_buffers[current_buffer_index];
// clear the buffer
memset(buffer, 0, 512);
buf->base = buffer;
buf->len = suggested_size;
buf->len = suggested_size;
},
[](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) {
DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data;
if (nread < 0 || addr == nullptr) {
return;
[](
uv_udp_t *handle,
ssize_t nread,
const uv_buf_t *buf,
const struct sockaddr *addr,
unsigned flags
) {
DaybreakConnectionManager *c = (DaybreakConnectionManager *) handle->data;
if (nread < 0 || addr == nullptr) {
return;
}
char endpoint[16];
uv_ip4_name((const sockaddr_in *) addr, endpoint, 16);
auto port = ntohs(((const sockaddr_in *) addr)->sin_port);
c->ProcessPacket(endpoint, port, buf->base, nread);
}
// std::cout << "recv nread: " << nread << " current_thread_id: " << std::this_thread::get_id() << std::endl;
char endpoint[16];
uv_ip4_name((const sockaddr_in*)addr, endpoint, 16);
auto port = ntohs(((const sockaddr_in*)addr)->sin_port);
c->ProcessPacket(endpoint, port, buf->base, nread);
});
);
m_attached = loop;
}
@ -201,9 +213,7 @@ void EQ::Net::DaybreakConnectionManager::ProcessResend()
auto iter = m_connections.begin();
while (iter != m_connections.end()) {
auto &connection = iter->second;
auto status = connection->m_status;
switch (status)
switch (connection->m_status)
{
case StatusConnected:
case StatusDisconnecting:
@ -292,11 +302,13 @@ void EQ::Net::DaybreakConnectionManager::SendDisconnect(const std::string &addr,
memcpy(data, out.Data(), out.Length());
send_buffers[0] = uv_buf_init(data, out.Length());
send_req->data = send_buffers[0].base;
int ret = uv_udp_send(send_req, &m_socket, send_buffers, 1, (sockaddr*)&send_addr,
[](uv_udp_send_t* req, int status) {
delete[](char*)req->data;
delete req;
});
int ret = uv_udp_send(
send_req, &m_socket, send_buffers, 1, (sockaddr *) &send_addr,
[](uv_udp_send_t *req, int status) {
delete[](char *) req->data;
delete req;
}
);
}
//new connection made as server
@ -538,7 +550,6 @@ void EQ::Net::DaybreakConnection::AddToQueue(int stream, uint16_t seq, const Pac
auto iter = s->packet_queue.find(seq);
if (s->packet_queue.bucket_count() < seq) {
// std::cout << "Resizing packet queue to " << seq << std::endl;
s->packet_queue.reserve(seq);
}
@ -608,14 +619,14 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p)
(*(current + 4) << 16) |
(*(current + 5) << 8) |
(*(current + 6))
);
);
current += 7;
}
else {
subpacket_length = (uint32_t)(
(*(current + 1) << 8) |
(*(current + 2))
);
);
current += 3;
}
}
@ -1116,63 +1127,56 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
return;
}
auto resends = 0;
auto now = Clock::now();
auto s = &m_streams[stream];
for (auto &entry : s->sent_packets) {
auto time_since_last_send = std::chrono::duration_cast<std::chrono::milliseconds>(now - entry.second.last_sent);
if (entry.second.times_resent == 0) {
if ((size_t)time_since_last_send.count() > entry.second.resend_delay) {
auto &p = entry.second.packet;
if (p.Length() >= DaybreakHeader::size()) {
if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) {
m_stats.resent_fragments++;
}
else {
m_stats.resent_full++;
}
}
else {
m_stats.resent_full++;
}
m_stats.resent_packets++;
if (m_streams[stream].sent_packets.empty()) {
return;
}
InternalBufferedSend(p);
entry.second.last_sent = now;
entry.second.times_resent++;
entry.second.resend_delay = EQ::Clamp(entry.second.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max);
resends++;
}
}
else {
auto time_since_first_sent = std::chrono::duration_cast<std::chrono::milliseconds>(now - entry.second.first_sent);
if (time_since_first_sent.count() >= m_owner->m_options.resend_timeout) {
std::cout << "Processing resend for stream " << stream << std::endl;
auto now = Clock::now(); // Current time
auto s = &m_streams[stream];
// Get a reference resend delay (assume first packet represents the typical case)
size_t reference_resend_delay = 0;
if (!s->sent_packets.empty()) {
reference_resend_delay = s->sent_packets.begin()->second.resend_delay;
}
std::cout << "Resending packet count " << s->sent_packets.size() << " reference_resend_delay " << reference_resend_delay << std::endl;
for (auto& entry : s->sent_packets) {
auto& packet_info = entry.second;
// Handle timeout only if the packet has been resent
if (packet_info.times_resent > 0) {
auto time_since_first_sent = std::chrono::duration_cast<std::chrono::milliseconds>(now - packet_info.first_sent).count();
if (time_since_first_sent >= m_owner->m_options.resend_timeout) {
Close();
return;
}
if ((size_t)time_since_last_send.count() > entry.second.resend_delay) {
auto &p = entry.second.packet;
if (p.Length() >= DaybreakHeader::size()) {
if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) {
m_stats.resent_fragments++;
}
else {
m_stats.resent_full++;
}
}
else {
m_stats.resent_full++;
}
m_stats.resent_packets++;
InternalBufferedSend(p);
entry.second.last_sent = now;
entry.second.times_resent++;
entry.second.resend_delay = EQ::Clamp(entry.second.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max);
resends++;
}
}
// Increment resend statistics
auto& p = packet_info.packet;
if (p.Length() >= DaybreakHeader::size()) {
if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) {
m_stats.resent_fragments++;
} else {
m_stats.resent_full++;
}
} else {
m_stats.resent_full++;
}
m_stats.resent_packets++;
// std::cout << "Resending packet " << p.Length() << " on stream " << stream << " times_resent "
// << packet_info.times_resent << " resend_delay " << packet_info.resend_delay << std::endl;
// Resend the packet
InternalBufferedSend(p);
packet_info.last_sent = now;
packet_info.times_resent++;
packet_info.resend_delay = EQ::Clamp(packet_info.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max);
}
}
@ -1350,22 +1354,26 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
DynamicPacket out;
out.PutPacket(0, p);
for (int i = 0; i < 2; ++i) {
switch (m_encode_passes[i]) {
case EncodeCompression:
if (out.GetInt8(0) == 0)
Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
else
Compress(out, 1, out.Length() - 1);
break;
case EncodeXOR:
if (out.GetInt8(0) == 0)
Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
else
Encode(out, 1, out.Length() - 1);
break;
default:
break;
for (auto &m_encode_passe: m_encode_passes) {
switch (m_encode_passe) {
case EncodeCompression:
if (out.GetInt8(0) == 0) {
Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
}
else {
Compress(out, 1, out.Length() - 1);
}
break;
case EncodeXOR:
if (out.GetInt8(0) == 0) {
Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
}
else {
Encode(out, 1, out.Length() - 1);
}
break;
default:
break;
}
}

View File

@ -10,6 +10,7 @@
#include <map>
#include <queue>
#include <list>
#include <set>
namespace EQ
{
@ -318,6 +319,8 @@ namespace EQ
void Attach(uv_loop_t *loop);
void Detach();
Timestamp m_last_resend_check;
EQ::Random m_rand;
uv_timer_t m_timer;
uv_udp_t m_socket;