diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index b953815e6..6c09127fc 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -72,7 +72,6 @@ SET(common_sources unix.cpp xml_parser.cpp platform.cpp - event/event_loop.cpp json/jsoncpp.cpp net/console_server.cpp net/console_server_connection.cpp @@ -207,8 +206,8 @@ SET(common_headers version.h xml_parser.h zone_numbers.h - event/background_task.h event/event_loop.h + event/task.h event/timer.h json/json.h json/json-forwards.h @@ -265,10 +264,9 @@ SET(common_headers ) SOURCE_GROUP(Event FILES - event/background_task.h - event/event_loop.cpp event/event_loop.h event/timer.h + event/task.h ) SOURCE_GROUP(Json FILES diff --git a/common/event/background_task.h b/common/event/background_task.h deleted file mode 100644 index 41fe4accb..000000000 --- a/common/event/background_task.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once -#include -#include "../any.h" -#include "event_loop.h" - -namespace EQ { - class BackgroundTask - { - public: - typedef std::function BackgroundTaskFunction; - struct BackgroundTaskBaton - { - BackgroundTaskFunction fn; - BackgroundTaskFunction on_finish; - EQEmu::Any data; - }; - - BackgroundTask(BackgroundTaskFunction fn, BackgroundTaskFunction on_finish, EQEmu::Any data) { - uv_work_t *m_work = new uv_work_t; - memset(m_work, 0, sizeof(uv_work_t)); - BackgroundTaskBaton *baton = new BackgroundTaskBaton(); - baton->fn = fn; - baton->on_finish = on_finish; - baton->data = data; - - m_work->data = baton; - uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) { - BackgroundTaskBaton *baton = (BackgroundTaskBaton*)req->data; - baton->fn(baton->data); - }, [](uv_work_t* req, int status) { - BackgroundTaskBaton *baton = (BackgroundTaskBaton*)req->data; - baton->on_finish(baton->data); - delete baton; - delete req; - }); - } - - ~BackgroundTask() { - - } - }; -} diff --git a/common/event/event_loop.cpp b/common/event/event_loop.cpp deleted file mode 100644 index e69de29bb..000000000 diff --git a/common/event/task.h b/common/event/task.h new file mode 100644 index 000000000..ae19700a2 --- /dev/null +++ b/common/event/task.h @@ -0,0 +1,100 @@ +#pragma once +#include +#include +#include "event_loop.h" +#include "../any.h" + +namespace EQ { + class Task + { + public: + typedef std::function ResolveFn; + typedef std::function RejectFn; + typedef std::function FinallyFn; + typedef std::function TaskFn; + struct TaskBaton + { + TaskFn fn; + ResolveFn on_then; + RejectFn on_catch; + FinallyFn on_finally; + bool has_result; + EQEmu::Any result; + bool has_error; + std::exception error; + }; + + Task(TaskFn fn) { + m_fn = fn; + } + + ~Task() { + + } + + Task& Then(ResolveFn fn) { + m_then = fn; + return *this; + } + + Task& Catch(RejectFn fn) { + m_catch = fn; + return *this; + } + + Task& Finally(FinallyFn fn) { + m_finally = fn; + return *this; + } + + void Run() { + uv_work_t *m_work = new uv_work_t; + memset(m_work, 0, sizeof(uv_work_t)); + TaskBaton *baton = new TaskBaton(); + baton->fn = m_fn; + baton->on_then = m_then; + baton->on_catch = m_catch; + baton->on_finally = m_finally; + baton->has_result = false; + baton->has_error = false; + + m_work->data = baton; + + uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) { + TaskBaton *baton = (TaskBaton*)req->data; + + baton->fn([baton](const EQEmu::Any& result) { + baton->has_error = false; + baton->has_result = true; + baton->result = result; + }, [baton](const std::exception &err) { + baton->has_error = true; + baton->has_result = false; + baton->error = err; + }); + }, [](uv_work_t* req, int status) { + TaskBaton *baton = (TaskBaton*)req->data; + + if (baton->has_error && baton->on_catch) { + baton->on_catch(baton->error); + } + else if (baton->has_result && baton->on_then) { + baton->on_then(baton->result); + } + + if (baton->on_finally) { + baton->on_finally(); + } + + delete baton; + delete req; + }); + } + + private: + TaskFn m_fn; + ResolveFn m_then; + RejectFn m_catch; + FinallyFn m_finally; + }; +} diff --git a/common/net/daybreak_connection.cpp b/common/net/daybreak_connection.cpp index 13827581c..652f31f2a 100644 --- a/common/net/daybreak_connection.cpp +++ b/common/net/daybreak_connection.cpp @@ -1,5 +1,6 @@ #include "daybreak_connection.h" #include "../event/event_loop.h" +#include "../event/task.h" #include "../eqemu_logsys.h" #include "../data_verification.h" #include "crc32.h" @@ -1045,10 +1046,6 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream) resends++; } } - - if (resends >= m_owner->m_options.resends_per_connection_cycle) { - return; - } } } @@ -1185,80 +1182,95 @@ void EQ::Net::DaybreakConnection::SendKeepAlive() void EQ::Net::DaybreakConnection::InternalSend(Packet &p) { m_last_send = Clock::now(); - auto send_func = [](uv_udp_send_t* req, int status) { delete[](char*)req->data; delete req; }; - + if (PacketCanBeEncoded(p)) { - DynamicPacket out; - out.PutPacket(0, p); - - for (int i = 0; i < 2; ++i) { - switch (m_encode_passes[i]) { - case EncodeCompression: - if(out.GetInt8(0) == 0) - Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Compress(out, 1, out.Length() - 1); - break; - case EncodeXOR: - if (out.GetInt8(0) == 0) - Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); - else - Encode(out, 1, out.Length() - 1); - break; - default: - break; + DynamicPacket *out = new DynamicPacket(); + out->PutPacket(0, p); + + auto sp = shared_from_this(); + EQ::Task([sp, out](EQ::Task::ResolveFn resolve, EQ::Task::RejectFn reject) { + + auto encode_passes = sp->GetEncodePasses(); + for (int i = 0; i < 2; ++i) { + switch (encode_passes[i]) { + case EncodeCompression: + if(out->GetInt8(0) == 0) + sp->Compress(*out, DaybreakHeader::size(), out->Length() - DaybreakHeader::size()); + else + sp->Compress(*out, 1, out->Length() - 1); + break; + case EncodeXOR: + if (out->GetInt8(0) == 0) + sp->Encode(*out, DaybreakHeader::size(), out->Length() - DaybreakHeader::size()); + else + sp->Encode(*out, 1, out->Length() - 1); + break; + default: + break; + } } - } - - AppendCRC(out); - + + sp->AppendCRC(*out); + resolve(out); + }) + .Then([sp, out, send_func](const EQEmu::Any &result) { + uv_udp_send_t *send_req = new uv_udp_send_t; + memset(send_req, 0, sizeof(*send_req)); + sockaddr_in send_addr; + uv_ip4_addr(sp->RemoteEndpoint().c_str(), sp->RemotePort(), &send_addr); + uv_buf_t send_buffers[1]; + + char *data = new char[out->Length()]; + memcpy(data, out->Data(), out->Length()); + send_buffers[0] = uv_buf_init(data, out->Length()); + send_req->data = send_buffers[0].base; + + auto &stats = sp->GetStats(); + + stats.sent_bytes += out->Length(); + stats.sent_packets++; + + auto owner = sp->GetManager(); + + if (owner->m_options.simulated_out_packet_loss && owner->m_options.simulated_out_packet_loss >= owner->m_rand.Int(0, 100)) { + delete[](char*)send_req->data; + delete send_req; + return; + } + + uv_udp_send(send_req, &owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); + }) + .Finally([out]() { + delete out; + }) + .Run(); + } + else { uv_udp_send_t *send_req = new uv_udp_send_t; - memset(send_req, 0, sizeof(*send_req)); sockaddr_in send_addr; uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); uv_buf_t send_buffers[1]; - - char *data = new char[out.Length()]; - memcpy(data, out.Data(), out.Length()); - send_buffers[0] = uv_buf_init(data, out.Length()); + + char *data = new char[p.Length()]; + memcpy(data, p.Data(), p.Length()); + send_buffers[0] = uv_buf_init(data, p.Length()); send_req->data = send_buffers[0].base; - - m_stats.sent_bytes += out.Length(); + + m_stats.sent_bytes += p.Length(); m_stats.sent_packets++; + if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { delete[](char*)send_req->data; delete send_req; return; } - + uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); - return; } - - uv_udp_send_t *send_req = new uv_udp_send_t; - sockaddr_in send_addr; - uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); - uv_buf_t send_buffers[1]; - - char *data = new char[p.Length()]; - memcpy(data, p.Data(), p.Length()); - send_buffers[0] = uv_buf_init(data, p.Length()); - send_req->data = send_buffers[0].base; - - m_stats.sent_bytes += p.Length(); - m_stats.sent_packets++; - - if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { - delete[](char*)send_req->data; - delete send_req; - return; - } - - uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); } void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable) diff --git a/common/net/daybreak_connection.h b/common/net/daybreak_connection.h index f7ea49e22..7fd0a4f10 100644 --- a/common/net/daybreak_connection.h +++ b/common/net/daybreak_connection.h @@ -95,7 +95,7 @@ namespace EQ class DaybreakConnectionManager; class DaybreakConnection; - class DaybreakConnection + class DaybreakConnection : public std::enable_shared_from_this { public: DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port); @@ -109,10 +109,16 @@ namespace EQ void QueuePacket(Packet &p); void QueuePacket(Packet &p, int stream); void QueuePacket(Packet &p, int stream, bool reliable); + const DaybreakConnectionStats& GetStats() const { return m_stats; } + DaybreakConnectionStats &GetStats() { return m_stats; } void ResetStats(); size_t GetRollingPing() const { return m_rolling_ping; } - DbProtocolStatus GetStatus() { return m_status; } + DbProtocolStatus GetStatus() const { return m_status; } + + const DaybreakEncodeType* GetEncodePasses() const { return m_encode_passes; } + const DaybreakConnectionManager* GetManager() const { return m_owner; } + DaybreakConnectionManager* GetManager() { return m_owner; } private: DaybreakConnectionManager *m_owner; std::string m_endpoint; @@ -209,7 +215,6 @@ namespace EQ resend_delay_factor = 1.25; resend_delay_min = 150; resend_delay_max = 5000; - resends_per_connection_cycle = 10; connect_delay_ms = 500; stale_connection_ms = 90000; connect_stale_ms = 5000; @@ -234,7 +239,6 @@ namespace EQ size_t resend_delay_ms; size_t resend_delay_min; size_t resend_delay_max; - int resends_per_connection_cycle; size_t connect_delay_ms; size_t connect_stale_ms; size_t stale_connection_ms; diff --git a/ucs/clientlist.cpp b/ucs/clientlist.cpp index 3701d4dc2..a399485e4 100644 --- a/ucs/clientlist.cpp +++ b/ucs/clientlist.cpp @@ -473,7 +473,6 @@ Clientlist::Clientlist(int ChatPort) { chat_opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor); chat_opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS); chat_opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); - chat_opts.daybreak_options.resends_per_connection_cycle = RuleI(Network, ResendsPerCycle); chatsf = new EQ::Net::EQStreamManager(chat_opts); diff --git a/world/net.cpp b/world/net.cpp index 65be8c3d0..cc6112997 100644 --- a/world/net.cpp +++ b/world/net.cpp @@ -499,7 +499,6 @@ int main(int argc, char** argv) { opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor); opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS); opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); - opts.daybreak_options.resends_per_connection_cycle = RuleI(Network, ResendsPerCycle); EQ::Net::EQStreamManager eqsm(opts); diff --git a/zone/net.cpp b/zone/net.cpp index c13fc5e84..3ddbc14df 100644 --- a/zone/net.cpp +++ b/zone/net.cpp @@ -464,7 +464,6 @@ int main(int argc, char** argv) { opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor); opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS); opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); - opts.daybreak_options.resends_per_connection_cycle = RuleI(Network, ResendsPerCycle); eqsm.reset(new EQ::Net::EQStreamManager(opts)); eqsf_open = true; diff --git a/zone/pathing.cpp b/zone/pathing.cpp index 74ae4c435..79b7900d2 100644 --- a/zone/pathing.cpp +++ b/zone/pathing.cpp @@ -1,5 +1,5 @@ #include "../common/global_define.h" -#include "../common/event/background_task.h" +#include "../common/event/task.h" #include "client.h" #include "zone.h" @@ -40,53 +40,56 @@ void CullPoints(std::vector &points) { } void Client::SendPathPacket(const std::vector &points) { - EQ::BackgroundTask task([](EQEmu::Any &data) { - auto &points = EQEmu::any_cast&>(data); + EQEmu::Any data(points); + EQ::Task([=](EQ::Task::ResolveFn resolve, EQ::Task::RejectFn reject) { + auto points = EQEmu::any_cast>(data); CullPoints(points); - }, [this](EQEmu::Any &data) { - auto &points = EQEmu::any_cast&>(data); - + resolve(points); + }) + .Then([this](const EQEmu::Any &result) { + auto points = EQEmu::any_cast>(result); if (points.size() < 2) { if (Admin() > 10) { Message(MT_System, "Too few points"); } - + EQApplicationPacket outapp(OP_FindPersonReply, 0); QueuePacket(&outapp); return; } - + if (points.size() > 36) { if (Admin() > 10) { Message(MT_System, "Too many points %u", points.size()); } - + EQApplicationPacket outapp(OP_FindPersonReply, 0); QueuePacket(&outapp); return; } - + if (Admin() > 10) { Message(MT_System, "Total points %u", points.size()); } - + int len = sizeof(FindPersonResult_Struct) + (points.size() + 1) * sizeof(FindPerson_Point); auto outapp = new EQApplicationPacket(OP_FindPersonReply, len); FindPersonResult_Struct* fpr = (FindPersonResult_Struct*)outapp->pBuffer; - + std::vector::iterator cur, end; cur = points.begin(); end = points.end(); unsigned int r; for (r = 0; cur != end; ++cur, r++) { fpr->path[r] = *cur; - + } //put the last element into the destination field --cur; fpr->path[r] = *cur; fpr->dest = *cur; - + FastQueuePacket(&outapp); - }, points); + }) + .Run(); }