This commit is contained in:
Akkadius 2025-01-27 01:15:27 -06:00
parent df628cd228
commit 8e8c0b9974
2 changed files with 105 additions and 57 deletions

View File

@ -1281,99 +1281,108 @@ void EQ::Net::DaybreakConnection::SendKeepAlive()
InternalSend(p);
}
void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
{
void EQ::Net::DaybreakConnection::InternalSend(Packet& p) {
if (m_owner->m_options.outgoing_data_rate > 0.0) {
auto new_budget = m_outgoing_budget - (p.Length() / 1024.0);
if (new_budget <= 0.0) {
m_stats.dropped_datarate_packets++;
return;
}
else {
m_outgoing_budget = new_budget;
}
m_outgoing_budget = new_budget;
}
m_last_send = Clock::now();
auto send_func = [](uv_udp_send_t* req, int status) {
delete[](char*)req->data;
delete req;
if (status < 0) {
std::cerr << "uv_udp_send failed: " << uv_strerror(status) << std::endl;
}
auto buffer = static_cast<char*>(req->data);
auto* manager = static_cast<DaybreakConnectionManager*>(req->handle->data);
// Verify buffer validity before release
std::cout << "[SEND FUNC] Releasing buffer: " << static_cast<void*>(buffer) << std::endl;
manager->send_buffer_pool.Release(buffer);
delete req; // Properly clean up send_req
};
if (PacketCanBeEncoded(p)) {
uv_udp_send_t* send_req = new uv_udp_send_t;
memset(send_req, 0, sizeof(*send_req));
sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char* data = nullptr;
if (PacketCanBeEncoded(p)) {
m_stats.bytes_before_encode += p.Length();
// Encode and compress the packet
DynamicPacket out;
out.PutPacket(0, p);
for (int i = 0; i < 2; ++i) {
switch (m_encode_passes[i]) {
case EncodeCompression:
if (out.GetInt8(0) == 0)
Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
else
Compress(out, 1, out.Length() - 1);
break;
case EncodeXOR:
if (out.GetInt8(0) == 0)
Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
else
Encode(out, 1, out.Length() - 1);
break;
default:
break;
case EncodeCompression:
if (out.GetInt8(0) == 0) {
std::cout << "Before compression: Length = " << out.Length() << std::endl;
Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
std::cout << "After compression: Length = " << out.Length() << std::endl;
}
else {
Compress(out, 1, out.Length() - 1);
}
break;
case EncodeXOR:
if (out.GetInt8(0) == 0) {
Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
}
else {
Encode(out, 1, out.Length() - 1);
}
break;
default:
break;
}
}
AppendCRC(out);
uv_udp_send_t *send_req = new uv_udp_send_t;
memset(send_req, 0, sizeof(*send_req));
sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char *data = new char[out.Length()];
// Allocate buffer from the pool
data = m_owner->send_buffer_pool.Allocate(out.Length());
memcpy(data, out.Data(), out.Length());
send_buffers[0] = uv_buf_init(data, out.Length());
send_req->data = send_buffers[0].base;
send_req->data = data;
m_stats.sent_bytes += out.Length();
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete[](char*)send_req->data;
delete send_req;
return;
}
} else {
// Allocate buffer for raw packet
data = m_owner->send_buffer_pool.Allocate(p.Length());
memcpy(data, p.Data(), p.Length());
send_buffers[0] = uv_buf_init(data, p.Length());
send_req->data = data;
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
return;
m_stats.sent_bytes += p.Length();
m_stats.sent_packets++;
}
m_stats.bytes_before_encode += p.Length();
uv_udp_send_t *send_req = new uv_udp_send_t;
sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char *data = new char[p.Length()];
memcpy(data, p.Data(), p.Length());
send_buffers[0] = uv_buf_init(data, p.Length());
send_req->data = send_buffers[0].base;
m_stats.sent_bytes += p.Length();
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete[](char*)send_req->data;
if (m_owner->m_options.simulated_out_packet_loss &&
m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
m_owner->send_buffer_pool.Release(data);
delete send_req;
return;
}
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
int rc = uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
if (rc < 0) {
std::cerr << "uv_udp_send failed: " << uv_strerror(rc) << std::endl;
m_owner->send_buffer_pool.Release(data);
delete send_req;
}
}
void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable)

View File

@ -11,7 +11,7 @@
#include <queue>
#include <list>
class BufferPool {
class UdpReceiveBufferPool {
public:
// Allocate a buffer from the pool or create a new one
uv_buf_t AllocateBuffer(size_t size) {
@ -30,8 +30,46 @@ public:
private:
std::vector<std::unique_ptr<char[]>> pool; // Pool of reusable buffers
std::vector<uv_udp_send_t*> send_req_pool; // Pool of reusable uv_udp_send_t objects
};
class BufferPool {
public:
char* Allocate(size_t size) {
std::lock_guard<std::mutex> lock(mutex_);
if (!pool_.empty()) {
char* buffer = pool_.back();
pool_.pop_back();
allocated_.insert(buffer);
std::cout << "[ALLOCATE] Reusing buffer: " << static_cast<void*>(buffer) << std::endl;
return buffer;
}
char* buffer = new char[size];
allocated_.insert(buffer);
std::cout << "[ALLOCATE] New buffer: " << static_cast<void*>(buffer) << std::endl;
return buffer;
}
void Release(char* buffer) {
std::lock_guard<std::mutex> lock(mutex_);
if (allocated_.find(buffer) == allocated_.end()) {
std::cerr << "[ERROR] Attempt to release unallocated or already released buffer: "
<< static_cast<void*>(buffer) << std::endl;
return;
}
allocated_.erase(buffer);
pool_.push_back(buffer);
std::cout << "[RELEASE] Buffer released: " << static_cast<void*>(buffer) << std::endl;
}
private:
std::vector<char*> pool_;
std::unordered_set<char*> allocated_;
std::mutex mutex_;
};
namespace EQ
{
namespace Net
@ -366,7 +404,8 @@ namespace EQ
manager->buffer_pool.ReleaseBuffer(buf->base);
}
BufferPool buffer_pool;
UdpReceiveBufferPool buffer_pool;
BufferPool send_buffer_pool;
EQ::Random m_rand;
uv_timer_t m_timer;