Redid background task interface, used it to async send client packets

This commit is contained in:
KimLS 2019-01-07 19:53:23 -08:00
parent 6c1e3ae3d6
commit e3d5200310
10 changed files with 199 additions and 127 deletions

View File

@ -72,7 +72,6 @@ SET(common_sources
unix.cpp unix.cpp
xml_parser.cpp xml_parser.cpp
platform.cpp platform.cpp
event/event_loop.cpp
json/jsoncpp.cpp json/jsoncpp.cpp
net/console_server.cpp net/console_server.cpp
net/console_server_connection.cpp net/console_server_connection.cpp
@ -207,8 +206,8 @@ SET(common_headers
version.h version.h
xml_parser.h xml_parser.h
zone_numbers.h zone_numbers.h
event/background_task.h
event/event_loop.h event/event_loop.h
event/task.h
event/timer.h event/timer.h
json/json.h json/json.h
json/json-forwards.h json/json-forwards.h
@ -265,10 +264,9 @@ SET(common_headers
) )
SOURCE_GROUP(Event FILES SOURCE_GROUP(Event FILES
event/background_task.h
event/event_loop.cpp
event/event_loop.h event/event_loop.h
event/timer.h event/timer.h
event/task.h
) )
SOURCE_GROUP(Json FILES SOURCE_GROUP(Json FILES

View File

@ -1,42 +0,0 @@
#pragma once
#include <functional>
#include "../any.h"
#include "event_loop.h"
namespace EQ {
class BackgroundTask
{
public:
typedef std::function<void(EQEmu::Any&)> BackgroundTaskFunction;
struct BackgroundTaskBaton
{
BackgroundTaskFunction fn;
BackgroundTaskFunction on_finish;
EQEmu::Any data;
};
BackgroundTask(BackgroundTaskFunction fn, BackgroundTaskFunction on_finish, EQEmu::Any data) {
uv_work_t *m_work = new uv_work_t;
memset(m_work, 0, sizeof(uv_work_t));
BackgroundTaskBaton *baton = new BackgroundTaskBaton();
baton->fn = fn;
baton->on_finish = on_finish;
baton->data = data;
m_work->data = baton;
uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) {
BackgroundTaskBaton *baton = (BackgroundTaskBaton*)req->data;
baton->fn(baton->data);
}, [](uv_work_t* req, int status) {
BackgroundTaskBaton *baton = (BackgroundTaskBaton*)req->data;
baton->on_finish(baton->data);
delete baton;
delete req;
});
}
~BackgroundTask() {
}
};
}

100
common/event/task.h Normal file
View File

@ -0,0 +1,100 @@
#pragma once
#include <functional>
#include <exception>
#include "event_loop.h"
#include "../any.h"
namespace EQ {
class Task
{
public:
typedef std::function<void(const EQEmu::Any&)> ResolveFn;
typedef std::function<void(const std::exception&)> RejectFn;
typedef std::function<void()> FinallyFn;
typedef std::function<void(ResolveFn, RejectFn)> TaskFn;
struct TaskBaton
{
TaskFn fn;
ResolveFn on_then;
RejectFn on_catch;
FinallyFn on_finally;
bool has_result;
EQEmu::Any result;
bool has_error;
std::exception error;
};
Task(TaskFn fn) {
m_fn = fn;
}
~Task() {
}
Task& Then(ResolveFn fn) {
m_then = fn;
return *this;
}
Task& Catch(RejectFn fn) {
m_catch = fn;
return *this;
}
Task& Finally(FinallyFn fn) {
m_finally = fn;
return *this;
}
void Run() {
uv_work_t *m_work = new uv_work_t;
memset(m_work, 0, sizeof(uv_work_t));
TaskBaton *baton = new TaskBaton();
baton->fn = m_fn;
baton->on_then = m_then;
baton->on_catch = m_catch;
baton->on_finally = m_finally;
baton->has_result = false;
baton->has_error = false;
m_work->data = baton;
uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) {
TaskBaton *baton = (TaskBaton*)req->data;
baton->fn([baton](const EQEmu::Any& result) {
baton->has_error = false;
baton->has_result = true;
baton->result = result;
}, [baton](const std::exception &err) {
baton->has_error = true;
baton->has_result = false;
baton->error = err;
});
}, [](uv_work_t* req, int status) {
TaskBaton *baton = (TaskBaton*)req->data;
if (baton->has_error && baton->on_catch) {
baton->on_catch(baton->error);
}
else if (baton->has_result && baton->on_then) {
baton->on_then(baton->result);
}
if (baton->on_finally) {
baton->on_finally();
}
delete baton;
delete req;
});
}
private:
TaskFn m_fn;
ResolveFn m_then;
RejectFn m_catch;
FinallyFn m_finally;
};
}

View File

