From 8e8c0b9974b79ca1f1a8bb573648f40b3ae12e86 Mon Sep 17 00:00:00 2001 From: Akkadius Date: Mon, 27 Jan 2025 01:15:27 -0600 Subject: [PATCH] Test --- common/net/daybreak_connection.cpp | 119 ++++++++++++++++------------- common/net/daybreak_connection.h | 43 ++++++++++- 2 files changed, 105 insertions(+), 57 deletions(-) diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 504216223..8d990b76c 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -1281,99 +1281,108 @@ void EQ::Net::DaybreakConnection::SendKeepAlive() InternalSend(p); } -void EQ::Net::DaybreakConnection::InternalSend(Packet &p) -{ +void EQ::Net::DaybreakConnection::InternalSend(Packet& p) { if (m_owner->m_options.outgoing_data_rate > 0.0) { auto new_budget = m_outgoing_budget - (p.Length() / 1024.0); if (new_budget <= 0.0) { m_stats.dropped_datarate_packets++; return; } - else { - m_outgoing_budget = new_budget; - } + m_outgoing_budget = new_budget; } m_last_send = Clock::now(); auto send_func = [](uv_udp_send_t* req, int status) { - delete[](char*)req->data; - delete req; + if (status < 0) { + std::cerr << "uv_udp_send failed: " << uv_strerror(status) << std::endl; + } + + auto buffer = static_cast(req->data); + auto* manager = static_cast(req->handle->data); + + // Verify buffer validity before release + std::cout << "[SEND FUNC] Releasing buffer: " << static_cast(buffer) << std::endl; + manager->send_buffer_pool.Release(buffer); + + delete req; // Properly clean up send_req }; - if (PacketCanBeEncoded(p)) { + uv_udp_send_t* send_req = new uv_udp_send_t; + memset(send_req, 0, sizeof(*send_req)); + sockaddr_in send_addr; + uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); + + uv_buf_t send_buffers[1]; + char* data = nullptr; + + if (PacketCanBeEncoded(p)) { m_stats.bytes_before_encode += p.Length(); + // Encode and compress the packet DynamicPacket out; out.PutPacket(0, p); for (int i = 0; i < 2; ++i) { switch (m_encode_passes[i]) { - case EncodeCompression: - if (out.GetInt8(0) == 0) - Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Compress(out, 1, out.Length() - 1); - break; - case EncodeXOR: - if (out.GetInt8(0) == 0) - Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Encode(out, 1, out.Length() - 1); - break; - default: - break; + case EncodeCompression: + if (out.GetInt8(0) == 0) { + std::cout << "Before compression: Length = " << out.Length() << std::endl; + Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); + std::cout << "After compression: Length = " << out.Length() << std::endl; + } + else { + Compress(out, 1, out.Length() - 1); + } + break; + case EncodeXOR: + if (out.GetInt8(0) == 0) { + Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); + } + else { + Encode(out, 1, out.Length() - 1); + } + break; + default: + break; } } AppendCRC(out); - uv_udp_send_t *send_req = new uv_udp_send_t; - memset(send_req, 0, sizeof(*send_req)); - sockaddr_in send_addr; - uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); - uv_buf_t send_buffers[1]; - - char *data = new char[out.Length()]; + // Allocate buffer from the pool + data = m_owner->send_buffer_pool.Allocate(out.Length()); memcpy(data, out.Data(), out.Length()); send_buffers[0] = uv_buf_init(data, out.Length()); - send_req->data = send_buffers[0].base; + send_req->data = data; m_stats.sent_bytes += out.Length(); m_stats.sent_packets++; - if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; - delete send_req; - return; - } + } else { + // Allocate buffer for raw packet + data = m_owner->send_buffer_pool.Allocate(p.Length()); + memcpy(data, p.Data(), p.Length()); + send_buffers[0] = uv_buf_init(data, p.Length()); + send_req->data = data; - uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); - return; + m_stats.sent_bytes += p.Length(); + m_stats.sent_packets++; } - m_stats.bytes_before_encode += p.Length(); - - uv_udp_send_t *send_req = new uv_udp_send_t; - sockaddr_in send_addr; - uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); - uv_buf_t send_buffers[1]; - - char *data = new char[p.Length()]; - memcpy(data, p.Data(), p.Length()); - send_buffers[0] = uv_buf_init(data, p.Length()); - send_req->data = send_buffers[0].base; - - m_stats.sent_bytes += p.Length(); - m_stats.sent_packets++; - - if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; + if (m_owner->m_options.simulated_out_packet_loss && + m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { + m_owner->send_buffer_pool.Release(data); delete send_req; return; } - uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); + int rc = uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); + if (rc < 0) { + std::cerr << "uv_udp_send failed: " << uv_strerror(rc) << std::endl; + m_owner->send_buffer_pool.Release(data); + delete send_req; + } } void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable) diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index 00ba117cd..d097c6241 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -11,7 +11,7 @@ #include #include -class BufferPool { +class UdpReceiveBufferPool { public: // Allocate a buffer from the pool or create a new one uv_buf_t AllocateBuffer(size_t size) { @@ -30,8 +30,46 @@ public: private: std::vector> pool; // Pool of reusable buffers + std::vector send_req_pool; // Pool of reusable uv_udp_send_t objects }; +class BufferPool { +public: + char* Allocate(size_t size) { + std::lock_guard lock(mutex_); + if (!pool_.empty()) { + char* buffer = pool_.back(); + pool_.pop_back(); + allocated_.insert(buffer); + std::cout << "[ALLOCATE] Reusing buffer: " << static_cast(buffer) << std::endl; + return buffer; + } + char* buffer = new char[size]; + allocated_.insert(buffer); + std::cout << "[ALLOCATE] New buffer: " << static_cast(buffer) << std::endl; + return buffer; + } + + void Release(char* buffer) { + std::lock_guard lock(mutex_); + if (allocated_.find(buffer) == allocated_.end()) { + std::cerr << "[ERROR] Attempt to release unallocated or already released buffer: " + << static_cast(buffer) << std::endl; + return; + } + allocated_.erase(buffer); + pool_.push_back(buffer); + std::cout << "[RELEASE] Buffer released: " << static_cast(buffer) << std::endl; + } + +private: + std::vector pool_; + std::unordered_set allocated_; + std::mutex mutex_; +}; + + + namespace EQ { namespace Net @@ -366,7 +404,8 @@ namespace EQ manager->buffer_pool.ReleaseBuffer(buf->base); } - BufferPool buffer_pool; + UdpReceiveBufferPool buffer_pool; + BufferPool send_buffer_pool; EQ::Random m_rand; uv_timer_t m_timer;