Nats initial commit

This commit is contained in:
Xackery
2018-02-10 23:42:38 -08:00
parent 0b97db9fd2
commit c69b9a95b7
42 changed files with 27943 additions and 1082 deletions
+2
View File
@@ -13,6 +13,7 @@ SET(world_sources
lfplist.cpp
login_server.cpp
login_server_list.cpp
nats_manager.cpp
net.cpp
queryserv.cpp
ucs.cpp
@@ -40,6 +41,7 @@ SET(world_headers
lfplist.h
login_server.h
login_server_list.h
nats_manager.h
net.h
queryserv.h
sof_char_create_data.h
+4
View File
@@ -46,6 +46,7 @@
#include "clientlist.h"
#include "wguild_mgr.h"
#include "sof_char_create_data.h"
#include "nats_manager.h"
#include <iostream>
#include <iomanip>
@@ -84,6 +85,7 @@ extern ClientList client_list;
extern EQEmu::Random emu_random;
extern uint32 numclients;
extern volatile bool RunLoops;
extern NatsManager nats;
Client::Client(EQStreamInterface* ieqs)
: autobootup_timeout(RuleI(World, ZoneAutobootTimeoutMS)),
@@ -799,6 +801,7 @@ bool Client::HandleEnterWorldPacket(const EQApplicationPacket *app) {
}
else {
Log(Logs::Detail, Logs::World_Server, "'%s' is trying to go home before they're able...", char_name);
nats.SendAdminMessage(StringFormat("Hacker: %s [%s]: MQGoHome: player tried to go home before they were able.", GetAccountName(), char_name));
database.SetHackerFlag(GetAccountName(), char_name, "MQGoHome: player tried to go home before they were able.");
eqs->Close();
return true;
@@ -823,6 +826,7 @@ bool Client::HandleEnterWorldPacket(const EQApplicationPacket *app) {
}
else {
Log(Logs::Detail, Logs::World_Server, "'%s' is trying to go to tutorial but are not allowed...", char_name);
nats.SendAdminMessage(StringFormat("Hacker %s [%s]: MQTutorial: player tried to enter the tutorial without having tutorial enabled for this character.", GetAccountName(), char_name));
database.SetHackerFlag(GetAccountName(), char_name, "MQTutorial: player tried to enter the tutorial without having tutorial enabled for this character.");
eqs->Close();
return true;
+55
View File
@@ -1467,4 +1467,59 @@ void ClientList::OnTick(EQ::Timer *t)
}
web_interface.SendEvent(out);
}
std::string ClientList::GetWhoAll() {
std::string reply = "";
LinkedListIterator<ClientListEntry*> iterator(clientlist);
ClientListEntry* cle = 0;
uint32 x = 0;
char* output = 0;
uint32 outsize = 0, outlen = 0;
reply.append("Players on server:\n");
iterator.Reset();
while (iterator.MoreElements()) {
cle = iterator.GetData();
const char* tmpZone = database.GetZoneName(cle->zone());
if (cle->Online() < CLE_Status_Zoning ||
x > 20) {
iterator.Advance();
continue;
}
if (cle->Admin() >= 250) reply.append("* GM-Impossible * ");
else if (cle->Admin() >= 200) reply.append("* GM-Mgmt * ");
else if (cle->Admin() >= 180) reply.append("* GM-Coder * ");
else if (cle->Admin() >= 170) reply.append("* GM-Areas * ");
else if (cle->Admin() >= 160) reply.append("* QuestMaster * ");
else if (cle->Admin() >= 150) reply.append("* GM-Lead Admin * ");
else if (cle->Admin() >= 100) reply.append("* GM-Admin * ");
else if (cle->Admin() >= 95) reply.append("* GM-Staff * ");
else if (cle->Admin() >= 90) reply.append("* EQ Support * ");
else if (cle->Admin() >= 85) reply.append("* GM-Tester * ");
else if (cle->Admin() >= 81) reply.append("* Senior Guide * ");
else if (cle->Admin() >= 80) reply.append("* QuestTroupe * ");
else if (cle->Admin() >= 50) reply.append("* Guide * ");
//else if (cle->Admin() >= 20) reply.append("* Apprentice Guide * ");
//else if (cle->Admin() >= 10) reply.append("* Steward * ");
if (cle->Anon() == 2) reply.append("[RolePlay");
else if (cle->Anon() == 1) reply.append("[ANON");
else reply.append("[");
reply.append(StringFormat(" %i %s ] %s", cle->level(), GetClassIDName(cle->class_(), cle->level()), cle->name()));
reply.append(StringFormat(" %s zone: %s", GetRaceIDName(cle->race()), database.GetZoneName(cle->zone())));
if (guild_mgr.GuildExists(cle->GuildID())) reply.append(StringFormat(" <%s>", guild_mgr.GetGuildName(cle->GuildID())));
if (cle->LFG()) reply.append(" LFG");
reply.append("\n");
x++;
iterator.Advance();
}
if (x >= 20) reply.append("First 20 shown, ");
reply.append(StringFormat("%u total players online.", x));
return reply;
}
+1
View File
@@ -69,6 +69,7 @@ public:
int GetClientCount();
void GetClients(const char *zone_name, std::vector<ClientListEntry *> &into);
std::string GetWhoAll();
private:
void OnTick(EQ::Timer *t);
inline uint32 GetNextCLEID() { return NextCLEID++; }
+268
View File
@@ -0,0 +1,268 @@
#include "nats_manager.h"
#include "nats.h"
#include "zonelist.h"
#include "login_server_list.h"
#include "clientlist.h"
#include "worlddb.h"
#include "../common/seperator.h"
#include "../common/eqemu_logsys.h"
#ifndef PROTO_H
#define PROTO_H
#include "../common/proto/message.pb.h"
#endif
#include "../common/servertalk.h"
#include "../common/string_util.h"
extern ZSList zoneserver_list;
extern LoginServerList loginserverlist;
extern ClientList client_list;
const WorldConfig *worldConfig;
NatsManager::NatsManager()
{
//new timers, object initialization
worldConfig = WorldConfig::get();
}
NatsManager::~NatsManager()
{
// Destroy all our objects to avoid report of memory leak
natsStatistics_Destroy(stats);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);
// To silence reports of memory still in used with valgrind
nats_Close();
}
bool NatsManager::connect() {
auto ncs = natsConnection_Status(conn);
if (ncs == CONNECTED) return true;
if (nats_timer.Enabled() && !nats_timer.Check()) return false;
natsOptions *opts = NULL;
natsOptions_Create(&opts);
natsOptions_SetMaxReconnect(opts, 0);
natsOptions_SetReconnectWait(opts, 0);
natsOptions_SetAllowReconnect(opts, false);
//The timeout is going to cause a 100ms delay on all connected clients every X seconds (20s)
//since this blocks the connection. It can be set lower or higher delay,
//but since NATS is a second priority I wanted server impact minimum.
natsOptions_SetTimeout(opts, 100);
std::string connection = StringFormat("nats://%s:%d", worldConfig->NATSHost.c_str(), worldConfig->NATSPort);
if (worldConfig->NATSHost.length() == 0) connection = "nats://localhost:4222";
natsOptions_SetURL(opts, connection.c_str());
s = natsConnection_Connect(&conn, opts);
natsOptions_Destroy(opts);
if (s != NATS_OK) {
Log(Logs::General, Logs::NATS, "failed to connect to %s: %s, retrying in 20s", connection.c_str(), nats_GetLastError(&s));
conn = NULL;
nats_timer.Enable();
nats_timer.SetTimer(20000);
return false;
}
Log(Logs::General, Logs::NATS, "connected to %s", connection.c_str());
nats_timer.Disable();
return true;
}
void NatsManager::Process()
{
natsMsg *msg = NULL;
if (!connect()) return;
s = NATS_OK;
for (int count = 0; (s == NATS_OK) && count < 5; count++)
{
s = natsSubscription_NextMsg(&msg, channelMessageSub, 1);
if (s != NATS_OK) break;
Log(Logs::General, Logs::NATS, "Got Broadcast Message '%s'", natsMsg_GetData(msg));
eqproto::ChannelMessage message;
if (!message.ParseFromString(natsMsg_GetData(msg))) {
Log(Logs::General, Logs::NATS, "Failed to marshal");
natsMsg_Destroy(msg);
continue;
}
ChannelMessageEvent(&message);
}
s = NATS_OK;
for (int count = 0; (s == NATS_OK) && count < 5; count++)
{
s = natsSubscription_NextMsg(&msg, commandMessageSub, 1);
if (s != NATS_OK) break;
Log(Logs::General, Logs::NATS, "Got Command Message '%s'", natsMsg_GetData(msg));
eqproto::CommandMessage message;
if (!message.ParseFromString(natsMsg_GetData(msg))) {
Log(Logs::General, Logs::NATS, "Failed to marshal");
natsMsg_Destroy(msg);
continue;
}
CommandMessageEvent(&message, natsMsg_GetReply(msg));
}
}
void NatsManager::OnChannelMessage(ServerChannelMessage_Struct* msg) {
if (!connect()) return;
eqproto::ChannelMessage message;
message.set_fromadmin(msg->fromadmin);
message.set_deliverto(msg->deliverto);
message.set_guilddbid(msg->guilddbid);
message.set_noreply(msg->noreply);
message.set_queued(msg->queued);
message.set_chan_num(msg->chan_num);
message.set_message(msg->message);
message.set_to(msg->to);
message.set_language(msg->language);
message.set_from(msg->from);
SendChannelMessage(&message);
return;
}
void NatsManager::OnEmoteMessage(ServerEmoteMessage_Struct* msg) {
if (!connect()) return;
eqproto::ChannelMessage message;
message.set_guilddbid(msg->guilddbid);
message.set_minstatus(msg->minstatus);
message.set_type(msg->type);
message.set_message(msg->message);
message.set_to(msg->to);
message.set_is_emote(true);
SendChannelMessage(&message);
return;
}
void NatsManager::SendAdminMessage(std::string adminMessage) {
if (!connect()) return;
eqproto::ChannelMessage message;
message.set_message(adminMessage.c_str());
std::string pubMessage;
if (!message.SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "Failed to serialize message to string");
return;
}
s = natsConnection_PublishString(conn, "AdminMessage", pubMessage.c_str());
if (s != NATS_OK) {
Log(Logs::General, Logs::NATS, "Failed to SendAdminMessage");
}
Log(Logs::General, Logs::NATS, "AdminMessage: %s", adminMessage.c_str());
}
//Send (publish) message to NATS
void NatsManager::SendChannelMessage(eqproto::ChannelMessage* message) {
if (!connect()) return;
std::string pubMessage;
if (!message->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "Failed to serialize message to string");
return;
}
s = natsConnection_PublishString(conn, "ChannelMessage", pubMessage.c_str());
if (s != NATS_OK) {
Log(Logs::General, Logs::NATS, "Failed to send ChannelMessageEvent");
}
}
void NatsManager::CommandMessageEvent(eqproto::CommandMessage* message, const char* reply) {
if (!connect()) return;
std::string pubMessage;
//Log(Logs::General, Logs::NATS, "Command: %s", message->command().c_str());
// message->params()
if (message->command().compare("who") == 0) {
message->set_result(client_list.GetWhoAll());
}
if (message->command().compare("unlock") == 0) {
WorldConfig::UnlockWorld();
if (loginserverlist.Connected()) loginserverlist.SendStatus();
message->set_result("Server is now unlocked.");
}
if (message->command().compare("lock") == 0) {
WorldConfig::LockWorld();
if (loginserverlist.Connected()) loginserverlist.SendStatus();
message->set_result("Server is now locked.");
}
if(message->command().compare("worldshutdown") == 0) {
uint32 time=0;
uint32 interval=0;
if(message->params_size() < 1) {
message->set_result("worldshutdown - Shuts down the server and all zones.\n \
Usage: worldshutdown now - Shuts down the server and all zones immediately.\n \
Usage: worldshutdown disable - Stops the server from a previously scheduled shut down.\n \
Usage: worldshutdown [timer] [interval] - Shuts down the server and all zones after [timer] seconds and sends warning every [interval] seconds\n");
} else if(message->params_size() == 2 && ((time=atoi(message->params(0).c_str()))>0) && ((interval=atoi(message->params(1).c_str()))>0)) {
message->set_result(StringFormat("Sending shutdown packet now, World will shutdown in: %i minutes with an interval of: %i seconds", (time / 60), interval));
zoneserver_list.WorldShutDown(time, interval);
}
else if(strcasecmp(message->params(0).c_str(), "now") == 0){
message->set_result("Sending shutdown packet now");
zoneserver_list.WorldShutDown(0, 0);
}
else if(strcasecmp(message->params(0).c_str(), "disable") == 0){
message->set_result("Shutdown prevented, next time I may not be so forgiving...");
zoneserver_list.SendEmoteMessage(0, 0, 0, 15, "<SYSTEMWIDE MESSAGE>:SYSTEM MSG:World shutdown aborted.");
zoneserver_list.shutdowntimer->Disable();
zoneserver_list.reminder->Disable();
}
}
if (message->result().length() <= 1) {
message->set_result("Failed to parse command.");
}
if (!message->SerializeToString(&pubMessage)) {
Log(Logs::General, Logs::NATS, "Failed to serialize command message to string");
return;
}
s = natsConnection_PublishString(conn, reply, pubMessage.c_str());
if (s != NATS_OK) {
Log(Logs::General, Logs::NATS, "Failed to send CommandMessageEvent");
return;
}
}
//Send a message to all zone servers.
void NatsManager::ChannelMessageEvent(eqproto::ChannelMessage* message) {
if (!connect()) return;
if (message->zone_id() > 0) return; //do'nt process non-zero messages
Log(Logs::General, Logs::NATS, "Broadcasting Message");
if (message->is_emote()) { //emote message
zoneserver_list.SendEmoteMessage(message->to().c_str(), message->guilddbid(), message->minstatus(), message->type(), message->message().c_str());
return;
}
//normal broadcast
char tmpname[64];
tmpname[0] = '*';
strcpy(&tmpname[1], message->from().c_str());
//TODO: add To support on tells
int channel = message->chan_num();
if (channel < 1) channel = 5; //default to ooc
zoneserver_list.SendChannelMessage(tmpname, 0, channel, message->language(), message->message().c_str());
}
void NatsManager::Save()
{
return;
}
void NatsManager::Load()
{
if (!connect()) return;
s = natsConnection_SubscribeSync(&channelMessageSub, conn, "ChannelMessageWorld");
s = natsConnection_SubscribeSync(&commandMessageSub, conn, "CommandMessageWorld");
return;
}
+43
View File
@@ -0,0 +1,43 @@
#ifndef NATS_H
#define NATS_H
#include "nats.h"
#include "world_config.h"
#include "../common/global_define.h"
#include "../common/types.h"
#include "../common/timer.h"
#ifndef PROTO_H
#define PROTO_H
#include "../common/proto/message.pb.h"
#endif
#include "../common/servertalk.h"
class NatsManager
{
public:
NatsManager();
~NatsManager();
void Process();
void OnChannelMessage(ServerChannelMessage_Struct * msg);
void OnEmoteMessage(ServerEmoteMessage_Struct * msg);
void SendAdminMessage(std::string adminMessage);
void ChannelMessageEvent(eqproto::ChannelMessage* message);
void CommandMessageEvent(eqproto::CommandMessage* message, const char* reply);
void SendChannelMessage(eqproto::ChannelMessage* message);
void Save();
void Load();
protected:
bool connect();
Timer nats_timer;
natsConnection *conn = NULL;
natsStatus s;
natsStatistics *stats = NULL;
natsOptions *opts = NULL;
natsSubscription *channelMessageSub = NULL;
natsSubscription *commandMessageSub = NULL;
natsSubscription *adminMessageSub = NULL;
};
#endif
+5 -1
View File
@@ -83,6 +83,7 @@ union semun {
#include "queryserv.h"
#include "web_interface.h"
#include "console.h"
#include "nats_manager.h"
#include "../common/net/servertalk_server.h"
@@ -102,6 +103,7 @@ bool holdzones = false;
const WorldConfig *Config;
EQEmuLogSys LogSys;
WebInterfaceList web_interface;
NatsManager nats;
void CatchSignal(int sig_num);
void CheckForServerScript(bool force_download = false);
@@ -386,6 +388,7 @@ int main(int argc, char** argv) {
adventure_manager.Load();
adventure_manager.LoadLeaderboardInfo();
nats.Load();
Log(Logs::General, Logs::World_Server, "Purging expired instances");
database.PurgeExpiredInstances();
@@ -412,7 +415,7 @@ int main(int argc, char** argv) {
server_opts.credentials = Config->SharedKey;
server_connection->Listen(server_opts);
Log(Logs::General, Logs::World_Server, "Server (TCP) listener started.");
nats.SendAdminMessage("World server booted up.");
server_connection->OnConnectionIdentified("Zone", [&console](std::shared_ptr<EQ::Net::ServertalkServerConnection> connection) {
LogF(Logs::General, Logs::World_Server, "New Zone Server connection from {2} at {0}:{1}",
connection->Handle()->RemoteIP(), connection->Handle()->RemotePort(), connection->GetUUID());
@@ -559,6 +562,7 @@ int main(int argc, char** argv) {
launcher_list.Process();
LFPGroupList.Process();
adventure_manager.Process();
nats.Process();
if (InterserverTimer.Check()) {
InterserverTimer.Start();
+4
View File
@@ -35,6 +35,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "adventure_manager.h"
#include "ucs.h"
#include "queryserv.h"
#include "nats_manager.h"
extern ClientList client_list;
extern GroupLFPList LFPGroupList;
@@ -44,6 +45,7 @@ extern volatile bool RunLoops;
extern AdventureManager adventure_manager;
extern UCSConnection UCSLink;
extern QueryServConnection QSLink;
extern NatsManager nats;
void CatchSignal(int sig_num);
ZoneServer::ZoneServer(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection, EQ::Net::ConsoleServer *console)
@@ -413,6 +415,7 @@ void ZoneServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p) {
UCSLink.SendMessage(scm->from, scm->message);
break;
}
nats.OnChannelMessage(scm);
if (scm->chan_num == 7 || scm->chan_num == 14) {
if (scm->deliverto[0] == '*') {
@@ -505,6 +508,7 @@ void ZoneServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p) {
}
case ServerOP_EmoteMessage: {
ServerEmoteMessage_Struct* sem = (ServerEmoteMessage_Struct*)pack->pBuffer;
nats.OnEmoteMessage(sem);
zoneserver_list.SendEmoteMessageRaw(sem->to, sem->guilddbid, sem->minstatus, sem->type, sem->message);
break;
}