From 1cafd6831de292659ed0f80c4fd22a3d13b4c25b Mon Sep 17 00:00:00 2001 From: KimLS Date: Fri, 13 Jan 2017 21:52:08 -0800 Subject: [PATCH] Basic work on subscriptions --- common/servertalk.h | 1 + wi/index.js | 2 +- wi/network/servertalk_api.js | 90 ++++++++++++++++++++++++++------- wi/network/servertalk_client.js | 4 +- wi/test.js | 2 +- wi/ws/eqw.js | 20 ++------ wi/ws/wi_common.js | 27 ++++++++++ wi/ws/ws_interface.js | 21 +++++++- world/web_interface.cpp | 21 ++++++++ world/web_interface.h | 2 + world/zonelist.cpp | 41 +++++++++++++++ world/zonelist.h | 7 +-- 12 files changed, 196 insertions(+), 42 deletions(-) create mode 100644 wi/ws/wi_common.js diff --git a/common/servertalk.h b/common/servertalk.h index f92a0a982..a79622099 100644 --- a/common/servertalk.h +++ b/common/servertalk.h @@ -88,6 +88,7 @@ #define ServerOP_DepopPlayerCorpse 0x0065 #define ServerOP_RequestTellQueue 0x0066 // client asks for it's tell queues #define ServerOP_ChangeSharedMem 0x0067 +#define ServerOP_WebInterfaceEvent 0x0068 #define ServerOP_RaidAdd 0x0100 //in use #define ServerOP_RaidRemove 0x0101 //in use diff --git a/wi/index.js b/wi/index.js index 308d34d17..ffeaa9c5a 100644 --- a/wi/index.js +++ b/wi/index.js @@ -23,8 +23,8 @@ const uuid = require('node-uuid'); const jwt = require('jsonwebtoken'); var mysql = require('mysql').createPool(settings.db); -var wsi = new websocket_iterface.wsi(server, key); var api = new servertalk.api(); +var wsi = new websocket_iterface.wsi(server, key, api); api.Init(settings.servertalk.addr, settings.servertalk.port, false, settings.servertalk.key); app.use(bodyParser.json()); diff --git a/wi/network/servertalk_api.js b/wi/network/servertalk_api.js index 4e0587c1a..cf677d6f2 100644 --- a/wi/network/servertalk_api.js +++ b/wi/network/servertalk_api.js @@ -7,6 +7,7 @@ class ServertalkAPI this.client = new servertalk.client(); this.client.Init(addr, port, ipv6, 'WebInterface', credentials); this.pending_calls = {}; + this.subscriptions = {}; var self = this; this.client.on('connecting', function() { @@ -25,28 +26,51 @@ class ServertalkAPI }); this.client.on('message', function(opcode, packet) { - var response = Buffer.from(packet).toString('utf8'); - try { - var res = JSON.parse(response); - - if(res.id) { - if(self.pending_calls.hasOwnProperty(res.id)) { - var entry = self.pending_calls[res.id]; - - if(res.error) { - var reject = entry[1]; - reject(res.error); - } else { - var resolve = entry[0]; - resolve(res.response); + if(opcode == 47) { + var response = Buffer.from(packet).toString('utf8'); + try { + var res = JSON.parse(response); + + if(res.id) { + if(self.pending_calls.hasOwnProperty(res.id)) { + var entry = self.pending_calls[res.id]; + + if(res.error) { + var reject = entry[1]; + reject(res.error); + } else { + var resolve = entry[0]; + resolve(res.response); + } + + delete self.pending_calls[res.id]; } - - delete self.pending_calls[res.id]; } + } catch(ex) { + console.log('Error processing response from server:\n', ex); + } + } else if(opcode == 104) { + var message = Buffer.from(packet).toString('utf8'); + try { + var msg = JSON.parse(message); + + if(msg.event) { + if(self.subscriptions.hasOwnProperty(msg.event)) { + var subs = self.subscriptions[msg.event]; + + for(var idx in subs) { + try { + var sub = subs[idx]; + sub.emit('subscriptionMessage', msg); + } catch(ex) { + console.log('Error dispatching subscription message', ex); + } + } + } + } + } catch(ex) { + console.log('Error processing response from server:\n', ex); } - - } catch(ex) { - console.log('Error processing response from server:\n', ex); } }); } @@ -83,6 +107,34 @@ class ServertalkAPI var c = { method: method, params: args }; client.Send(47, Buffer.from(JSON.stringify(c))); } + + Subscribe(event_id, who) { + this.Unsubscribe(event_id, who); + + var subs = this.subscriptions[event_id]; + if(subs) { + console.log('Subscribe', who.uuid, 'to', event_id); + subs[who.uuid] = who; + } else { + console.log('Subscribe', who.uuid, 'to', event_id); + this.subscriptions[event_id] = { }; + this.subscriptions[event_id][who.uuid] = who; + } + } + + Unsubscribe(event_id, who) { + var subs = this.subscriptions[event_id]; + if(subs) { + console.log('Unsubscribe', who.uuid, 'from', event_id); + delete subs[who.uuid]; + } + } + + UnsubscribeAll(who) { + for(var sub_idx in this.subscriptions) { + this.Unsubscribe(sub_idx, who); + } + } } module.exports = { diff --git a/wi/network/servertalk_client.js b/wi/network/servertalk_client.js index 7ccf41303..4197a1aa4 100644 --- a/wi/network/servertalk_client.js +++ b/wi/network/servertalk_client.js @@ -214,8 +214,8 @@ class ServertalkClient extends EventEmitter ProcessMessage(p) { try { - var length = this.m_buffer.readUInt32LE(0); - var opcode = this.m_buffer.readUInt16LE(4); + var length = p.readUInt32LE(0); + var opcode = p.readUInt16LE(4); if(length > 0) { var data = p.slice(6); diff --git a/wi/test.js b/wi/test.js index 24ddb4423..2637ddb37 100644 --- a/wi/test.js +++ b/wi/test.js @@ -2,7 +2,7 @@ const WebSocket = require('ws'); const ws = new WebSocket('ws://localhost:9080'); ws.on('open', function open() { - ws.send(JSON.stringify({authorization: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6IktTcHJpdGUxIiwiZXhwIjoxNDg0NzIzNDQxLCJpYXQiOjE0ODQxMTg2NDF9.Lmwm572yMWIu1DUrfer8JVvl1DGEkdnMsMFp5WDzp_A', id: '1', method: 'EQW::IsLocked', params: []})); + ws.send(JSON.stringify({authorization: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6IktTcHJpdGUxIiwiZXhwIjoxNDg0NzIzNDQxLCJpYXQiOjE0ODQxMTg2NDF9.Lmwm572yMWIu1DUrfer8JVvl1DGEkdnMsMFp5WDzp_A', id: '1', method: 'EQW::Subscribe::WorldZoneUpdate', params: []})); }); ws.on('message', function(data, flags) { diff --git a/wi/ws/eqw.js b/wi/ws/eqw.js index af119aab5..15fc5a62c 100644 --- a/wi/ws/eqw.js +++ b/wi/ws/eqw.js @@ -1,20 +1,10 @@ -function Register(name, wsi, api) { - wsi.Register(name, - function(request) { - api.Call(name, request.params) - .then(function(value) { - wsi.Send(request, value); - }) - .catch(function(reason) { - wsi.SendError(request, reason); - }); - }, true); -} +const common = require('./wi_common.js'); var RegisterEQW = function(wsi, api) { - Register('EQW::IsLocked', wsi, api); - Register('EQW::Lock', wsi, api); - Register('EQW::Unlock', wsi, api); + common.Register('EQW::IsLocked', wsi, api); + common.Register('EQW::Lock', wsi, api); + common.Register('EQW::Unlock', wsi, api); + common.RegisterSubscription('EQW', 'WorldZoneUpdate', wsi, api); }; module.exports = { diff --git a/wi/ws/wi_common.js b/wi/ws/wi_common.js new file mode 100644 index 000000000..e9eb6577f --- /dev/null +++ b/wi/ws/wi_common.js @@ -0,0 +1,27 @@ +function Register(name, wsi, api) { + wsi.Register(name, + function(request) { + api.Call(name, request.params) + .then(function(value) { + wsi.Send(request, value); + }) + .catch(function(reason) { + wsi.SendError(request, reason); + }); + }, true); +} + +function RegisterSubscription(namespace, event, wsi, api) { + wsi.Register(namespace + '::Subscribe::' + event, function(request) { + api.Subscribe(event, request.ws); + }); + + wsi.Register(namespace + '::Unsubscribe::' + event, function(request) { + api.Unsubscribe(event, request.ws); + }); +} + +module.exports = { + 'Register': Register, + 'RegisterSubscription': RegisterSubscription +} \ No newline at end of file diff --git a/wi/ws/ws_interface.js b/wi/ws/ws_interface.js index 1e6a06202..ea60ca2fd 100644 --- a/wi/ws/ws_interface.js +++ b/wi/ws/ws_interface.js @@ -1,18 +1,21 @@ const WebSocketServer = require('ws').Server; const jwt = require('jsonwebtoken'); +const uuid = require('node-uuid'); class WebSocketInterface { - constructor(server, key) { + constructor(server, key, api) { this.wss = new WebSocketServer({ server: server }); this.methods = {}; var self = this; this.wss.on('connection', function connection(ws) { self.ws = ws; + ws.uuid = uuid.v4(); ws.on('message', function incoming(message) { try { var request = JSON.parse(message); + request.ws = ws; if(request.method) { var method = self.methods[request.method]; @@ -51,6 +54,14 @@ class WebSocketInterface self.SendError(null, 'No method supplied'); } }); + + ws.on('close', function() { + api.UnsubscribeAll(ws); + }); + + ws.on('subscriptionMessage', function(msg) { + self.SendRaw(msg); + }); }); } @@ -92,6 +103,14 @@ class WebSocketInterface console.log(ex); } } + + SendRaw(obj) { + try { + this.ws.send(JSON.stringify(obj)); + } catch(ex) { + console.log(ex); + } + } } module.exports = { diff --git a/world/web_interface.cpp b/world/web_interface.cpp index 250f9d3b8..e2d14e62d 100644 --- a/world/web_interface.cpp +++ b/world/web_interface.cpp @@ -105,6 +105,21 @@ void WebInterface::SendError(const std::string &message, const std::string &id) Send(error); } +void WebInterface::SendEvent(const Json::Value &value) +{ + try { + std::stringstream ss; + ss << value; + + EQ::Net::DynamicPacket p; + p.PutString(0, ss.str()); + m_connection->Send(ServerOP_WebInterfaceEvent, p); + } + catch (std::exception) { + //Log error + } +} + void WebInterface::AddCall(const std::string &method, WebInterfaceCall call) { m_calls.insert(std::make_pair(method, call)); @@ -149,6 +164,12 @@ void WebInterfaceList::SendResponse(const std::string &uuid, std::string &id, co } } +void WebInterfaceList::SendEvent(const Json::Value &value) { + for (auto &i : m_interfaces) { + i.second->SendEvent(value); + } +} + void WebInterfaceList::SendError(const std::string &uuid, const std::string &message) { auto iter = m_interfaces.find(uuid); if (iter != m_interfaces.end()) { diff --git a/world/web_interface.h b/world/web_interface.h index 76ce325e2..fc90265d9 100644 --- a/world/web_interface.h +++ b/world/web_interface.h @@ -19,6 +19,7 @@ public: void SendResponse(const std::string &id, const Json::Value &response); void SendError(const std::string &message); void SendError(const std::string &message, const std::string &id); + void SendEvent(const Json::Value &value); void AddCall(const std::string &method, WebInterfaceCall call); private: void OnCall(uint16 opcode, EQ::Net::Packet &p); @@ -37,6 +38,7 @@ public: void AddConnection(std::shared_ptr connection); void RemoveConnection(std::shared_ptr connection); void SendResponse(const std::string &uuid, std::string &id, const Json::Value &response); + void SendEvent(const Json::Value &value); void SendError(const std::string &uuid, const std::string &message); void SendError(const std::string &uuid, const std::string &message, const std::string &id); diff --git a/world/zonelist.cpp b/world/zonelist.cpp index 3c9460fce..1dc6a5636 100644 --- a/world/zonelist.cpp +++ b/world/zonelist.cpp @@ -24,10 +24,13 @@ #include "../common/servertalk.h" #include "../common/string_util.h" #include "../common/random.h" +#include "../common/json/json.h" +#include "web_interface.h" extern uint32 numzones; extern bool holdzones; extern EQEmu::Random emu_random; +extern WebInterfaceList web_interface; void CatchSignal(int sig_num); ZSList::ZSList() @@ -36,6 +39,8 @@ ZSList::ZSList() CurGroupID = 1; LastAllocatedPort=0; memset(pLockedZones, 0, sizeof(pLockedZones)); + + m_tick.reset(new EQ::Timer(1000, true, std::bind(&ZSList::OnTick, this, std::placeholders::_1))); } ZSList::~ZSList() { @@ -690,3 +695,39 @@ void ZSList::WorldShutDown(uint32 time, uint32 interval) CatchSignal(2); } } + +void ZSList::OnTick(EQ::Timer *t) +{ + Json::Value out; + out["event"] = "WorldZoneUpdate"; + out["data"] = Json::Value(); + + for (auto &zone : list) + { + Json::Value outzone; + + outzone["CAddress"] = zone->GetCAddress(); + outzone["CLocalAddress"] = zone->GetCLocalAddress(); + outzone["CompileTime"] = zone->GetCompileTime(); + outzone["CPort"] = zone->GetCPort(); + outzone["ID"] = zone->GetID(); + outzone["InstanceID"] = zone->GetInstanceID(); + outzone["IP"] = zone->GetIP(); + outzone["LaunchedName"] = zone->GetLaunchedName(); + outzone["LaunchName"] = zone->GetLaunchName(); + outzone["Port"] = zone->GetPort(); + outzone["PrevZoneID"] = zone->GetPrevZoneID(); + outzone["UUID"] = zone->GetUUID(); + outzone["ZoneID"] = zone->GetZoneID(); + outzone["ZoneLongName"] = zone->GetZoneLongName(); + outzone["ZoneName"] = zone->GetZoneName(); + outzone["ZoneOSProcessID"] = zone->GetZoneOSProcessID(); + outzone["NumPlayers"] = zone->NumPlayers(); + outzone["BootingUp"] = zone->IsBootingUp(); + outzone["StaticZone"] = zone->IsStaticZone(); + + out["data"].append(outzone); + } + + web_interface.SendEvent(out); +} diff --git a/world/zonelist.h b/world/zonelist.h index 45974cdad..e152e5046 100644 --- a/world/zonelist.h +++ b/world/zonelist.h @@ -4,7 +4,7 @@ #include "../common/types.h" #include "../common/eqtime.h" #include "../common/timer.h" -#include "../common/linked_list.h" +#include "../common/event/timer.h" #include #include @@ -60,14 +60,15 @@ public: void GetZoneIDList(std::vector &zones); void WorldShutDown(uint32 time, uint32 interval); -protected: +private: + void OnTick(EQ::Timer *t); uint32 NextID; std::list> list; uint16 pLockedZones[MaxLockedZones]; uint32 CurGroupID; uint16 LastAllocatedPort; - + std::unique_ptr m_tick; }; #endif /*ZONELIST_H_*/