Shared task service

This commit is contained in:
KimLS 2019-06-09 18:39:32 -07:00
parent c1484a698c
commit b3a3d9bec5
23 changed files with 338 additions and 54 deletions

View File

@ -359,6 +359,7 @@ IF(EQEMU_BUILD_SERVER)
ADD_SUBDIRECTORY(ucs)
ADD_SUBDIRECTORY(queryserv)
ADD_SUBDIRECTORY(eqlaunch)
ADD_SUBDIRECTORY(services/tasks)
ENDIF(EQEMU_BUILD_SERVER)
IF(EQEMU_BUILD_LOGIN)
ADD_SUBDIRECTORY(loginserver)

View File

@ -64,6 +64,7 @@ SET(common_sources
say_link.cpp
serialize_buffer.cpp
serverinfo.cpp
service.cpp
shareddb.cpp
skills.cpp
spdat.cpp
@ -197,6 +198,7 @@ SET(common_headers
serialize_buffer.h
serverinfo.h
servertalk.h
service.h
shareddb.h
skills.h
spdat.h

View File

@ -19,23 +19,35 @@ EQ::Net::ServertalkClient::~ServertalkClient()
{
}
void EQ::Net::ServertalkClient::Send(uint16_t opcode, EQ::Net::Packet &p)
void EQ::Net::ServertalkClient::Send(uint16_t opcode, const EQ::Net::Packet &p)
{
EQ::Net::DynamicPacket out;
#ifdef ENABLE_SECURITY
if (m_encrypted) {
auto idx = 0;
if (p.Length() == 0) {
p.PutUInt8(0, 0);
DynamicPacket p;
p.PutInt8(0, 0);
out.PutUInt32(0, p.Length() + crypto_secretbox_MACBYTES);
out.PutUInt16(4, opcode);
std::unique_ptr<unsigned char[]> cipher(new unsigned char[p.Length() + crypto_secretbox_MACBYTES]);
crypto_box_easy_afternm(&cipher[0], (unsigned char*)p.Data(), p.Length(), m_nonce_ours, m_shared_key);
(*(uint64_t*)&m_nonce_ours[0])++;
out.PutData(6, &cipher[0], p.Length() + crypto_secretbox_MACBYTES);
}
else {
out.PutUInt32(0, p.Length() + crypto_secretbox_MACBYTES);
out.PutUInt16(4, opcode);
out.PutUInt32(0, p.Length() + crypto_secretbox_MACBYTES);
out.PutUInt16(4, opcode);
std::unique_ptr<unsigned char[]> cipher(new unsigned char[p.Length() + crypto_secretbox_MACBYTES]);
std::unique_ptr<unsigned char[]> cipher(new unsigned char[p.Length() + crypto_secretbox_MACBYTES]);
crypto_box_easy_afternm(&cipher[0], (unsigned char*)p.Data(), p.Length(), m_nonce_ours, m_shared_key);
(*(uint64_t*)&m_nonce_ours[0])++;
out.PutData(6, &cipher[0], p.Length() + crypto_secretbox_MACBYTES);
crypto_box_easy_afternm(&cipher[0], (unsigned char*)p.Data(), p.Length(), m_nonce_ours, m_shared_key);
(*(uint64_t*)&m_nonce_ours[0])++;
out.PutData(6, &cipher[0], p.Length() + crypto_secretbox_MACBYTES);
}
}
else {
out.PutUInt32(0, p.Length());

View File

@ -18,7 +18,7 @@ namespace EQ
ServertalkClient(const std::string &addr, int port, bool ipv6, const std::string &identifier, const std::string &credentials);
~ServertalkClient();
void Send(uint16_t opcode, EQ::Net::Packet &p);
void Send(uint16_t opcode, const EQ::Net::Packet &p);
void SendPacket(ServerPacket *p);
void OnConnect(std::function<void(ServertalkClient*)> cb) { m_on_connect_cb = cb; }
void OnMessage(uint16_t opcode, std::function<void(uint16_t, EQ::Net::Packet&)> cb);

View File

@ -17,7 +17,7 @@ EQ::Net::ServertalkLegacyClient::~ServertalkLegacyClient()
{
}
void EQ::Net::ServertalkLegacyClient::Send(uint16_t opcode, EQ::Net::Packet &p)
void EQ::Net::ServertalkLegacyClient::Send(uint16_t opcode, const EQ::Net::Packet &p)
{
if (!m_connection)
return;

View File

@ -15,7 +15,7 @@ namespace EQ
ServertalkLegacyClient(const std::string &addr, int port, bool ipv6);
~ServertalkLegacyClient();
void Send(uint16_t opcode, EQ::Net::Packet &p);
void Send(uint16_t opcode, const EQ::Net::Packet &p);
void SendPacket(ServerPacket *p);
void OnConnect(std::function<void(ServertalkLegacyClient*)> cb) { m_on_connect_cb = cb; }
void OnMessage(uint16_t opcode, std::function<void(uint16_t, EQ::Net::Packet&)> cb);

View File

@ -19,22 +19,32 @@ EQ::Net::ServertalkServerConnection::~ServertalkServerConnection()
{
}
void EQ::Net::ServertalkServerConnection::Send(uint16_t opcode, EQ::Net::Packet & p)
void EQ::Net::ServertalkServerConnection::Send(uint16_t opcode, const EQ::Net::Packet & p)
{
EQ::Net::DynamicPacket out;
#ifdef ENABLE_SECURITY
if (m_encrypted) {
if (p.Length() == 0) {
DynamicPacket p;
p.PutUInt8(0, 0);
out.PutUInt32(0, p.Length() + crypto_secretbox_MACBYTES);
out.PutUInt16(4, opcode);
std::unique_ptr<unsigned char[]> cipher(new unsigned char[p.Length() + crypto_secretbox_MACBYTES]);
crypto_box_easy_afternm(&cipher[0], (unsigned char*)p.Data(), p.Length(), m_nonce_ours, m_shared_key);
(*(uint64_t*)&m_nonce_ours[0])++;
out.PutData(6, &cipher[0], p.Length() + crypto_secretbox_MACBYTES);
}
else {
out.PutUInt32(0, p.Length() + crypto_secretbox_MACBYTES);
out.PutUInt16(4, opcode);
out.PutUInt32(0, p.Length() + crypto_secretbox_MACBYTES);
out.PutUInt16(4, opcode);
std::unique_ptr<unsigned char[]> cipher(new unsigned char[p.Length() + crypto_secretbox_MACBYTES]);
crypto_box_easy_afternm(&cipher[0], (unsigned char*)p.Data(), p.Length(), m_nonce_ours, m_shared_key);
(*(uint64_t*)&m_nonce_ours[0])++;
out.PutData(6, &cipher[0], p.Length() + crypto_secretbox_MACBYTES);
std::unique_ptr<unsigned char[]> cipher(new unsigned char[p.Length() + crypto_secretbox_MACBYTES]);
crypto_box_easy_afternm(&cipher[0], (unsigned char*)p.Data(), p.Length(), m_nonce_ours, m_shared_key);
(*(uint64_t*)&m_nonce_ours[0])++;
out.PutData(6, &cipher[0], p.Length() + crypto_secretbox_MACBYTES);
}
}
else {
out.PutUInt32(0, p.Length());

View File

@ -19,7 +19,7 @@ namespace EQ
ServertalkServerConnection(std::shared_ptr<EQ::Net::TCPConnection> c, ServertalkServer *parent, bool encrypted, bool allow_downgrade);
~ServertalkServerConnection();
void Send(uint16_t opcode, EQ::Net::Packet &p);
void Send(uint16_t opcode, const EQ::Net::Packet &p);
void SendPacket(ServerPacket *p);
void OnMessage(uint16_t opcode, std::function<void(uint16_t, EQ::Net::Packet&)> cb);
void OnMessage(std::function<void(uint16_t, EQ::Net::Packet&)> cb);

View File

@ -1350,19 +1350,6 @@ struct ServerSharedTaskMember_Struct { // used for various things we just need t
#define TASKJOINOOZ_LEVEL 3
#define TASKJOINOOZ_TIMER 4
struct RouteToHeader
{
char filter[64];
char type[128];
template <class Archive>
void serialize(Archive & archive)
{
archive(filter,
type);
}
};
#pragma pack()
#endif

65
common/service.cpp Normal file
View File

@ -0,0 +1,65 @@
#include "service.h"
#include "event/event_loop.h"
#include "event/timer.h"
#include <thread>
#include <chrono>
struct EQ::Service::Impl
{
bool running;
std::string identifier;
size_t heartbeat_duration_ms;
size_t sleep_duration_ms;
std::unique_ptr<EQ::WorldConnection> world_connection;
std::unique_ptr<EQ::Timer> heartbeat_timer;
std::chrono::steady_clock::time_point last_time;
};
EQ::Service::Service(const std::string &identifier, size_t heartbeat_duration_ms, size_t sleep_duration_ms)
{
_impl.reset(new Impl());
_impl->running = false;
_impl->identifier = identifier;
_impl->heartbeat_duration_ms = heartbeat_duration_ms;
_impl->sleep_duration_ms = sleep_duration_ms;
}
EQ::Service::~Service()
{
}
void EQ::Service::Run()
{
OnStart();
_impl->world_connection.reset(new EQ::WorldConnection(_impl->identifier));
_impl->world_connection->SetOnRoutedMessageHandler([this](const std::string& filter, const std::string& identifier, int type, const EQ::Net::Packet& p) {
OnRoutedMessage(identifier, type, p);
});
_impl->last_time = std::chrono::steady_clock::now();
_impl->heartbeat_timer.reset(new EQ::Timer(_impl->heartbeat_duration_ms, true, [this](EQ::Timer *t) {
auto now = std::chrono::steady_clock::now();
auto time_since = std::chrono::duration_cast<std::chrono::duration<double>>(now - _impl->last_time);
OnHeartbeat(time_since.count());
}));
_impl->running = true;
auto &loop = EQ::EventLoop::Get();
auto sleep_duration = _impl->sleep_duration_ms;
while (_impl->running) {
loop.Process();
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_duration));
}
_impl->heartbeat_timer.release();
_impl->world_connection.release();
OnStop();
}
void EQ::Service::RouteMessage(const std::string &filter, int type, const EQ::Net::Packet &p)
{
_impl->world_connection->RouteMessage(filter, type, p);
}

43
common/service.h Normal file
View File

@ -0,0 +1,43 @@
#pragma once
#include <string>
#include <memory>
#include "world_connection.h"
#include "net/packet.h"
#include "eqemu_logsys.h"
#include "platform.h"
#include "crash.h"
#define EQRegisterService(type) EQEmuLogSys LogSys; \
int main(int argc, char **argv) { \
LogSys.LoadLogSettingsDefaults(); \
set_exception_handler(); \
type srv; \
srv.Run(); \
return 0; \
} \
namespace EQ
{
class Service
{
public:
Service(const std::string &identifier, size_t heartbeat_duration_ms, size_t sleep_duration);
virtual ~Service();
void Run();
protected:
virtual void OnStart() = 0;
virtual void OnStop() = 0;
virtual void OnHeartbeat(double time_since_last) = 0;
virtual void OnRoutedMessage(const std::string& identifier, int type, const EQ::Net::Packet& p) = 0;
void RouteMessage(const std::string &filter, int type, const EQ::Net::Packet& p);
private:
struct Impl;
std::unique_ptr<Impl> _impl;
};
}

View File

@ -36,19 +36,24 @@ bool EQ::WorldConnection::Connected() const {
return m_connection->Connected();
}
void EQ::WorldConnection::RouteMessage(const std::string &filter, const EQ::Net::Packet &p)
void EQ::WorldConnection::RouteMessage(const std::string &filter, int type, const EQ::Net::Packet &p)
{
if (!m_connection->Connected()) {
return;
}
RouteToHeader header;
strn0cpy(header.filter, filter.data(), 64);
strn0cpy(header.type, m_connection->GetIdentifier().data(), 128);
auto identifier = m_connection->GetIdentifier();
EQ::Net::DynamicPacket out;
out.PutSerialize(0, header);
out.PutUInt32(out.Length(), static_cast<uint32_t>(filter.length()));
out.PutString(out.Length(), filter);
out.PutUInt32(out.Length(), static_cast<uint32_t>(identifier.length()));
out.PutString(out.Length(), identifier);
out.PutInt32(out.Length(), type);
out.PutInt32(out.Length(), static_cast<uint32_t>(p.Length()));
out.PutPacket(out.Length(), p);
m_connection->Send(ServerOP_RouteTo, out);
}
void EQ::WorldConnection::_HandleMessage(uint16 opcode, const EQ::Net::Packet &p)
@ -61,9 +66,17 @@ void EQ::WorldConnection::_HandleMessage(uint16 opcode, const EQ::Net::Packet &p
void EQ::WorldConnection::_HandleRoutedMessage(uint16 opcode, const EQ::Net::Packet &p)
{
if (m_on_routed_message) {
auto header = p.GetSerialize<RouteToHeader>(0);
auto np = EQ::Net::StaticPacket((int8_t*)p.Data() + sizeof(RouteToHeader), p.Length() - sizeof(RouteToHeader));
m_on_routed_message(header.filter, header.type, np);
auto idx = 0;
auto filter_length = p.GetInt32(idx); idx += sizeof(int32_t);
auto filter = p.GetString(idx, filter_length); idx += filter_length;
auto identifier_length = p.GetInt32(idx); idx += sizeof(int32_t);
auto identifier = p.GetString(idx, identifier_length); idx += identifier_length;
auto type = p.GetInt32(idx); idx += sizeof(int32_t);
auto packet_length = p.GetInt32(idx); idx += sizeof(int32_t);
auto packet = EQ::Net::StaticPacket(
(void*)((const uint8_t*)p.Data() + idx),
static_cast<size_t>(packet_length));
m_on_routed_message(filter, identifier, type, packet);
}
}

View File

@ -11,7 +11,7 @@ namespace EQ
public:
typedef std::function<void()> OnConnectedHandler;
typedef std::function<void(uint16, const EQ::Net::Packet&)> OnMessageHandler;
typedef std::function<void(const std::string&, const std::string&, EQ::Net::Packet&)> OnRoutedMessageHandler;
typedef std::function<void(const std::string&, const std::string&, int, const EQ::Net::Packet&)> OnRoutedMessageHandler;
WorldConnection(const std::string &type);
virtual ~WorldConnection();
@ -33,7 +33,7 @@ namespace EQ
m_on_routed_message = handler;
}
void RouteMessage(const std::string &filter, const EQ::Net::Packet& p);
void RouteMessage(const std::string &filter, int type, const EQ::Net::Packet& p);
protected:
OnConnectedHandler m_on_connected;

View File

@ -0,0 +1,17 @@
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
SET(service_sources
tasks_service.cpp
)
SET(service_headers
tasks_service.h
)
ADD_EXECUTABLE(tasks_service ${service_sources} ${service_headers})
INSTALL(TARGETS tasks_service RUNTIME DESTINATION ${CMAKE_INSTALL_PREFIX}/bin)
TARGET_LINK_LIBRARIES(tasks_service ${SERVER_LIBS})
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)

View File

@ -0,0 +1,31 @@
#include "tasks_service.h"
#include "../../common/eqemu_logsys.h"
EQ::TasksService::TasksService()
: EQ::Service("Tasks", 100, 1)
{
}
EQ::TasksService::~TasksService() {
}
void EQ::TasksService::OnStart() {
}
void EQ::TasksService::OnStop() {
}
void EQ::TasksService::OnHeartbeat(double time_since_last) {
}
void EQ::TasksService::OnRoutedMessage(const std::string& identifier, int type, const EQ::Net::Packet& p)
{
LogF(Logs::General, Logs::World_Server, "Routed message of type {0} with length {1}", type, p.Length());
}
EQRegisterService(EQ::TasksService);

View File

@ -0,0 +1,19 @@
#pragma once
#include "../../common/service.h"
namespace EQ
{
class TasksService : public EQ::Service
{
public:
TasksService();
virtual ~TasksService();
protected:
virtual void OnStart();
virtual void OnStop();
virtual void OnHeartbeat(double time_since_last);
virtual void OnRoutedMessage(const std::string& identifier, int type, const EQ::Net::Packet& p);
};
}

View File

@ -15,6 +15,7 @@ SET(world_sources
login_server_list.cpp
net.cpp
queryserv.cpp
router.cpp
shared_tasks.cpp
ucs.cpp
web_interface.cpp
@ -43,6 +44,7 @@ SET(world_headers
login_server_list.h
net.h
queryserv.h
router.h
shared_tasks.h
sof_char_create_data.h
ucs.h

View File

@ -84,6 +84,7 @@ union semun {
#include "web_interface.h"
#include "console.h"
#include "shared_tasks.h"
#include "router.h"
#include "../common/net/servertalk_server.h"
#include "../zone/data_bucket.h"
@ -419,6 +420,7 @@ int main(int argc, char** argv) {
std::unique_ptr<EQ::Net::ServertalkServer> server_connection;
server_connection.reset(new EQ::Net::ServertalkServer());
Router router;
EQ::Net::ServertalkServerOptions server_opts;
server_opts.port = Config->WorldTCPPort;
server_opts.ipv6 = false;
@ -502,14 +504,18 @@ int main(int argc, char** argv) {
web_interface.RemoveConnection(connection);
});
server_connection->OnConnectionIdentified([](std::shared_ptr<EQ::Net::ServertalkServerConnection> connection) {
server_connection->OnConnectionIdentified([&router](std::shared_ptr<EQ::Net::ServertalkServerConnection> connection) {
LogF(Logs::General, Logs::World_Server, "New connection from {0} with identifier {1}",
connection->GetUUID(), connection->GetIdentifier());
router.AddConnection(connection);
});
server_connection->OnConnectionRemoved([](std::shared_ptr<EQ::Net::ServertalkServerConnection> connection) {
server_connection->OnConnectionRemoved([&router](std::shared_ptr<EQ::Net::ServertalkServerConnection> connection) {
LogF(Logs::General, Logs::World_Server, "Removed connection from {0} with identifier {1}",
connection->GetUUID(), connection->GetIdentifier());
router.RemoveConnection(connection);
});
EQ::Net::EQStreamManagerOptions opts(9000, false, false);

45
world/router.cpp Normal file
View File

@ -0,0 +1,45 @@
#include "router.h"
Router::Router()
{
}
Router::~Router()
{
}
void Router::AddConnection(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection)
{
m_connections.push_back(connection);
connection->OnMessage(ServerOP_RouteTo, std::bind(&Router::OnRouterMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void Router::RemoveConnection(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection)
{
auto iter = m_connections.begin();
while (iter != m_connections.end()) {
if ((*iter) == connection) {
m_connections.erase(iter);
return;
}
iter++;
}
}
void Router::OnRouterMessage(uint16 opcode, const EQ::Net::Packet &p)
{
auto idx = 0;
auto filter_length = p.GetInt32(idx); idx += sizeof(int32_t);
auto filter = p.GetString(idx, filter_length); idx += filter_length;
printf("Recv router msg of size %i\n", p.Length());
for (auto &connection : m_connections) {
auto identifier = connection->GetIdentifier();
auto pos = identifier.find(filter);
if (pos == 0) {
connection->Send(opcode, p);
}
}
}

19
world/router.h Normal file
View File

@ -0,0 +1,19 @@
#pragma once
#include "../common/net/servertalk_server_connection.h"
#include <memory>
#include <list>
class Router
{
public:
Router();
~Router();
void AddConnection(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection);
void RemoveConnection(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection);
private:
std::list<std::shared_ptr<EQ::Net::ServertalkServerConnection>> m_connections;
void OnRouterMessage(uint16 opcode, const EQ::Net::Packet &p);
};

View File

@ -2831,14 +2831,18 @@ void command_spawn(Client *c, const Seperator *sep)
void command_test(Client *c, const Seperator *sep)
{
c->Message(15, "Triggering test command");
EQ::Net::DynamicPacket p;
p.PutCString(0, "TestPacket");
worldserver.RouteMessage("Tasks", 1234, p);
if (sep->arg[1]) {
c->SetPrimaryWeaponOrnamentation(atoi(sep->arg[1]));
}
if (sep->arg[2]) {
c->SetSecondaryWeaponOrnamentation(atoi(sep->arg[2]));
}
//c->Message(15, "Triggering test command");
//
//if (sep->arg[1]) {
// c->SetPrimaryWeaponOrnamentation(atoi(sep->arg[1]));
//}
//if (sep->arg[2]) {
// c->SetSecondaryWeaponOrnamentation(atoi(sep->arg[2]));
//}
}
void command_texture(Client *c, const Seperator *sep)

View File

@ -118,6 +118,13 @@ uint16 WorldServer::GetPort() const
return 0;
}
void WorldServer::RouteMessage(const std::string &filter, int type, const EQ::Net::Packet &p)
{
if (m_connection) {
m_connection->RouteMessage(filter, type, p);
}
}
void WorldServer::SetZoneData(uint32 iZoneID, uint32 iInstanceID) {
auto pack = new ServerPacket(ServerOP_SetZone, sizeof(SetZone_Struct));
SetZone_Struct* szs = (SetZone_Struct*)pack->pBuffer;

View File

@ -35,6 +35,7 @@ public:
void SendPacket(ServerPacket* pack);
std::string GetIP() const;
uint16 GetPort() const;
void RouteMessage(const std::string &filter, int type, const EQ::Net::Packet& p);
void HandleMessage(uint16 opcode, const EQ::Net::Packet &p);