mirror of
https://github.com/EQEmu/Server.git
synced 2026-01-10 17:33:51 +00:00
Added proper support for arena in nats_manager, misc cleanup
This commit is contained in:
parent
c9a79af79b
commit
40ef387496
@ -1,6 +1,9 @@
|
||||
syntax = "proto3";
|
||||
package eqproto;
|
||||
|
||||
option cc_enable_arenas = true;
|
||||
option optimize_for = SPEED;
|
||||
|
||||
message ChannelMessage {
|
||||
int32 chan_num = 1;
|
||||
int32 language = 2;
|
||||
@ -16,8 +19,7 @@ message ChannelMessage {
|
||||
bool is_emote = 12;
|
||||
//0 not queued, 1 queued, 2 queue full, 3 offline
|
||||
int32 queued = 13;
|
||||
//You can specify a zone id if you want a message exclusively to one zone
|
||||
int32 zone_id = 14;
|
||||
string result = 15;
|
||||
}
|
||||
|
||||
message CommandMessage {
|
||||
|
||||
@ -47,7 +47,7 @@ func asyncChannelMessageSubscriber(nc *nats.Conn) {
|
||||
// and poll for messages syncronously
|
||||
func syncChannelMessageSubscriber(nc *nats.Conn) {
|
||||
|
||||
sub, err := nc.SubscribeSync("world.channel_message")
|
||||
sub, err := nc.SubscribeSync("world.channel_message.out")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@ -72,7 +72,7 @@ func testBroadcastMessage(nc *nats.Conn, msg string) {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err = nc.Publish("world.channel_message", d); err != nil {
|
||||
if err = nc.Publish("world.channel_message.in", d); err != nil {
|
||||
log.Println("Failed to publish:", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@ -5,11 +5,12 @@
|
||||
#include "clientlist.h"
|
||||
#include "worlddb.h"
|
||||
|
||||
#include <google/protobuf/arena.h>
|
||||
#include "../common/seperator.h"
|
||||
#include "../common/eqemu_logsys.h"
|
||||
#ifndef PROTO_H
|
||||
#define PROTO_H
|
||||
#include "../common/proto/message.pb.h"
|
||||
#include "../common/message->pb.h"
|
||||
#endif
|
||||
#include "../common/servertalk.h"
|
||||
#include "../common/string_util.h"
|
||||
@ -19,6 +20,8 @@ extern LoginServerList loginserverlist;
|
||||
extern ClientList client_list;
|
||||
const WorldConfig *worldConfig;
|
||||
|
||||
google::protobuf::Arena the_arena;
|
||||
|
||||
NatsManager::NatsManager()
|
||||
{
|
||||
//new timers, object initialization
|
||||
@ -69,6 +72,15 @@ bool NatsManager::connect() {
|
||||
|
||||
Log(Logs::General, Logs::NATS, "connected to %s", connection.c_str());
|
||||
nats_timer.Disable();
|
||||
|
||||
s = natsConnection_SubscribeSync(&channelMessageSub, conn, "world.channel_message->in");
|
||||
if (s != NATS_OK)
|
||||
Log(Logs::General, Logs::NATS, "world.channel_message->in: failed to subscribe: %s", nats_GetLastError(&s));
|
||||
|
||||
s = natsConnection_SubscribeSync(&commandMessageSub, conn, "world.command_message->in");
|
||||
if (s != NATS_OK)
|
||||
Log(Logs::General, Logs::NATS, "world.command_message->in: failed to subscribe: %s", nats_GetLastError(&s));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -79,174 +91,229 @@ void NatsManager::Process()
|
||||
if (!connect())
|
||||
return;
|
||||
s = NATS_OK;
|
||||
for (int count = 0; (s == NATS_OK) && count < 5; count++)
|
||||
for (int count = 0; (s == NATS_OK) && count < 5; count++)
|
||||
{
|
||||
s = natsSubscription_NextMsg(&msg, channelMessageSub, 1);
|
||||
if (s != NATS_OK) break;
|
||||
Log(Logs::General, Logs::NATS, "Got Broadcast Message '%s'", natsMsg_GetData(msg));
|
||||
eqproto::ChannelMessage message;
|
||||
if (!message.ParseFromString(natsMsg_GetData(msg))) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to marshal");
|
||||
if (s != NATS_OK)
|
||||
break;
|
||||
|
||||
eqproto::ChannelMessage* message = google::protobuf::Arena::CreateMessage<eqproto::ChannelMessage>(&the_arena);
|
||||
if (!message->ParseFromString(natsMsg_GetData(msg))) {
|
||||
Log(Logs::General, Logs::NATS, "world.channel_message->in: failed to parse");
|
||||
natsMsg_Destroy(msg);
|
||||
continue;
|
||||
}
|
||||
ChannelMessageEvent(&message);
|
||||
|
||||
GetChannelMessage(message, natsMsg_GetReply(msg));
|
||||
}
|
||||
|
||||
s = NATS_OK;
|
||||
for (int count = 0; (s == NATS_OK) && count < 5; count++)
|
||||
{
|
||||
s = natsSubscription_NextMsg(&msg, commandMessageSub, 1);
|
||||
if (s != NATS_OK) break;
|
||||
Log(Logs::General, Logs::NATS, "Got Command Message '%s'", natsMsg_GetData(msg));
|
||||
eqproto::CommandMessage message;
|
||||
|
||||
|
||||
if (!message.ParseFromString(natsMsg_GetData(msg))) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to marshal");
|
||||
if (s != NATS_OK)
|
||||
break;
|
||||
|
||||
eqproto::CommandMessage* message = google::protobuf::Arena::CreateMessage<eqproto::CommandMessage>(&the_arena);
|
||||
if (!message->ParseFromString(natsMsg_GetData(msg))) {
|
||||
Log(Logs::General, Logs::NATS, "world.command_message->in: failed to parse");
|
||||
natsMsg_Destroy(msg);
|
||||
continue;
|
||||
}
|
||||
CommandMessageEvent(&message, natsMsg_GetReply(msg));
|
||||
GetCommandMessage(message, natsMsg_GetReply(msg));
|
||||
}
|
||||
}
|
||||
|
||||
void NatsManager::OnChannelMessage(ServerChannelMessage_Struct* msg) {
|
||||
if (!connect()) return;
|
||||
if (!connect())
|
||||
return;
|
||||
|
||||
eqproto::ChannelMessage message;
|
||||
eqproto::ChannelMessage* message = google::protobuf::Arena::CreateMessage<eqproto::ChannelMessage>(&the_arena);
|
||||
|
||||
message.set_fromadmin(msg->fromadmin);
|
||||
message.set_deliverto(msg->deliverto);
|
||||
message.set_guilddbid(msg->guilddbid);
|
||||
message.set_noreply(msg->noreply);
|
||||
message.set_queued(msg->queued);
|
||||
message.set_chan_num(msg->chan_num);
|
||||
message.set_message(msg->message);
|
||||
message.set_to(msg->to);
|
||||
message.set_language(msg->language);
|
||||
message.set_from(msg->from);
|
||||
SendChannelMessage(&message);
|
||||
message->set_fromadmin(msg->fromadmin);
|
||||
message->set_deliverto(msg->deliverto);
|
||||
message->set_guilddbid(msg->guilddbid);
|
||||
message->set_noreply(msg->noreply);
|
||||
message->set_queued(msg->queued);
|
||||
message->set_chan_num(msg->chan_num);
|
||||
message->set_message(msg->message);
|
||||
message->set_to(msg->to);
|
||||
message->set_language(msg->language);
|
||||
message->set_from(msg->from);
|
||||
SendChannelMessage(message);
|
||||
return;
|
||||
}
|
||||
|
||||
void NatsManager::OnEmoteMessage(ServerEmoteMessage_Struct* msg) {
|
||||
if (!connect()) return;
|
||||
|
||||
eqproto::ChannelMessage message;
|
||||
message.set_guilddbid(msg->guilddbid);
|
||||
message.set_minstatus(msg->minstatus);
|
||||
message.set_type(msg->type);
|
||||
message.set_message(msg->message);
|
||||
message.set_to(msg->to);
|
||||
message.set_is_emote(true);
|
||||
SendChannelMessage(&message);
|
||||
if (!connect())
|
||||
return;
|
||||
eqproto::ChannelMessage* message = google::protobuf::Arena::CreateMessage<eqproto::ChannelMessage>(&the_arena);
|
||||
message->set_guilddbid(msg->guilddbid);
|
||||
message->set_minstatus(msg->minstatus);
|
||||
message->set_type(msg->type);
|
||||
message->set_message(msg->message);
|
||||
message->set_to(msg->to);
|
||||
message->set_is_emote(true);
|
||||
SendChannelMessage(message);
|
||||
return;
|
||||
}
|
||||
|
||||
void NatsManager::SendAdminMessage(std::string adminMessage) {
|
||||
if (!connect()) return;
|
||||
// SendAdminMessage will send an admin message to NATS
|
||||
void NatsManager::SendAdminMessage(std::string adminMessage, const char* reply) {
|
||||
if (!connect())
|
||||
return;
|
||||
eqproto::ChannelMessage* message = google::protobuf::Arena::CreateMessage<eqproto::ChannelMessage>(&the_arena);
|
||||
|
||||
eqproto::ChannelMessage message;
|
||||
message.set_message(adminMessage.c_str());
|
||||
message->set_message(adminMessage.c_str());
|
||||
std::string pubMessage;
|
||||
if (!message.SerializeToString(&pubMessage)) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to serialize message to string");
|
||||
if (!message->SerializeToString(&pubMessage)) {
|
||||
Log(Logs::General, Logs::NATS, "global.admin_message->out: failed to serialize message to string");
|
||||
return;
|
||||
}
|
||||
s = natsConnection_PublishString(conn, "world.admin_message", pubMessage.c_str());
|
||||
|
||||
if (reply && strlen(reply) > 0)
|
||||
s = natsConnection_Publish(conn, reply, (const void*)pubMessage.c_str(), pubMessage.length());
|
||||
else
|
||||
s = natsConnection_Publish(conn, "global.admin_message->out", (const void*)pubMessage.c_str(), pubMessage.length());
|
||||
|
||||
if (s != NATS_OK) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to publish to world.admin_message");
|
||||
Log(Logs::General, Logs::NATS, "global.admin_message->out failed: %s", nats_GetLastError(&s));
|
||||
return;
|
||||
}
|
||||
Log(Logs::General, Logs::NATS, "world.admin_message: %s", adminMessage.c_str());
|
||||
Log(Logs::General, Logs::NATS, "global.admin_message->out: %s", adminMessage.c_str());
|
||||
}
|
||||
|
||||
//Send (publish) message to NATS
|
||||
void NatsManager::SendChannelMessage(eqproto::ChannelMessage* message) {
|
||||
if (!connect()) return;
|
||||
// SendChannelMessage will send a channel message to NATS
|
||||
void NatsManager::SendChannelMessage(eqproto::ChannelMessage* message, const char* reply) {
|
||||
if (!connect())
|
||||
return;
|
||||
|
||||
std::string pubMessage;
|
||||
if (!message->SerializeToString(&pubMessage)) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to serialize message to string");
|
||||
Log(Logs::General, Logs::NATS, "world.channel_message->out: failed to serialize message to string");
|
||||
return;
|
||||
}
|
||||
s = natsConnection_PublishString(conn, "world.channel_message", pubMessage.c_str());
|
||||
|
||||
if (reply && strlen(reply) > 0)
|
||||
s = natsConnection_Publish(conn, reply, (const void*)pubMessage.c_str(), pubMessage.length());
|
||||
else
|
||||
s = natsConnection_Publish(conn, "world.channel_message->out", (const void*)pubMessage.c_str(), pubMessage.length());
|
||||
|
||||
if (s != NATS_OK) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to send world.command_message");
|
||||
Log(Logs::General, Logs::NATS, "world.channel_message->out failed: %s");
|
||||
return;
|
||||
}
|
||||
Log(Logs::General, Logs::NATS, "world.channel_message->out: %s", message->message().c_str());
|
||||
}
|
||||
|
||||
void NatsManager::CommandMessageEvent(eqproto::CommandMessage* message, const char* reply) {
|
||||
if (!connect()) return;
|
||||
// SendCommandMessage will send a channel message to NATS
|
||||
void NatsManager::SendCommandMessage(eqproto::CommandMessage* message, const char* reply) {
|
||||
if (!connect())
|
||||
return;
|
||||
|
||||
if (message->result().length() <= 1)
|
||||
message->set_result("Failed to parse command.");
|
||||
|
||||
|
||||
std::string pubMessage;
|
||||
//Log(Logs::General, Logs::NATS, "Command: %s", message->command().c_str());
|
||||
// message->params()
|
||||
|
||||
|
||||
if (message->command().compare("who") == 0) {
|
||||
if (!message->SerializeToString(&pubMessage)) {
|
||||
Log(Logs::General, Logs::NATS, "world.command_message->out: failed to serialize message to string");
|
||||
return;
|
||||
}
|
||||
|
||||
if (reply && strlen(reply) > 0)
|
||||
s = natsConnection_Publish(conn, reply, (const void*)pubMessage.c_str(), pubMessage.length());
|
||||
else
|
||||
s = natsConnection_Publish(conn, "world.command_message->out", (const void*)pubMessage.c_str(), pubMessage.length());
|
||||
|
||||
if (s != NATS_OK) {
|
||||
Log(Logs::General, Logs::NATS, "world.command_message->out failed: %s");
|
||||
return;
|
||||
}
|
||||
Log(Logs::General, Logs::NATS, "world.command_message->out: %s", message->command().c_str());
|
||||
}
|
||||
|
||||
// GetCommandMessage is used to process a command message
|
||||
void NatsManager::GetCommandMessage(eqproto::CommandMessage* message, const char* reply) {
|
||||
if (!connect())
|
||||
return;
|
||||
|
||||
std::string pubMessage;
|
||||
|
||||
Log(Logs::General, Logs::NATS, "world.command_message->in: %s", message->command().c_str());
|
||||
|
||||
if (message->command().compare("who") == 0) {
|
||||
message->set_result(client_list.GetWhoAll());
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
if (message->command().compare("unlock") == 0) {
|
||||
WorldConfig::UnlockWorld();
|
||||
if (loginserverlist.Connected()) loginserverlist.SendStatus();
|
||||
if (loginserverlist.Connected())
|
||||
loginserverlist.SendStatus();
|
||||
message->set_result("Server is now unlocked.");
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
if (message->command().compare("lock") == 0) {
|
||||
WorldConfig::LockWorld();
|
||||
if (loginserverlist.Connected()) loginserverlist.SendStatus();
|
||||
if (loginserverlist.Connected())
|
||||
loginserverlist.SendStatus();
|
||||
message->set_result("Server is now locked.");
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
if(message->command().compare("worldshutdown") == 0) {
|
||||
uint32 time=0;
|
||||
uint32 interval=0;
|
||||
|
||||
if(message->params_size() < 1) {
|
||||
message->set_result("worldshutdown - Shuts down the server and all zones.\n \
|
||||
Usage: worldshutdown now - Shuts down the server and all zones immediately.\n \
|
||||
Usage: worldshutdown disable - Stops the server from a previously scheduled shut down.\n \
|
||||
Usage: worldshutdown [timer] [interval] - Shuts down the server and all zones after [timer] seconds and sends warning every [interval] seconds\n");
|
||||
} else if(message->params_size() == 2 && ((time=atoi(message->params(0).c_str()))>0) && ((interval=atoi(message->params(1).c_str()))>0)) {
|
||||
if(message->params_size() == 2 && ((time=atoi(message->params(0).c_str()))>0) && ((interval=atoi(message->params(1).c_str()))>0)) {
|
||||
message->set_result(StringFormat("Sending shutdown packet now, World will shutdown in: %i minutes with an interval of: %i seconds", (time / 60), interval));
|
||||
zoneserver_list.WorldShutDown(time, interval);
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
else if(strcasecmp(message->params(0).c_str(), "now") == 0){
|
||||
message->set_result("Sending shutdown packet now");
|
||||
zoneserver_list.WorldShutDown(0, 0);
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
else if(strcasecmp(message->params(0).c_str(), "disable") == 0){
|
||||
message->set_result("Shutdown prevented, next time I may not be so forgiving...");
|
||||
zoneserver_list.SendEmoteMessage(0, 0, 0, 15, "<SYSTEMWIDE MESSAGE>:SYSTEM MSG:World shutdown aborted.");
|
||||
zoneserver_list.shutdowntimer->Disable();
|
||||
zoneserver_list.reminder->Disable();
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (message->result().length() <= 1) {
|
||||
message->set_result("Failed to parse command.");
|
||||
}
|
||||
|
||||
if (!message->SerializeToString(&pubMessage)) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to serialize command message to string");
|
||||
message->set_result("worldshutdown - Shuts down the server and all zones.\n \
|
||||
Usage: worldshutdown now - Shuts down the server and all zones immediately.\n \
|
||||
Usage: worldshutdown disable - Stops the server from a previously scheduled shut down.\n \
|
||||
Usage: worldshutdown [timer] [interval] - Shuts down the server and all zones after [timer] seconds and sends warning every [interval] seconds\n");
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
s = natsConnection_PublishString(conn, reply, pubMessage.c_str());
|
||||
if (s != NATS_OK) {
|
||||
Log(Logs::General, Logs::NATS, "Failed to send CommandMessageEvent");
|
||||
return;
|
||||
}
|
||||
message->set_result("unknown command sent");
|
||||
SendCommandMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
//Send a message to all zone servers.
|
||||
void NatsManager::ChannelMessageEvent(eqproto::ChannelMessage* message) {
|
||||
if (!connect()) return;
|
||||
if (message->zone_id() > 0) return; //do'nt process non-zero messages
|
||||
Log(Logs::General, Logs::NATS, "Broadcasting Message");
|
||||
// GetChannelMessage is when a 3rd party app sends a channel message via NATS.
|
||||
void NatsManager::GetChannelMessage(eqproto::ChannelMessage* message, const char* reply) {
|
||||
if (!connect())
|
||||
return;
|
||||
|
||||
|
||||
Log(Logs::General, Logs::NATS, "world.channel_message->in: %s", message->message().c_str());
|
||||
if (message->is_emote()) { //emote message
|
||||
zoneserver_list.SendEmoteMessage(message->to().c_str(), message->guilddbid(), message->minstatus(), message->type(), message->message().c_str());
|
||||
message->set_result("Sent message");
|
||||
SendChannelMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -256,20 +323,18 @@ void NatsManager::ChannelMessageEvent(eqproto::ChannelMessage* message) {
|
||||
strcpy(&tmpname[1], message->from().c_str());
|
||||
//TODO: add To support on tells
|
||||
int channel = message->chan_num();
|
||||
if (channel < 1) channel = 5; //default to ooc
|
||||
if (channel < 1)
|
||||
channel = 5; //default to ooc
|
||||
zoneserver_list.SendChannelMessage(tmpname, 0, channel, message->language(), message->message().c_str());
|
||||
}
|
||||
|
||||
void NatsManager::Save()
|
||||
{
|
||||
message->set_result("Sent message");
|
||||
SendChannelMessage(message, reply);
|
||||
return;
|
||||
}
|
||||
|
||||
void NatsManager::Load()
|
||||
{
|
||||
if (!connect()) return;
|
||||
if (!connect())
|
||||
return;
|
||||
|
||||
s = natsConnection_SubscribeSync(&channelMessageSub, conn, "world.channel_message");
|
||||
s = natsConnection_SubscribeSync(&commandMessageSub, conn, "world.command_message");
|
||||
return;
|
||||
}
|
||||
@ -22,10 +22,11 @@ public:
|
||||
void Process();
|
||||
void OnChannelMessage(ServerChannelMessage_Struct * msg);
|
||||
void OnEmoteMessage(ServerEmoteMessage_Struct * msg);
|
||||
void SendAdminMessage(std::string adminMessage);
|
||||
void ChannelMessageEvent(eqproto::ChannelMessage* message);
|
||||
void CommandMessageEvent(eqproto::CommandMessage* message, const char* reply);
|
||||
void SendChannelMessage(eqproto::ChannelMessage* message);
|
||||
void SendAdminMessage(std::string adminMessage, const char* reply = nullptr);
|
||||
void GetChannelMessage(eqproto::ChannelMessage* message, const char* reply = nullptr);
|
||||
void SendChannelMessage(eqproto::ChannelMessage* message, const char* reply = nullptr);
|
||||
void GetCommandMessage(eqproto::CommandMessage* message, const char* reply = nullptr);
|
||||
void SendCommandMessage(eqproto::CommandMessage* message, const char* reply = nullptr);
|
||||
void Save();
|
||||
void Load();
|
||||
protected:
|
||||
|
||||
@ -411,12 +411,13 @@ void ZoneServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p) {
|
||||
if (pack->size < sizeof(ServerChannelMessage_Struct))
|
||||
break;
|
||||
ServerChannelMessage_Struct* scm = (ServerChannelMessage_Struct*)pack->pBuffer;
|
||||
|
||||
nats.OnChannelMessage(scm);
|
||||
if (scm->chan_num == 20)
|
||||
{
|
||||
UCSLink.SendMessage(scm->from, scm->message);
|
||||
break;
|
||||
}
|
||||
nats.OnChannelMessage(scm);
|
||||
if (scm->chan_num == 7 || scm->chan_num == 14) {
|
||||
if (scm->deliverto[0] == '*') {
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -16,7 +16,7 @@ public:
|
||||
~NatsManager();
|
||||
void Process();
|
||||
void Unregister();
|
||||
void ZoneSubscribe(const char * zonename);
|
||||
void ZoneSubscribe(const char * zonename, uint32 instance);
|
||||
void Load();
|
||||
void OnChannelMessageEvent(uint32 entity_id, ChannelMessage_Struct * cm);
|
||||
void OnEntityEvent(const EmuOpcode op, uint32 entity_id, uint32 target_id);
|
||||
@ -37,7 +37,8 @@ protected:
|
||||
natsConnection *conn = NULL;
|
||||
natsStatus s;
|
||||
natsOptions *opts = NULL;
|
||||
std::string subscribedZonename;
|
||||
std::string subscribedZoneName;
|
||||
uint32 subscribedZoneInstance;
|
||||
natsSubscription *zoneChannelMessageSub = NULL;
|
||||
natsSubscription *zoneCommandMessageSub = NULL;
|
||||
natsSubscription *zoneEntityEventSubscribeAllSub = NULL;
|
||||
|
||||
@ -515,7 +515,7 @@ int main(int argc, char** argv) {
|
||||
entity_list.BeaconProcess();
|
||||
entity_list.EncounterProcess();
|
||||
if (zone->IsLoaded()) {
|
||||
nats.ZoneSubscribe(zone->GetShortName());
|
||||
nats.ZoneSubscribe(zone->GetShortName(), zone->GetInstanceID());
|
||||
nats.Process();
|
||||
}
|
||||
if (zone) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user