@ -1,5 +1,6 @@
#include "daybreak_connection.h" #include "daybreak_connection.h"
#include "../event/event_loop.h" #include "../event/event_loop.h"
#include "../event/task.h"
#include "../eqemu_logsys.h" #include "../eqemu_logsys.h"
#include "../data_verification.h" #include "../data_verification.h"
#include "crc32.h" #include "crc32.h"
@ -1045,10 +1046,6 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
resends++; resends++;
} }
} }
if (resends >= m_owner->m_options.resends_per_connection_cycle) {
return;
}
} }
} }
@ -1185,60 +1182,74 @@ void EQ::Net::DaybreakConnection::SendKeepAlive()
void EQ::Net::DaybreakConnection::InternalSend(Packet &p) void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
{ {
m_last_send = Clock::now(); m_last_send = Clock::now();
auto send_func = [](uv_udp_send_t* req, int status) { auto send_func = [](uv_udp_send_t* req, int status) {
delete[](char*)req->data; delete[](char*)req->data;
delete req; delete req;
}; };
if (PacketCanBeEncoded(p)) { if (PacketCanBeEncoded(p)) {
DynamicPacket out; DynamicPacket *out = new DynamicPacket();
out.PutPacket(0, p); out->PutPacket(0, p);
auto sp = shared_from_this();
EQ::Task([sp, out](EQ::Task::ResolveFn resolve, EQ::Task::RejectFn reject) {
auto encode_passes = sp->GetEncodePasses();
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
switch (m_encode_passes[i]) { switch (encode_passes[i]) {
case EncodeCompression: case EncodeCompression:
if(out.GetInt8(0) == 0) if(out->GetInt8(0) == 0)
Compress(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); sp->Compress(*out, DaybreakHeader::size(), out->Length() - DaybreakHeader::size());
else else
Compress(out, 1, out.Length() - 1); sp->Compress(*out, 1, out->Length() - 1);
break; break;
case EncodeXOR: case EncodeXOR:
if (out.GetInt8(0) == 0) if (out->GetInt8(0) == 0)
Encode(out, DaybreakHeader::size(), out.Length() - DaybreakHeader::size()); sp->Encode(*out, DaybreakHeader::size(), out->Length() - DaybreakHeader::size());
else else
Encode(out, 1, out.Length() - 1); sp->Encode(*out, 1, out->Length() - 1);
break; break;
default: default:
break; break;
} }
} }
AppendCRC(out); sp->AppendCRC(*out);
resolve(out);
})
.Then([sp, out, send_func](const EQEmu::Any &result) {
uv_udp_send_t *send_req = new uv_udp_send_t; uv_udp_send_t *send_req = new uv_udp_send_t;
memset(send_req, 0, sizeof(*send_req)); memset(send_req, 0, sizeof(*send_req));
sockaddr_in send_addr; sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); uv_ip4_addr(sp->RemoteEndpoint().c_str(), sp->RemotePort(), &send_addr);
uv_buf_t send_buffers[1]; uv_buf_t send_buffers[1];
char *data = new char[out.Length()]; char *data = new char[out->Length()];
memcpy(data, out.Data(), out.Length()); memcpy(data, out->Data(), out->Length());
send_buffers[0] = uv_buf_init(data, out.Length()); send_buffers[0] = uv_buf_init(data, out->Length());
send_req->data = send_buffers[0].base; send_req->data = send_buffers[0].base;
m_stats.sent_bytes += out.Length(); auto &stats = sp->GetStats();
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) { stats.sent_bytes += out->Length();
stats.sent_packets++;
auto owner = sp->GetManager();
if (owner->m_options.simulated_out_packet_loss && owner->m_options.simulated_out_packet_loss >= owner->m_rand.Int(0, 100)) {
delete[](char*)send_req->data; delete[](char*)send_req->data;
delete send_req; delete send_req;
return; return;
} }
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); uv_udp_send(send_req, &owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
return; })
.Finally([out]() {
delete out;
})
.Run();
} }
else {
uv_udp_send_t *send_req = new uv_udp_send_t; uv_udp_send_t *send_req = new uv_udp_send_t;
sockaddr_in send_addr; sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr); uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
@ -1260,6 +1271,7 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func); uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
} }
}
void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable) void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable)
{ {

View File

@ -95,7 +95,7 @@ namespace EQ
class DaybreakConnectionManager; class DaybreakConnectionManager;
class DaybreakConnection; class DaybreakConnection;
class DaybreakConnection class DaybreakConnection : public std::enable_shared_from_this<DaybreakConnection>
{ {
public: public:
DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port); DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port);
@ -109,10 +109,16 @@ namespace EQ
void QueuePacket(Packet &p); void QueuePacket(Packet &p);
void QueuePacket(Packet &p, int stream); void QueuePacket(Packet &p, int stream);
void QueuePacket(Packet &p, int stream, bool reliable); void QueuePacket(Packet &p, int stream, bool reliable);
const DaybreakConnectionStats& GetStats() const { return m_stats; } const DaybreakConnectionStats& GetStats() const { return m_stats; }
DaybreakConnectionStats &GetStats() { return m_stats; }
void ResetStats(); void ResetStats();
size_t GetRollingPing() const { return m_rolling_ping; } size_t GetRollingPing() const { return m_rolling_ping; }
DbProtocolStatus GetStatus() { return m_status; } DbProtocolStatus GetStatus() const { return m_status; }
const DaybreakEncodeType* GetEncodePasses() const { return m_encode_passes; }
const DaybreakConnectionManager* GetManager() const { return m_owner; }
DaybreakConnectionManager* GetManager() { return m_owner; }
private: private:
DaybreakConnectionManager *m_owner; DaybreakConnectionManager *m_owner;
std::string m_endpoint; std::string m_endpoint;
@ -209,7 +215,6 @@ namespace EQ
resend_delay_factor = 1.25; resend_delay_factor = 1.25;
resend_delay_min = 150; resend_delay_min = 150;
resend_delay_max = 5000; resend_delay_max = 5000;
resends_per_connection_cycle = 10;
connect_delay_ms = 500; connect_delay_ms = 500;
stale_connection_ms = 90000; stale_connection_ms = 90000;
connect_stale_ms = 5000; connect_stale_ms = 5000;
@ -234,7 +239,6 @@ namespace EQ
size_t resend_delay_ms; size_t resend_delay_ms;
size_t resend_delay_min; size_t resend_delay_min;
size_t resend_delay_max; size_t resend_delay_max;
int resends_per_connection_cycle;
size_t connect_delay_ms; size_t connect_delay_ms;
size_t connect_stale_ms; size_t connect_stale_ms;
size_t stale_connection_ms; size_t stale_connection_ms;

View File

@ -473,7 +473,6 @@ Clientlist::Clientlist(int ChatPort) {
chat_opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor); chat_opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor);
chat_opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS); chat_opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS);
chat_opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); chat_opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
chat_opts.daybreak_options.resends_per_connection_cycle = RuleI(Network, ResendsPerCycle);
chatsf = new EQ::Net::EQStreamManager(chat_opts); chatsf = new EQ::Net::EQStreamManager(chat_opts);

