Subscription changes

This commit is contained in:
KimLS 2019-05-17 00:09:38 -07:00
parent e277e55718
commit c737504645
5 changed files with 64 additions and 53 deletions

View File

@ -3,7 +3,8 @@
#include "../event/timer.h" #include "../event/timer.h"
#include <fmt/format.h> #include <fmt/format.h>
#include <map> #include <map>
#include <list> #include <unordered_set>
#include <array>
struct MethodHandlerEntry struct MethodHandlerEntry
{ {
@ -24,6 +25,7 @@ struct EQ::Net::WebsocketServer::Impl
std::map<std::string, MethodHandlerEntry> methods; std::map<std::string, MethodHandlerEntry> methods;
websocket_server websocket_server; websocket_server websocket_server;
LoginHandler login_handler; LoginHandler login_handler;
std::array<std::unordered_set<WebsocketServerConnection*>, SubscriptionEventMax> subscriptions;
}; };
EQ::Net::WebsocketServer::WebsocketServer(const std::string &addr, int port) EQ::Net::WebsocketServer::WebsocketServer(const std::string &addr, int port)
@ -88,7 +90,7 @@ EQ::Net::WebsocketServer::~WebsocketServer()
void EQ::Net::WebsocketServer::ReleaseConnection(WebsocketServerConnection *connection) void EQ::Net::WebsocketServer::ReleaseConnection(WebsocketServerConnection *connection)
{ {
//Clear any subscriptions here UnsubscribeAll(connection);
_impl->connections.erase(connection->GetWebsocketConnection()); _impl->connections.erase(connection->GetWebsocketConnection());
} }
@ -132,12 +134,12 @@ void EQ::Net::WebsocketServer::SetLoginHandler(LoginHandler handler)
_impl->login_handler = handler; _impl->login_handler = handler;
} }
void EQ::Net::WebsocketServer::DispatchEvent(const std::string &evt, Json::Value data, int required_status) void EQ::Net::WebsocketServer::DispatchEvent(WebsocketSubscriptionEvent evt, Json::Value data, int required_status)
{ {
try { try {
Json::Value event_obj; Json::Value event_obj;
event_obj["type"] = "event"; event_obj["type"] = "event";
event_obj["event"] = evt; event_obj["event"] = (int)evt;
event_obj["data"] = data; event_obj["data"] = data;
std::stringstream payload; std::stringstream payload;
@ -146,9 +148,7 @@ void EQ::Net::WebsocketServer::DispatchEvent(const std::string &evt, Json::Value
for (auto &iter : _impl->connections) { for (auto &iter : _impl->connections) {
auto &c = iter.second; auto &c = iter.second;
//Might be better to get rid of subscriptions and just send everything and if (c->GetStatus() >= required_status && IsSubscribed(c.get(), evt)) {
//let the client sort out what they want idk
if (c->GetStatus() >= required_status && c->IsSubscribed(evt)) {
c->GetWebsocketConnection()->send(payload.str()); c->GetWebsocketConnection()->send(payload.str());
} }
} }
@ -190,13 +190,20 @@ Json::Value EQ::Net::WebsocketServer::Subscribe(WebsocketServerConnection *conne
Json::Value ret; Json::Value ret;
try { try {
auto evt = params[0].asString(); auto evt = params[0].asInt();
connection->Subscribe(evt); if (evt < 0 || evt >= SubscriptionEventMax) {
throw WebsocketException("Not a valid subscription");
}
DoSubscribe(connection, (WebsocketSubscriptionEvent)evt);
ret["status"] = "Ok"; ret["status"] = "Ok";
return ret; return ret;
} }
catch (WebsocketException &ex) {
throw ex;
}
catch (std::exception) { catch (std::exception) {
throw WebsocketException("Unable to process subscribe request"); throw WebsocketException("Unable to process unsubscribe request");
} }
} }
@ -205,12 +212,44 @@ Json::Value EQ::Net::WebsocketServer::Unsubscribe(WebsocketServerConnection *con
Json::Value ret; Json::Value ret;
try { try {
auto evt = params[0].asString(); auto evt = params[0].asInt();
connection->Unsubscribe(evt); if (evt < 0 || evt >= SubscriptionEventMax) {
throw WebsocketException("Not a valid subscription");
}
DoUnsubscribe(connection, (WebsocketSubscriptionEvent)evt);
ret["status"] = "Ok"; ret["status"] = "Ok";
return ret; return ret;
} }
catch (WebsocketException &ex) {
throw ex;
}
catch (std::exception) { catch (std::exception) {
throw WebsocketException("Unable to process unsubscribe request"); throw WebsocketException("Unable to process unsubscribe request");
} }
} }
void EQ::Net::WebsocketServer::DoSubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub) {
auto &s = _impl->subscriptions[sub];
auto iter = s.find(connection);
if (iter == s.end()) {
s.insert(connection);
}
}
void EQ::Net::WebsocketServer::DoUnsubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub) {
auto &s = _impl->subscriptions[sub];
s.erase(connection);
}
bool EQ::Net::WebsocketServer::IsSubscribed(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub) {
auto &s = _impl->subscriptions[sub];
return s.count(connection) == 1;
}
void EQ::Net::WebsocketServer::UnsubscribeAll(WebsocketServerConnection *connection) {
for (auto i = 0; i < SubscriptionEventMax; ++i) {
DoUnsubscribe(connection, (WebsocketSubscriptionEvent)i);
}
}

