Make event loops able to be run in another thread

This commit is contained in:
KimLS 2019-03-29 18:39:17 -07:00
parent 566f743a88
commit 21a39db1c6
16 changed files with 82 additions and 29 deletions

View File

@ -7,15 +7,20 @@ namespace EQ
{ {
class EventLoop class EventLoop
{ {
public: public:
static EventLoop &Get() { EventLoop() {
static thread_local EventLoop inst; memset(&m_loop, 0, sizeof(uv_loop_t));
return inst; uv_loop_init(&m_loop);
} }
~EventLoop() { ~EventLoop() {
uv_loop_close(&m_loop); uv_loop_close(&m_loop);
} }
static EventLoop &GetDefault() {
static EventLoop inst;
return inst;
}
void Process() { void Process() {
uv_run(&m_loop, UV_RUN_NOWAIT); uv_run(&m_loop, UV_RUN_NOWAIT);
@ -23,12 +28,7 @@ namespace EQ
uv_loop_t* Handle() { return &m_loop; } uv_loop_t* Handle() { return &m_loop; }
private: private:
EventLoop() {
memset(&m_loop, 0, sizeof(uv_loop_t));
uv_loop_init(&m_loop);
}
EventLoop(const EventLoop&); EventLoop(const EventLoop&);
EventLoop& operator=(const EventLoop&); EventLoop& operator=(const EventLoop&);

View File

@ -24,7 +24,13 @@ namespace EQ {
std::exception error; std::exception error;
}; };
Task(EQ::EventLoop *loop, TaskFn fn) {
m_loop = loop;
m_fn = fn;
}
Task(TaskFn fn) { Task(TaskFn fn) {
m_loop = &EQ::EventLoop::GetDefault();
m_fn = fn; m_fn = fn;
} }
@ -60,7 +66,7 @@ namespace EQ {
m_work->data = baton; m_work->data = baton;
uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) { uv_queue_work(m_loop->Handle(), m_work, [](uv_work_t* req) {
TaskBaton *baton = (TaskBaton*)req->data; TaskBaton *baton = (TaskBaton*)req->data;
baton->fn([baton](const EQEmu::Any& result) { baton->fn([baton](const EQEmu::Any& result) {
@ -92,6 +98,7 @@ namespace EQ {
} }
private: private:
EQ::EventLoop *m_loop;
TaskFn m_fn; TaskFn m_fn;
ResolveFn m_then; ResolveFn m_then;
RejectFn m_catch; RejectFn m_catch;

View File

@ -6,14 +6,31 @@ namespace EQ {
class Timer class Timer
{ {
public: public:
Timer(std::function<void(Timer *)> cb) Timer(EQ::EventLoop *loop, std::function<void(Timer *)> cb)
{ {
m_loop = loop;
m_timer = nullptr; m_timer = nullptr;
m_cb = cb; m_cb = cb;
} }
Timer(std::function<void(Timer *)> cb)
{
m_loop = &EQ::EventLoop::GetDefault();
m_timer = nullptr;
m_cb = cb;
}
Timer(EQ::EventLoop *loop, uint64_t duration_ms, bool repeats, std::function<void(Timer *)> cb)
{
m_loop = loop;
m_timer = nullptr;
m_cb = cb;
Start(duration_ms, repeats);
}
Timer(uint64_t duration_ms, bool repeats, std::function<void(Timer *)> cb) Timer(uint64_t duration_ms, bool repeats, std::function<void(Timer *)> cb)
{ {
m_loop = &EQ::EventLoop::GetDefault();
m_timer = nullptr; m_timer = nullptr;
m_cb = cb; m_cb = cb;
Start(duration_ms, repeats); Start(duration_ms, repeats);
@ -25,7 +42,7 @@ namespace EQ {
} }
void Start(uint64_t duration_ms, bool repeats) { void Start(uint64_t duration_ms, bool repeats) {
auto loop = EventLoop::Get().Handle(); auto loop = m_loop->Handle();
if (!m_timer) { if (!m_timer) {
m_timer = new uv_timer_t; m_timer = new uv_timer_t;
memset(m_timer, 0, sizeof(uv_timer_t)); memset(m_timer, 0, sizeof(uv_timer_t));
@ -61,6 +78,7 @@ namespace EQ {
m_cb(this); m_cb(this);
} }
EQ::EventLoop *m_loop;
uv_timer_t *m_timer; uv_timer_t *m_timer;
std::function<void(Timer*)> m_cb; std::function<void(Timer*)> m_cb;
}; };

View File

@ -13,7 +13,7 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
memset(&m_timer, 0, sizeof(uv_timer_t)); memset(&m_timer, 0, sizeof(uv_timer_t));
memset(&m_socket, 0, sizeof(uv_udp_t)); memset(&m_socket, 0, sizeof(uv_udp_t));
Attach(EQ::EventLoop::Get().Handle()); Attach(EventLoop::GetDefault().Handle());
} }
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConnectionManagerOptions &opts) EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConnectionManagerOptions &opts)
@ -23,7 +23,12 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConn
memset(&m_timer, 0, sizeof(uv_timer_t)); memset(&m_timer, 0, sizeof(uv_timer_t));
memset(&m_socket, 0, sizeof(uv_udp_t)); memset(&m_socket, 0, sizeof(uv_udp_t));
Attach(EQ::EventLoop::Get().Handle()); if (opts.loop == nullptr) {
Attach(EventLoop::GetDefault().Handle());
}
else {
Attach(opts.loop->Handle());
}
} }
EQ::Net::DaybreakConnectionManager::~DaybreakConnectionManager() EQ::Net::DaybreakConnectionManager::~DaybreakConnectionManager()

View File

@ -13,6 +13,7 @@
namespace EQ namespace EQ
{ {
class EventLoop;
namespace Net namespace Net
{ {
enum DaybreakProtocolOpcode enum DaybreakProtocolOpcode
@ -252,6 +253,7 @@ namespace EQ
resend_timeout = 90000; resend_timeout = 90000;
connection_close_time = 2000; connection_close_time = 2000;
outgoing_data_rate = 0.0; outgoing_data_rate = 0.0;
loop = nullptr;
} }
size_t max_packet_size; size_t max_packet_size;
@ -275,6 +277,7 @@ namespace EQ
DaybreakEncodeType encode_passes[2]; DaybreakEncodeType encode_passes[2];
int port; int port;
double outgoing_data_rate; double outgoing_data_rate;
EQ::EventLoop *loop;
}; };
class DaybreakConnectionManager class DaybreakConnectionManager

View File

@ -8,7 +8,7 @@ namespace EQ
{ {
namespace Net namespace Net
{ {
static void DNSLookup(const std::string &addr, int port, bool ipv6, std::function<void(const std::string&)> cb) { static void DNSLookup(EQ::EventLoop *eloop, const std::string &addr, int port, bool ipv6, std::function<void(const std::string&)> cb) {
struct DNSBaton struct DNSBaton
{ {
std::function<void(const std::string&)> cb; std::function<void(const std::string&)> cb;
@ -21,7 +21,7 @@ namespace EQ
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP; hints.ai_protocol = IPPROTO_TCP;
auto loop = EQ::EventLoop::Get().Handle(); auto loop = eloop->Handle();
uv_getaddrinfo_t *resolver = new uv_getaddrinfo_t(); uv_getaddrinfo_t *resolver = new uv_getaddrinfo_t();
memset(resolver, 0, sizeof(uv_getaddrinfo_t)); memset(resolver, 0, sizeof(uv_getaddrinfo_t));
auto port_str = std::to_string(port); auto port_str = std::to_string(port);
@ -57,5 +57,9 @@ namespace EQ
cb(addr); cb(addr);
}, addr.c_str(), port_str.c_str(), &hints); }, addr.c_str(), port_str.c_str(), &hints);
} }
static void DNSLookup(const std::string &addr, int port, bool ipv6, std::function<void(const std::string&)> cb) {
DNSLookup(&EQ::EventLoop::GetDefault(), addr, port, ipv6, cb);
}
} }
} }

View File

@ -15,7 +15,7 @@ EQ::Net::TCPConnection::~TCPConnection() {
Disconnect(); Disconnect();
} }
void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb) void EQ::Net::TCPConnection::Connect(EQ::EventLoop *loop, const std::string & addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb)
{ {
struct EQTCPConnectBaton struct EQTCPConnectBaton
{ {
@ -23,10 +23,9 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv
std::function<void(std::shared_ptr<EQ::Net::TCPConnection>)> cb; std::function<void(std::shared_ptr<EQ::Net::TCPConnection>)> cb;
}; };
auto loop = EQ::EventLoop::Get().Handle();
uv_tcp_t *socket = new uv_tcp_t; uv_tcp_t *socket = new uv_tcp_t;
memset(socket, 0, sizeof(uv_tcp_t)); memset(socket, 0, sizeof(uv_tcp_t));
uv_tcp_init(loop, socket); uv_tcp_init(loop->Handle(), socket);
sockaddr_storage iaddr; sockaddr_storage iaddr;
if (ipv6) { if (ipv6) {
@ -64,6 +63,11 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv
}); });
} }
void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb)
{
Connect(&EventLoop::GetDefault(), addr, port, ipv6, cb);
}
void EQ::Net::TCPConnection::Start() { void EQ::Net::TCPConnection::Start() {
uv_read_start((uv_stream_t*)m_socket, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { uv_read_start((uv_stream_t*)m_socket, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = new char[suggested_size]; buf->base = new char[suggested_size];

View File

@ -7,6 +7,7 @@
namespace EQ namespace EQ
{ {
class EventLoop;
namespace Net namespace Net
{ {
class TCPConnection class TCPConnection
@ -15,6 +16,7 @@ namespace EQ
TCPConnection(uv_tcp_t *socket); TCPConnection(uv_tcp_t *socket);
~TCPConnection(); ~TCPConnection();
static void Connect(EQ::EventLoop *loop, const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb);
static void Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb); static void Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb);
void Start(); void Start();

View File

@ -7,6 +7,13 @@ void on_close_tcp_server_handle(uv_handle_t* handle) {
EQ::Net::TCPServer::TCPServer() EQ::Net::TCPServer::TCPServer()
{ {
m_loop = &EventLoop::GetDefault();
m_socket = nullptr;
}
EQ::Net::TCPServer::TCPServer(EQ::EventLoop *loop)
{
m_loop = loop;
m_socket = nullptr; m_socket = nullptr;
} }
@ -32,7 +39,7 @@ void EQ::Net::TCPServer::Listen(const std::string &addr, int port, bool ipv6, st
m_on_new_connection = cb; m_on_new_connection = cb;
auto loop = EQ::EventLoop::Get().Handle(); auto loop = m_loop->Handle();
m_socket = new uv_tcp_t; m_socket = new uv_tcp_t;
memset(m_socket, 0, sizeof(uv_tcp_t)); memset(m_socket, 0, sizeof(uv_tcp_t));
uv_tcp_init(loop, m_socket); uv_tcp_init(loop, m_socket);
@ -53,7 +60,7 @@ void EQ::Net::TCPServer::Listen(const std::string &addr, int port, bool ipv6, st
return; return;
} }
auto loop = EQ::EventLoop::Get().Handle(); auto loop = server->loop;
uv_tcp_t *client = new uv_tcp_t; uv_tcp_t *client = new uv_tcp_t;
memset(client, 0, sizeof(uv_tcp_t)); memset(client, 0, sizeof(uv_tcp_t));
uv_tcp_init(loop, client); uv_tcp_init(loop, client);

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "tcp_connection.h" #include "tcp_connection.h"
#include "../event/event_loop.h"
namespace EQ namespace EQ
{ {
@ -10,6 +11,7 @@ namespace EQ
{ {
public: public:
TCPServer(); TCPServer();
TCPServer(EQ::EventLoop *loop);
~TCPServer(); ~TCPServer();
void Listen(int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb); void Listen(int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb);
@ -19,7 +21,8 @@ namespace EQ
private: private:
std::function<void(std::shared_ptr<TCPConnection>)> m_on_new_connection; std::function<void(std::shared_ptr<TCPConnection>)> m_on_new_connection;
EQ::EventLoop *m_loop;
uv_tcp_t *m_socket; uv_tcp_t *m_socket;
}; };
} }
} }

View File

@ -136,7 +136,7 @@ int main(int argc, char *argv[]) {
zones.erase(rem); zones.erase(rem);
} }
EQ::EventLoop::Get().Process(); EQ::EventLoop::GetDefault().Process();
Sleep(5); Sleep(5);
} }

View File

@ -179,7 +179,7 @@ int main()
while (run_server) { while (run_server) {
Timer::SetCurrentTime(); Timer::SetCurrentTime();
server.client_manager->Process(); server.client_manager->Process();
EQ::EventLoop::Get().Process(); EQ::EventLoop::GetDefault().Process();
Sleep(5); Sleep(5);
} }

View File

@ -99,7 +99,7 @@ int main() {
if(LFGuildExpireTimer.Check()) if(LFGuildExpireTimer.Check())
lfguildmanager.ExpireEntries(); lfguildmanager.ExpireEntries();
EQ::EventLoop::Get().Process(); EQ::EventLoop::GetDefault().Process();
Sleep(5); Sleep(5);
} }
LogSys.CloseFileLogs(); LogSys.CloseFileLogs();

View File

@ -153,7 +153,7 @@ int main() {
if(ChannelListProcessTimer.Check()) if(ChannelListProcessTimer.Check())
ChannelList->Process(); ChannelList->Process();
EQ::EventLoop::Get().Process(); EQ::EventLoop::GetDefault().Process();
Sleep(5); Sleep(5);
} }

View File

@ -587,7 +587,7 @@ int main(int argc, char** argv) {
UpdateWindowTitle(window_title); UpdateWindowTitle(window_title);
} }
EQ::EventLoop::Get().Process(); EQ::EventLoop::GetDefault().Process();
Sleep(5); Sleep(5);
} }
Log(Logs::General, Logs::World_Server, "World main loop completed."); Log(Logs::General, Logs::World_Server, "World main loop completed.");

View File

@ -588,7 +588,7 @@ int main(int argc, char** argv) {
while (RunLoops) { while (RunLoops) {
bool previous_loaded = is_zone_loaded && numclients > 0; bool previous_loaded = is_zone_loaded && numclients > 0;
EQ::EventLoop::Get().Process(); EQ::EventLoop::GetDefault().Process();
bool current_loaded = is_zone_loaded && numclients > 0; bool current_loaded = is_zone_loaded && numclients > 0;
if (previous_loaded && !current_loaded) { if (previous_loaded && !current_loaded) {