From 683c36c3fca20b32f577057d17727afd8affe2a8 Mon Sep 17 00:00:00 2001 From: Akkadius Date: Sat, 1 Feb 2025 15:47:48 -0600 Subject: [PATCH] Ring buffer send poc --- common/net/daybreak_connection.cpp | 121 ++++++++++++++++++++++++++--- 1 file changed, 112 insertions(+), 9 deletions(-) diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 9cb5919fd..08f3f4258 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -8,6 +8,110 @@ #include #include +#include // For memcpy +#include + +#define INITIAL_RING_BUFFER_CAPACITY 8192 // Initial number of buffers +#define BUFFER_SIZE 512 // Fixed buffer size + +class StaticRingBuffer { +private: + char (*m_data)[BUFFER_SIZE]; // Pointer to dynamically allocated buffer array + bool* m_in_use; // Tracks if a buffer is currently allocated + size_t m_capacity; // Current capacity of the ring buffer + volatile size_t m_write_index = 0; // Next available buffer + volatile size_t m_read_index = 0; // Next buffer to be freed + std::atomic m_used_space = 0; // Track used buffers + + +public: + StaticRingBuffer() + : m_capacity(INITIAL_RING_BUFFER_CAPACITY) { + m_data = new char[m_capacity][BUFFER_SIZE]; // Allocate initial buffer pool + m_in_use = new bool[m_capacity]{false}; // Track buffer usage (initialized to false) + } + + ~StaticRingBuffer() { + delete[] m_data; // Free memory + delete[] m_in_use; // Free tracking array + } + + // Acquire a fixed-size buffer (doubles capacity if full) + char* Acquire() { + for (size_t i = 0; i < m_capacity; ++i) { + size_t index = (m_write_index + i) % m_capacity; + if (!m_in_use[index]) { + m_in_use[index] = true; + m_write_index = (index + 1) % m_capacity; + std::cout << "Acquired buffer " << index << std::endl; + return m_data[index]; + } + } + + DoubleCapacity(); + return Acquire(); // Retry after resizing + } + + // Release a buffer back to the pool + void Release(char* buffer) { + size_t index = (buffer - m_data[0]) / BUFFER_SIZE; // Divide by BUFFER_SIZE, not m_capacity + + // Prevent invalid releases (e.g., if the buffer isn't from m_data[]) + if (index >= m_capacity || m_data[index] != buffer) { +// std::cerr << "Invalid Release: Buffer not in m_data or incorrect index!\n"; + return; + } + +// std::cout << "Released buffer " << index << std::endl; + + m_in_use[index] = false; // Mark buffer as available + } + +private: + // Doubles the ring buffer capacity while preserving existing data + // TODO: Add a lock to prevent concurrent issues during resizing + void DoubleCapacity() { + size_t new_capacity = m_capacity * 2; + char (*new_data)[BUFFER_SIZE] = new char[new_capacity][BUFFER_SIZE]; + bool* new_in_use = new bool[new_capacity]{false}; + + size_t current_size = (m_write_index >= m_read_index) + ? (m_write_index - m_read_index) + : (m_capacity - m_read_index + m_write_index); + + std::cout << "[INFO] Resizing buffer from " << m_capacity << " to " << new_capacity << std::endl; + + // Copy data in FIFO order + for (size_t i = 0; i < current_size; ++i) { + size_t old_index = (m_read_index + i) % m_capacity; + memcpy(new_data[i], m_data[old_index], BUFFER_SIZE); + new_in_use[i] = m_in_use[old_index]; + } + + // Store old buffer reference + char (*old_data_ref)[BUFFER_SIZE] = m_data; + bool* old_in_use_ref = m_in_use; + size_t old_capacity = m_capacity; + + // Assign new memory + m_data = new_data; + m_in_use = new_in_use; + m_capacity = new_capacity; + + m_read_index = 0; + m_write_index = current_size; + + // Delay deletion of old data to avoid access issues + std::thread([old_data_ref, old_in_use_ref, old_capacity]() { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Ensure old packets are sent + delete[] old_data_ref; + delete[] old_in_use_ref; + std::cout << "[INFO] Old buffer safely deallocated (Capacity: " << old_capacity << ")\n"; + }).detach(); + } +}; + + EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager() { m_attached = nullptr; @@ -1112,10 +1216,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) auto s = &m_streams[stream]; // Get a reference resend delay (assume first packet represents the typical case) - size_t reference_resend_delay = 0; if (!s->sent_packets.empty()) { - reference_resend_delay = s->sent_packets.begin()->second.resend_delay; - // Check if the first packet has timed out auto &first_packet = s->sent_packets.begin()->second; auto time_since_first_sent = std::chrono::duration_cast(now - first_packet.first_sent).count(); @@ -1336,6 +1437,8 @@ void EQ::Net::DaybreakConnection::SendKeepAlive() InternalSend(p); } +StaticRingBuffer m_ring_buffer; + void EQ::Net::DaybreakConnection::InternalSend(Packet &p) { if (m_owner->m_options.outgoing_data_rate > 0.0) { @@ -1352,8 +1455,10 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p) m_last_send = Clock::now(); auto send_func = [](uv_udp_send_t* req, int status) { - delete[](char*)req->data; - delete req; + if (req->data) { + m_ring_buffer.Release((char*)req->data); // Return buffer to pool + } + delete req; // Return send request to pool }; if (PacketCanBeEncoded(p)) { @@ -1390,7 +1495,7 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p) uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); uv_buf_t send_buffers[1]; - char *data = new char[out.Length()]; + char* data = m_ring_buffer.Acquire(); memcpy(data, out.Data(), out.Length()); send_buffers[0] = uv_buf_init(data, out.Length()); send_req->data = send_buffers[0].base; @@ -1398,7 +1503,6 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p) m_stats.sent_bytes += out.Length(); m_stats.sent_packets++; if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; delete send_req; return; } @@ -1414,7 +1518,7 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p) uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); uv_buf_t send_buffers[1]; - char *data = new char[p.Length()]; + char* data = m_ring_buffer.Acquire(); memcpy(data, p.Data(), p.Length()); send_buffers[0] = uv_buf_init(data, p.Length()); send_req->data = send_buffers[0].base; @@ -1423,7 +1527,6 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p) m_stats.sent_packets++; if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; delete send_req; return; }