More work on subscription events, and relaying through world.

This commit is contained in:
KimLS
2014-08-01 18:29:41 -07:00
parent 002f5e3bcc
commit 40b555a55b
18 changed files with 657 additions and 72 deletions
+4
View File
@@ -97,6 +97,8 @@ SET(zone_sources
QuestParserCollection.cpp
raids.cpp
RaycastMesh.cpp
remote_call.cpp
remote_call_subscribe.cpp
spawn2.cpp
spawn2.h
spawngroup.cpp
@@ -189,6 +191,8 @@ SET(zone_headers
raid.h
raids.h
RaycastMesh.h
remote_call.h
remote_call_subscribe.h
skills.h
spawn2.cpp
spawn2.h
-2
View File
@@ -40,12 +40,10 @@ void ClientLogs::subscribe(EQEMuLog::LogIDs id, Client *c) {
end = entries[id].end();
for(; cur != end; ++cur) {
if(*cur == c) {
printf("%s was allready subscribed to %d\n", c->GetName(), id);
return;
}
}
printf("%s has been subscribed to %d\n", c->GetName(), id);
entries[id].push_back(c);
}
+32 -15
View File
@@ -18,10 +18,12 @@
#include "../common/debug.h"
#include "masterentity.h"
#include "../common/spdat.h"
#include "../common/StringUtil.h"
#include "StringIDs.h"
#include "worldserver.h"
#include "QuestParserCollection.h"
#include "../common/StringUtil.h"
#include "remote_call.h"
#include "remote_call_subscribe.h"
#include <sstream>
#include <math.h>
@@ -1213,19 +1215,34 @@ void Mob::MakeSpawnUpdateNoDelta(PlayerPositionUpdateServer_Struct *spu){
spu->padding0014 =0x7f;
spu->padding0018 =0x5df27;
///* Testing */
//if (IsNPC() && WS_Client_Connected.size() != 0){
// std::string str = MakeJSON("ResponseType:PositionUpdate,entity:" + std::to_string(GetID()) + ",name:" + GetName() + ",x:" + std::to_string(x_pos) + ",y:" + std::to_string(y_pos) + ",z:" + std::to_string(z_pos) + ",h:" + std::to_string(heading));
// char * writable = new char[str.size() + 1];
// std::copy(str.begin(), str.end(), writable);
// ServerPacket* pack = new ServerPacket(ServerOP_WIWorldResponse, sizeof(WI_Client_Response_Struct)+str.length() + 1);
// WI_Client_Response_Struct* WICR = (WI_Client_Response_Struct*)pack->pBuffer;
// strn0cpy(WICR->Client_UUID, WS_Client_Connected.c_str(), 64);
// strn0cpy(WICR->JSON_Data, str.c_str(), str.length() + 1);
// if (worldserver.Connected()) { worldserver.SendPacket(pack); }
// safe_delete(pack);
// delete[] writable;
//}
if(IsNPC()) {
//zone_id
//instance_id
//entity_id
//x
//y
//z
//h
const auto &conns = RemoteCallSubscriptionHandler::Instance()->GetSubscribers("NPC::MakeSpawnUpdateNoDelta");
if(conns.size() > 0) {
std::string method = "NPC::MakeSpawnUpdateNoDelta";
std::vector<std::string> params;
params.push_back(std::to_string((long)zone->GetZoneID()));
params.push_back(std::to_string((long)zone->GetInstanceID()));
params.push_back(std::to_string((long)GetID()));
params.push_back(GetName());
params.push_back(std::to_string((double)x_pos));
params.push_back(std::to_string((double)y_pos));
params.push_back(std::to_string((double)z_pos));
params.push_back(std::to_string((double)heading));
auto &iter = conns.begin();
while(iter != conns.end()) {
RemoteCall((*iter), method, params);
++iter;
}
}
}
}
// this is for SendPosUpdate()
@@ -1245,7 +1262,7 @@ void Mob::MakeSpawnUpdate(PlayerPositionUpdateServer_Struct* spu) {
if(this->IsClient())
spu->animation = animation;
else
spu->animation = pRunAnimSpeed;//animation;
spu->animation = pRunAnimSpeed;
spu->delta_heading = NewFloatToEQ13(static_cast<float>(delta_heading));
}
+8 -1
View File
@@ -53,12 +53,13 @@
#include "titles.h"
#include "guild_mgr.h"
#include "tasks.h"
#include "QuestParserCollection.h"
#include "embparser.h"
#include "lua_parser.h"
#include "client_logs.h"
#include "questmgr.h"
#include "remote_call.h"
#include "remote_call_subscribe.h"
#include <iostream>
#include <string>
@@ -111,6 +112,7 @@ extern void MapOpcodes();
int main(int argc, char** argv) {
RegisterExecutablePlatform(ExePlatformZone);
set_exception_handler();
register_remote_call_handlers();
const char *zone_name;
@@ -310,6 +312,7 @@ int main(int argc, char** argv) {
}
Timer InterserverTimer(INTERSERVER_TIMER); // does MySQL pings and auto-reconnect
Timer RemoteCallProcessTimer(5000);
#ifdef EQPROFILE
#ifdef PROFILE_DUMP_TIME
Timer profile_dump_timer(PROFILE_DUMP_TIME*1000);
@@ -403,6 +406,10 @@ int main(int argc, char** argv) {
worldwasconnected = false;
}
if(RemoteCallProcessTimer.Check()) {
RemoteCallSubscriptionHandler::Instance()->Process();
}
if (ZoneLoaded && zoneupdate_timer.Check()) {
{
if(net.group_timer.Enabled() && net.group_timer.Check())
+112
View File
@@ -0,0 +1,112 @@
#include "../common/debug.h"
#include "../common/logsys.h"
#include "../common/logtypes.h"
#include "../common/md5.h"
#include "../common/EmuTCPConnection.h"
#include "../common/packet_functions.h"
#include "../common/packet_dump.h"
#include "../common/servertalk.h"
#include "remote_call.h"
#include "remote_call_subscribe.h"
#include "worldserver.h"
std::map<std::string, RemoteCallHandler> remote_call_methods;
extern WorldServer worldserver;
void RemoteCallResponse(const std::string &connection_id, const std::string &request_id, const std::map<std::string, std::string> &res, const std::string &error) {
uint32 sz = (uint32)(connection_id.size() + request_id.size() + error.size() + 3 + 16);
auto iter = res.begin();
while(iter != res.end()) {
sz += (uint32)iter->first.size() + (uint32)iter->second.size() + 10;
++iter;
}
ServerPacket *pack = new ServerPacket(ServerOP_WIRemoteCallResponse, sz);
pack->WriteUInt32((uint32)request_id.size());
pack->WriteString(request_id.c_str());
pack->WriteUInt32((uint32)connection_id.size());
pack->WriteString(connection_id.c_str());
pack->WriteUInt32((uint32)error.size());
pack->WriteString(error.c_str());
pack->WriteUInt32((uint32)res.size());
iter = res.begin();
while(iter != res.end()) {
pack->WriteUInt32((uint32)iter->first.size());
pack->WriteString(iter->first.c_str());
pack->WriteUInt32((uint32)iter->second.size());
pack->WriteString(iter->second.c_str());
++iter;
}
worldserver.SendPacket(pack);
safe_delete(pack);
}
void RemoteCall(const std::string &connection_id, const std::string &method, const std::vector<std::string> &params) {
uint32 sz = (uint32)(connection_id.size() + method.size() + 14);
auto iter = params.begin();
while(iter != params.end()) {
sz += (uint32)iter->size() + 5;
++iter;
}
ServerPacket *pack = new ServerPacket(ServerOP_WIRemoteCallToClient, sz);
pack->WriteUInt32((uint32)connection_id.size());
pack->WriteString(connection_id.c_str());
pack->WriteUInt32((uint32)method.size());
pack->WriteString(method.c_str());
pack->WriteUInt32((uint32)params.size());
iter = params.begin();
while(iter != params.end()) {
pack->WriteUInt32((uint32)iter->size());
pack->WriteString(iter->c_str());
++iter;
}
worldserver.SendPacket(pack);
safe_delete(pack);
}
void register_remote_call_handlers() {
remote_call_methods["Zone::Subscribe"] = handle_rc_subscribe;
remote_call_methods["Zone::Unsubscribe"] = handle_rc_unsubscribe;
}
void handle_rc_subscribe(const std::string &method, const std::string &connection_id, const std::string &request_id, const std::vector<std::string> &params) {
std::string error;
std::map<std::string, std::string> res;
if(params.size() != 1) {
error = "Missing event to subscribe to";
RemoteCallResponse(connection_id, request_id, res, error);
return;
}
if(RemoteCallSubscriptionHandler::Instance()->Subscribe(connection_id, params[0])) {
res["status"] = "subscribed";
} else {
res["status"] = "failed to subscribe";
}
RemoteCallResponse(connection_id, request_id, res, error);
}
void handle_rc_unsubscribe(const std::string &method, const std::string &connection_id, const std::string &request_id, const std::vector<std::string> &params) {
std::string error;
std::map<std::string, std::string> res;
if(params.size() != 1) {
error = "Missing event to unsubscribe from";
RemoteCallResponse(connection_id, request_id, res, error);
return;
}
if(RemoteCallSubscriptionHandler::Instance()->Subscribe(connection_id, params[0])) {
res["status"] = "unsubscribed";
}
else {
res["status"] = "failed to unsubscribe";
}
RemoteCallResponse(connection_id, request_id, res, error);
}
+36
View File
@@ -0,0 +1,36 @@
/* EQEMu: Everquest Server Emulator
Copyright (C) 2001-2014 EQEMu Development Team (http://eqemulator.net)
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY except by those people which sell it, which
are required to give you total support for your newly bought product;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef ZONE_REMOTE_CALL_H
#define ZONE_REMOTE_CALL_H
#include <map>
#include <string>
#include <vector>
typedef void(*RemoteCallHandler)(const std::string&, const std::string&, const std::string&, const std::vector<std::string>&);
void RemoteCallResponse(const std::string &connection_id, const std::string &request_id, const std::map<std::string, std::string> &res, const std::string &error);
void RemoteCall(const std::string &connection_id, const std::string &method, const std::vector<std::string> &params);
void register_remote_call_handlers();
void handle_rc_subscribe(const std::string &method, const std::string &connection_id, const std::string &request_id, const std::vector<std::string> &params);
void handle_rc_unsubscribe(const std::string &method, const std::string &connection_id, const std::string &request_id, const std::vector<std::string> &params);
#endif
+159
View File
@@ -0,0 +1,159 @@
#include "../common/debug.h"
#include "../common/logsys.h"
#include "../common/logtypes.h"
#include "../common/md5.h"
#include "../common/EmuTCPConnection.h"
#include "../common/packet_functions.h"
#include "../common/packet_dump.h"
#include "../common/servertalk.h"
#include "remote_call_subscribe.h"
#include "worldserver.h"
#include "zone.h"
extern WorldServer worldserver;
extern Zone* zone;
RemoteCallSubscriptionHandler* RemoteCallSubscriptionHandler::_instance = nullptr;
RemoteCallSubscriptionHandler::RemoteCallSubscriptionHandler() {
}
RemoteCallSubscriptionHandler::~RemoteCallSubscriptionHandler() {
}
RemoteCallSubscriptionHandler *RemoteCallSubscriptionHandler::Instance()
{
if(!_instance) {
_instance = new RemoteCallSubscriptionHandler();
}
return _instance;
}
bool RemoteCallSubscriptionHandler::Subscribe(std::string connection_id, std::string event_name) {
if(registered_events.count(event_name) == 0) {
std::vector<std::string> r;
r.push_back(connection_id);
registered_events[event_name] = r;
if(connection_ids.count(connection_id) == 0) {
connection_ids[connection_id] = 1;
} else {
int count = connection_ids[connection_id];
connection_ids[connection_id] = count + 1;
}
return true;
} else {
std::vector<std::string>& r = registered_events[event_name];
size_t sz = r.size();
for(size_t i = 0; i < sz; ++i) {
if(connection_id.compare(r[i]) == 0) {
return false;
}
}
r.push_back(connection_id);
registered_events[event_name] = r;
if(connection_ids.count(connection_id) == 0) {
connection_ids[connection_id] = 1;
}
else {
int count = connection_ids[connection_id];
connection_ids[connection_id] = count + 1;
}
return true;
}
}
bool RemoteCallSubscriptionHandler::Unsubscribe(std::string connection_id, std::string event_name) {
if(registered_events.count(event_name) == 0) {
return false;
}
std::vector<std::string>& r = registered_events[event_name];
auto iter = r.begin();
while(iter != r.end()) {
if(iter->compare(connection_id) == 0) {
r.erase(iter);
registered_events[event_name] = r;
int count = connection_ids[connection_id];
connection_ids[connection_id] = count - 1;
return true;
}
}
return false;
}
void RemoteCallSubscriptionHandler::Process() {
//create a check for all these connection ids packet
uint32 sz = 12;
auto iter = connection_ids.begin();
while(iter != connection_ids.end()) {
sz += iter->first.size();
sz += 5;
++iter;
}
ServerPacket *pack = new ServerPacket(ServerOP_WIClientSession, sz);
pack->WriteUInt32((uint32)zone->GetZoneID());
pack->WriteUInt32((uint32)zone->GetInstanceID());
pack->WriteUInt32((uint32)connection_ids.size());
iter = connection_ids.begin();
while(iter != connection_ids.end()) {
pack->WriteUInt32((uint32)iter->first.size());
pack->WriteString(iter->first.c_str());
++iter;
}
worldserver.SendPacket(pack);
safe_delete(pack);
}
const std::vector<std::string> &RemoteCallSubscriptionHandler::GetSubscribers(std::string event_name) {
if(registered_events.count(event_name) == 0) {
return std::vector<std::string>();
}
std::vector<std::string> &r = registered_events[event_name];
return r;
}
void RemoteCallSubscriptionHandler::ClearConnection(std::string connection_id) {
if(connection_ids.count(connection_id) != 0) {
connection_ids.erase(connection_id);
}
auto iter = registered_events.begin();
while(iter != registered_events.end()) {
auto &conns = iter->second;
auto conn_iter = conns.begin();
while(conn_iter != conns.end()) {
if(conn_iter->compare(connection_id) == 0) {
conns.erase(conn_iter);
registered_events[iter->first] = conns;
printf("Removing connection: %s from event %s\n", connection_id.c_str(), iter->first.c_str());
break;
}
++conn_iter;
}
++iter;
}
}
void RemoteCallSubscriptionHandler::ClearAllConnections() {
//
//std::map<std::string, std::vector<std::string>> registered_events;
//std::map<std::string, int> connection_ids;
registered_events.clear();
connection_ids.clear();
}
+51
View File
@@ -0,0 +1,51 @@
/* EQEMu: Everquest Server Emulator
Copyright (C) 2001-2014 EQEMu Development Team (http://eqemulator.net)
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY except by those people which sell it, which
are required to give you total support for your newly bought product;
without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef ZONE_REMOTE_CALL_SUBSCRIBE_H
#define ZONE_REMOTE_CALL_SUBSCRIBE_H
#include <map>
#include <string>
#include <vector>
class RemoteCallSubscriptionHandler
{
public:
~RemoteCallSubscriptionHandler();
static RemoteCallSubscriptionHandler *Instance();
bool Subscribe(std::string connection_id, std::string event_name);
bool Unsubscribe(std::string connection_id, std::string event_name);
const std::vector<std::string> &GetSubscribers(std::string event_name);
void Process();
void ClearConnection(std::string connection_id);
void ClearAllConnections();
private:
RemoteCallSubscriptionHandler();
RemoteCallSubscriptionHandler(RemoteCallSubscriptionHandler const&);
RemoteCallSubscriptionHandler& operator=(RemoteCallSubscriptionHandler const&);
static RemoteCallSubscriptionHandler *_instance;
std::map<std::string, std::vector<std::string>> registered_events;
std::map<std::string, int> connection_ids;
};
#endif
+46 -4
View File
@@ -51,7 +51,8 @@
#include "../common/rulesys.h"
#include "titles.h"
#include "QGlobals.h"
#include "remote_call.h"
#include "remote_call_subscribe.h"
extern EntityList entity_list;
extern Zone* zone;
@@ -62,6 +63,7 @@ extern NetConnection net;
extern PetitionList petition_list;
extern uint32 numclients;
extern volatile bool RunLoops;
extern std::map<std::string, RemoteCallHandler> remote_call_methods;
WorldServer::WorldServer()
: WorldConnection(EmuTCPConnection::packetModeZone)
@@ -1812,10 +1814,50 @@ void WorldServer::Process() {
}
break;
}
case ServerOP_WIRemoteCall:
printf("Recv remote call from WI but atm doing anything with it is not yet implemented (BUT SOON)\n");
DumpPacket(pack);
case ServerOP_WIRemoteCall: {
char *id = nullptr;
char *session_id = nullptr;
char *method = nullptr;
id = new char[pack->ReadUInt32() + 1];
pack->ReadString(id);
session_id = new char[pack->ReadUInt32() + 1];
pack->ReadString(session_id);
method = new char[pack->ReadUInt32() + 1];
pack->ReadString(method);
uint32 param_count = pack->ReadUInt32();
std::vector<std::string> params;
for(uint32 i = 0; i < param_count; ++i) {
char *p = new char[pack->ReadUInt32() + 1];
pack->ReadString(p);
params.push_back(p);
safe_delete_array(p);
}
if(remote_call_methods.count(method) != 0) {
auto f = remote_call_methods[method];
f(method, session_id, id, params);
}
safe_delete_array(id);
safe_delete_array(session_id);
safe_delete_array(method);
break;
}
case ServerOP_WIClientSessionResponse: {
uint32 count = pack->ReadUInt32();
for(uint32 i = 0; i < count; ++i) {
char *p = new char[pack->ReadUInt32() + 1];
pack->ReadString(p);
RemoteCallSubscriptionHandler::Instance()->ClearConnection(p);
safe_delete_array(p);
}
break;
}
default: {
std::cout << " Unknown ZSopcode:" << (int)pack->opcode;
std::cout << " size:" << pack->size << std::endl;
+3
View File
@@ -58,6 +58,8 @@
#include "../common/rulesys.h"
#include "guild_mgr.h"
#include "QuestParserCollection.h"
#include "remote_call.h"
#include "remote_call_subscribe.h"
#ifdef _WINDOWS
#define snprintf _snprintf
@@ -790,6 +792,7 @@ void Zone::Shutdown(bool quite)
LogFile->write(EQEMuLog::Normal, "Zone shutdown: going to sleep");
ZoneLoaded = false;
RemoteCallSubscriptionHandler::Instance()->ClearAllConnections();
zone->ResetAuth();
safe_delete(zone);
dbasync->CommitWrites();