Implement a basic websockets server

This commit is contained in:
KimLS
2019-05-16 00:12:21 -07:00
parent 5bfcef600f
commit ebca112769
16 changed files with 734 additions and 307 deletions
+216
View File
@@ -0,0 +1,216 @@
#include "websocket_server.h"
#include "../event/event_loop.h"
#include "../event/timer.h"
#include <fmt/format.h>
#include <map>
#include <list>
struct MethodHandlerEntry
{
MethodHandlerEntry(EQ::Net::WebsocketServer::MethodHandler h, int s) {
handler = h;
status = s;
}
EQ::Net::WebsocketServer::MethodHandler handler;
int status;
};
struct EQ::Net::WebsocketServer::Impl
{
std::unique_ptr<TCPServer> server;
std::unique_ptr<EQ::Timer> ping_timer;
std::map<std::shared_ptr<websocket_connection>, std::unique_ptr<WebsocketServerConnection>> connections;
std::map<std::string, MethodHandlerEntry> methods;
websocket_server websocket_server;
LoginHandler login_handler;
};
EQ::Net::WebsocketServer::WebsocketServer(const std::string &addr, int port)
{
_impl.reset(new Impl());
_impl->server.reset(new EQ::Net::TCPServer());
_impl->server->Listen(addr, port, false, [this](std::shared_ptr<EQ::Net::TCPConnection> connection) {
auto wsc = _impl->websocket_server.get_connection();
WebsocketServerConnection *c = new WebsocketServerConnection(this, connection, wsc);
_impl->connections.insert(std::make_pair(wsc, std::unique_ptr<WebsocketServerConnection>(c)));
});
_impl->websocket_server.set_write_handler(
[this](websocketpp::connection_hdl hdl, char const *data, size_t size) -> websocketpp::lib::error_code {
auto c = _impl->websocket_server.get_con_from_hdl(hdl);
auto iter = _impl->connections.find(c);
if (iter != _impl->connections.end()) {
iter->second->GetTCPConnection()->Write(data, size);
}
return websocketpp::lib::error_code();
});
_impl->ping_timer.reset(new EQ::Timer(5000, true, [this](EQ::Timer *t) {
auto iter = _impl->connections.begin();
while (iter != _impl->connections.end()) {
try {
auto &connection = iter->second;
connection->GetWebsocketConnection()->ping("keepalive");
}
catch (std::exception) {
iter->second->GetTCPConnection()->Disconnect();
}
iter++;
}
}));
_impl->methods.insert(std::make_pair("login", MethodHandlerEntry(std::bind(&WebsocketServer::Login, this, std::placeholders::_1, std::placeholders::_2), 0)));
_impl->methods.insert(std::make_pair("subscribe", MethodHandlerEntry(std::bind(&WebsocketServer::Subscribe, this, std::placeholders::_1, std::placeholders::_2), 0)));
_impl->methods.insert(std::make_pair("unsubscribe", MethodHandlerEntry(std::bind(&WebsocketServer::Unsubscribe, this, std::placeholders::_1, std::placeholders::_2), 0)));
_impl->login_handler = [](const WebsocketServerConnection* connection, const std::string& user, const std::string& pass) {
WebsocketLoginStatus ret;
ret.account_name = "admin";
if (connection->RemoteIP() == "127.0.0.1" || connection->RemoteIP() == "::") {
ret.logged_in = true;
return ret;
}
ret.logged_in = false;
return ret;
};
_impl->websocket_server.clear_access_channels(websocketpp::log::alevel::all);
}
EQ::Net::WebsocketServer::~WebsocketServer()
{
}
void EQ::Net::WebsocketServer::ReleaseConnection(WebsocketServerConnection *connection)
{
//Clear any subscriptions here
_impl->connections.erase(connection->GetWebsocketConnection());
}
Json::Value EQ::Net::WebsocketServer::HandleRequest(WebsocketServerConnection *connection, const std::string &method, const Json::Value &params)
{
Json::Value err;
if (method != "login") {
if (!connection->IsAuthorized()) {
throw WebsocketException("Not logged in");
}
}
auto iter = _impl->methods.find(method);
if (iter != _impl->methods.end()) {
auto &s = iter->second;
if (s.status > connection->GetStatus()) {
throw WebsocketException("Status too low");
}
return s.handler(connection, params);
}
throw WebsocketException("Unknown Method");
}
void EQ::Net::WebsocketServer::SetMethodHandler(const std::string &method, MethodHandler handler, int required_status)
{
//Reserved method names
if (method == "subscribe" ||
method == "unsubscribe" ||
method == "login") {
return;
}
_impl->methods.insert_or_assign(method, MethodHandlerEntry(handler, required_status));
}
void EQ::Net::WebsocketServer::SetLoginHandler(LoginHandler handler)
{
_impl->login_handler = handler;
}
void EQ::Net::WebsocketServer::DispatchEvent(const std::string &evt, Json::Value data, int required_status)
{
try {
Json::Value event_obj;
event_obj["type"] = "event";
event_obj["event"] = evt;
event_obj["data"] = data;
std::stringstream payload;
payload << event_obj;
for (auto &iter : _impl->connections) {
auto &c = iter.second;
//Might be better to get rid of subscriptions and just send everything and
//let the client sort out what they want idk
if (c->GetStatus() >= required_status && c->IsSubscribed(evt)) {
c->GetWebsocketConnection()->send(payload.str());
}
}
}
catch (std::exception) {
}
}
Json::Value EQ::Net::WebsocketServer::Login(WebsocketServerConnection *connection, const Json::Value &params)
{
Json::Value ret;
try {
Json::Value ret;
auto user = params[0].asString();
auto pass = params[1].asString();
auto r = _impl->login_handler(connection, user, pass);
if (r.logged_in) {
connection->SetAuthorized(true, r.account_name, r.account_id, 255);
ret["status"] = "Ok";
}
else {
connection->SetAuthorized(false, "", 0, 0);
ret["status"] = "Not Authorized";
}
return ret;
}
catch (std::exception) {
throw WebsocketException("Unable to process login request");
}
}
Json::Value EQ::Net::WebsocketServer::Subscribe(WebsocketServerConnection *connection, const Json::Value &params)
{
Json::Value ret;
try {
auto evt = params[0].asString();
connection->Subscribe(evt);
ret["status"] = "Ok";
return ret;
}
catch (std::exception) {
throw WebsocketException("Unable to process subscribe request");
}
}
Json::Value EQ::Net::WebsocketServer::Unsubscribe(WebsocketServerConnection *connection, const Json::Value &params)
{
Json::Value ret;
try {
auto evt = params[0].asString();
connection->Unsubscribe(evt);
ret["status"] = "Ok";
return ret;
}
catch (std::exception) {
throw WebsocketException("Unable to process unsubscribe request");
}
}
+63
View File
@@ -0,0 +1,63 @@
#pragma once
#include "websocket_server_connection.h"
#include "../json/json.h"
#include <memory>
#include <functional>
#include <exception>
namespace EQ
{
namespace Net
{
struct WebsocketLoginStatus
{
bool logged_in;
std::string account_name;
uint32 account_id;
int status;
};
class WebsocketException : public std::exception
{
public:
WebsocketException(const std::string &msg)
: _msg(msg.empty() ? "Unknown Error" : msg) { }
~WebsocketException() throw() {}
virtual char const *what() const throw() {
return _msg.c_str();
}
private:
const std::string _msg;
};
class WebsocketServer
{
public:
typedef std::function<Json::Value(WebsocketServerConnection*, const Json::Value&)> MethodHandler;
typedef std::function<WebsocketLoginStatus(WebsocketServerConnection*, const std::string&, const std::string&)> LoginHandler;
WebsocketServer(const std::string &addr, int port);
~WebsocketServer();
void SetMethodHandler(const std::string& method, MethodHandler handler, int required_status);
void SetLoginHandler(LoginHandler handler);
void DispatchEvent(const std::string& evt, Json::Value data = Json::Value(), int required_status = 0);
private:
void ReleaseConnection(WebsocketServerConnection *connection);
Json::Value HandleRequest(WebsocketServerConnection *connection, const std::string& method, const Json::Value &params);
Json::Value Login(WebsocketServerConnection *connection, const Json::Value &params);
Json::Value Subscribe(WebsocketServerConnection *connection, const Json::Value &params);
Json::Value Unsubscribe(WebsocketServerConnection *connection, const Json::Value &params);
struct Impl;
std::unique_ptr<Impl> _impl;
friend class WebsocketServerConnection;
};
}
}
+189
View File
@@ -0,0 +1,189 @@
#include "websocket_server_connection.h"
#include "websocket_server.h"
#include "../timer.h"
#include "../util/uuid.h"
#include <sstream>
#include <unordered_set>
#include <fmt/format.h>
struct EQ::Net::WebsocketServerConnection::Impl {
WebsocketServer *parent;
std::shared_ptr<TCPConnection> connection;
std::shared_ptr<websocket_connection> ws_connection;
std::string id;
bool authorized;
std::string account_name;
uint32 account_id;
int status;
std::unordered_set<std::string> subscribed;
};
EQ::Net::WebsocketServerConnection::WebsocketServerConnection(WebsocketServer *parent,
std::shared_ptr<TCPConnection> connection,
std::shared_ptr<websocket_connection> ws_connection)
{
_impl.reset(new Impl());
_impl->parent = parent;
_impl->connection = connection;
_impl->id = EQ::Util::UUID::Generate().ToString();
_impl->authorized = false;
_impl->account_id = 0;
_impl->status = 0;
_impl->ws_connection = ws_connection;
_impl->ws_connection->set_message_handler(std::bind(&WebsocketServerConnection::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
_impl->ws_connection->start();
connection->OnDisconnect([this](EQ::Net::TCPConnection *connection) {
_impl->parent->ReleaseConnection(this);
});
connection->OnRead([this](EQ::Net::TCPConnection *c, const unsigned char *buffer, size_t buffer_size) {
_impl->ws_connection->read_all((const char*)buffer, buffer_size);
});
connection->Start();
}
EQ::Net::WebsocketServerConnection::~WebsocketServerConnection()
{
}
std::string EQ::Net::WebsocketServerConnection::GetID() const
{
return _impl->id;
}
bool EQ::Net::WebsocketServerConnection::IsAuthorized() const
{
return _impl->authorized;
}
std::string EQ::Net::WebsocketServerConnection::GetAccountName() const
{
return _impl->account_name;
}
uint32 EQ::Net::WebsocketServerConnection::GetAccountID() const
{
return _impl->account_id;
}
int EQ::Net::WebsocketServerConnection::GetStatus() const
{
return _impl->status;
}
std::string EQ::Net::WebsocketServerConnection::RemoteIP() const
{
return _impl->connection->RemoteIP();
}
int EQ::Net::WebsocketServerConnection::RemotePort() const
{
return _impl->connection->RemotePort();
}
std::shared_ptr<EQ::Net::websocket_connection> EQ::Net::WebsocketServerConnection::GetWebsocketConnection()
{
return _impl->ws_connection;
}
std::shared_ptr<EQ::Net::TCPConnection> EQ::Net::WebsocketServerConnection::GetTCPConnection()
{
return _impl->connection;
}
void EQ::Net::WebsocketServerConnection::OnMessage(websocketpp::connection_hdl hdl, websocket_message_ptr msg)
{
BenchTimer timer;
timer.reset();
if (msg->get_opcode() == websocketpp::frame::opcode::text) {
try {
auto &payload = msg->get_payload();
std::stringstream ss(payload);
Json::Value root;
ss >> root;
auto method = root["method"].asString();
auto params = root["params"];
std::string id = "";
auto idNode = root["id"];
if (!idNode.isNull() && idNode.isString()) {
id = idNode.asString();
}
Json::Value response;
response["type"] = "method";
response["data"] = _impl->parent->HandleRequest(this, method, params);
response["method"] = method;
if(id != "") {
response["id"] = id;
}
SendResponse(response, timer.elapsed());
}
catch (std::exception &ex) {
Json::Value error;
error["type"] = "method";
error["error"] = fmt::format("{0}", ex.what());
SendResponse(error, timer.elapsed());
}
}
}
void EQ::Net::WebsocketServerConnection::SendResponse(const Json::Value &response, double time_elapsed)
{
Json::Value root = response;
root["execution_time"] = std::to_string(time_elapsed);
std::stringstream payload;
payload << root;
_impl->ws_connection->send(payload.str());
}
void EQ::Net::WebsocketServerConnection::SetAuthorized(bool v, const std::string account_name, uint32 account_id, int status)
{
_impl->authorized = v;
_impl->account_name = account_name;
_impl->account_id = account_id;
_impl->status = status;
}
void EQ::Net::WebsocketServerConnection::Subscribe(const std::string &evt)
{
if (evt == "") {
return;
}
auto iter = _impl->subscribed.find(evt);
if (iter == _impl->subscribed.end()) {
_impl->subscribed.insert(evt);
}
}
void EQ::Net::WebsocketServerConnection::Unsubscribe(const std::string &evt)
{
if (evt == "") {
return;
}
auto iter = _impl->subscribed.find(evt);
if (iter != _impl->subscribed.end()) {
_impl->subscribed.erase(iter);
}
}
bool EQ::Net::WebsocketServerConnection::IsSubscribed(const std::string &evt) const
{
auto iter = _impl->subscribed.find(evt);
if (iter != _impl->subscribed.end()) {
return true;
}
return false;
}
+49
View File
@@ -0,0 +1,49 @@
#pragma once
#include "tcp_server.h"
#include "../types.h"
#include "../json/json-forwards.h"
#include <websocketpp/config/core.hpp>
#include <websocketpp/server.hpp>
namespace EQ
{
namespace Net
{
typedef websocketpp::server<websocketpp::config::core> websocket_server;
typedef websocketpp::connection<websocketpp::config::core> websocket_connection;
typedef websocket_server::message_ptr websocket_message_ptr;
class WebsocketServer;
class WebsocketServerConnection
{
public:
WebsocketServerConnection(WebsocketServer *parent,
std::shared_ptr<TCPConnection> connection,
std::shared_ptr<websocket_connection> ws_connection);
~WebsocketServerConnection();
std::string GetID() const;
bool IsAuthorized() const;
std::string GetAccountName() const;
uint32 GetAccountID() const;
int GetStatus() const;
std::string RemoteIP() const;
int RemotePort() const;
private:
std::shared_ptr<websocket_connection> GetWebsocketConnection();
std::shared_ptr<TCPConnection> GetTCPConnection();
void OnMessage(websocketpp::connection_hdl hdl, websocket_message_ptr msg);
void SendResponse(const Json::Value &response, double time_elapsed);
void SetAuthorized(bool v, const std::string account_name, uint32 account_id, int status);
void Subscribe(const std::string &evt);
void Unsubscribe(const std::string &evt);
bool IsSubscribed(const std::string &evt) const;
struct Impl;
std::unique_ptr<Impl> _impl;
friend class WebsocketServer;
};
}
}