From 43a5bff84a004701aff0bee162991e5a88b74cc0 Mon Sep 17 00:00:00 2001 From: Chris Miles Date: Thu, 10 Apr 2025 02:02:25 -0500 Subject: [PATCH] [Performance] Network Ring Buffers (#4857) * [Performance] Network Ring Buffers * Cursor versus linear scan (wtf GPT) --- common/CMakeLists.txt | 4 + common/eqemu_logsys.h | 10 +- common/eqemu_logsys_log_aliases.h | 40 ++-- common/net/daybreak_connection.cpp | 182 +++++++++--------- common/net/daybreak_connection.h | 1 + common/net/daybreak_pooling.h | 123 ++++++++++++ common/net/daybreak_structs.h | 1 + common/net/servertalk_client_connection.cpp | 6 +- .../servertalk_legacy_client_connection.cpp | 8 +- common/net/tcp_connection.cpp | 163 ++++++++++------ common/net/tcp_connection.h | 3 +- common/net/tcp_connection_pooling.h | 125 ++++++++++++ 12 files changed, 493 insertions(+), 173 deletions(-) create mode 100644 common/net/daybreak_pooling.h create mode 100644 common/net/tcp_connection_pooling.h diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 8111c6fa3..c587a289a 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -671,6 +671,7 @@ SET(common_headers net/console_server_connection.h net/crc32.h net/daybreak_connection.h + net/daybreak_pooling.h net/daybreak_structs.h net/dns.h net/endian.h @@ -682,6 +683,7 @@ SET(common_headers net/servertalk_server.h net/servertalk_server_connection.h net/tcp_connection.h + net/tcp_connection_pooling.h net/tcp_server.h net/websocket_server.h net/websocket_server_connection.h @@ -742,6 +744,7 @@ SOURCE_GROUP(Net FILES net/crc32.h net/daybreak_connection.cpp net/daybreak_connection.h + net/daybreak_pooling.h net/daybreak_structs.h net/dns.h net/endian.h @@ -762,6 +765,7 @@ SOURCE_GROUP(Net FILES net/servertalk_server_connection.h net/tcp_connection.cpp net/tcp_connection.h + net/tcp_connection_pooling.h net/tcp_server.cpp net/tcp_server.h net/websocket_server.cpp diff --git a/common/eqemu_logsys.h b/common/eqemu_logsys.h index a82d19d72..d0897a0f7 100644 --- a/common/eqemu_logsys.h +++ b/common/eqemu_logsys.h @@ -74,7 +74,7 @@ namespace Logs { Spawns, Spells, Status, // deprecated - TCPConnection, + TCPConnection, // deprecated Tasks, Tradeskills, Trading, @@ -150,6 +150,8 @@ namespace Logs { BotSpellTypeChecks, NpcHandin, ZoneState, + NetClient, + NetTCP, MaxCategoryID /* Don't Remove this */ }; @@ -183,7 +185,7 @@ namespace Logs { "Spawns", "Spells", "Status (Deprecated)", - "TCP Connection", + "TCP Connection (Deprecated)", "Tasks", "Tradeskills", "Trading", @@ -258,7 +260,9 @@ namespace Logs { "Bot Spell Checks", "Bot Spell Type Checks", "NpcHandin", - "ZoneState" + "ZoneState", + "Net Server <-> Client", + "Net TCP" }; } diff --git a/common/eqemu_logsys_log_aliases.h b/common/eqemu_logsys_log_aliases.h index 1047f6c0d..55fa7fa20 100644 --- a/common/eqemu_logsys_log_aliases.h +++ b/common/eqemu_logsys_log_aliases.h @@ -261,26 +261,6 @@ OutF(LogSys, Logs::Detail, Logs::Spells, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ } while (0) -#define LogStatus(message, ...) do {\ - if (LogSys.IsLogEnabled(Logs::General, Logs::Status))\ - OutF(LogSys, Logs::General, Logs::Status, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ -} while (0) - -#define LogStatusDetail(message, ...) do {\ - if (LogSys.IsLogEnabled(Logs::Detail, Logs::Status))\ - OutF(LogSys, Logs::Detail, Logs::Status, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ -} while (0) - -#define LogTCPConnection(message, ...) do {\ - if (LogSys.IsLogEnabled(Logs::General, Logs::TCPConnection))\ - OutF(LogSys, Logs::General, Logs::TCPConnection, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ -} while (0) - -#define LogTCPConnectionDetail(message, ...) do {\ - if (LogSys.IsLogEnabled(Logs::Detail, Logs::TCPConnection))\ - OutF(LogSys, Logs::Detail, Logs::TCPConnection, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ -} while (0) - #define LogTasks(message, ...) do {\ if (LogSys.IsLogEnabled(Logs::General, Logs::Tasks))\ OutF(LogSys, Logs::General, Logs::Tasks, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ @@ -924,6 +904,26 @@ OutF(LogSys, Logs::Detail, Logs::ZoneState, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ } while (0) +#define LogNetClient(message, ...) do {\ + if (LogSys.IsLogEnabled(Logs::General, Logs::NetClient))\ + OutF(LogSys, Logs::General, Logs::NetClient, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ +} while (0) + +#define LogNetClientDetail(message, ...) do {\ + if (LogSys.IsLogEnabled(Logs::Detail, Logs::NetClient))\ + OutF(LogSys, Logs::Detail, Logs::NetClient, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ +} while (0) + +#define LogNetTCP(message, ...) do {\ + if (LogSys.IsLogEnabled(Logs::General, Logs::NetTCP))\ + OutF(LogSys, Logs::General, Logs::NetTCP, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ +} while (0) + +#define LogNetTCPDetail(message, ...) do {\ + if (LogSys.IsLogEnabled(Logs::Detail, Logs::NetTCP))\ + OutF(LogSys, Logs::Detail, Logs::NetTCP, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ +} while (0) + #define Log(debug_level, log_category, message, ...) do {\ if (LogSys.IsLogEnabled(debug_level, log_category))\ LogSys.Out(debug_level, log_category, __FILE__, __func__, __LINE__, message, ##__VA_ARGS__);\ diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index f1495315e..519ecff82 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -1,12 +1,16 @@ #include "daybreak_connection.h" #include "../event/event_loop.h" -#include "../event/task.h" #include "../data_verification.h" #include "crc32.h" -#include "../eqemu_logsys.h" #include #include -#include + +// observed client receive window is 300 packets, 140KB +constexpr size_t MAX_CLIENT_RECV_PACKETS_PER_WINDOW = 300; +constexpr size_t MAX_CLIENT_RECV_BYTES_PER_WINDOW = 140 * 1024; + +// buffer pools +SendBufferPool send_buffer_pool; EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager() { @@ -53,16 +57,22 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop) uv_ip4_addr("0.0.0.0", m_options.port, &recv_addr); int rc = uv_udp_bind(&m_socket, (const struct sockaddr *)&recv_addr, UV_UDP_REUSEADDR); - rc = uv_udp_recv_start(&m_socket, - [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = new char[suggested_size]; - memset(buf->base, 0, suggested_size); - buf->len = suggested_size; - }, + rc = uv_udp_recv_start( + &m_socket, + [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + if (suggested_size > 65536) { + buf->base = new char[suggested_size]; + buf->len = suggested_size; + return; + } + + static thread_local char temp_buf[65536]; + buf->base = temp_buf; + buf->len = 65536; + }, [](uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data; if (nread < 0 || addr == nullptr) { - delete[] buf->base; return; } @@ -70,7 +80,10 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop) uv_ip4_name((const sockaddr_in*)addr, endpoint, 16); auto port = ntohs(((const sockaddr_in*)addr)->sin_port); c->ProcessPacket(endpoint, port, buf->base, nread); - delete[] buf->base; + + if (buf->len > 65536) { + delete[] buf->base; + } }); m_attached = loop; @@ -310,7 +323,7 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner m_last_session_stats = Clock::now(); m_outgoing_budget = owner->m_options.outgoing_data_rate; - LogNetcode("New session [{}] with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key)); + LogNetClient("New session [{}] with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key)); } //new connection made as client @@ -634,7 +647,7 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p) p.PutSerialize(0, reply); InternalSend(p); - LogNetcode("[OP_SessionRequest] Session [{}] started with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key)); + LogNetClient("[OP_SessionRequest] Session [{}] started with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key)); } break; @@ -653,7 +666,7 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p) m_max_packet_size = reply.max_packet_size; ChangeStatus(StatusConnected); - LogNetcode( + LogNetClient( "[OP_SessionResponse] Session [{}] refresh with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key) @@ -782,7 +795,7 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p) SendDisconnect(); } - LogNetcode( + LogNetClient( "[OP_SessionDisconnect] Session [{}] disconnect with encode key [{}]", m_connect_code, HostToNetwork(m_encode_key) @@ -852,7 +865,7 @@ bool EQ::Net::DaybreakConnection::ValidateCRC(Packet &p) } if (p.Length() < (size_t)m_crc_bytes) { - LogNetcode("Session [{}] ignored packet (crc bytes invalid on session)", m_connect_code); + LogNetClient("Session [{}] ignored packet (crc bytes invalid on session)", m_connect_code); return false; } @@ -1043,7 +1056,7 @@ void EQ::Net::DaybreakConnection::Decompress(Packet &p, size_t offset, size_t le return; } - static uint8_t new_buffer[4096]; + static thread_local uint8_t new_buffer[4096]; uint8_t *buffer = (uint8_t*)p.Data() + offset; uint32_t new_length = 0; @@ -1064,7 +1077,7 @@ void EQ::Net::DaybreakConnection::Decompress(Packet &p, size_t offset, size_t le void EQ::Net::DaybreakConnection::Compress(Packet &p, size_t offset, size_t length) { - uint8_t new_buffer[2048] = { 0 }; + static thread_local uint8_t new_buffer[2048] = { 0 }; uint8_t *buffer = (uint8_t*)p.Data() + offset; uint32_t new_length = 0; bool send_uncompressed = true; @@ -1091,10 +1104,6 @@ void EQ::Net::DaybreakConnection::ProcessResend() } } -// observed client receive window is 300 packets, 140KB -constexpr size_t MAX_CLIENT_RECV_PACKETS_PER_WINDOW = 300; -constexpr size_t MAX_CLIENT_RECV_BYTES_PER_WINDOW = 140 * 1024; - void EQ::Net::DaybreakConnection::ProcessResend(int stream) { if (m_status == DbProtocolStatus::StatusDisconnected) { @@ -1125,9 +1134,10 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) // make sure that the first_packet in the list first_sent time is within the resend_delay and now // if it is not, then we need to resend all packets in the list if (time_since_first_sent <= first_packet.resend_delay && !m_acked_since_last_resend) { - LogNetcodeDetail( - "Not resending packets for stream [{}] time since first sent [{}] resend delay [{}] m_acked_since_last_resend [{}]", + LogNetClient( + "Not resending packets for stream [{}] packets [{}] time_first_sent [{}] resend_delay [{}] m_acked_since_last_resend [{}]", stream, + s->sent_packets.size(), time_since_first_sent, first_packet.resend_delay, m_acked_since_last_resend @@ -1142,7 +1152,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) total_size += e.second.packet.Length(); } - LogNetcodeDetail( + LogNetClient( "Resending packets for stream [{}] packet count [{}] total packet size [{}] m_acked_since_last_resend [{}]", stream, s->sent_packets.size(), @@ -1154,7 +1164,7 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) for (auto &e: s->sent_packets) { if (m_resend_packets_sent >= MAX_CLIENT_RECV_PACKETS_PER_WINDOW || m_resend_bytes_sent >= MAX_CLIENT_RECV_BYTES_PER_WINDOW) { - LogNetcodeDetail( + LogNetClient( "Stopping resend because we hit thresholds m_resend_packets_sent [{}] max [{}] m_resend_bytes_sent [{}] max [{}]", m_resend_packets_sent, MAX_CLIENT_RECV_PACKETS_PER_WINDOW, @@ -1333,99 +1343,97 @@ void EQ::Net::DaybreakConnection::SendKeepAlive() InternalSend(p); } -void EQ::Net::DaybreakConnection::InternalSend(Packet &p) -{ +void EQ::Net::DaybreakConnection::InternalSend(Packet &p) { if (m_owner->m_options.outgoing_data_rate > 0.0) { auto new_budget = m_outgoing_budget - (p.Length() / 1024.0); if (new_budget <= 0.0) { m_stats.dropped_datarate_packets++; return; - } - else { + } else { m_outgoing_budget = new_budget; } } m_last_send = Clock::now(); - auto send_func = [](uv_udp_send_t* req, int status) { - delete[](char*)req->data; - delete req; - }; + auto pooled_opt = send_buffer_pool.acquire(); + if (!pooled_opt) { + m_stats.dropped_datarate_packets++; + return; + } + + auto [send_req, data, ctx] = *pooled_opt; + ctx->pool = &send_buffer_pool; // set pool pointer + + sockaddr_in send_addr{}; + uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); + uv_buf_t send_buffers[1]; if (PacketCanBeEncoded(p)) { - m_stats.bytes_before_encode += p.Length(); DynamicPacket out; out.PutPacket(0, p); - for (int i = 0; i < 2; ++i) { - switch (m_encode_passes[i]) { - case EncodeCompression: - if (out.GetInt8(0) == 0) - Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Compress(out, 1, out.Length() - 1); - break; - case EncodeXOR: - if (out.GetInt8(0) == 0) - Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Encode(out, 1, out.Length() - 1); - break; - default: - break; + for (auto &m_encode_passe: m_encode_passes) { + switch (m_encode_passe) { + case EncodeCompression: + if (out.GetInt8(0) == 0) { + Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); + } else { + Compress(out, 1, out.Length() - 1); + } + break; + case EncodeXOR: + if (out.GetInt8(0) == 0) { + Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); + } else { + Encode(out, 1, out.Length() - 1); + } + break; + default: + break; } } AppendCRC(out); - - uv_udp_send_t *send_req = new uv_udp_send_t; - memset(send_req, 0, sizeof(*send_req)); - sockaddr_in send_addr; - uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); - uv_buf_t send_buffers[1]; - - char *data = new char[out.Length()]; memcpy(data, out.Data(), out.Length()); send_buffers[0] = uv_buf_init(data, out.Length()); - send_req->data = send_buffers[0].base; - - m_stats.sent_bytes += out.Length(); - m_stats.sent_packets++; - if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; - delete send_req; - return; - } - - uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); - return; + } else { + memcpy(data, p.Data(), p.Length()); + send_buffers[0] = uv_buf_init(data, p.Length()); } - m_stats.bytes_before_encode += p.Length(); - - uv_udp_send_t *send_req = new uv_udp_send_t; - sockaddr_in send_addr; - uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); - uv_buf_t send_buffers[1]; - - char *data = new char[p.Length()]; - memcpy(data, p.Data(), p.Length()); - send_buffers[0] = uv_buf_init(data, p.Length()); - send_req->data = send_buffers[0].base; - m_stats.sent_bytes += p.Length(); m_stats.sent_packets++; - if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; - delete send_req; + if (m_owner->m_options.simulated_out_packet_loss && + m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { + send_buffer_pool.release(ctx); return; } - uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); + int send_result = uv_udp_send( + send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr *)&send_addr, + [](uv_udp_send_t *req, int status) { + auto *ctx = reinterpret_cast(req->data); + if (!ctx) { + std::cerr << "Error: send_req->data is null in callback!" << std::endl; + return; + } + + if (status < 0) { + std::cerr << "uv_udp_send failed: " << uv_strerror(status) << std::endl; + } + + ctx->pool->release(ctx); + } + ); + + if (send_result < 0) { + std::cerr << "uv_udp_send() failed: " << uv_strerror(send_result) << std::endl; + send_buffer_pool.release(ctx); + } } void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable) diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index e053f3b24..23537a3fe 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -3,6 +3,7 @@ #include "../random.h" #include "packet.h" #include "daybreak_structs.h" +#include "daybreak_pooling.h" #include #include #include diff --git a/common/net/daybreak_pooling.h b/common/net/daybreak_pooling.h new file mode 100644 index 000000000..e374ecdc7 --- /dev/null +++ b/common/net/daybreak_pooling.h @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "../eqemu_logsys.h" +#include + +constexpr size_t UDP_BUFFER_SIZE = 512; + +struct EmbeddedContext { + size_t pool_index; + class SendBufferPool* pool; +}; + +class SendBufferPool { +public: + explicit SendBufferPool(size_t initial_capacity = 64) + : m_capacity(initial_capacity), m_head(0) + { + LogNetClient("[SendBufferPool] Initializing with capacity [{}]", (int)m_capacity); + + m_pool.reserve(m_capacity); + m_locks = std::make_unique(m_capacity); + + for (size_t i = 0; i < m_capacity; ++i) { + auto* req = new PooledUdpSend(); + req->context.pool_index = i; + req->context.pool = this; + req->uv_req.data = &req->context; + + m_pool.emplace_back(std::unique_ptr(req)); + m_locks[i].store(false, std::memory_order_relaxed); + } + } + + std::optional> acquire() { + size_t cap = m_capacity.load(std::memory_order_acquire); + for (size_t i = 0; i < cap; ++i) { + size_t index = m_head.fetch_add(1, std::memory_order_relaxed) % cap; + bool expected = false; + if (m_locks[index].compare_exchange_strong(expected, true)) { + auto* req = m_pool[index].get(); + LogNetClientDetail("[SendBufferPool] Acquired [{}]", index); + return std::make_tuple(&req->uv_req, req->buffer.data(), &req->context); + } + } + + LogNetClient("[SendBufferPool] Growing from [{}] to [{}]", cap, cap * 2); + grow(); + return acquireAfterGrowth(); + } + + void release(EmbeddedContext* ctx) { + if (!ctx || ctx->pool != this || ctx->pool_index >= m_capacity.load(std::memory_order_acquire)) { + LogNetClient("[SendBufferPool] Invalid context release [{}]", ctx ? ctx->pool_index : -1); + return; + } + m_locks[ctx->pool_index].store(false, std::memory_order_release); + LogNetClientDetail("[SendBufferPool] Released [{}]", ctx->pool_index); + } + +private: + struct PooledUdpSend { + uv_udp_send_t uv_req; + std::array buffer; + EmbeddedContext context; + }; + + std::vector> m_pool; + std::unique_ptr m_locks; + std::atomic m_capacity; + std::atomic m_head; + std::mutex m_grow_mutex; + + void grow() { + std::lock_guard lock(m_grow_mutex); + + size_t old_cap = m_capacity.load(std::memory_order_acquire); + size_t new_cap = old_cap * 2; + + m_pool.reserve(new_cap); + for (size_t i = old_cap; i < new_cap; ++i) { + auto* req = new PooledUdpSend(); + req->context.pool_index = i; + req->context.pool = this; + req->uv_req.data = &req->context; + + m_pool.emplace_back(std::unique_ptr(req)); + } + + auto new_locks = std::make_unique(new_cap); + for (size_t i = 0; i < old_cap; ++i) { + new_locks[i].store(m_locks[i].load(std::memory_order_acquire)); + } + for (size_t i = old_cap; i < new_cap; ++i) { + new_locks[i].store(false, std::memory_order_relaxed); + } + + m_locks = std::move(new_locks); + m_capacity.store(new_cap, std::memory_order_release); + + LogNetClient("[SendBufferPool] Grew to [{}] from [{}]", new_cap, old_cap); + } + + std::optional> acquireAfterGrowth() { + size_t cap = m_capacity.load(std::memory_order_acquire); + for (size_t i = 0; i < cap; ++i) { + size_t index = m_head.fetch_add(1, std::memory_order_relaxed) % cap; + bool expected = false; + if (m_locks[index].compare_exchange_strong(expected, true)) { + auto* req = m_pool[index].get(); + LogNetClient("[SendBufferPool] Acquired after grow [{}]", index); + return std::make_tuple(&req->uv_req, req->buffer.data(), &req->context); + } + } + return std::nullopt; + } +}; diff --git a/common/net/daybreak_structs.h b/common/net/daybreak_structs.h index b03ed18bb..3047897b2 100644 --- a/common/net/daybreak_structs.h +++ b/common/net/daybreak_structs.h @@ -171,3 +171,4 @@ namespace EQ }; } } + diff --git a/common/net/servertalk_client_connection.cpp b/common/net/servertalk_client_connection.cpp index d873d43ca..524c9d496 100644 --- a/common/net/servertalk_client_connection.cpp +++ b/common/net/servertalk_client_connection.cpp @@ -62,15 +62,15 @@ void EQ::Net::ServertalkClient::Connect() m_connecting = true; EQ::Net::TCPConnection::Connect(m_addr, m_port, false, [this](std::shared_ptr connection) { if (connection == nullptr) { - LogF(Logs::General, Logs::TCPConnection, "Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port); + LogNetTCP("Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port); m_connecting = false; return; } - LogF(Logs::General, Logs::TCPConnection, "Connected to {0}:{1}", m_addr, m_port); + LogNetTCP("Connected to {0}:{1}", m_addr, m_port); m_connection = connection; m_connection->OnDisconnect([this](EQ::Net::TCPConnection *c) { - LogF(Logs::General, Logs::TCPConnection, "Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port); + LogNetTCP("Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port); m_connection.reset(); }); diff --git a/common/net/servertalk_legacy_client_connection.cpp b/common/net/servertalk_legacy_client_connection.cpp index 8f2260dd6..deba4a464 100644 --- a/common/net/servertalk_legacy_client_connection.cpp +++ b/common/net/servertalk_legacy_client_connection.cpp @@ -58,15 +58,15 @@ void EQ::Net::ServertalkLegacyClient::Connect() m_connecting = true; EQ::Net::TCPConnection::Connect(m_addr, m_port, false, [this](std::shared_ptr connection) { if (connection == nullptr) { - LogF(Logs::General, Logs::TCPConnection, "Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port); + LogNetTCP("Error connecting to {0}:{1}, attempting to reconnect...", m_addr, m_port); m_connecting = false; return; } - LogF(Logs::General, Logs::TCPConnection, "Connected to {0}:{1}", m_addr, m_port); + LogNetTCP("Connected to {0}:{1}", m_addr, m_port); m_connection = connection; m_connection->OnDisconnect([this](EQ::Net::TCPConnection *c) { - LogF(Logs::General, Logs::TCPConnection, "Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port); + LogNetTCP("Connection lost to {0}:{1}, attempting to reconnect...", m_addr, m_port); m_connection.reset(); }); @@ -131,7 +131,7 @@ void EQ::Net::ServertalkLegacyClient::ProcessReadBuffer() } else { EQ::Net::StaticPacket p(&m_buffer[current + 4], length); - + auto cb = m_message_callbacks.find(opcode); if (cb != m_message_callbacks.end()) { cb->second(opcode, p); diff --git a/common/net/tcp_connection.cpp b/common/net/tcp_connection.cpp index ae1f14bad..f277e95e8 100644 --- a/common/net/tcp_connection.cpp +++ b/common/net/tcp_connection.cpp @@ -1,5 +1,8 @@ #include "tcp_connection.h" #include "../event/event_loop.h" +#include + +WriteReqPool tcp_write_pool; void on_close_handle(uv_handle_t* handle) { delete (uv_tcp_t *)handle; @@ -64,36 +67,37 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv }); } -void EQ::Net::TCPConnection::Start() { - uv_read_start((uv_stream_t*)m_socket, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = new char[suggested_size]; - buf->len = suggested_size; - }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { +void EQ::Net::TCPConnection::Start() +{ + uv_read_start( + (uv_stream_t *) m_socket, [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + if (suggested_size > 65536) { + buf->base = new char[suggested_size]; + buf->len = suggested_size; + return; + } - TCPConnection *connection = (TCPConnection*)stream->data; + static thread_local char temp_buf[65536]; + buf->base = temp_buf; + buf->len = 65536; + }, [](uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { + auto *connection = (TCPConnection *) stream->data; - if (nread > 0) { - connection->Read(buf->base, nread); + if (nread > 0) { + connection->Read(buf->base, nread); + } + else if (nread == UV_EOF) { + connection->Disconnect(); + } + else if (nread < 0) { + connection->Disconnect(); + } - if (buf->base) { - delete[] buf->base; + if (buf->len > 65536) { + delete [] buf->base; } } - else if (nread == UV_EOF) { - connection->Disconnect(); - - if (buf->base) { - delete[] buf->base; - } - } - else if (nread < 0) { - connection->Disconnect(); - - if (buf->base) { - delete[] buf->base; - } - } - }); + ); } void EQ::Net::TCPConnection::OnRead(std::function cb) @@ -130,43 +134,92 @@ void EQ::Net::TCPConnection::Read(const char *data, size_t count) } } -void EQ::Net::TCPConnection::Write(const char *data, size_t count) -{ - if (!m_socket) { +void EQ::Net::TCPConnection::Write(const char* data, size_t count) { + if (!m_socket || !data || count == 0) { + std::cerr << "TCPConnection::Write - Invalid socket or data\n"; return; } - struct WriteBaton - { - TCPConnection *connection; - char *buffer; - }; - - WriteBaton *baton = new WriteBaton; - baton->connection = this; - baton->buffer = new char[count]; - - uv_write_t *write_req = new uv_write_t; - memset(write_req, 0, sizeof(uv_write_t)); - write_req->data = baton; - uv_buf_t send_buffers[1]; - - memcpy(baton->buffer, data, count); - send_buffers[0] = uv_buf_init(baton->buffer, count); - - uv_write(write_req, (uv_stream_t*)m_socket, send_buffers, 1, [](uv_write_t* req, int status) { - WriteBaton *baton = (WriteBaton*)req->data; - delete[] baton->buffer; - delete req; - - if (status < 0) { - baton->connection->Disconnect(); + if (count <= TCP_BUFFER_SIZE) { + // Fast path: use pooled request with embedded buffer + auto req_opt = tcp_write_pool.acquire(); + if (!req_opt) { + std::cerr << "TCPConnection::Write - Out of write requests\n"; + return; } - delete baton; - }); + TCPWriteReq* write_req = *req_opt; + + // Fill buffer and set context + memcpy(write_req->buffer.data(), data, count); + write_req->connection = this; + write_req->magic = 0xC0FFEE; + + uv_buf_t buf = uv_buf_init(write_req->buffer.data(), static_cast(count)); + + int result = uv_write( + &write_req->req, + reinterpret_cast(m_socket), + &buf, + 1, + [](uv_write_t* req, int status) { + auto* full_req = reinterpret_cast(req); + if (full_req->magic != 0xC0FFEE) { + std::cerr << "uv_write callback - invalid magic, skipping release\n"; + return; + } + + tcp_write_pool.release(full_req); + + if (status < 0 && full_req->connection) { + std::cerr << "uv_write failed: " << uv_strerror(status) << std::endl; + full_req->connection->Disconnect(); + } + } + ); + + if (result < 0) { + std::cerr << "uv_write() failed immediately: " << uv_strerror(result) << std::endl; + tcp_write_pool.release(write_req); + } + + } else { + // Slow path: allocate heap buffer for large write + LogNetTCP("[TCPConnection] Large write of [{}] bytes, using heap buffer", count); + + char* heap_buffer = new char[count]; + memcpy(heap_buffer, data, count); + + uv_write_t* write_req = new uv_write_t; + write_req->data = heap_buffer; + + uv_buf_t buf = uv_buf_init(heap_buffer, static_cast(count)); + + int result = uv_write( + write_req, + reinterpret_cast(m_socket), + &buf, + 1, + [](uv_write_t* req, int status) { + char* data = static_cast(req->data); + delete[] data; + delete req; + + if (status < 0) { + std::cerr << "uv_write (large) failed: " << uv_strerror(status) << std::endl; + } + } + ); + + if (result < 0) { + std::cerr << "uv_write() (large) failed immediately: " << uv_strerror(result) << std::endl; + delete[] heap_buffer; + delete write_req; + } + } } + std::string EQ::Net::TCPConnection::LocalIP() const { sockaddr_storage addr; diff --git a/common/net/tcp_connection.h b/common/net/tcp_connection.h index 6bc256e85..d64695dd2 100644 --- a/common/net/tcp_connection.h +++ b/common/net/tcp_connection.h @@ -1,5 +1,6 @@ #pragma once +#include "tcp_connection_pooling.h" #include #include #include @@ -16,7 +17,7 @@ namespace EQ ~TCPConnection(); static void Connect(const std::string &addr, int port, bool ipv6, std::function)> cb); - + void Start(); void OnRead(std::function cb); void OnDisconnect(std::function cb); diff --git a/common/net/tcp_connection_pooling.h b/common/net/tcp_connection_pooling.h new file mode 100644 index 000000000..44f0abe92 --- /dev/null +++ b/common/net/tcp_connection_pooling.h @@ -0,0 +1,125 @@ +#pragma once + +#include "../eqemu_logsys.h" +#include +#include +#include +#include +#include +#include +#include +#include + +namespace EQ { namespace Net { class TCPConnection; } } + +constexpr size_t TCP_BUFFER_SIZE = 8192; + +struct TCPWriteReq { + uv_write_t req{}; + std::array buffer{}; + size_t buffer_index{}; + EQ::Net::TCPConnection* connection{}; + uint32_t magic = 0xC0FFEE; +}; + +class WriteReqPool { +public: + explicit WriteReqPool(size_t initial_capacity = 512) + : m_capacity(initial_capacity), m_head(0) { + initialize_pool(m_capacity); + } + + std::optional acquire() { + size_t cap = m_capacity.load(std::memory_order_acquire); + + for (size_t i = 0; i < cap; ++i) { + size_t index = m_head.fetch_add(1, std::memory_order_relaxed) % cap; + + bool expected = false; + if (m_locks[index].compare_exchange_strong(expected, true, std::memory_order_acquire)) { + LogNetTCPDetail("[WriteReqPool] Acquired buffer index [{}]", index); + return m_reqs[index].get(); + } + } + + LogNetTCP("[WriteReqPool] Growing from [{}] to [{}]", cap, cap * 2); + grow(); + return acquireAfterGrow(); + } + + void release(TCPWriteReq* req) { + if (!req) return; + + const size_t index = req->buffer_index; + const size_t cap = m_capacity.load(std::memory_order_acquire); + + if (index >= cap || m_reqs[index].get() != req) { + std::cerr << "WriteReqPool::release - Invalid or stale pointer (index=" << index << ")\n"; + return; + } + + m_locks[index].store(false, std::memory_order_release); + LogNetTCPDetail("[WriteReqPool] Released buffer index [{}]", index); + } + +private: + std::vector> m_reqs; + std::unique_ptr m_locks; + std::atomic m_capacity; + std::atomic m_head; + std::mutex m_grow_mutex; + + void initialize_pool(size_t count) { + m_reqs.reserve(count); + m_locks = std::make_unique(count); + + for (size_t i = 0; i < count; ++i) { + auto req = std::make_unique(); + req->buffer_index = i; + req->req.data = req.get(); // optional: for use in libuv callbacks + m_locks[i].store(false, std::memory_order_relaxed); + m_reqs.emplace_back(std::move(req)); + } + + m_capacity.store(count, std::memory_order_release); + } + + void grow() { + std::lock_guard lock(m_grow_mutex); + + const size_t old_cap = m_capacity.load(std::memory_order_acquire); + const size_t new_cap = old_cap * 2; + + m_reqs.reserve(new_cap); + for (size_t i = old_cap; i < new_cap; ++i) { + auto req = std::make_unique(); + req->buffer_index = i; + req->req.data = req.get(); // optional + m_reqs.emplace_back(std::move(req)); + } + + auto new_locks = std::make_unique(new_cap); + for (size_t i = 0; i < old_cap; ++i) { + new_locks[i].store(m_locks[i].load(std::memory_order_acquire)); + } + for (size_t i = old_cap; i < new_cap; ++i) { + new_locks[i].store(false, std::memory_order_relaxed); + } + + m_locks = std::move(new_locks); + m_capacity.store(new_cap, std::memory_order_release); + } + + std::optional acquireAfterGrow() { + const size_t cap = m_capacity.load(std::memory_order_acquire); + + for (size_t i = 0; i < cap; ++i) { + bool expected = false; + if (m_locks[i].compare_exchange_strong(expected, true, std::memory_order_acquire)) { + LogNetTCP("[WriteReqPool] Acquired buffer index [{}] after grow", i); + return m_reqs[i].get(); + } + } + return std::nullopt; + } +};