Ring buffer send poc

This commit is contained in:
Akkadius 2025-02-01 15:47:48 -06:00
parent d505f0b114
commit 683c36c3fc

View File

@ -8,6 +8,110 @@
#include <fmt/format.h>
#include <sstream>
#include <cstring> // For memcpy
#include <thread>
#define INITIAL_RING_BUFFER_CAPACITY 8192 // Initial number of buffers
#define BUFFER_SIZE 512 // Fixed buffer size
class StaticRingBuffer {
private:
char (*m_data)[BUFFER_SIZE]; // Pointer to dynamically allocated buffer array
bool* m_in_use; // Tracks if a buffer is currently allocated
size_t m_capacity; // Current capacity of the ring buffer
volatile size_t m_write_index = 0; // Next available buffer
volatile size_t m_read_index = 0; // Next buffer to be freed
std::atomic<size_t> m_used_space = 0; // Track used buffers
public:
StaticRingBuffer()
: m_capacity(INITIAL_RING_BUFFER_CAPACITY) {
m_data = new char[m_capacity][BUFFER_SIZE]; // Allocate initial buffer pool
m_in_use = new bool[m_capacity]{false}; // Track buffer usage (initialized to false)
}
~StaticRingBuffer() {
delete[] m_data; // Free memory
delete[] m_in_use; // Free tracking array
}
// Acquire a fixed-size buffer (doubles capacity if full)
char* Acquire() {
for (size_t i = 0; i < m_capacity; ++i) {
size_t index = (m_write_index + i) % m_capacity;
if (!m_in_use[index]) {
m_in_use[index] = true;
m_write_index = (index + 1) % m_capacity;
std::cout << "Acquired buffer " << index << std::endl;
return m_data[index];
}
}
DoubleCapacity();
return Acquire(); // Retry after resizing
}
// Release a buffer back to the pool
void Release(char* buffer) {
size_t index = (buffer - m_data[0]) / BUFFER_SIZE; // Divide by BUFFER_SIZE, not m_capacity
// Prevent invalid releases (e.g., if the buffer isn't from m_data[])
if (index >= m_capacity || m_data[index] != buffer) {
// std::cerr << "Invalid Release: Buffer not in m_data or incorrect index!\n";
return;
}
// std::cout << "Released buffer " << index << std::endl;
m_in_use[index] = false; // Mark buffer as available
}
private:
// Doubles the ring buffer capacity while preserving existing data
// TODO: Add a lock to prevent concurrent issues during resizing
void DoubleCapacity() {
size_t new_capacity = m_capacity * 2;
char (*new_data)[BUFFER_SIZE] = new char[new_capacity][BUFFER_SIZE];
bool* new_in_use = new bool[new_capacity]{false};
size_t current_size = (m_write_index >= m_read_index)
? (m_write_index - m_read_index)
: (m_capacity - m_read_index + m_write_index);
std::cout << "[INFO] Resizing buffer from " << m_capacity << " to " << new_capacity << std::endl;
// Copy data in FIFO order
for (size_t i = 0; i < current_size; ++i) {
size_t old_index = (m_read_index + i) % m_capacity;
memcpy(new_data[i], m_data[old_index], BUFFER_SIZE);
new_in_use[i] = m_in_use[old_index];
}
// Store old buffer reference
char (*old_data_ref)[BUFFER_SIZE] = m_data;
bool* old_in_use_ref = m_in_use;
size_t old_capacity = m_capacity;
// Assign new memory
m_data = new_data;
m_in_use = new_in_use;
m_capacity = new_capacity;
m_read_index = 0;
m_write_index = current_size;
// Delay deletion of old data to avoid access issues
std::thread([old_data_ref, old_in_use_ref, old_capacity]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Ensure old packets are sent
delete[] old_data_ref;
delete[] old_in_use_ref;
std::cout << "[INFO] Old buffer safely deallocated (Capacity: " << old_capacity << ")\n";
}).detach();
}
};
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
{
m_attached = nullptr;
@ -1112,10 +1216,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
auto s = &m_streams[stream];
// Get a reference resend delay (assume first packet represents the typical case)
size_t reference_resend_delay = 0;
if (!s->sent_packets.empty()) {
reference_resend_delay = s->sent_packets.begin()->second.resend_delay;
// Check if the first packet has timed out
auto &first_packet = s->sent_packets.begin()->second;
auto time_since_first_sent = std::chrono::duration_cast<std::chrono::milliseconds>(now - first_packet.first_sent).count();
@ -1336,6 +1437,8 @@ void EQ::Net::DaybreakConnection::SendKeepAlive()
InternalSend(p);
}
StaticRingBuffer m_ring_buffer;
void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
{
if (m_owner->m_options.outgoing_data_rate > 0.0) {
@ -1352,8 +1455,10 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
m_last_send = Clock::now();
auto send_func = [](uv_udp_send_t* req, int status) {
delete[](char*)req->data;
delete req;
if (req->data) {
m_ring_buffer.Release((char*)req->data); // Return buffer to pool
}
delete req; // Return send request to pool
};
if (PacketCanBeEncoded(p)) {
@ -1390,7 +1495,7 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char *data = new char[out.Length()];
char* data = m_ring_buffer.Acquire();
memcpy(data, out.Data(), out.Length());
send_buffers[0] = uv_buf_init(data, out.Length());
send_req->data = send_buffers[0].base;
@ -1398,7 +1503,6 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
m_stats.sent_bytes += out.Length();
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete[](char*)send_req->data;
delete send_req;
return;
}
@ -1414,7 +1518,7 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char *data = new char[p.Length()];
char* data = m_ring_buffer.Acquire();
memcpy(data, p.Data(), p.Length());
send_buffers[0] = uv_buf_init(data, p.Length());
send_req->data = send_buffers[0].base;
@ -1423,7 +1527,6 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete[](char*)send_req->data;
delete send_req;
return;
}