diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 0fefc9982..504216223 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -53,25 +53,7 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop) uv_ip4_addr("0.0.0.0", m_options.port, &recv_addr); int rc = uv_udp_bind(&m_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR); - rc = uv_udp_recv_start(&m_socket, - [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = new char[suggested_size]; - memset(buf->base, 0, suggested_size); - buf->len = suggested_size; - }, - [](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { - DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data; - if (nread < 0 || addr == nullptr) { - delete[] buf->base; - return; - } - - char endpoint[16]; - uv_ip4_name((const sockaddr_in*)addr, endpoint, 16); - auto port = ntohs(((const sockaddr_in*)addr)->sin_port); - c->ProcessPacket(endpoint, port, buf->base, nread); - delete[] buf->base; - }); + rc = uv_udp_recv_start(&m_socket, AllocCallback, RecvCallback); m_attached = loop; } @@ -523,6 +505,12 @@ void EQ::Net::DaybreakConnection::AddToQueue(int stream, uint16_t seq, const Pac { auto s = &m_streams[stream]; auto iter = s->packet_queue.find(seq); + + if (s->packet_queue.bucket_count() < seq) { + std::cout << "Resizing packet queue to " << seq << std::endl; + s->packet_queue.reserve(seq); + } + if (iter == s->packet_queue.end()) { DynamicPacket *out = new DynamicPacket(); out->PutPacket(0, p); @@ -1207,7 +1195,7 @@ void EQ::Net::DaybreakConnection::UpdateDataBudget(double budget_add) void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq) { - DaybreakReliableHeader ack; + static DaybreakReliableHeader ack = {}; ack.zero = 0; ack.opcode = OP_Ack + stream_id; ack.sequence = HostToNetwork(seq); diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index c2c714813..00ba117cd 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -11,6 +11,27 @@ #include #include +class BufferPool { +public: + // Allocate a buffer from the pool or create a new one + uv_buf_t AllocateBuffer(size_t size) { + if (!pool.empty()) { + auto buf = std::move(pool.back()); + pool.pop_back(); + return uv_buf_init(buf.release(), size); + } + return uv_buf_init(new char[size], size); + } + + // Return a buffer to the pool for reuse + void ReleaseBuffer(char* buffer) { + pool.emplace_back(buffer); + } + +private: + std::vector> pool; // Pool of reusable buffers +}; + namespace EQ { namespace Net @@ -201,7 +222,7 @@ namespace EQ uint16_t sequence_in; uint16_t sequence_out; - std::map packet_queue; + std::unordered_map> packet_queue; DynamicPacket fragment_packet; uint32_t fragment_current_bytes; @@ -318,6 +339,35 @@ namespace EQ void Attach(uv_loop_t *loop); void Detach(); + static void AllocCallback(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + auto* manager = static_cast(handle->data); + *buf = manager->buffer_pool.AllocateBuffer(suggested_size); // Use buffer pool + } + + static void RecvCallback(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { + auto* manager = static_cast(handle->data); + + if (nread < 0 || addr == nullptr) { + manager->buffer_pool.ReleaseBuffer(buf->base); // Return buffer to pool + return; + } + + // Get endpoint details + char endpoint[16]; + uv_ip4_name((const sockaddr_in*)addr, endpoint, sizeof(endpoint)); + auto port = ntohs(((const sockaddr_in*)addr)->sin_port); + + std::cout << "Received " << nread << " bytes from " << endpoint << ":" << port << std::endl; + + // Process the packet + manager->ProcessPacket(endpoint, port, buf->base, nread); + + // Release buffer back to the pool + manager->buffer_pool.ReleaseBuffer(buf->base); + } + + BufferPool buffer_pool; + EQ::Random m_rand; uv_timer_t m_timer; uv_udp_t m_socket;