Basic work on subscriptions

This commit is contained in:
KimLS 2017-01-13 21:52:08 -08:00
parent f24770489e
commit 1cafd6831d
12 changed files with 196 additions and 42 deletions

View File

@ -88,6 +88,7 @@
#define ServerOP_DepopPlayerCorpse 0x0065
#define ServerOP_RequestTellQueue 0x0066 // client asks for it's tell queues
#define ServerOP_ChangeSharedMem 0x0067
#define ServerOP_WebInterfaceEvent 0x0068
#define ServerOP_RaidAdd 0x0100 //in use
#define ServerOP_RaidRemove 0x0101 //in use

View File

@ -23,8 +23,8 @@ const uuid = require('node-uuid');
const jwt = require('jsonwebtoken');
var mysql = require('mysql').createPool(settings.db);
var wsi = new websocket_iterface.wsi(server, key);
var api = new servertalk.api();
var wsi = new websocket_iterface.wsi(server, key, api);
api.Init(settings.servertalk.addr, settings.servertalk.port, false, settings.servertalk.key);
app.use(bodyParser.json());

View File

@ -7,6 +7,7 @@ class ServertalkAPI
this.client = new servertalk.client();
this.client.Init(addr, port, ipv6, 'WebInterface', credentials);
this.pending_calls = {};
this.subscriptions = {};
var self = this;
this.client.on('connecting', function() {
@ -25,28 +26,51 @@ class ServertalkAPI
});
this.client.on('message', function(opcode, packet) {
var response = Buffer.from(packet).toString('utf8');
try {
var res = JSON.parse(response);
if(res.id) {
if(self.pending_calls.hasOwnProperty(res.id)) {
var entry = self.pending_calls[res.id];
if(res.error) {
var reject = entry[1];
reject(res.error);
} else {
var resolve = entry[0];
resolve(res.response);
if(opcode == 47) {
var response = Buffer.from(packet).toString('utf8');
try {
var res = JSON.parse(response);
if(res.id) {
if(self.pending_calls.hasOwnProperty(res.id)) {
var entry = self.pending_calls[res.id];
if(res.error) {
var reject = entry[1];
reject(res.error);
} else {
var resolve = entry[0];
resolve(res.response);
}
delete self.pending_calls[res.id];
}
delete self.pending_calls[res.id];
}
} catch(ex) {
console.log('Error processing response from server:\n', ex);
}
} else if(opcode == 104) {
var message = Buffer.from(packet).toString('utf8');
try {
var msg = JSON.parse(message);
if(msg.event) {
if(self.subscriptions.hasOwnProperty(msg.event)) {
var subs = self.subscriptions[msg.event];
for(var idx in subs) {
try {
var sub = subs[idx];
sub.emit('subscriptionMessage', msg);
} catch(ex) {
console.log('Error dispatching subscription message', ex);
}
}
}
}
} catch(ex) {
console.log('Error processing response from server:\n', ex);
}
} catch(ex) {
console.log('Error processing response from server:\n', ex);
}
});
}
@ -83,6 +107,34 @@ class ServertalkAPI
var c = { method: method, params: args };
client.Send(47, Buffer.from(JSON.stringify(c)));
}
Subscribe(event_id, who) {
this.Unsubscribe(event_id, who);
var subs = this.subscriptions[event_id];
if(subs) {
console.log('Subscribe', who.uuid, 'to', event_id);
subs[who.uuid] = who;
} else {
console.log('Subscribe', who.uuid, 'to', event_id);
this.subscriptions[event_id] = { };
this.subscriptions[event_id][who.uuid] = who;
}
}
Unsubscribe(event_id, who) {
var subs = this.subscriptions[event_id];
if(subs) {
console.log('Unsubscribe', who.uuid, 'from', event_id);
delete subs[who.uuid];
}
}
UnsubscribeAll(who) {
for(var sub_idx in this.subscriptions) {
this.Unsubscribe(sub_idx, who);
}
}
}
module.exports = {

View File

@ -214,8 +214,8 @@ class ServertalkClient extends EventEmitter
ProcessMessage(p) {
try {
var length = this.m_buffer.readUInt32LE(0);
var opcode = this.m_buffer.readUInt16LE(4);
var length = p.readUInt32LE(0);
var opcode = p.readUInt16LE(4);
if(length > 0) {
var data = p.slice(6);

View File

@ -2,7 +2,7 @@ const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:9080');
ws.on('open', function open() {
ws.send(JSON.stringify({authorization: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6IktTcHJpdGUxIiwiZXhwIjoxNDg0NzIzNDQxLCJpYXQiOjE0ODQxMTg2NDF9.Lmwm572yMWIu1DUrfer8JVvl1DGEkdnMsMFp5WDzp_A', id: '1', method: 'EQW::IsLocked', params: []}));
ws.send(JSON.stringify({authorization: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6IktTcHJpdGUxIiwiZXhwIjoxNDg0NzIzNDQxLCJpYXQiOjE0ODQxMTg2NDF9.Lmwm572yMWIu1DUrfer8JVvl1DGEkdnMsMFp5WDzp_A', id: '1', method: 'EQW::Subscribe::WorldZoneUpdate', params: []}));
});
ws.on('message', function(data, flags) {

View File

@ -1,20 +1,10 @@
function Register(name, wsi, api) {
wsi.Register(name,
function(request) {
api.Call(name, request.params)
.then(function(value) {
wsi.Send(request, value);
})
.catch(function(reason) {
wsi.SendError(request, reason);
});
}, true);
}
const common = require('./wi_common.js');
var RegisterEQW = function(wsi, api) {
Register('EQW::IsLocked', wsi, api);
Register('EQW::Lock', wsi, api);
Register('EQW::Unlock', wsi, api);
common.Register('EQW::IsLocked', wsi, api);
common.Register('EQW::Lock', wsi, api);
common.Register('EQW::Unlock', wsi, api);
common.RegisterSubscription('EQW', 'WorldZoneUpdate', wsi, api);
};
module.exports = {

27
wi/ws/wi_common.js Normal file
View File

@ -0,0 +1,27 @@
function Register(name, wsi, api) {
wsi.Register(name,
function(request) {
api.Call(name, request.params)
.then(function(value) {
wsi.Send(request, value);
})
.catch(function(reason) {
wsi.SendError(request, reason);
});
}, true);
}
function RegisterSubscription(namespace, event, wsi, api) {
wsi.Register(namespace + '::Subscribe::' + event, function(request) {
api.Subscribe(event, request.ws);
});
wsi.Register(namespace + '::Unsubscribe::' + event, function(request) {
api.Unsubscribe(event, request.ws);
});
}
module.exports = {
'Register': Register,
'RegisterSubscription': RegisterSubscription
}

View File

@ -1,18 +1,21 @@
const WebSocketServer = require('ws').Server;
const jwt = require('jsonwebtoken');
const uuid = require('node-uuid');
class WebSocketInterface
{
constructor(server, key) {
constructor(server, key, api) {
this.wss = new WebSocketServer({ server: server });
this.methods = {};
var self = this;
this.wss.on('connection', function connection(ws) {
self.ws = ws;
ws.uuid = uuid.v4();
ws.on('message', function incoming(message) {
try {
var request = JSON.parse(message);
request.ws = ws;
if(request.method) {
var method = self.methods[request.method];
@ -51,6 +54,14 @@ class WebSocketInterface
self.SendError(null, 'No method supplied');
}
});
ws.on('close', function() {
api.UnsubscribeAll(ws);
});
ws.on('subscriptionMessage', function(msg) {
self.SendRaw(msg);
});
});
}
@ -92,6 +103,14 @@ class WebSocketInterface
console.log(ex);
}
}
SendRaw(obj) {
try {
this.ws.send(JSON.stringify(obj));
} catch(ex) {
console.log(ex);
}
}
}
module.exports = {

View File

@ -105,6 +105,21 @@ void WebInterface::SendError(const std::string &message, const std::string &id)
Send(error);
}
void WebInterface::SendEvent(const Json::Value &value)
{
try {
std::stringstream ss;
ss << value;
EQ::Net::DynamicPacket p;
p.PutString(0, ss.str());
m_connection->Send(ServerOP_WebInterfaceEvent, p);
}
catch (std::exception) {
//Log error
}
}
void WebInterface::AddCall(const std::string &method, WebInterfaceCall call)
{
m_calls.insert(std::make_pair(method, call));
@ -149,6 +164,12 @@ void WebInterfaceList::SendResponse(const std::string &uuid, std::string &id, co
}
}
void WebInterfaceList::SendEvent(const Json::Value &value) {
for (auto &i : m_interfaces) {
i.second->SendEvent(value);
}
}
void WebInterfaceList::SendError(const std::string &uuid, const std::string &message) {
auto iter = m_interfaces.find(uuid);
if (iter != m_interfaces.end()) {

View File

@ -19,6 +19,7 @@ public:
void SendResponse(const std::string &id, const Json::Value &response);
void SendError(const std::string &message);
void SendError(const std::string &message, const std::string &id);
void SendEvent(const Json::Value &value);
void AddCall(const std::string &method, WebInterfaceCall call);
private:
void OnCall(uint16 opcode, EQ::Net::Packet &p);
@ -37,6 +38,7 @@ public:
void AddConnection(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection);
void RemoveConnection(std::shared_ptr<EQ::Net::ServertalkServerConnection> connection);
void SendResponse(const std::string &uuid, std::string &id, const Json::Value &response);
void SendEvent(const Json::Value &value);
void SendError(const std::string &uuid, const std::string &message);
void SendError(const std::string &uuid, const std::string &message, const std::string &id);

View File

@ -24,10 +24,13 @@
#include "../common/servertalk.h"
#include "../common/string_util.h"
#include "../common/random.h"
#include "../common/json/json.h"
#include "web_interface.h"
extern uint32 numzones;
extern bool holdzones;
extern EQEmu::Random emu_random;
extern WebInterfaceList web_interface;
void CatchSignal(int sig_num);
ZSList::ZSList()
@ -36,6 +39,8 @@ ZSList::ZSList()
CurGroupID = 1;
LastAllocatedPort=0;
memset(pLockedZones, 0, sizeof(pLockedZones));
m_tick.reset(new EQ::Timer(1000, true, std::bind(&ZSList::OnTick, this, std::placeholders::_1)));
}
ZSList::~ZSList() {
@ -690,3 +695,39 @@ void ZSList::WorldShutDown(uint32 time, uint32 interval)
CatchSignal(2);
}
}
void ZSList::OnTick(EQ::Timer *t)
{
Json::Value out;
out["event"] = "WorldZoneUpdate";
out["data"] = Json::Value();
for (auto &zone : list)
{
Json::Value outzone;
outzone["CAddress"] = zone->GetCAddress();
outzone["CLocalAddress"] = zone->GetCLocalAddress();
outzone["CompileTime"] = zone->GetCompileTime();
outzone["CPort"] = zone->GetCPort();
outzone["ID"] = zone->GetID();
outzone["InstanceID"] = zone->GetInstanceID();
outzone["IP"] = zone->GetIP();
outzone["LaunchedName"] = zone->GetLaunchedName();
outzone["LaunchName"] = zone->GetLaunchName();
outzone["Port"] = zone->GetPort();
outzone["PrevZoneID"] = zone->GetPrevZoneID();
outzone["UUID"] = zone->GetUUID();
outzone["ZoneID"] = zone->GetZoneID();
outzone["ZoneLongName"] = zone->GetZoneLongName();
outzone["ZoneName"] = zone->GetZoneName();
outzone["ZoneOSProcessID"] = zone->GetZoneOSProcessID();
outzone["NumPlayers"] = zone->NumPlayers();
outzone["BootingUp"] = zone->IsBootingUp();
outzone["StaticZone"] = zone->IsStaticZone();
out["data"].append(outzone);
}
web_interface.SendEvent(out);
}

View File

@ -4,7 +4,7 @@
#include "../common/types.h"
#include "../common/eqtime.h"
#include "../common/timer.h"
#include "../common/linked_list.h"
#include "../common/event/timer.h"
#include <vector>
#include <memory>
@ -60,14 +60,15 @@ public:
void GetZoneIDList(std::vector<uint32> &zones);
void WorldShutDown(uint32 time, uint32 interval);
protected:
private:
void OnTick(EQ::Timer *t);
uint32 NextID;
std::list<std::unique_ptr<ZoneServer>> list;
uint16 pLockedZones[MaxLockedZones];
uint32 CurGroupID;
uint16 LastAllocatedPort;
std::unique_ptr<EQ::Timer> m_tick;
};
#endif /*ZONELIST_H_*/