mirror of
https://github.com/EQEmu/Server.git
synced 2025-12-13 10:31:29 +00:00
[Network] UDP Receive buffer pool
This commit is contained in:
parent
c966f26ac1
commit
df628cd228
@ -53,25 +53,7 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
|
|||||||
uv_ip4_addr("0.0.0.0", m_options.port, &recv_addr);
|
uv_ip4_addr("0.0.0.0", m_options.port, &recv_addr);
|
||||||
int rc = uv_udp_bind(&m_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR);
|
int rc = uv_udp_bind(&m_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR);
|
||||||
|
|
||||||
rc = uv_udp_recv_start(&m_socket,
|
rc = uv_udp_recv_start(&m_socket, AllocCallback, RecvCallback);
|
||||||
[](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
|
||||||
buf->base = new char[suggested_size];
|
|
||||||
memset(buf->base, 0, suggested_size);
|
|
||||||
buf->len = suggested_size;
|
|
||||||
},
|
|
||||||
[](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) {
|
|
||||||
DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data;
|
|
||||||
if (nread < 0 || addr == nullptr) {
|
|
||||||
delete[] buf->base;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
char endpoint[16];
|
|
||||||
uv_ip4_name((const sockaddr_in*)addr, endpoint, 16);
|
|
||||||
auto port = ntohs(((const sockaddr_in*)addr)->sin_port);
|
|
||||||
c->ProcessPacket(endpoint, port, buf->base, nread);
|
|
||||||
delete[] buf->base;
|
|
||||||
});
|
|
||||||
|
|
||||||
m_attached = loop;
|
m_attached = loop;
|
||||||
}
|
}
|
||||||
@ -523,6 +505,12 @@ void EQ::Net::DaybreakConnection::AddToQueue(int stream, uint16_t seq, const Pac
|
|||||||
{
|
{
|
||||||
auto s = &m_streams[stream];
|
auto s = &m_streams[stream];
|
||||||
auto iter = s->packet_queue.find(seq);
|
auto iter = s->packet_queue.find(seq);
|
||||||
|
|
||||||
|
if (s->packet_queue.bucket_count() < seq) {
|
||||||
|
std::cout << "Resizing packet queue to " << seq << std::endl;
|
||||||
|
s->packet_queue.reserve(seq);
|
||||||
|
}
|
||||||
|
|
||||||
if (iter == s->packet_queue.end()) {
|
if (iter == s->packet_queue.end()) {
|
||||||
DynamicPacket *out = new DynamicPacket();
|
DynamicPacket *out = new DynamicPacket();
|
||||||
out->PutPacket(0, p);
|
out->PutPacket(0, p);
|
||||||
@ -1207,7 +1195,7 @@ void EQ::Net::DaybreakConnection::UpdateDataBudget(double budget_add)
|
|||||||
|
|
||||||
void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq)
|
void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq)
|
||||||
{
|
{
|
||||||
DaybreakReliableHeader ack;
|
static DaybreakReliableHeader ack = {};
|
||||||
ack.zero = 0;
|
ack.zero = 0;
|
||||||
ack.opcode = OP_Ack + stream_id;
|
ack.opcode = OP_Ack + stream_id;
|
||||||
ack.sequence = HostToNetwork(seq);
|
ack.sequence = HostToNetwork(seq);
|
||||||
|
|||||||
@ -11,6 +11,27 @@
|
|||||||
#include <queue>
|
#include <queue>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
|
class BufferPool {
|
||||||
|
public:
|
||||||
|
// Allocate a buffer from the pool or create a new one
|
||||||
|
uv_buf_t AllocateBuffer(size_t size) {
|
||||||
|
if (!pool.empty()) {
|
||||||
|
auto buf = std::move(pool.back());
|
||||||
|
pool.pop_back();
|
||||||
|
return uv_buf_init(buf.release(), size);
|
||||||
|
}
|
||||||
|
return uv_buf_init(new char[size], size);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a buffer to the pool for reuse
|
||||||
|
void ReleaseBuffer(char* buffer) {
|
||||||
|
pool.emplace_back(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::vector<std::unique_ptr<char[]>> pool; // Pool of reusable buffers
|
||||||
|
};
|
||||||
|
|
||||||
namespace EQ
|
namespace EQ
|
||||||
{
|
{
|
||||||
namespace Net
|
namespace Net
|
||||||
@ -201,7 +222,7 @@ namespace EQ
|
|||||||
|
|
||||||
uint16_t sequence_in;
|
uint16_t sequence_in;
|
||||||
uint16_t sequence_out;
|
uint16_t sequence_out;
|
||||||
std::map<uint16_t, Packet*> packet_queue;
|
std::unordered_map<uint16_t, std::unique_ptr<Packet>> packet_queue;
|
||||||
|
|
||||||
DynamicPacket fragment_packet;
|
DynamicPacket fragment_packet;
|
||||||
uint32_t fragment_current_bytes;
|
uint32_t fragment_current_bytes;
|
||||||
@ -318,6 +339,35 @@ namespace EQ
|
|||||||
void Attach(uv_loop_t *loop);
|
void Attach(uv_loop_t *loop);
|
||||||
void Detach();
|
void Detach();
|
||||||
|
|
||||||
|
static void AllocCallback(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
|
auto* manager = static_cast<DaybreakConnectionManager*>(handle->data);
|
||||||
|
*buf = manager->buffer_pool.AllocateBuffer(suggested_size); // Use buffer pool
|
||||||
|
}
|
||||||
|
|
||||||
|
static void RecvCallback(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) {
|
||||||
|
auto* manager = static_cast<DaybreakConnectionManager*>(handle->data);
|
||||||
|
|
||||||
|
if (nread < 0 || addr == nullptr) {
|
||||||
|
manager->buffer_pool.ReleaseBuffer(buf->base); // Return buffer to pool
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get endpoint details
|
||||||
|
char endpoint[16];
|
||||||
|
uv_ip4_name((const sockaddr_in*)addr, endpoint, sizeof(endpoint));
|
||||||
|
auto port = ntohs(((const sockaddr_in*)addr)->sin_port);
|
||||||
|
|
||||||
|
std::cout << "Received " << nread << " bytes from " << endpoint << ":" << port << std::endl;
|
||||||
|
|
||||||
|
// Process the packet
|
||||||
|
manager->ProcessPacket(endpoint, port, buf->base, nread);
|
||||||
|
|
||||||
|
// Release buffer back to the pool
|
||||||
|
manager->buffer_pool.ReleaseBuffer(buf->base);
|
||||||
|
}
|
||||||
|
|
||||||
|
BufferPool buffer_pool;
|
||||||
|
|
||||||
EQ::Random m_rand;
|
EQ::Random m_rand;
|
||||||
uv_timer_t m_timer;
|
uv_timer_t m_timer;
|
||||||
uv_udp_t m_socket;
|
uv_udp_t m_socket;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user