View File

@ -11,6 +11,13 @@ namespace EQ
{ {
namespace Net namespace Net
{ {
enum WebsocketSubscriptionEvent : int
{
SubscriptionEventNone,
SubscriptionEventLog,
SubscriptionEventMax
};
struct WebsocketLoginStatus struct WebsocketLoginStatus
{ {
bool logged_in; bool logged_in;
@ -45,7 +52,7 @@ namespace EQ
void SetMethodHandler(const std::string& method, MethodHandler handler, int required_status); void SetMethodHandler(const std::string& method, MethodHandler handler, int required_status);
void SetLoginHandler(LoginHandler handler); void SetLoginHandler(LoginHandler handler);
void DispatchEvent(const std::string& evt, Json::Value data = Json::Value(), int required_status = 0); void DispatchEvent(WebsocketSubscriptionEvent evt, Json::Value data = Json::Value(), int required_status = 0);
private: private:
void ReleaseConnection(WebsocketServerConnection *connection); void ReleaseConnection(WebsocketServerConnection *connection);
Json::Value HandleRequest(WebsocketServerConnection *connection, const std::string& method, const Json::Value &params); Json::Value HandleRequest(WebsocketServerConnection *connection, const std::string& method, const Json::Value &params);
@ -53,6 +60,10 @@ namespace EQ
Json::Value Login(WebsocketServerConnection *connection, const Json::Value &params); Json::Value Login(WebsocketServerConnection *connection, const Json::Value &params);
Json::Value Subscribe(WebsocketServerConnection *connection, const Json::Value &params); Json::Value Subscribe(WebsocketServerConnection *connection, const Json::Value &params);
Json::Value Unsubscribe(WebsocketServerConnection *connection, const Json::Value &params); Json::Value Unsubscribe(WebsocketServerConnection *connection, const Json::Value &params);
void DoSubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub);
void DoUnsubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub);
bool IsSubscribed(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub);
void UnsubscribeAll(WebsocketServerConnection *connection);
struct Impl; struct Impl;
std::unique_ptr<Impl> _impl; std::unique_ptr<Impl> _impl;

View File

@ -3,7 +3,6 @@
#include "../timer.h" #include "../timer.h"
#include "../util/uuid.h" #include "../util/uuid.h"
#include <sstream> #include <sstream>
#include <unordered_set>
#include <fmt/format.h> #include <fmt/format.h>
struct EQ::Net::WebsocketServerConnection::Impl { struct EQ::Net::WebsocketServerConnection::Impl {
@ -15,7 +14,6 @@ struct EQ::Net::WebsocketServerConnection::Impl {
std::string account_name; std::string account_name;
uint32 account_id; uint32 account_id;
int status; int status;
std::unordered_set<std::string> subscribed;
}; };
EQ::Net::WebsocketServerConnection::WebsocketServerConnection(WebsocketServer *parent, EQ::Net::WebsocketServerConnection::WebsocketServerConnection(WebsocketServer *parent,
@ -153,37 +151,3 @@ void EQ::Net::WebsocketServerConnection::SetAuthorized(bool v, const std::string
_impl->account_id = account_id; _impl->account_id = account_id;
_impl->status = status; _impl->status = status;
} }
void EQ::Net::WebsocketServerConnection::Subscribe(const std::string &evt)
{
if (evt == "") {
return;
}
auto iter = _impl->subscribed.find(evt);
if (iter == _impl->subscribed.end()) {
_impl->subscribed.insert(evt);
}
}
void EQ::Net::WebsocketServerConnection::Unsubscribe(const std::string &evt)
{
if (evt == "") {
return;
}
auto iter = _impl->subscribed.find(evt);
if (iter != _impl->subscribed.end()) {
_impl->subscribed.erase(iter);
}
}
bool EQ::Net::WebsocketServerConnection::IsSubscribed(const std::string &evt) const
{
auto iter = _impl->subscribed.find(evt);
if (iter != _impl->subscribed.end()) {
return true;
}
return false;
}

View File

@ -36,9 +36,6 @@ namespace EQ
void OnMessage(websocketpp::connection_hdl hdl, websocket_message_ptr msg); void OnMessage(websocketpp::connection_hdl hdl, websocket_message_ptr msg);
void SendResponse(const Json::Value &response, double time_elapsed); void SendResponse(const Json::Value &response, double time_elapsed);
void SetAuthorized(bool v, const std::string account_name, uint32 account_id, int status); void SetAuthorized(bool v, const std::string account_name, uint32 account_id, int status);
void Subscribe(const std::string &evt);
void Unsubscribe(const std::string &evt);
bool IsSubscribed(const std::string &evt) const;
struct Impl; struct Impl;
std::unique_ptr<Impl> _impl; std::unique_ptr<Impl> _impl;

View File

@ -807,7 +807,7 @@ void RegisterApiLogEvent(std::unique_ptr<EQ::Net::WebsocketServer> &server)
data["debug_level"] = debug_level; data["debug_level"] = debug_level;
data["log_category"] = log_category; data["log_category"] = log_category;
data["msg"] = msg; data["msg"] = msg;
server->DispatchEvent("log", data, 50); server->DispatchEvent(EQ::Net::SubscriptionEventLog, data, 50);
}); });
} }