From bad631df591c9e12884a82e4986173bddd5afe57 Mon Sep 17 00:00:00 2001 From: Chris Miles Date: Fri, 24 Feb 2023 18:01:59 -0600 Subject: [PATCH] [Player Events] Add QS processing, mutex tweaks (#2984) * [Player Events] Add QS processing, mutex tweaks * Update ucs.cpp * Move the size process check out of the server to server networking thread --- common/dbcore.cpp | 8 ++++---- common/events/player_event_logs.cpp | 8 ++------ queryserv/queryserv.cpp | 9 +++++++++ queryserv/worldserver.cpp | 13 +++++++++++++ world/main.cpp | 16 +++++----------- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/common/dbcore.cpp b/common/dbcore.cpp index 64ab2f005..1a4fbef7a 100644 --- a/common/dbcore.cpp +++ b/common/dbcore.cpp @@ -70,19 +70,17 @@ DBcore::~DBcore() // Sends the MySQL server a keepalive void DBcore::ping() { - if (!m_query_lock.try_lock()) { + if (!MDatabase.trylock()) { // well, if's it's locked, someone's using it. If someone's using it, it doesnt need a keepalive return; } mysql_ping(&mysql); - m_query_lock.unlock(); + MDatabase.unlock(); } MySQLRequestResult DBcore::QueryDatabase(std::string query, bool retryOnFailureOnce) { - m_query_lock.lock(); auto r = QueryDatabase(query.c_str(), query.length(), retryOnFailureOnce); - m_query_lock.unlock(); return r; } @@ -98,6 +96,8 @@ MySQLRequestResult DBcore::QueryDatabase(const char *query, uint32 querylen, boo BenchTimer timer; timer.reset(); + LockMutex lock(&MDatabase); + // Reconnect if we are not connected before hand. if (pStatus != Connected) { Open(); diff --git a/common/events/player_event_logs.cpp b/common/events/player_event_logs.cpp index 8e9872962..8f5cee5a7 100644 --- a/common/events/player_event_logs.cpp +++ b/common/events/player_event_logs.cpp @@ -140,10 +140,6 @@ void PlayerEventLogs::AddToQueue(const PlayerEventLogsRepository::PlayerEventLog m_batch_queue_lock.lock(); m_record_batch_queue.emplace_back(log); m_batch_queue_lock.unlock(); - - if (m_record_batch_queue.size() >= RuleI(Logging, BatchPlayerEventProcessChunkSize)) { - ProcessBatchQueue(); - } } // fills common event data in the SendEvent function @@ -607,10 +603,10 @@ std::string PlayerEventLogs::GetDiscordPayloadFromEvent(const PlayerEvent::Playe return payload; } -// general process function, used in world or UCS depending on rule Logging:PlayerEventsQSProcess +// general process function, used in world or QS depending on rule Logging:PlayerEventsQSProcess void PlayerEventLogs::Process() { - if (m_process_batch_events_timer.Check()) { + if (m_process_batch_events_timer.Check() || m_record_batch_queue.size() >= RuleI(Logging, BatchPlayerEventProcessChunkSize)) { ProcessBatchQueue(); } diff --git a/queryserv/queryserv.cpp b/queryserv/queryserv.cpp index aabfb5f1d..a504b8e77 100644 --- a/queryserv/queryserv.cpp +++ b/queryserv/queryserv.cpp @@ -33,6 +33,7 @@ #include "worldserver.h" #include "../common/path_manager.h" #include "../common/zone_store.h" +#include "../common/events/player_event_logs.h" #include #include #include @@ -47,6 +48,7 @@ WorldServer *worldserver = 0; EQEmuLogSys LogSys; PathManager path; ZoneStore zone_store; +PlayerEventLogs player_event_logs; void CatchSignal(int sig_num) { @@ -106,6 +108,9 @@ int main() /* Load Looking For Guild Manager */ lfguildmanager.LoadDatabase(); + Timer player_event_process_timer(1000); + player_event_logs.SetDatabase(&database)->Init(); + auto loop_fn = [&](EQ::Timer* t) { Timer::SetCurrentTime(); @@ -117,6 +122,10 @@ int main() if (LFGuildExpireTimer.Check()) { lfguildmanager.ExpireEntries(); } + + if (player_event_process_timer.Check()) { + player_event_logs.Process(); + } }; EQ::Timer process_timer(loop_fn); diff --git a/queryserv/worldserver.cpp b/queryserv/worldserver.cpp index 8019197e6..f46baf983 100644 --- a/queryserv/worldserver.cpp +++ b/queryserv/worldserver.cpp @@ -29,6 +29,8 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA #include "lfguild.h" #include "queryservconfig.h" #include "worldserver.h" +#include "../common/events/player_events.h" +#include "../common/events/player_event_logs.h" #include #include #include @@ -89,6 +91,17 @@ void WorldServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p) case 0: { break; } + case ServerOP_PlayerEvent: { + auto n = PlayerEvent::PlayerEventContainer{}; + auto s = (ServerSendPlayerEvent_Struct *) p.Data(); + EQ::Util::MemoryStreamReader ss(s->cereal_data, s->cereal_size); + cereal::BinaryInputArchive archive(ss); + archive(n); + + player_event_logs.AddToQueue(n.player_event_log); + + break; + } case ServerOP_KeepAlive: { break; } diff --git a/world/main.cpp b/world/main.cpp index 6e46d70ad..3aa175899 100644 --- a/world/main.cpp +++ b/world/main.cpp @@ -131,12 +131,6 @@ inline void UpdateWindowTitle(std::string new_title) #endif } -void PlayerEventQueueListener() { - while (RunLoops) { - player_event_logs.Process(); - Sleep(1000); - } -} /** * World process entrypoint @@ -381,13 +375,9 @@ int main(int argc, char **argv) } ); + Timer player_event_process_timer(1000); player_event_logs.SetDatabase(&database)->Init(); - if (!RuleB(Logging, PlayerEventsQSProcess)) { - LogInfo("[PlayerEventQueueListener] Booting queue processor"); - std::thread(PlayerEventQueueListener).detach(); - } - auto loop_fn = [&](EQ::Timer* t) { Timer::SetCurrentTime(); @@ -435,6 +425,10 @@ int main(int argc, char **argv) client_list.Process(); + if (player_event_process_timer.Check()) { + player_event_logs.Process(); + } + if (PurgeInstanceTimer.Check()) { database.PurgeExpiredInstances(); database.PurgeAllDeletedDataBuckets();