[Performance] Network Ring Buffers (#4857)

* [Performance] Network Ring Buffers

* Cursor versus linear scan (wtf GPT)
This commit is contained in:
Chris Miles 2025-04-10 02:02:25 -05:00 committed by GitHub
parent e983d07228
commit 43a5bff84a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 493 additions and 173 deletions

View File

@ -671,6 +671,7 @@ SET(common_headers
net/console_server_connection.h net/console_server_connection.h
net/crc32.h net/crc32.h
net/daybreak_connection.h net/daybreak_connection.h
net/daybreak_pooling.h
net/daybreak_structs.h net/daybreak_structs.h
net/dns.h net/dns.h
net/endian.h net/endian.h
@ -682,6 +683,7 @@ SET(common_headers
net/servertalk_server.h net/servertalk_server.h
net/servertalk_server_connection.h net/servertalk_server_connection.h
net/tcp_connection.h net/tcp_connection.h
net/tcp_connection_pooling.h
net/tcp_server.h net/tcp_server.h
net/websocket_server.h net/websocket_server.h
net/websocket_server_connection.h net/websocket_server_connection.h
@ -742,6 +744,7 @@ SOURCE_GROUP(Net FILES
net/crc32.h net/crc32.h
net/daybreak_connection.cpp net/daybreak_connection.cpp
net/daybreak_connection.h net/daybreak_connection.h
net/daybreak_pooling.h
net/daybreak_structs.h net/daybreak_structs.h
net/dns.h net/dns.h
net/endian.h net/endian.h
@ -762,6 +765,7 @@ SOURCE_GROUP(Net FILES
net/servertalk_server_connection.h net/servertalk_server_connection.h
net/tcp_connection.cpp net/tcp_connection.cpp
net/tcp_connection.h net/tcp_connection.h
net/tcp_connection_pooling.h
net/tcp_server.cpp net/tcp_server.cpp
net/tcp_server.h net/tcp_server.h
net/websocket_server.cpp net/websocket_server.cpp

View File

@ -74,7 +74,7 @@ namespace Logs {
Spawns, Spawns,
Spells, Spells,
Status, // deprecated Status, // deprecated
TCPConnection, TCPConnection, // deprecated
Tasks, Tasks,
Tradeskills, Tradeskills,
Trading, Trading,
@ -150,6 +150,8 @@ namespace Logs {
BotSpellTypeChecks, BotSpellTypeChecks,
NpcHandin, NpcHandin,
ZoneState, ZoneState,
NetClient,
NetTCP,
MaxCategoryID /* Don't Remove this */ MaxCategoryID /* Don't Remove this */
}; };
@ -183,7 +185,7 @@ namespace Logs {
"Spawns", "Spawns",
"Spells", "Spells",
"Status (Deprecated)", "Status (Deprecated)",
"TCP Connection", "TCP Connection (Deprecated)",
"Tasks", "Tasks",
"Tradeskills", "Tradeskills",
"Trading", "Trading",
@ -258,7 +260,9 @@ namespace Logs {
"Bot Spell Checks", "Bot Spell Checks",
"Bot Spell Type Checks", "Bot Spell Type Checks",
"NpcHandin", "NpcHandin",
"ZoneState" "ZoneState",
"Net Server <-> Client",
"Net TCP"
}; };
} }

View File

@ -261,26 +261,6 @@
OutF(LogSys, Logs::Detail, Logs::Spells, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ OutF(LogSys, Logs::Detail, Logs::Spells, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0) } while (0)
#define LogStatus(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::General, Logs::Status))\
OutF(LogSys, Logs::General, Logs::Status, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogStatusDetail(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::Detail, Logs::Status))\
OutF(LogSys, Logs::Detail, Logs::Status, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogTCPConnection(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::General, Logs::TCPConnection))\
OutF(LogSys, Logs::General, Logs::TCPConnection, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogTCPConnectionDetail(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::Detail, Logs::TCPConnection))\
OutF(LogSys, Logs::Detail, Logs::TCPConnection, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogTasks(message, ...) do {\ #define LogTasks(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::General, Logs::Tasks))\ if (LogSys.IsLogEnabled(Logs::General, Logs::Tasks))\
OutF(LogSys, Logs::General, Logs::Tasks, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ OutF(LogSys, Logs::General, Logs::Tasks, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
@ -924,6 +904,26 @@
OutF(LogSys, Logs::Detail, Logs::ZoneState, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ OutF(LogSys, Logs::Detail, Logs::ZoneState, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0) } while (0)
#define LogNetClient(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::General, Logs::NetClient))\
OutF(LogSys, Logs::General, Logs::NetClient, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogNetClientDetail(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::Detail, Logs::NetClient))\
OutF(LogSys, Logs::Detail, Logs::NetClient, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogNetTCP(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::General, Logs::NetTCP))\
OutF(LogSys, Logs::General, Logs::NetTCP, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define LogNetTCPDetail(message, ...) do {\
if (LogSys.IsLogEnabled(Logs::Detail, Logs::NetTCP))\
OutF(LogSys, Logs::Detail, Logs::NetTCP, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\
} while (0)
#define Log(debug_level, log_category, message, ...) do {\ #define Log(debug_level, log_category, message, ...) do {\
if (LogSys.IsLogEnabled(debug_level, log_category))\ if (LogSys.IsLogEnabled(debug_level, log_category))\
LogSys.Out(debug_level, log_category, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ LogSys.Out(debug_level, log_category, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\

View File

@ -1,12 +1,16 @@
#include "daybreak_connection.h" #include "daybreak_connection.h"
#include "../event/event_loop.h" #include "../event/event_loop.h"
#include "../event/task.h"
#include "../data_verification.h" #include "../data_verification.h"
#include "crc32.h" #include "crc32.h"
#include "../eqemu_logsys.h"
#include <zlib.h> #include <zlib.h>
#include <fmt/format.h> #include <fmt/format.h>
#include <sstream>
// observed client receive window is 300 packets, 140KB
constexpr size_t MAX_CLIENT_RECV_PACKETS_PER_WINDOW = 300;
constexpr size_t MAX_CLIENT_RECV_BYTES_PER_WINDOW = 140 * 1024;
// buffer pools
SendBufferPool send_buffer_pool;
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager() EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
{ {
@ -53,16 +57,22 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
uv_ip4_addr("0.0.0.0", m_options.port, &recv_addr); uv_ip4_addr("0.0.0.0", m_options.port, &recv_addr);
int rc = uv_udp_bind(&m_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR); int rc = uv_udp_bind(&m_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR);
rc = uv_udp_recv_start(&m_socket, rc = uv_udp_recv_start(
[](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { &m_socket,
buf->base = new char[suggested_size]; [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
memset(buf->base, 0, suggested_size); if (suggested_size > 65536) {
buf->len = suggested_size; buf->base = new char[suggested_size];
}, buf->len = suggested_size;
return;
}
static thread_local char temp_buf[65536];
buf->base = temp_buf;
buf->len = 65536;
},
[](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { [](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) {
DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data; DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data;
if (nread < 0 || addr == nullptr) { if (nread < 0 || addr == nullptr) {
delete[] buf->base;
return; return;
} }
@ -70,7 +80,10 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
uv_ip4_name((const sockaddr_in*)addr, endpoint, 16); uv_ip4_name((const sockaddr_in*)addr, endpoint, 16);
auto port = ntohs(((const sockaddr_in*)addr)->sin_port); auto port = ntohs(((const sockaddr_in*)addr)->sin_port);
c->ProcessPacket(endpoint, port, buf->base, nread); c->ProcessPacket(endpoint, port, buf->base, nread);
delete[] buf->base;
if (buf->len > 65536) {
delete[] buf->base;
}
}); });
m_attached = loop; m_attached = loop;
@ -310,7 +323,7 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner
m_last_session_stats = Clock::now(); m_last_session_stats = Clock::now();
m_outgoing_budget = owner->m_options.outgoing_data_rate; m_outgoing_budget = owner->m_options.outgoing_data_rate;
LogNetcode("New session [{}] with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key)); LogNetClient("New session [{}] with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key));
} }
//new connection made as client //new connection made as client
@ -634,7 +647,7 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p)
p.PutSerialize(0, reply); p.PutSerialize(0, reply);
InternalSend(p); InternalSend(p);
LogNetcode("[OP_SessionRequest] Session [{}] started with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key)); LogNetClient("[OP_SessionRequest] Session [{}] started with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key));
} }
break; break;
@ -653,7 +666,7 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p)
m_max_packet_size = reply.max_packet_size; m_max_packet_size = reply.max_packet_size;
ChangeStatus(StatusConnected); ChangeStatus(StatusConnected);
LogNetcode( LogNetClient(
"[OP_SessionResponse] Session [{}] refresh with encode key [{}]", "[OP_SessionResponse] Session [{}] refresh with encode key [{}]",
m_connect_code, m_connect_code,
HostToNetwork(m_encode_key) HostToNetwork(m_encode_key)
@ -782,7 +795,7 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p)
SendDisconnect(); SendDisconnect();
} }
LogNetcode( LogNetClient(
"[OP_SessionDisconnect] Session [{}] disconnect with encode key [{}]", "[OP_SessionDisconnect] Session [{}] disconnect with encode key [{}]",
m_connect_code, m_connect_code,
HostToNetwork(m_encode_key) HostToNetwork(m_encode_key)
@ -852,7 +865,7 @@ bool EQ::Net::DaybreakConnection::ValidateCRC(Packet &p)
} }
if (p.Length() < (size_t)m_crc_bytes) { if (p.Length() < (size_t)m_crc_bytes) {
LogNetcode("Session [{}] ignored packet (crc bytes invalid on session)", m_connect_code); LogNetClient("Session [{}] ignored packet (crc bytes invalid on session)", m_connect_code);
return false; return false;
} }
@ -1043,7 +1056,7 @@ void EQ::Net::DaybreakConnection::Decompress(Packet &p, size_t offset, size_t le
return; return;
} }
static uint8_t new_buffer[4096]; static thread_local uint8_t new_buffer[4096];
uint8_t *buffer = (uint8_t*)p.Data() + offset; uint8_t *buffer = (uint8_t*)p.Data() + offset;
uint32_t new_length = 0; uint32_t new_length = 0;
@ -1064,7 +1077,7 @@ void EQ::Net::DaybreakConnection::Decompress(Packet &p, size_t offset, size_t le
void EQ::Net::DaybreakConnection::Compress(Packet &p, size_t offset, size_t length) void EQ::Net::DaybreakConnection::Compress(Packet &p, size_t offset, size_t length)
{ {
uint8_t new_buffer[2048] = { 0 }; static thread_local uint8_t new_buffer[2048] = { 0 };
uint8_t *buffer = (uint8_t*)p.Data() + offset; uint8_t *buffer = (uint8_t*)p.Data() + offset;
uint32_t new_length = 0; uint32_t new_length = 0;
bool send_uncompressed = true; bool send_uncompressed = true;
@ -1091,10 +1104,6 @@ void EQ::Net::DaybreakConnection::ProcessResend()
} }
} }
// observed client receive window is 300 packets, 140KB
constexpr size_t MAX_CLIENT_RECV_PACKETS_PER_WINDOW = 300;
constexpr size_t MAX_CLIENT_RECV_BYTES_PER_WINDOW = 140 * 1024;
void EQ::Net::DaybreakConnection::ProcessResend(int stream) void EQ::Net::DaybreakConnection::ProcessResend(int stream)
{ {
if (m_status == DbProtocolStatus::StatusDisconnected) { if (m_status == DbProtocolStatus::StatusDisconnected) {
@ -1125,9 +1134,10 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
// make sure that the first_packet in the list first_sent time is within the resend_delay and now // make sure that the first_packet in the list first_sent time is within the resend_delay and now
// if it is not, then we need to resend all packets in the list // if it is not, then we need to resend all packets in the list
if (time_since_first_sent <= first_packet.resend_delay && !m_acked_since_last_resend) { if (time_since_first_sent <= first_packet.resend_delay && !m_acked_since_last_resend) {
LogNetcodeDetail( LogNetClient(
"Not resending packets for stream [{}] time since first sent [{}] resend delay [{}] m_acked_since_last_resend [{}]", "Not resending packets for stream [{}] packets [{}] time_first_sent [{}] resend_delay [{}] m_acked_since_last_resend [{}]",
stream, stream,
s->sent_packets.size(),
time_since_first_sent, time_since_first_sent,
first_packet.resend_delay, first_packet.resend_delay,
m_acked_since_last_resend m_acked_since_last_resend
@ -1142,7 +1152,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
total_size += e.second.packet.Length(); total_size += e.second.packet.Length();
} }
LogNetcodeDetail( LogNetClient(
"Resending packets for stream [{}] packet count [{}] total packet size [{}] m_acked_since_last_resend [{}]", "Resending packets for stream [{}] packet count [{}] total packet size [{}] m_acked_since_last_resend [{}]",
stream, stream,
s->sent_packets.size(), s->sent_packets.size(),
@ -1154,7 +1164,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
for (auto &e: s->sent_packets) { for (auto &e: s->sent_packets) {
if (m_resend_packets_sent >= MAX_CLIENT_RECV_PACKETS_PER_WINDOW || if (m_resend_packets_sent >= MAX_CLIENT_RECV_PACKETS_PER_WINDOW ||
m_resend_bytes_sent >= MAX_CLIENT_RECV_BYTES_PER_WINDOW) { m_resend_bytes_sent >= MAX_CLIENT_RECV_BYTES_PER_WINDOW) {
LogNetcodeDetail( LogNetClient(
"Stopping resend because we hit thresholds m_resend_packets_sent [{}] max [{}] m_resend_bytes_sent [{}] max [{}]", "Stopping resend because we hit thresholds m_resend_packets_sent [{}] max [{}] m_resend_bytes_sent [{}] max [{}]",
m_resend_packets_sent, m_resend_packets_sent,
MAX_CLIENT_RECV_PACKETS_PER_WINDOW, MAX_CLIENT_RECV_PACKETS_PER_WINDOW,
@ -1333,99 +1343,97 @@ void EQ::Net::DaybreakConnection::SendKeepAlive()
InternalSend(p); InternalSend(p);
} }
void EQ::Net::DaybreakConnection::InternalSend(Packet &p) void EQ::Net::DaybreakConnection::InternalSend(Packet &p) {
{
if (m_owner->m_options.outgoing_data_rate > 0.0) { if (m_owner->m_options.outgoing_data_rate > 0.0) {
auto new_budget = m_outgoing_budget - (p.Length() / 1024.0); auto new_budget = m_outgoing_budget - (p.Length() / 1024.0);
if (new_budget <= 0.0) { if (new_budget <= 0.0) {
m_stats.dropped_datarate_packets++; m_stats.dropped_datarate_packets++;
return; return;
} } else {
else {
m_outgoing_budget = new_budget; m_outgoing_budget = new_budget;
} }
} }
m_last_send = Clock::now(); m_last_send = Clock::now();
auto send_func = [](uv_udp_send_t* req, int status) { auto pooled_opt = send_buffer_pool.acquire();
delete[](char*)req->data; if (!pooled_opt) {
delete req; m_stats.dropped_datarate_packets++;
}; return;
}
auto [send_req, data, ctx] = *pooled_opt;
ctx->pool = &send_buffer_pool; // set pool pointer
sockaddr_in send_addr{};
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
if (PacketCanBeEncoded(p)) { if (PacketCanBeEncoded(p)) {
m_stats.bytes_before_encode += p.Length(); m_stats.bytes_before_encode += p.Length();
DynamicPacket out; DynamicPacket out;
out.PutPacket(0, p); out.PutPacket(0, p);
for (int i = 0; i < 2; ++i) { for (auto &m_encode_passe: m_encode_passes) {
switch (m_encode_passes[i]) { switch (m_encode_passe) {
case EncodeCompression: case EncodeCompression:
if (out.GetInt8(0) == 0) if (out.GetInt8(0) == 0) {
Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
else } else {
Compress(out, 1, out.Length() - 1); Compress(out, 1, out.Length() - 1);
break; }
case EncodeXOR: break;
if (out.GetInt8(0) == 0) case EncodeXOR:
Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); if (out.GetInt8(0) == 0) {
else Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size());
Encode(out, 1, out.Length() - 1); } else {
break; Encode(out, 1, out.Length() - 1);
default: }
break; break;
default:
break;
} }
} }
AppendCRC(out); AppendCRC(out);
uv_udp_send_t *send_req = new uv_udp_send_t;
memset(send_req, 0, sizeof(*send_req));
sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char *data = new char[out.Length()];
memcpy(data, out.Data(), out.Length()); memcpy(data, out.Data(), out.Length());
send_buffers[0] = uv_buf_init(data, out.Length()); send_buffers[0] = uv_buf_init(data, out.Length());
send_req->data = send_buffers[0].base; } else {
memcpy(data, p.Data(), p.Length());
m_stats.sent_bytes += out.Length(); send_buffers[0] = uv_buf_init(data, p.Length());
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete[](char*)send_req->data;
delete send_req;
return;
}
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
return;
} }
m_stats.bytes_before_encode += p.Length();
uv_udp_send_t *send_req = new uv_udp_send_t;
sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
uv_buf_t send_buffers[1];
char *data = new char[p.Length()];
memcpy(data, p.Data(), p.Length());
send_buffers[0] = uv_buf_init(data, p.Length());
send_req->data = send_buffers[0].base;
m_stats.sent_bytes += p.Length(); m_stats.sent_bytes += p.Length();
m_stats.sent_packets++; m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { if (m_owner->m_options.simulated_out_packet_loss &&
delete[](char*)send_req->data; m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete send_req; send_buffer_pool.release(ctx);
return; return;
} }
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); int send_result = uv_udp_send(
send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr *)&send_addr,
[](uv_udp_send_t *req, int status) {
auto *ctx = reinterpret_cast<EmbeddedContext *>(req->data);
if (!ctx) {
std::cerr << "Error: send_req->data is null in callback!" << std::endl;
return;
}
if (status < 0) {
std::cerr << "uv_udp_send failed: " << uv_strerror(status) << std::endl;
}
ctx->pool->release(ctx);
}
);
if (send_result < 0) {
std::cerr << "uv_udp_send() failed: " << uv_strerror(send_result) << std::endl;
send_buffer_pool.release(ctx);
}
} }
void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable) void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable)

View File

@ -3,6 +3,7 @@
#include "../random.h" #include "../random.h"
#include "packet.h" #include "packet.h"
#include "daybreak_structs.h" #include "daybreak_structs.h"
#include "daybreak_pooling.h"
#include <uv.h> #include <uv.h>
#include <chrono> #include <chrono>
#include <functional> #include <functional>

View File

@ -0,0 +1,123 @@
#pragma once
#include <optional>
#include <atomic>
#include <memory>
#include <array>
#include <vector>
#include <mutex>
#include <iostream>
#include "../eqemu_logsys.h"
#include <uv.h>
constexpr size_t UDP_BUFFER_SIZE = 512;
struct EmbeddedContext {
size_t pool_index;
class SendBufferPool* pool;
};
class SendBufferPool {
public:
explicit SendBufferPool(size_t initial_capacity = 64)
: m_capacity(initial_capacity), m_head(0)
{
LogNetClient("[SendBufferPool] Initializing with capacity [{}]", (int)m_capacity);
m_pool.reserve(m_capacity);
m_locks = std::make_unique<std::atomic_bool[]>(m_capacity);
for (size_t i = 0; i < m_capacity; ++i) {
auto* req = new PooledUdpSend();
req->context.pool_index = i;
req->context.pool = this;
req->uv_req.data = &req->context;
m_pool.emplace_back(std::unique_ptr<PooledUdpSend>(req));
m_locks[i].store(false, std::memory_order_relaxed);
}
}
std::optional<std::tuple<uv_udp_send_t*, char*, EmbeddedContext*>> acquire() {
size_t cap = m_capacity.load(std::memory_order_acquire);
for (size_t i = 0; i < cap; ++i) {
size_t index = m_head.fetch_add(1, std::memory_order_relaxed) % cap;
bool expected = false;
if (m_locks[index].compare_exchange_strong(expected, true)) {
auto* req = m_pool[index].get();
LogNetClientDetail("[SendBufferPool] Acquired [{}]", index);
return std::make_tuple(&req->uv_req, req->buffer.data(), &req->context);
}
}
LogNetClient("[SendBufferPool] Growing from [{}] to [{}]", cap, cap * 2);
grow();
return acquireAfterGrowth();
}
void release(EmbeddedContext* ctx) {
if (!ctx || ctx->pool != this || ctx->pool_index >= m_capacity.load(std::memory_order_acquire)) {
LogNetClient("[SendBufferPool] Invalid context release [{}]", ctx ? ctx->pool_index : -1);
return;
}
m_locks[ctx->pool_index].store(false, std::memory_order_release);
LogNetClientDetail("[SendBufferPool] Released [{}]", ctx->pool_index);
}
private:
struct PooledUdpSend {
uv_udp_send_t uv_req;
std::array<char, UDP_BUFFER_SIZE> buffer;
EmbeddedContext context;
};
std::vector<std::unique_ptr<PooledUdpSend>> m_pool;
std::unique_ptr<std::atomic_bool[]> m_locks;
std::atomic<size_t> m_capacity;
std::atomic<size_t> m_head;
std::mutex m_grow_mutex;
void grow() {
std::lock_guard<std::mutex> lock(m_grow_mutex);
size_t old_cap = m_capacity.load(std::memory_order_acquire);
size_t new_cap = old_cap * 2;
m_pool.reserve(new_cap);
for (size_t i = old_cap; i < new_cap; ++i) {
auto* req = new PooledUdpSend();
req->context.pool_index = i;
req->context.pool = this;
req->uv_req.data = &req->context;
m_pool.emplace_back(std::unique_ptr<PooledUdpSend>(req));
}
auto new_locks = std::make_unique<std::atomic_bool[]>(new_cap);
for (size_t i = 0; i < old_cap; ++i) {
new_locks[i].store(m_locks[i].load(std::memory_order_acquire));
}
for (size_t i = old_cap; i < new_cap; ++i) {
new_locks[i].store(false, std::memory_order_relaxed);
}
m_locks = std::move(new_locks);
m_capacity.store(new_cap, std::memory_order_release);
LogNetClient("[SendBufferPool] Grew to [{}] from [{}]", new_cap, old_cap);
}
std::optional<std::tuple<uv_udp_send_t*, char*, EmbeddedContext*>> acquireAfterGrowth() {
size_t cap = m_capacity.load(std::memory_order_acquire);
for (size_t i = 0; i < cap; ++i) {
size_t index = m_head.fetch_add(1, std::memory_order_relaxed) % cap;
bool expected = false;
if (m_locks[index].compare_exchange_strong(expected, true)) {
auto* req = m_pool[index].get();
LogNetClient("[SendBufferPool] Acquired after grow [{}]", index);
return std::make_tuple(&req->uv_req, req->buffer.data(), &req->context);
}
}
return std::nullopt;
}
};

View File

@ -171,3 +171,4 @@ namespace EQ
}; };
} }
} }

View File

@ -62,15 +62,15 @@ void EQ::Net::ServertalkClient::Connect()
m_connecting = true; m_connecting = true;
EQ::Net::TCPConnection::Connect(m_addr, m_port, false, [this](std::shared_ptr<EQ::Net::TCPConnection> connection) { EQ::Net::TCPConnection::Connect(m_addr, m_port, false, [this](std::shared_ptr<EQ::Net::TCPConnection> connection) {
if (connection == nullptr) { if (connection == nullptr) {
LogF(Logs::General, Logs::TCPConnection, "Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port); LogNetTCP("Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port);
m_connecting = false; m_connecting = false;
return; return;
} }
LogF(Logs::General, Logs::TCPConnection, "Connected to {0}:{1}", m_addr, m_port); LogNetTCP("Connected to {0}:{1}", m_addr, m_port);
m_connection = connection; m_connection = connection;
m_connection->OnDisconnect([this](EQ::Net::TCPConnection *c) { m_connection->OnDisconnect([this](EQ::Net::TCPConnection *c) {
LogF(Logs::General, Logs::TCPConnection, "Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port); LogNetTCP("Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port);
m_connection.reset(); m_connection.reset();
}); });

View File

@ -58,15 +58,15 @@ void EQ::Net::ServertalkLegacyClient::Connect()
m_connecting = true; m_connecting = true;
EQ::Net::TCPConnection::Connect(m_addr, m_port, false, [this](std::shared_ptr<EQ::Net::TCPConnection> connection) { EQ::Net::TCPConnection::Connect(m_addr, m_port, false, [this](std::shared_ptr<EQ::Net::TCPConnection> connection) {
if (connection == nullptr) { if (connection == nullptr) {
LogF(Logs::General, Logs::TCPConnection, "Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port); LogNetTCP("Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port);
m_connecting = false; m_connecting = false;
return; return;
} }
LogF(Logs::General, Logs::TCPConnection, "Connected to {0}:{1}", m_addr, m_port); LogNetTCP("Connected to {0}:{1}", m_addr, m_port);
m_connection = connection; m_connection = connection;
m_connection->OnDisconnect([this](EQ::Net::TCPConnection *c) { m_connection->OnDisconnect([this](EQ::Net::TCPConnection *c) {
LogF(Logs::General, Logs::TCPConnection, "Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port); LogNetTCP("Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port);
m_connection.reset(); m_connection.reset();
}); });

View File

@ -1,5 +1,8 @@
#include "tcp_connection.h" #include "tcp_connection.h"
#include "../event/event_loop.h" #include "../event/event_loop.h"
#include <iostream>
WriteReqPool tcp_write_pool;
void on_close_handle(uv_handle_t* handle) { void on_close_handle(uv_handle_t* handle) {
delete (uv_tcp_t *)handle; delete (uv_tcp_t *)handle;
@ -64,36 +67,37 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv
}); });
} }
void EQ::Net::TCPConnection::Start() { void EQ::Net::TCPConnection::Start()
uv_read_start((uv_stream_t*)m_socket, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { {
buf->base = new char[suggested_size]; uv_read_start(
buf->len = suggested_size; (uv_stream_t *) m_socket, [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
}, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { if (suggested_size > 65536) {
buf->base = new char[suggested_size];
buf->len = suggested_size;
return;
}
TCPConnection *connection = (TCPConnection*)stream->data; static thread_local char temp_buf[65536];
buf->base = temp_buf;
buf->len = 65536;
}, [](uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
auto *connection = (TCPConnection *) stream->data;
if (nread > 0) { if (nread > 0) {
connection->Read(buf->base, nread); connection->Read(buf->base, nread);
}
else if (nread == UV_EOF) {
connection->Disconnect();
}
else if (nread < 0) {
connection->Disconnect();
}
if (buf->base) { if (buf->len > 65536) {
delete[] buf->base; delete [] buf->base;
} }
} }
else if (nread == UV_EOF) { );
connection->Disconnect();
if (buf->base) {
delete[] buf->base;
}
}
else if (nread < 0) {
connection->Disconnect();
if (buf->base) {
delete[] buf->base;
}
}
});
} }
void EQ::Net::TCPConnection::OnRead(std::function<void(TCPConnection*, const unsigned char*, size_t)> cb) void EQ::Net::TCPConnection::OnRead(std::function<void(TCPConnection*, const unsigned char*, size_t)> cb)
@ -130,43 +134,92 @@ void EQ::Net::TCPConnection::Read(const char *data, size_t count)
} }
} }
void EQ::Net::TCPConnection::Write(const char *data, size_t count) void EQ::Net::TCPConnection::Write(const char* data, size_t count) {
{ if (!m_socket || !data || count == 0) {
if (!m_socket) { std::cerr << "TCPConnection::Write - Invalid socket or data\n";
return; return;
} }
struct WriteBaton if (count <= TCP_BUFFER_SIZE) {
{ // Fast path: use pooled request with embedded buffer
TCPConnection *connection; auto req_opt = tcp_write_pool.acquire();
char *buffer; if (!req_opt) {
}; std::cerr << "TCPConnection::Write - Out of write requests\n";
return;
WriteBaton *baton = new WriteBaton;
baton->connection = this;
baton->buffer = new char[count];
uv_write_t *write_req = new uv_write_t;
memset(write_req, 0, sizeof(uv_write_t));
write_req->data = baton;
uv_buf_t send_buffers[1];
memcpy(baton->buffer, data, count);
send_buffers[0] = uv_buf_init(baton->buffer, count);
uv_write(write_req, (uv_stream_t*)m_socket, send_buffers, 1, [](uv_write_t* req, int status) {
WriteBaton *baton = (WriteBaton*)req->data;
delete[] baton->buffer;
delete req;
if (status < 0) {
baton->connection->Disconnect();
} }
delete baton; TCPWriteReq* write_req = *req_opt;
});
// Fill buffer and set context
memcpy(write_req->buffer.data(), data, count);
write_req->connection = this;
write_req->magic = 0xC0FFEE;
uv_buf_t buf = uv_buf_init(write_req->buffer.data(), static_cast<unsigned int>(count));
int result = uv_write(
&write_req->req,
reinterpret_cast<uv_stream_t*>(m_socket),
&buf,
1,
[](uv_write_t* req, int status) {
auto* full_req = reinterpret_cast<TCPWriteReq*>(req);
if (full_req->magic != 0xC0FFEE) {
std::cerr << "uv_write callback - invalid magic, skipping release\n";
return;
}
tcp_write_pool.release(full_req);
if (status < 0 && full_req->connection) {
std::cerr << "uv_write failed: " << uv_strerror(status) << std::endl;
full_req->connection->Disconnect();
}
}
);
if (result < 0) {
std::cerr << "uv_write() failed immediately: " << uv_strerror(result) << std::endl;
tcp_write_pool.release(write_req);
}
} else {
// Slow path: allocate heap buffer for large write
LogNetTCP("[TCPConnection] Large write of [{}] bytes, using heap buffer", count);
char* heap_buffer = new char[count];
memcpy(heap_buffer, data, count);
uv_write_t* write_req = new uv_write_t;
write_req->data = heap_buffer;
uv_buf_t buf = uv_buf_init(heap_buffer, static_cast<unsigned int>(count));
int result = uv_write(
write_req,
reinterpret_cast<uv_stream_t*>(m_socket),
&buf,
1,
[](uv_write_t* req, int status) {
char* data = static_cast<char*>(req->data);
delete[] data;
delete req;
if (status < 0) {
std::cerr << "uv_write (large) failed: " << uv_strerror(status) << std::endl;
}
}
);
if (result < 0) {
std::cerr << "uv_write() (large) failed immediately: " << uv_strerror(result) << std::endl;
delete[] heap_buffer;
delete write_req;
}
}
} }
std::string EQ::Net::TCPConnection::LocalIP() const std::string EQ::Net::TCPConnection::LocalIP() const
{ {
sockaddr_storage addr; sockaddr_storage addr;

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include "tcp_connection_pooling.h"
#include <functional> #include <functional>
#include <string> #include <string>
#include <memory> #include <memory>

View File

@ -0,0 +1,125 @@
#pragma once
#include "../eqemu_logsys.h"
#include <vector>
#include <array>
#include <atomic>
#include <memory>
#include <optional>
#include <mutex>
#include <uv.h>
#include <iostream>
namespace EQ { namespace Net { class TCPConnection; } }
constexpr size_t TCP_BUFFER_SIZE = 8192;
struct TCPWriteReq {
uv_write_t req{};
std::array<char, TCP_BUFFER_SIZE> buffer{};
size_t buffer_index{};
EQ::Net::TCPConnection* connection{};
uint32_t magic = 0xC0FFEE;
};
class WriteReqPool {
public:
explicit WriteReqPool(size_t initial_capacity = 512)
: m_capacity(initial_capacity), m_head(0) {
initialize_pool(m_capacity);
}
std::optional<TCPWriteReq*> acquire() {
size_t cap = m_capacity.load(std::memory_order_acquire);
for (size_t i = 0; i < cap; ++i) {
size_t index = m_head.fetch_add(1, std::memory_order_relaxed) % cap;
bool expected = false;
if (m_locks[index].compare_exchange_strong(expected, true, std::memory_order_acquire)) {
LogNetTCPDetail("[WriteReqPool] Acquired buffer index [{}]", index);
return m_reqs[index].get();
}
}
LogNetTCP("[WriteReqPool] Growing from [{}] to [{}]", cap, cap * 2);
grow();
return acquireAfterGrow();
}
void release(TCPWriteReq* req) {
if (!req) return;
const size_t index = req->buffer_index;
const size_t cap = m_capacity.load(std::memory_order_acquire);
if (index >= cap || m_reqs[index].get() != req) {
std::cerr << "WriteReqPool::release - Invalid or stale pointer (index=" << index << ")\n";
return;
}
m_locks[index].store(false, std::memory_order_release);
LogNetTCPDetail("[WriteReqPool] Released buffer index [{}]", index);
}
private:
std::vector<std::unique_ptr<TCPWriteReq>> m_reqs;
std::unique_ptr<std::atomic_bool[]> m_locks;
std::atomic<size_t> m_capacity;
std::atomic<size_t> m_head;
std::mutex m_grow_mutex;
void initialize_pool(size_t count) {
m_reqs.reserve(count);
m_locks = std::make_unique<std::atomic_bool[]>(count);
for (size_t i = 0; i < count; ++i) {
auto req = std::make_unique<TCPWriteReq>();
req->buffer_index = i;
req->req.data = req.get(); // optional: for use in libuv callbacks
m_locks[i].store(false, std::memory_order_relaxed);
m_reqs.emplace_back(std::move(req));
}
m_capacity.store(count, std::memory_order_release);
}
void grow() {
std::lock_guard<std::mutex> lock(m_grow_mutex);
const size_t old_cap = m_capacity.load(std::memory_order_acquire);
const size_t new_cap = old_cap * 2;
m_reqs.reserve(new_cap);
for (size_t i = old_cap; i < new_cap; ++i) {
auto req = std::make_unique<TCPWriteReq>();
req->buffer_index = i;
req->req.data = req.get(); // optional
m_reqs.emplace_back(std::move(req));
}
auto new_locks = std::make_unique<std::atomic_bool[]>(new_cap);
for (size_t i = 0; i < old_cap; ++i) {
new_locks[i].store(m_locks[i].load(std::memory_order_acquire));
}
for (size_t i = old_cap; i < new_cap; ++i) {
new_locks[i].store(false, std::memory_order_relaxed);
}
m_locks = std::move(new_locks);
m_capacity.store(new_cap, std::memory_order_release);
}
std::optional<TCPWriteReq*> acquireAfterGrow() {
const size_t cap = m_capacity.load(std::memory_order_acquire);
for (size_t i = 0; i < cap; ++i) {
bool expected = false;
if (m_locks[i].compare_exchange_strong(expected, true, std::memory_order_acquire)) {
LogNetTCP("[WriteReqPool] Acquired buffer index [{}] after grow", i);
return m_reqs[i].get();
}
}
return std::nullopt;
}
};