View File

@ -499,7 +499,6 @@ int main(int argc, char** argv) {
opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor); opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor);
opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS); opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS);
opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
opts.daybreak_options.resends_per_connection_cycle = RuleI(Network, ResendsPerCycle);
EQ::Net::EQStreamManager eqsm(opts); EQ::Net::EQStreamManager eqsm(opts);

View File

@ -464,7 +464,6 @@ int main(int argc, char** argv) {
opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor); opts.daybreak_options.resend_delay_factor = RuleR(Network, ResendDelayFactor);
opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS); opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS);
opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS); opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
opts.daybreak_options.resends_per_connection_cycle = RuleI(Network, ResendsPerCycle);
eqsm.reset(new EQ::Net::EQStreamManager(opts)); eqsm.reset(new EQ::Net::EQStreamManager(opts));
eqsf_open = true; eqsf_open = true;

View File

@ -1,5 +1,5 @@
#include "../common/global_define.h" #include "../common/global_define.h"
#include "../common/event/background_task.h" #include "../common/event/task.h"
#include "client.h" #include "client.h"
#include "zone.h" #include "zone.h"
@ -40,12 +40,14 @@ void CullPoints(std::vector<FindPerson_Point> &points) {
} }
void Client::SendPathPacket(const std::vector<FindPerson_Point> &points) { void Client::SendPathPacket(const std::vector<FindPerson_Point> &points) {
EQ::BackgroundTask task([](EQEmu::Any &data) { EQEmu::Any data(points);
auto &points = EQEmu::any_cast<std::vector<FindPerson_Point>&>(data); EQ::Task([=](EQ::Task::ResolveFn resolve, EQ::Task::RejectFn reject) {
auto points = EQEmu::any_cast<std::vector<FindPerson_Point>>(data);
CullPoints(points); CullPoints(points);
}, [this](EQEmu::Any &data) { resolve(points);
auto &points = EQEmu::any_cast<std::vector<FindPerson_Point>&>(data); })
.Then([this](const EQEmu::Any &result) {
auto points = EQEmu::any_cast<std::vector<FindPerson_Point>>(result);
if (points.size() < 2) { if (points.size() < 2) {
if (Admin() > 10) { if (Admin() > 10) {
Message(MT_System, "Too few points"); Message(MT_System, "Too few points");
@ -88,5 +90,6 @@ void Client::SendPathPacket(const std::vector<FindPerson_Point> &points) {
fpr->dest = *cur; fpr->dest = *cur;
FastQueuePacket(&outapp); FastQueuePacket(&outapp);
}, points); })
.Run();
} }