diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d3adcc25..16a2bfa20 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -297,6 +297,11 @@ IF(EQEMU_BUILD_LUA) ENDIF(EQEMU_SANITIZE_LUA_LIBS) ENDIF(EQEMU_BUILD_LUA) +IF(EQEMU_BUILD_SOCKET_SERVER) + INCLUDE_DIRECTORIES("${CMAKE_CURRENT_SOURCE_DIR}/dependencies/websocketpp") + ADD_DEFINITIONS(-D_WEBSOCKETPP_CPP11_STL_) +ENDIF(EQEMU_BUILD_SOCKET_SERVER) + INCLUDE_DIRECTORIES("${ZLIB_INCLUDE_DIRS}" "${MySQL_INCLUDE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}/common/glm/glm") IF(EQEMU_BUILD_LUA) @@ -312,7 +317,9 @@ IF(EQEMU_BUILD_SERVER) ADD_SUBDIRECTORY(zone) ADD_SUBDIRECTORY(ucs) ADD_SUBDIRECTORY(queryserv) +IF(EQEMU_BUILD_SOCKET_SERVER) ADD_SUBDIRECTORY(socket_server) +ENDIF(EQEMU_BUILD_SOCKET_SERVER) ADD_SUBDIRECTORY(eqlaunch) ENDIF(EQEMU_BUILD_SERVER) IF(EQEMU_BUILD_LOGIN) diff --git a/socket_server/socket_server.cpp b/socket_server/socket_server.cpp index eed4751d1..83af27d4c 100644 --- a/socket_server/socket_server.cpp +++ b/socket_server/socket_server.cpp @@ -44,6 +44,131 @@ void CatchSignal(int sig_num) { worldserver->Disconnect(); } +/* Web Sockets Start */ + +enum action_type { + SUBSCRIBE, + UNSUBSCRIBE, + MESSAGE +}; + +struct action { + action(action_type t, connection_hdl h) : type(t), hdl(h) {} + action(action_type t, connection_hdl h, server::message_ptr m) + : type(t), hdl(h), msg(m) {} + + action_type type; + websocketpp::connection_hdl hdl; + server::message_ptr msg; +}; + +class broadcast_server { +public: + broadcast_server() { + // Initialize Asio Transport + m_server.init_asio(); + + // Register handler callbacks + m_server.set_open_handler(bind(&broadcast_server::on_open, this, ::_1)); + m_server.set_close_handler(bind(&broadcast_server::on_close, this, ::_1)); + m_server.set_message_handler(bind(&broadcast_server::on_message, this, ::_1, ::_2)); + } + + void run(uint16_t port) { + // listen on specified port + m_server.listen(port); + + // Start the server accept loop + m_server.start_accept(); + + // Start the ASIO io_service run loop + try { + m_server.run(); + } + catch (const std::exception & e) { + std::cout << e.what() << std::endl; + } + catch (websocketpp::lib::error_code e) { + std::cout << e.message() << std::endl; + } + catch (...) { + std::cout << "other exception" << std::endl; + } + } + + void on_open(connection_hdl hdl) { + unique_lock lock(m_action_lock); + //std::cout << "on_open" << std::endl; + m_actions.push(action(SUBSCRIBE, hdl)); + lock.unlock(); + m_action_cond.notify_one(); + } + + void on_close(connection_hdl hdl) { + unique_lock lock(m_action_lock); + //std::cout << "on_close" << std::endl; + m_actions.push(action(UNSUBSCRIBE, hdl)); + lock.unlock(); + m_action_cond.notify_one(); + } + + void on_message(connection_hdl hdl, server::message_ptr msg) { + // queue message up for sending by processing thread + unique_lock lock(m_action_lock); + msg->set_payload("Test Message"); + // std::cout << "on_message" << std::endl; + m_actions.push(action(MESSAGE, hdl, msg)); + lock.unlock(); + m_action_cond.notify_one(); + } + + void process_messages() { + while (1) { + unique_lock lock(m_action_lock); + + while (m_actions.empty()) { + m_action_cond.wait(lock); + } + + action a = m_actions.front(); + m_actions.pop(); + + lock.unlock(); + + if (a.type == SUBSCRIBE) { + unique_lock con_lock(m_connection_lock); + m_connections.insert(a.hdl); + } + else if (a.type == UNSUBSCRIBE) { + unique_lock con_lock(m_connection_lock); + m_connections.erase(a.hdl); + } + else if (a.type == MESSAGE) { + unique_lock con_lock(m_connection_lock); + + con_list::iterator it; + for (it = m_connections.begin(); it != m_connections.end(); ++it) { + m_server.send(*it, a.msg); + } + } + else { + // undefined. + } + } + } +private: + typedef std::set> con_list; + + server m_server; + con_list m_connections; + std::queue m_actions; + + mutex m_action_lock; + mutex m_connection_lock; + condition_variable m_action_cond; +}; + +/* Web Sockets Shit */ int main() { RegisterExecutablePlatform(ExePlatformSocket_Server); set_exception_handler();