From 21a39db1c65144aa717f4316ef8e9e9b809c600f Mon Sep 17 00:00:00 2001 From: KimLS Date: Fri, 29 Mar 2019 18:39:17 -0700 Subject: [PATCH] Make event loops able to be run in another thread --- common/event/event_loop.h | 20 ++++++++++---------- common/event/task.h | 9 ++++++++- common/event/timer.h | 22 ++++++++++++++++++++-- common/net/daybreak_connection.cpp | 9 +++++++-- common/net/daybreak_connection.h | 3 +++ common/net/dns.h | 8 ++++++-- common/net/tcp_connection.cpp | 10 +++++++--- common/net/tcp_connection.h | 2 ++ common/net/tcp_server.cpp | 11 +++++++++-- common/net/tcp_server.h | 5 ++++- eqlaunch/eqlaunch.cpp | 2 +- loginserver/main.cpp | 2 +- queryserv/queryserv.cpp | 2 +- ucs/ucs.cpp | 2 +- world/net.cpp | 2 +- zone/net.cpp | 2 +- 16 files changed, 82 insertions(+), 29 deletions(-) diff --git a/common/event/event_loop.h b/common/event/event_loop.h index 268f78c4d..e6efd580d 100644 --- a/common/event/event_loop.h +++ b/common/event/event_loop.h @@ -7,15 +7,20 @@ namespace EQ { class EventLoop { - public: - static EventLoop &Get() { - static thread_local EventLoop inst; - return inst; + public: + EventLoop() { + memset(&m_loop, 0, sizeof(uv_loop_t)); + uv_loop_init(&m_loop); } ~EventLoop() { uv_loop_close(&m_loop); } + + static EventLoop &GetDefault() { + static EventLoop inst; + return inst; + } void Process() { uv_run(&m_loop, UV_RUN_NOWAIT); @@ -23,12 +28,7 @@ namespace EQ uv_loop_t* Handle() { return &m_loop; } - private: - EventLoop() { - memset(&m_loop, 0, sizeof(uv_loop_t)); - uv_loop_init(&m_loop); - } - + private: EventLoop(const EventLoop&); EventLoop& operator=(const EventLoop&); diff --git a/common/event/task.h b/common/event/task.h index ae19700a2..9a0188d15 100644 --- a/common/event/task.h +++ b/common/event/task.h @@ -24,7 +24,13 @@ namespace EQ { std::exception error; }; + Task(EQ::EventLoop *loop, TaskFn fn) { + m_loop = loop; + m_fn = fn; + } + Task(TaskFn fn) { + m_loop = &EQ::EventLoop::GetDefault(); m_fn = fn; } @@ -60,7 +66,7 @@ namespace EQ { m_work->data = baton; - uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) { + uv_queue_work(m_loop->Handle(), m_work, [](uv_work_t* req) { TaskBaton *baton = (TaskBaton*)req->data; baton->fn([baton](const EQEmu::Any& result) { @@ -92,6 +98,7 @@ namespace EQ { } private: + EQ::EventLoop *m_loop; TaskFn m_fn; ResolveFn m_then; RejectFn m_catch; diff --git a/common/event/timer.h b/common/event/timer.h index e71459630..f1d023b99 100644 --- a/common/event/timer.h +++ b/common/event/timer.h @@ -6,14 +6,31 @@ namespace EQ { class Timer { public: - Timer(std::function cb) + Timer(EQ::EventLoop *loop, std::function cb) { + m_loop = loop; m_timer = nullptr; m_cb = cb; } + Timer(std::function cb) + { + m_loop = &EQ::EventLoop::GetDefault(); + m_timer = nullptr; + m_cb = cb; + } + + Timer(EQ::EventLoop *loop, uint64_t duration_ms, bool repeats, std::function cb) + { + m_loop = loop; + m_timer = nullptr; + m_cb = cb; + Start(duration_ms, repeats); + } + Timer(uint64_t duration_ms, bool repeats, std::function cb) { + m_loop = &EQ::EventLoop::GetDefault(); m_timer = nullptr; m_cb = cb; Start(duration_ms, repeats); @@ -25,7 +42,7 @@ namespace EQ { } void Start(uint64_t duration_ms, bool repeats) { - auto loop = EventLoop::Get().Handle(); + auto loop = m_loop->Handle(); if (!m_timer) { m_timer = new uv_timer_t; memset(m_timer, 0, sizeof(uv_timer_t)); @@ -61,6 +78,7 @@ namespace EQ { m_cb(this); } + EQ::EventLoop *m_loop; uv_timer_t *m_timer; std::function m_cb; }; diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 375841211..15b61ee51 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -13,7 +13,7 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager() memset(&m_timer, 0, sizeof(uv_timer_t)); memset(&m_socket, 0, sizeof(uv_udp_t)); - Attach(EQ::EventLoop::Get().Handle()); + Attach(EventLoop::GetDefault().Handle()); } EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConnectionManagerOptions &opts) @@ -23,7 +23,12 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConn memset(&m_timer, 0, sizeof(uv_timer_t)); memset(&m_socket, 0, sizeof(uv_udp_t)); - Attach(EQ::EventLoop::Get().Handle()); + if (opts.loop == nullptr) { + Attach(EventLoop::GetDefault().Handle()); + } + else { + Attach(opts.loop->Handle()); + } } EQ::Net::DaybreakConnectionManager::~DaybreakConnectionManager() diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index ab3c759c8..eac442844 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -13,6 +13,7 @@ namespace EQ { + class EventLoop; namespace Net { enum DaybreakProtocolOpcode @@ -252,6 +253,7 @@ namespace EQ resend_timeout = 90000; connection_close_time = 2000; outgoing_data_rate = 0.0; + loop = nullptr; } size_t max_packet_size; @@ -275,6 +277,7 @@ namespace EQ DaybreakEncodeType encode_passes[2]; int port; double outgoing_data_rate; + EQ::EventLoop *loop; }; class DaybreakConnectionManager diff --git a/common/net/dns.h b/common/net/dns.h index d43f41566..dbdd80531 100644 --- a/common/net/dns.h +++ b/common/net/dns.h @@ -8,7 +8,7 @@ namespace EQ { namespace Net { - static void DNSLookup(const std::string &addr, int port, bool ipv6, std::function cb) { + static void DNSLookup(EQ::EventLoop *eloop, const std::string &addr, int port, bool ipv6, std::function cb) { struct DNSBaton { std::function cb; @@ -21,7 +21,7 @@ namespace EQ hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; - auto loop = EQ::EventLoop::Get().Handle(); + auto loop = eloop->Handle(); uv_getaddrinfo_t *resolver = new uv_getaddrinfo_t(); memset(resolver, 0, sizeof(uv_getaddrinfo_t)); auto port_str = std::to_string(port); @@ -57,5 +57,9 @@ namespace EQ cb(addr); }, addr.c_str(), port_str.c_str(), &hints); } + + static void DNSLookup(const std::string &addr, int port, bool ipv6, std::function cb) { + DNSLookup(&EQ::EventLoop::GetDefault(), addr, port, ipv6, cb); + } } } diff --git a/common/net/tcp_connection.cpp b/common/net/tcp_connection.cpp index 03f8466de..8b468f43d 100644 --- a/common/net/tcp_connection.cpp +++ b/common/net/tcp_connection.cpp @@ -15,7 +15,7 @@ EQ::Net::TCPConnection::~TCPConnection() { Disconnect(); } -void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv6, std::function)> cb) +void EQ::Net::TCPConnection::Connect(EQ::EventLoop *loop, const std::string & addr, int port, bool ipv6, std::function)> cb) { struct EQTCPConnectBaton { @@ -23,10 +23,9 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv std::function)> cb; }; - auto loop = EQ::EventLoop::Get().Handle(); uv_tcp_t *socket = new uv_tcp_t; memset(socket, 0, sizeof(uv_tcp_t)); - uv_tcp_init(loop, socket); + uv_tcp_init(loop->Handle(), socket); sockaddr_storage iaddr; if (ipv6) { @@ -64,6 +63,11 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv }); } +void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv6, std::function)> cb) +{ + Connect(&EventLoop::GetDefault(), addr, port, ipv6, cb); +} + void EQ::Net::TCPConnection::Start() { uv_read_start((uv_stream_t*)m_socket, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = new char[suggested_size]; diff --git a/common/net/tcp_connection.h b/common/net/tcp_connection.h index c69671da8..4e914589c 100644 --- a/common/net/tcp_connection.h +++ b/common/net/tcp_connection.h @@ -7,6 +7,7 @@ namespace EQ { + class EventLoop; namespace Net { class TCPConnection @@ -15,6 +16,7 @@ namespace EQ TCPConnection(uv_tcp_t *socket); ~TCPConnection(); + static void Connect(EQ::EventLoop *loop, const std::string &addr, int port, bool ipv6, std::function)> cb); static void Connect(const std::string &addr, int port, bool ipv6, std::function)> cb); void Start(); diff --git a/common/net/tcp_server.cpp b/common/net/tcp_server.cpp index ed68d54dc..89478d4dc 100644 --- a/common/net/tcp_server.cpp +++ b/common/net/tcp_server.cpp @@ -7,6 +7,13 @@ void on_close_tcp_server_handle(uv_handle_t* handle) { EQ::Net::TCPServer::TCPServer() { + m_loop = &EventLoop::GetDefault(); + m_socket = nullptr; +} + +EQ::Net::TCPServer::TCPServer(EQ::EventLoop *loop) +{ + m_loop = loop; m_socket = nullptr; } @@ -32,7 +39,7 @@ void EQ::Net::TCPServer::Listen(const std::string &addr, int port, bool ipv6, st m_on_new_connection = cb; - auto loop = EQ::EventLoop::Get().Handle(); + auto loop = m_loop->Handle(); m_socket = new uv_tcp_t; memset(m_socket, 0, sizeof(uv_tcp_t)); uv_tcp_init(loop, m_socket); @@ -53,7 +60,7 @@ void EQ::Net::TCPServer::Listen(const std::string &addr, int port, bool ipv6, st return; } - auto loop = EQ::EventLoop::Get().Handle(); + auto loop = server->loop; uv_tcp_t *client = new uv_tcp_t; memset(client, 0, sizeof(uv_tcp_t)); uv_tcp_init(loop, client); diff --git a/common/net/tcp_server.h b/common/net/tcp_server.h index 6f8280d3c..006bab487 100644 --- a/common/net/tcp_server.h +++ b/common/net/tcp_server.h @@ -1,6 +1,7 @@ #pragma once #include "tcp_connection.h" +#include "../event/event_loop.h" namespace EQ { @@ -10,6 +11,7 @@ namespace EQ { public: TCPServer(); + TCPServer(EQ::EventLoop *loop); ~TCPServer(); void Listen(int port, bool ipv6, std::function)> cb); @@ -19,7 +21,8 @@ namespace EQ private: std::function)> m_on_new_connection; + EQ::EventLoop *m_loop; uv_tcp_t *m_socket; }; } -} \ No newline at end of file +} diff --git a/eqlaunch/eqlaunch.cpp b/eqlaunch/eqlaunch.cpp index 4cb7f7b9c..74376803e 100644 --- a/eqlaunch/eqlaunch.cpp +++ b/eqlaunch/eqlaunch.cpp @@ -136,7 +136,7 @@ int main(int argc, char *argv[]) { zones.erase(rem); } - EQ::EventLoop::Get().Process(); + EQ::EventLoop::GetDefault().Process(); Sleep(5); } diff --git a/loginserver/main.cpp b/loginserver/main.cpp index a77ec395f..56bce73aa 100644 --- a/loginserver/main.cpp +++ b/loginserver/main.cpp @@ -179,7 +179,7 @@ int main() while (run_server) { Timer::SetCurrentTime(); server.client_manager->Process(); - EQ::EventLoop::Get().Process(); + EQ::EventLoop::GetDefault().Process(); Sleep(5); } diff --git a/queryserv/queryserv.cpp b/queryserv/queryserv.cpp index cef6e40e3..1cdbe891f 100644 --- a/queryserv/queryserv.cpp +++ b/queryserv/queryserv.cpp @@ -99,7 +99,7 @@ int main() { if(LFGuildExpireTimer.Check()) lfguildmanager.ExpireEntries(); - EQ::EventLoop::Get().Process(); + EQ::EventLoop::GetDefault().Process(); Sleep(5); } LogSys.CloseFileLogs(); diff --git a/ucs/ucs.cpp b/ucs/ucs.cpp index bd36c47e6..69787deec 100644 --- a/ucs/ucs.cpp +++ b/ucs/ucs.cpp @@ -153,7 +153,7 @@ int main() { if(ChannelListProcessTimer.Check()) ChannelList->Process(); - EQ::EventLoop::Get().Process(); + EQ::EventLoop::GetDefault().Process(); Sleep(5); } diff --git a/world/net.cpp b/world/net.cpp index ce5900d30..88ad4e6e1 100644 --- a/world/net.cpp +++ b/world/net.cpp @@ -587,7 +587,7 @@ int main(int argc, char** argv) { UpdateWindowTitle(window_title); } - EQ::EventLoop::Get().Process(); + EQ::EventLoop::GetDefault().Process(); Sleep(5); } Log(Logs::General, Logs::World_Server, "World main loop completed."); diff --git a/zone/net.cpp b/zone/net.cpp index 0b1183136..7c06c8fc7 100644 --- a/zone/net.cpp +++ b/zone/net.cpp @@ -588,7 +588,7 @@ int main(int argc, char** argv) { while (RunLoops) { bool previous_loaded = is_zone_loaded && numclients > 0; - EQ::EventLoop::Get().Process(); + EQ::EventLoop::GetDefault().Process(); bool current_loaded = is_zone_loaded && numclients > 0; if (previous_loaded && !current_loaded) {