[Player Events] Add QS processing, mutex tweaks (#2984)

* [Player Events] Add QS processing, mutex tweaks

* Update ucs.cpp

* Move the size process check out of the server to server networking thread
This commit is contained in:
Chris Miles 2023-02-24 18:01:59 -06:00 committed by GitHub
parent e8f1aa253a
commit bad631df59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 21 deletions

View File

@ -70,19 +70,17 @@ DBcore::~DBcore()
// Sends the MySQL server a keepalive
void DBcore::ping()
{
if (!m_query_lock.try_lock()) {
if (!MDatabase.trylock()) {
// well, if's it's locked, someone's using it. If someone's using it, it doesnt need a keepalive
return;
}
mysql_ping(&mysql);
m_query_lock.unlock();
MDatabase.unlock();
}
MySQLRequestResult DBcore::QueryDatabase(std::string query, bool retryOnFailureOnce)
{
m_query_lock.lock();
auto r = QueryDatabase(query.c_str(), query.length(), retryOnFailureOnce);
m_query_lock.unlock();
return r;
}
@ -98,6 +96,8 @@ MySQLRequestResult DBcore::QueryDatabase(const char *query, uint32 querylen, boo
BenchTimer timer;
timer.reset();
LockMutex lock(&MDatabase);
// Reconnect if we are not connected before hand.
if (pStatus != Connected) {
Open();

View File

@ -140,10 +140,6 @@ void PlayerEventLogs::AddToQueue(const PlayerEventLogsRepository::PlayerEventLog
m_batch_queue_lock.lock();
m_record_batch_queue.emplace_back(log);
m_batch_queue_lock.unlock();
if (m_record_batch_queue.size() >= RuleI(Logging, BatchPlayerEventProcessChunkSize)) {
ProcessBatchQueue();
}
}
// fills common event data in the SendEvent function
@ -607,10 +603,10 @@ std::string PlayerEventLogs::GetDiscordPayloadFromEvent(const PlayerEvent::Playe
return payload;
}
// general process function, used in world or UCS depending on rule Logging:PlayerEventsQSProcess
// general process function, used in world or QS depending on rule Logging:PlayerEventsQSProcess
void PlayerEventLogs::Process()
{
if (m_process_batch_events_timer.Check()) {
if (m_process_batch_events_timer.Check() || m_record_batch_queue.size() >= RuleI(Logging, BatchPlayerEventProcessChunkSize)) {
ProcessBatchQueue();
}

View File

@ -33,6 +33,7 @@
#include "worldserver.h"
#include "../common/path_manager.h"
#include "../common/zone_store.h"
#include "../common/events/player_event_logs.h"
#include <list>
#include <signal.h>
#include <thread>
@ -47,6 +48,7 @@ WorldServer *worldserver = 0;
EQEmuLogSys LogSys;
PathManager path;
ZoneStore zone_store;
PlayerEventLogs player_event_logs;
void CatchSignal(int sig_num)
{
@ -106,6 +108,9 @@ int main()
/* Load Looking For Guild Manager */
lfguildmanager.LoadDatabase();
Timer player_event_process_timer(1000);
player_event_logs.SetDatabase(&database)->Init();
auto loop_fn = [&](EQ::Timer* t) {
Timer::SetCurrentTime();
@ -117,6 +122,10 @@ int main()
if (LFGuildExpireTimer.Check()) {
lfguildmanager.ExpireEntries();
}
if (player_event_process_timer.Check()) {
player_event_logs.Process();
}
};
EQ::Timer process_timer(loop_fn);

View File

@ -29,6 +29,8 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "lfguild.h"
#include "queryservconfig.h"
#include "worldserver.h"
#include "../common/events/player_events.h"
#include "../common/events/player_event_logs.h"
#include <iomanip>
#include <iostream>
#include <stdarg.h>
@ -89,6 +91,17 @@ void WorldServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p)
case 0: {
break;
}
case ServerOP_PlayerEvent: {
auto n = PlayerEvent::PlayerEventContainer{};
auto s = (ServerSendPlayerEvent_Struct *) p.Data();
EQ::Util::MemoryStreamReader ss(s->cereal_data, s->cereal_size);
cereal::BinaryInputArchive archive(ss);
archive(n);
player_event_logs.AddToQueue(n.player_event_log);
break;
}
case ServerOP_KeepAlive: {
break;
}

View File

@ -131,12 +131,6 @@ inline void UpdateWindowTitle(std::string new_title)
#endif
}
void PlayerEventQueueListener() {
while (RunLoops) {
player_event_logs.Process();
Sleep(1000);
}
}
/**
* World process entrypoint
@ -381,13 +375,9 @@ int main(int argc, char **argv)
}
);
Timer player_event_process_timer(1000);
player_event_logs.SetDatabase(&database)->Init();
if (!RuleB(Logging, PlayerEventsQSProcess)) {
LogInfo("[PlayerEventQueueListener] Booting queue processor");
std::thread(PlayerEventQueueListener).detach();
}
auto loop_fn = [&](EQ::Timer* t) {
Timer::SetCurrentTime();
@ -435,6 +425,10 @@ int main(int argc, char **argv)
client_list.Process();
if (player_event_process_timer.Check()) {
player_event_logs.Process();
}
if (PurgeInstanceTimer.Check()) {
database.PurgeExpiredInstances();
database.PurgeAllDeletedDataBuckets();