From c09a48d58c7b048b0d582ee6a1d65ed584e4f46b Mon Sep 17 00:00:00 2001 From: KimLS Date: Thu, 4 Apr 2019 17:49:37 -0700 Subject: [PATCH] Implement remain functionality and cleaning up some code --- common/CMakeLists.txt | 2 + common/eq_stream_intf.h | 8 + common/net/daybreak_connection.cpp | 19 +- common/net/daybreak_connection.h | 8 +- common/net/eqstream.h | 8 +- common/net/eqstream_concurrent.cpp | 623 ++++++++++++++++++++++++++++- common/net/eqstream_concurrent.h | 24 +- loginserver/client_manager.cpp | 4 +- ucs/clientlist.cpp | 2 +- world/net.cpp | 6 +- zone/net.cpp | 2 +- 11 files changed, 681 insertions(+), 25 deletions(-) diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 3f9f7b66d..88f978f1e 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -224,6 +224,7 @@ SET(common_headers net/endian.h net/eqstream.h net/eqstream_concurrent.h + net/eqstream_concurrent_message.h net/packet.h net/servertalk_client_connection.h net/servertalk_legacy_client_connection.h @@ -298,6 +299,7 @@ SOURCE_GROUP(Net FILES net/eqstream.h net/eqstream_concurrent.cpp net/eqstream_concurrent.h + net/eqstream_concurrent_message.h net/packet.cpp net/packet.h net/servertalk_client_connection.cpp diff --git a/common/eq_stream_intf.h b/common/eq_stream_intf.h index 17a025a18..4933e98f6 100644 --- a/common/eq_stream_intf.h +++ b/common/eq_stream_intf.h @@ -7,6 +7,7 @@ #include "emu_versions.h" #include "eq_packet.h" #include "net/daybreak_connection.h" +#include "event/event_loop.h" typedef enum { ESTABLISHED, @@ -23,6 +24,7 @@ struct EQStreamManagerInterfaceOptions { EQStreamManagerInterfaceOptions() { opcode_size = 2; + loop = &EQ::EventLoop::GetDefault(); } EQStreamManagerInterfaceOptions(int port, bool encoded, bool compressed) { @@ -41,13 +43,16 @@ struct EQStreamManagerInterfaceOptions } daybreak_options.port = port; + loop = &EQ::EventLoop::GetDefault(); } int opcode_size; bool track_opcode_stats; EQ::Net::DaybreakConnectionManagerOptions daybreak_options; + EQ::EventLoop *loop; }; +class EQStreamInterface; class EQStreamManagerInterface { public: @@ -56,6 +61,9 @@ public: const EQStreamManagerInterfaceOptions& GetOptions() const { return m_options; } EQStreamManagerInterfaceOptions& MutateOptions() { return m_options; } + + virtual void OnNewConnection(std::function)> func) = 0; + virtual void OnConnectionStateChange(std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) = 0; protected: EQStreamManagerInterfaceOptions m_options; }; diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 15b61ee51..dc2b491c2 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -10,6 +10,7 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager() { m_attached = nullptr; + m_next_id = 1; memset(&m_timer, 0, sizeof(uv_timer_t)); memset(&m_socket, 0, sizeof(uv_udp_t)); @@ -20,6 +21,7 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConn { m_attached = nullptr; m_options = opts; + m_next_id = 1; memset(&m_timer, 0, sizeof(uv_timer_t)); memset(&m_socket, 0, sizeof(uv_udp_t)); @@ -94,7 +96,7 @@ void EQ::Net::DaybreakConnectionManager::Connect(const std::string &addr, int po { //todo dns resolution - auto connection = std::shared_ptr(new DaybreakConnection(this, addr, port)); + auto connection = std::shared_ptr(new DaybreakConnection(this, GetNextId(), addr, port)); connection->m_self = connection; if (m_on_new_connection) { @@ -232,7 +234,7 @@ void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoi StaticPacket p((void*)data, size); auto request = p.GetSerialize(0); - connection = std::shared_ptr(new DaybreakConnection(this, request, endpoint, port)); + connection = std::shared_ptr(new DaybreakConnection(this, GetNextId(), request, endpoint, port)); connection->m_self = connection; if (m_on_new_connection) { @@ -290,10 +292,18 @@ void EQ::Net::DaybreakConnectionManager::SendDisconnect(const std::string &addr, }); } +uint64_t EQ::Net::DaybreakConnectionManager::GetNextId() +{ + auto id = m_next_id; + m_next_id++; + return id; +} + //new connection made as server -EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port) +EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const DaybreakConnect &connect, const std::string &endpoint, int port) { m_owner = owner; + m_id = id; m_last_send = Clock::now(); m_last_recv = Clock::now(); m_status = StatusConnected; @@ -316,9 +326,10 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner } //new connection made as client -EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, const std::string &endpoint, int port) +EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const std::string &endpoint, int port) { m_owner = owner; + m_id = id; m_last_send = Clock::now(); m_last_recv = Clock::now(); m_status = StatusConnecting; diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index eac442844..cf8aa409d 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -119,8 +119,8 @@ namespace EQ class DaybreakConnection { public: - DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port); - DaybreakConnection(DaybreakConnectionManager *owner, const std::string &endpoint, int port); + DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const DaybreakConnect &connect, const std::string &endpoint, int port); + DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const std::string &endpoint, int port); ~DaybreakConnection(); const std::string& RemoteEndpoint() const { return m_endpoint; } @@ -139,6 +139,7 @@ namespace EQ const DaybreakEncodeType* GetEncodePasses() const { return m_encode_passes; } const DaybreakConnectionManager* GetManager() const { return m_owner; } DaybreakConnectionManager* GetManager() { return m_owner; } + uint64_t GetId() const { return m_id; } private: DaybreakConnectionManager *m_owner; std::string m_endpoint; @@ -161,6 +162,7 @@ namespace EQ size_t m_rolling_ping; Timestamp m_close_time; double m_outgoing_budget; + uint64_t m_id; struct DaybreakSentPacket { @@ -306,6 +308,7 @@ namespace EQ uv_udp_t m_socket; uv_loop_t *m_attached; DaybreakConnectionManagerOptions m_options; + uint64_t m_next_id; std::function)> m_on_new_connection; std::function, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change; std::function, const Packet&)> m_on_packet_recv; @@ -315,6 +318,7 @@ namespace EQ void ProcessPacket(const std::string &endpoint, int port, const char *data, size_t size); std::shared_ptr FindConnectionByEndpoint(std::string addr, int port); void SendDisconnect(const std::string &addr, int port); + uint64_t GetNextId(); friend class DaybreakConnection; }; diff --git a/common/net/eqstream.h b/common/net/eqstream.h index 1d253d4f6..2bf7beb3a 100644 --- a/common/net/eqstream.h +++ b/common/net/eqstream.h @@ -19,12 +19,12 @@ namespace EQ EQStreamManager(const EQStreamManagerInterfaceOptions &options); ~EQStreamManager(); - void OnNewConnection(std::function)> func) { m_on_new_connection = func; } - void OnConnectionStateChange(std::function, DbProtocolStatus, DbProtocolStatus)> func) { m_on_connection_state_change = func; } + virtual void OnNewConnection(std::function)> func) { m_on_new_connection = func; } + virtual void OnConnectionStateChange(std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) { m_on_connection_state_change = func; } private: DaybreakConnectionManager m_daybreak; - std::function)> m_on_new_connection; - std::function, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change; + std::function)> m_on_new_connection; + std::function, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change; std::map, std::shared_ptr> m_streams; void DaybreakNewConnection(std::shared_ptr connection); diff --git a/common/net/eqstream_concurrent.cpp b/common/net/eqstream_concurrent.cpp index 119593755..a75cb7301 100644 --- a/common/net/eqstream_concurrent.cpp +++ b/common/net/eqstream_concurrent.cpp @@ -1,36 +1,653 @@ #include "eqstream_concurrent.h" +#include "eqstream_concurrent_message.h" +#include "../event/event_loop.h" +#include "../event/timer.h" +#include "../string_util.h" +#include "../opcodemgr.h" +#include "daybreak_connection.h" +#include +#include +#include +#include +#include +#include struct EQ::Net::ConcurrentEQStreamManager::Impl { - + std::thread background; + bool background_running; + moodycamel::ConcurrentQueue foreground_queue; + moodycamel::ConcurrentQueue background_queue; + std::unordered_map> connections; + std::unique_ptr foreground_loop_timer; + std::unique_ptr background_loop_timer; + std::unique_ptr background_update_stats_timer; + std::unordered_map> streams; + std::function)> on_new_connection; + std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> on_connection_state_change; }; EQ::Net::ConcurrentEQStreamManager::ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options) : EQStreamManagerInterface(options) { _impl.reset(new Impl()); + _impl->background = std::thread(std::bind(&ConcurrentEQStreamManager::_BackgroundThread, this)); + _impl->foreground_loop_timer.reset(new EQ::Timer(options.loop, 16, true, + std::bind(&ConcurrentEQStreamManager::_ForegroundTimer, this, std::placeholders::_1))); } EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager() { + for (auto &s : _impl->streams) { + s.second->_Invalidate(); + } + + _impl->foreground_loop_timer.release(); + + //Tell the background to shutdown and wait for it to actually do so + ceqs_terminate_msg_t msg; + msg.type = TerminateBackground; + + _PushToBackgroundQueue((ceqs_msg_t*)&msg); + _impl->background.join(); + + //Go through our incoming messages to make sure we clean up any packets in that need to be freed + ceqs_msg_t eqs_msg; + + while (_impl->foreground_queue.try_dequeue(eqs_msg)) { + if (eqs_msg.type == PacketRecv) { + ConcurrentEQStreamPacketRecvMessage *eqs_msg_in = (ConcurrentEQStreamPacketRecvMessage*)&eqs_msg; + + delete eqs_msg_in->packet; + } + } } -void EQ::Net::ConcurrentEQStreamManager::OnNewConnection(std::function)> func) +void EQ::Net::ConcurrentEQStreamManager::_BackgroundThread() { + _impl->background_running = true; + EQ::EventLoop loop; + auto &eqs_opts = GetOptions(); + auto opts = eqs_opts.daybreak_options; + opts.loop = &loop; + + std::unique_ptr dbcm(new DaybreakConnectionManager(opts)); + dbcm->OnNewConnection(std::bind(&ConcurrentEQStreamManager::DaybreakNewConnection, this, std::placeholders::_1)); + dbcm->OnConnectionStateChange(std::bind(&ConcurrentEQStreamManager::DaybreakConnectionStateChange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + dbcm->OnPacketRecv(std::bind(&ConcurrentEQStreamManager::DaybreakPacketRecv, this, std::placeholders::_1, std::placeholders::_2)); + + _impl->background_loop_timer.reset(new EQ::Timer(&loop, 16, true, + std::bind(&ConcurrentEQStreamManager::_BackgroundTimer, this, std::placeholders::_1))); + + _impl->background_update_stats_timer.reset(new EQ::Timer(&loop, 500, true, + std::bind(&ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer, this, std::placeholders::_1))); + + while (true == _impl->background_running) { + loop.Process(); + std::this_thread::sleep_for(std::chrono::microseconds(1)); + } + + _impl->background_loop_timer.release(); + _impl->background_update_stats_timer.release(); + dbcm.release(); + + ceqs_msg_t eqs_msg; + while (_impl->background_queue.try_dequeue(eqs_msg)) { + if (eqs_msg.type == PacketRecv) { + ConcurrentEQStreamPacketRecvMessage *eqs_msg_in = (ConcurrentEQStreamPacketRecvMessage*)&eqs_msg; + delete eqs_msg_in->packet; + } + } +} + +//Called by background +void EQ::Net::ConcurrentEQStreamManager::_BackgroundTimer(EQ::Timer * t) { + ceqs_msg_t msg_queue[16]; + size_t count = 0; + while ((count = _impl->background_queue.try_dequeue_bulk(msg_queue, 16)) != 0) { + for (size_t i = 0; i < count; ++i) { + _ProcessBackgroundMessage(msg_queue[i]); + } + } } -void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::function, DbProtocolStatus, DbProtocolStatus)> func) +void EQ::Net::ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer(EQ::Timer *t) { + ceqs_msg_t msgs[16]; + int i = 0; + + for (auto &c : _impl->connections) { + auto &connection = c.second; + auto msg = (ceqs_update_daybreak_stats_msg_t*)&msgs[i]; + + msg->type = ceqs_msg_type::UpdateDaybreakStats; + msg->stream_id = connection->GetId(); + msg->stats = connection->GetStats(); + i++; + + printf("Sending stats to client %u\n", connection->GetId()); + if (i >= 16) { + _impl->background_queue.enqueue_bulk(msgs, 16); + i = 0; + } + } + + if (i > 0) { + _impl->background_queue.enqueue_bulk(msgs, i); + } } +//Called by background +void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_msg_t &msg) +{ + switch (msg.type) { + case QueuePacket: + { + ConcurrentEQStreamQueuePacketMessage *msg_in = (ConcurrentEQStreamQueuePacketMessage*)&msg; + printf("(background) Packet Queue for %u with %u bytes with ack: %s\n", msg_in->stream_id, msg_in->packet->Length(), msg_in->ack_req ? "true" : "false"); + + auto iter = _impl->connections.find(msg_in->stream_id); + if (iter != _impl->connections.end()) { + iter->second->QueuePacket(*msg_in->packet, 0, msg_in->ack_req); + } + + delete msg_in->packet; + break; + } + case TerminateBackground: + { + _impl->background_running = false; + break; + } + case CloseConnection: + { + ConcurrentEQStreamCloseConnectionMessage *msg_in = (ConcurrentEQStreamCloseConnectionMessage*)&msg; + auto iter = _impl->connections.find(msg_in->stream_id); + if (iter != _impl->connections.end()) { + iter->second->Close(); + } + break; + } + case ResetStats: + { + ConcurrentEQStreamResetStatsMessage *msg_in = (ConcurrentEQStreamResetStatsMessage*)&msg; + auto iter = _impl->connections.find(msg_in->stream_id); + if (iter != _impl->connections.end()) { + iter->second->ResetStats(); + } + break; + } + default: + printf("(background) New message with unhandled type %u\n", (int)msg.type); + } +} + +//Called by foreground +void EQ::Net::ConcurrentEQStreamManager::_ForegroundTimer(EQ::Timer *t) +{ + ceqs_msg_t msg_queue[16]; + size_t count = 0; + while ((count = _impl->foreground_queue.try_dequeue_bulk(msg_queue, 16)) != 0) { + for (size_t i = 0; i < count; ++i) { + _ProcessForegroundMessage(msg_queue[i]); + } + } +} + +//Called by foreground +void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_msg_t &msg) +{ + switch (msg.type) { + case NewConnection: + { + ConcurrentEQStreamNewConnectionMessage *msg_in = (ConcurrentEQStreamNewConnectionMessage*)&msg; + printf("(foreground) New connection from %s:%u with id: %u\n", msg_in->endpoint, msg_in->remote_port, msg_in->stream_id); + + std::shared_ptr stream(new ConcurrentEQStream(this, + msg_in->stream_id, + msg_in->endpoint, + msg_in->remote_port, + (DbProtocolStatus)msg_in->state)); + + _impl->streams.insert(std::make_pair(msg_in->stream_id, stream)); + if (_impl->on_new_connection) { + _impl->on_new_connection(stream); + } + break; + } + case ConnectionStateChange: + { + ConcurrentEQStreamConnectionStateChangeMessage *msg_in = (ConcurrentEQStreamConnectionStateChangeMessage*)&msg; + printf("(foreground) Connection State Change for %u, was %u now is %u\n", msg_in->stream_id, msg_in->from, msg_in->to); + + + auto iter = _impl->streams.find(msg_in->stream_id); + if (iter != _impl->streams.end()) { + iter->second->_SetState((DbProtocolStatus)msg_in->to); + + if ((DbProtocolStatus)msg_in->to == DbProtocolStatus::StatusDisconnected || (DbProtocolStatus)msg_in->to == DbProtocolStatus::StatusDisconnecting) { + _impl->streams.erase(iter); + } + } + break; + } + case PacketRecv: + { + ConcurrentEQStreamPacketRecvMessage *msg_in = (ConcurrentEQStreamPacketRecvMessage*)&msg; + printf("(foreground) Packet Recv for %u with %u bytes\n", msg_in->stream_id, msg_in->packet->Length()); + std::unique_ptr p(msg_in->packet); + + auto iter = _impl->streams.find(msg_in->stream_id); + if (iter != _impl->streams.end()) { + iter->second->_RecvPacket(std::move(p)); + } + break; + } + case UpdateDaybreakStats: + { + ceqs_update_daybreak_stats_msg_t *msg_in = (ceqs_update_daybreak_stats_msg_t*)&msg; + auto iter = _impl->streams.find(msg_in->stream_id); + if (iter != _impl->streams.end()) { + iter->second->_UpdateStats(msg_in->stats); + } + break; + } + default: + break; + } +} + +void EQ::Net::ConcurrentEQStreamManager::_PushToBackgroundQueue(ceqs_msg_t *msg) +{ + _impl->background_queue.enqueue(*msg); +} + +void EQ::Net::ConcurrentEQStreamManager::_PushToForegroundQueue(ceqs_msg_t *msg) +{ + _impl->foreground_queue.enqueue(*msg); +} + +//Called by foreground +void EQ::Net::ConcurrentEQStreamManager::OnNewConnection(std::function)> func) +{ + _impl->on_new_connection = func; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::function, DbProtocolStatus, DbProtocolStatus)> func) +{ + _impl->on_connection_state_change = func; +} + +//Called by background void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr connection) { + _impl->connections.insert(std::make_pair(connection->GetId(), connection)); + ConcurrentEQStreamNewConnectionMessage msg; + msg.type = ceqs_msg_type::NewConnection; + msg.stream_id = connection->GetId(); + msg.remote_port = connection->RemotePort(); + msg.state = connection->GetStatus(); + strcpy(msg.endpoint, connection->RemoteEndpoint().c_str()); + msg.endpoint[connection->RemoteEndpoint().length()] = 0; + + //Make sure the foreground gets this message + _PushToForegroundQueue((ceqs_msg_t*)&msg); + printf("(background) New connection from %s:%u with id: %u\n", connection->RemoteEndpoint().c_str(), connection->RemotePort(), connection->GetId()); } +//Called by background void EQ::Net::ConcurrentEQStreamManager::DaybreakConnectionStateChange(std::shared_ptr connection, DbProtocolStatus from, DbProtocolStatus to) { + if (to == DbProtocolStatus::StatusDisconnecting || to == DbProtocolStatus::StatusDisconnected) { + auto iter = _impl->connections.find(connection->GetId()); + if (iter != _impl->connections.end()) { + _impl->connections.erase(iter); + } + } + + ConcurrentEQStreamConnectionStateChangeMessage msg; + msg.type = ceqs_msg_type::ConnectionStateChange; + msg.stream_id = connection->GetId(); + msg.from = (int)from; + msg.to = (int)to; + + //Make sure the foreground gets this message + _PushToForegroundQueue((ceqs_msg_t*)&msg); + printf("(background) Connection State Change for %u, was %u now is %u\n", connection->GetId(), (int)from, (int)to); } +//Called by background void EQ::Net::ConcurrentEQStreamManager::DaybreakPacketRecv(std::shared_ptr connection, const Packet &p) { + ConcurrentEQStreamPacketRecvMessage msg; + msg.type = ceqs_msg_type::PacketRecv; + msg.stream_id = connection->GetId(); + msg.packet = new DynamicPacket(); + msg.packet->PutPacket(0, p); + + //Make sure the foreground gets this message + _PushToForegroundQueue((ceqs_msg_t*)&msg); + printf("(background) Packet Recv for %u with %u bytes\n", connection->GetId(), p.Length()); +} + +struct EQ::Net::ConcurrentEQStream::Impl +{ + ConcurrentEQStreamManager *parent; + uint64_t id; + std::string remote_endpoint; + int remote_port; + uint32_t remote_ip; + DbProtocolStatus state; + std::deque> packet_queue; + OpcodeManager **opcode_manager; + DaybreakConnectionStats stats; +}; + +//Called by foreground +EQ::Net::ConcurrentEQStream::ConcurrentEQStream(ConcurrentEQStreamManager *parent, uint64_t id, const std::string &remote_endpoint, int remote_port, DbProtocolStatus state) +{ + _impl.reset(new Impl()); + _impl->parent = parent; + _impl->id = id; + _impl->remote_endpoint = remote_endpoint; + _impl->remote_port = remote_port; + _impl->remote_ip = inet_addr(remote_endpoint.c_str()); + _impl->state = state; + _impl->opcode_manager = nullptr; +} + +//Called by foreground +EQ::Net::ConcurrentEQStream::~ConcurrentEQStream() +{ +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::QueuePacket(const EQApplicationPacket *p, bool ack_req) +{ + if (!_impl->parent) { + return; + } + + if (_impl->opcode_manager && *_impl->opcode_manager) { + auto &options = _impl->parent->GetOptions(); + uint16 opcode = 0; + if (p->GetOpcodeBypass() != 0) { + opcode = p->GetOpcodeBypass(); + } + else { + if (options.track_opcode_stats) { + //m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway + } + opcode = (*_impl->opcode_manager)->EmuToEQ(p->GetOpcode()); + } + + EQ::Net::DynamicPacket *out = new EQ::Net::DynamicPacket(); + switch (options.opcode_size) { + case 1: + out->PutUInt8(0, opcode); + out->PutData(1, p->pBuffer, p->size); + break; + case 2: + out->PutUInt16(0, opcode); + out->PutData(2, p->pBuffer, p->size); + break; + } + + ConcurrentEQStreamQueuePacketMessage msg; + msg.type = ceqs_msg_type::QueuePacket; + msg.stream_id = _impl->id; + msg.packet = out; + msg.ack_req = ack_req; + + //Make sure the background gets this message + _impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg); + + printf("(foreground) Packet Queue for %u with %u bytes with ack: %s\n", _impl->id, out->Length(), ack_req ? "true" : "false"); + } +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::FastQueuePacket(EQApplicationPacket **p, bool ack_req) +{ + std::unique_ptr app(*p); + QueuePacket(app.get(), ack_req); +} + +//Called by foreground +EQApplicationPacket *EQ::Net::ConcurrentEQStream::PopPacket() +{ + if (!_impl->parent) { + return nullptr; + } + + if (_impl->packet_queue.empty()) { + return nullptr; + } + + if (_impl->opcode_manager != nullptr && *_impl->opcode_manager != nullptr) { + auto &options = _impl->parent->GetOptions(); + auto &p = _impl->packet_queue.front(); + + uint16 opcode = 0; + switch (options.opcode_size) { + case 1: + opcode = p->GetUInt8(0); + break; + case 2: + opcode = p->GetUInt16(0); + break; + } + + EmuOpcode emu_op = (*_impl->opcode_manager)->EQToEmu(opcode); + if (options.track_opcode_stats) { + //m_packet_recv_count[emu_op]++; + } + + EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + options.opcode_size, p->Length() - options.opcode_size); + ret->SetProtocolOpcode(opcode); + _impl->packet_queue.pop_front(); + return ret; + } + + return nullptr; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::Close() +{ + if (!_impl->parent) { + return; + } + + ConcurrentEQStreamCloseConnectionMessage msg; + msg.type = CloseConnection; + msg.stream_id = _impl->id; + + _impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg); +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::ReleaseFromUse() +{ +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::RemoveData() +{ +} + +//Called by foreground +std::string EQ::Net::ConcurrentEQStream::GetRemoteAddr() const +{ + return _impl->remote_endpoint; +} + +//Called by foreground +uint32 EQ::Net::ConcurrentEQStream::GetRemoteIP() const +{ + return _impl->remote_ip; +} + +//Called by foreground +uint16 EQ::Net::ConcurrentEQStream::GetRemotePort() const +{ + return _impl->remote_port; +} + +//Called by foreground +bool EQ::Net::ConcurrentEQStream::CheckState(EQStreamState state) +{ + return GetState() == state; +} + +//Called by foreground +std::string EQ::Net::ConcurrentEQStream::Describe() const +{ + return "Concurrent EQStream"; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::SetActive(bool val) +{ +} + +//Called by foreground +EQStreamInterface::MatchState EQ::Net::ConcurrentEQStream::CheckSignature(const Signature *sig) +{ + if (!_impl->parent) { + return MatchFailed; + } + + if (!_impl->packet_queue.empty()) { + auto& options = _impl->parent->GetOptions(); + auto p = _impl->packet_queue.front().get(); + uint16 opcode = 0; + size_t length = p->Length() - options.opcode_size; + switch (options.opcode_size) { + case 1: + opcode = p->GetUInt8(0); + break; + case 2: + opcode = p->GetUInt16(0); + break; + } + + if (sig->ignore_eq_opcode != 0 && opcode == sig->ignore_eq_opcode) { + if (_impl->packet_queue.size() > 1) { + p = _impl->packet_queue[1].get(); + opcode = 0; + length = p->Length() - options.opcode_size; + switch (options.opcode_size) { + case 1: + opcode = p->GetUInt8(0); + break; + case 2: + opcode = p->GetUInt16(0); + break; + } + } + else { + return MatchNotReady; + } + } + + if (opcode == sig->first_eq_opcode) { + if (length == sig->first_length) { + // LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length matched {3}", + // m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length); + return MatchSuccessful; + } + else if (length == 0) { + // LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length is ignored.", + // m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode); + return MatchSuccessful; + } + else { + // LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} but length {3} did not match expected {4}", + // m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length, sig->first_length); + return MatchFailed; + } + } + else { + //LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode {1:#x} did not match expected {2:#x}", + // m_connection->RemoteEndpoint(), m_connection->RemotePort(), opcode, sig->first_eq_opcode); + return MatchFailed; + } + } + + return MatchNotReady; +} + +//Called by foreground +EQStreamState EQ::Net::ConcurrentEQStream::GetState() +{ + switch (_impl->state) { + case StatusConnecting: + return UNESTABLISHED; + case StatusConnected: + return ESTABLISHED; + case StatusDisconnecting: + return DISCONNECTING; + default: + return CLOSED; + } +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::SetOpcodeManager(OpcodeManager **opm) +{ + _impl->opcode_manager = opm; +} + +//Called by foreground +EQStreamInterface::Stats EQ::Net::ConcurrentEQStream::GetStats() const +{ + EQStreamInterface::Stats ret; + ret.DaybreakStats = _impl->stats; + return ret; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::ResetStats() +{ + if (!_impl->parent) { + return; + } + + ConcurrentEQStreamResetStatsMessage msg; + msg.type = ceqs_msg_type::ResetStats; + msg.stream_id = _impl->id; + + _impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg); +} + +//Called by foreground +EQStreamManagerInterface *EQ::Net::ConcurrentEQStream::GetManager() const +{ + return _impl->parent; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::_SetState(DbProtocolStatus state) +{ + _impl->state = state; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::_RecvPacket(std::unique_ptr p) +{ + _impl->packet_queue.push_back(std::move(p)); +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::_UpdateStats(const DaybreakConnectionStats &stats) +{ + _impl->stats = stats; +} + +//Called by foreground +void EQ::Net::ConcurrentEQStream::_Invalidate() +{ + _impl->parent = nullptr; } diff --git a/common/net/eqstream_concurrent.h b/common/net/eqstream_concurrent.h index 51b6dae2a..bbec7c19f 100644 --- a/common/net/eqstream_concurrent.h +++ b/common/net/eqstream_concurrent.h @@ -1,10 +1,12 @@ #pragma once #include "../eq_stream_intf.h" +#include "eqstream_concurrent_message.h" #include namespace EQ { + class Timer; namespace Net { class ConcurrentEQStream; @@ -14,22 +16,30 @@ namespace EQ ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options); ~ConcurrentEQStreamManager(); - void OnNewConnection(std::function)> func); - void OnConnectionStateChange(std::function, DbProtocolStatus, DbProtocolStatus)> func); + virtual void OnNewConnection(std::function)> func); + virtual void OnConnectionStateChange(std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func); + + void _PushToBackgroundQueue(ceqs_msg_t* msg); + void _PushToForegroundQueue(ceqs_msg_t* msg); private: struct Impl; std::unique_ptr _impl; + void _BackgroundThread(); + void _BackgroundTimer(EQ::Timer *t); + void _BackgroundUpdateStatsTimer(EQ::Timer *t); + void _ProcessBackgroundMessage(const ceqs_msg_t &msg); + void _ForegroundTimer(EQ::Timer *t); + void _ProcessForegroundMessage(const ceqs_msg_t &msg); void DaybreakNewConnection(std::shared_ptr connection); void DaybreakConnectionStateChange(std::shared_ptr connection, DbProtocolStatus from, DbProtocolStatus to); void DaybreakPacketRecv(std::shared_ptr connection, const Packet &p); - friend class EQStream; }; class ConcurrentEQStream : public EQStreamInterface { public: - ConcurrentEQStream(EQStreamManagerInterface *parent, uint64_t id); + ConcurrentEQStream(ConcurrentEQStreamManager *parent, uint64_t id, const std::string &remote_endpoint, int remote_port, DbProtocolStatus state); ~ConcurrentEQStream(); virtual void QueuePacket(const EQApplicationPacket *p, bool ack_req = true); @@ -50,11 +60,15 @@ namespace EQ virtual Stats GetStats() const; virtual void ResetStats(); virtual EQStreamManagerInterface* GetManager() const; + + void _SetState(DbProtocolStatus state); + void _RecvPacket(std::unique_ptr p); + void _UpdateStats(const DaybreakConnectionStats &stats); + void _Invalidate(); private: struct Impl; std::unique_ptr _impl; - friend class ConcurrentEQStreamManager; }; } } diff --git a/loginserver/client_manager.cpp b/loginserver/client_manager.cpp index 4117430a7..9d6974d05 100644 --- a/loginserver/client_manager.cpp +++ b/loginserver/client_manager.cpp @@ -37,7 +37,7 @@ ClientManager::ClientManager() run_server = false; } - titanium_stream->OnNewConnection([this](std::shared_ptr stream) { + titanium_stream->OnNewConnection([this](std::shared_ptr stream) { LogF(Logs::General, Logs::Login_Server, "New Titanium client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort()); stream->SetOpcodeManager(&titanium_ops); Client *c = new Client(stream, cv_titanium); @@ -55,7 +55,7 @@ ClientManager::ClientManager() run_server = false; } - sod_stream->OnNewConnection([this](std::shared_ptr stream) { + sod_stream->OnNewConnection([this](std::shared_ptr stream) { LogF(Logs::General, Logs::Login_Server, "New SoD client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort()); stream->SetOpcodeManager(&sod_ops); Client *c = new Client(stream, cv_sod); diff --git a/ucs/clientlist.cpp b/ucs/clientlist.cpp index 03579f550..4df8cb17d 100644 --- a/ucs/clientlist.cpp +++ b/ucs/clientlist.cpp @@ -482,7 +482,7 @@ Clientlist::Clientlist(int ChatPort) { if (!ChatOpMgr->LoadOpcodes("mail_opcodes.conf")) exit(1); - chatsf->OnNewConnection([this](std::shared_ptr stream) { + chatsf->OnNewConnection([this](std::shared_ptr stream) { LogF(Logs::General, Logs::Login_Server, "New Client UDP connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort()); stream->SetOpcodeManager(&ChatOpMgr); diff --git a/world/net.cpp b/world/net.cpp index 88ad4e6e1..989a7c2e8 100644 --- a/world/net.cpp +++ b/world/net.cpp @@ -33,7 +33,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA #include "../common/version.h" #include "../common/eqtime.h" #include "../common/event/event_loop.h" -#include "../common/net/eqstream.h" +#include "../common/net/eqstream_concurrent.h" #include "../common/opcodemgr.h" #include "../common/guilds.h" #include "../common/eq_stream_ident.h" @@ -505,7 +505,7 @@ int main(int argc, char** argv) { opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); opts.daybreak_options.outgoing_data_rate = RuleR(Network, ClientDataRate); - EQ::Net::EQStreamManager eqsm(opts); + EQ::Net::ConcurrentEQStreamManager eqsm(opts); //register all the patches we have avaliable with the stream identifier. EQStreamIdentifier stream_identifier; @@ -520,7 +520,7 @@ int main(int argc, char** argv) { std::shared_ptr eqs; EQStreamInterface *eqsi; - eqsm.OnNewConnection([&stream_identifier](std::shared_ptr stream) { + eqsm.OnNewConnection([&stream_identifier](std::shared_ptr stream) { stream_identifier.AddStream(stream); LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort())); }); diff --git a/zone/net.cpp b/zone/net.cpp index 7c06c8fc7..cfd93990c 100644 --- a/zone/net.cpp +++ b/zone/net.cpp @@ -510,7 +510,7 @@ int main(int argc, char** argv) { eqsm.reset(new EQ::Net::EQStreamManager(opts)); eqsf_open = true; - eqsm->OnNewConnection([&stream_identifier](std::shared_ptr stream) { + eqsm->OnNewConnection([&stream_identifier](std::shared_ptr stream) { stream_identifier.AddStream(stream); LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort())); });