Compare commits

...

10 Commits

32 changed files with 1062 additions and 104 deletions
+3
View File
@@ -16,3 +16,6 @@
[submodule "submodules/recastnavigation"]
path = submodules/recastnavigation
url = https://github.com/recastnavigation/recastnavigation.git
[submodule "submodules/concurrentqueue"]
path = submodules/concurrentqueue
url = https://github.com/cameron314/concurrentqueue.git
+1
View File
@@ -349,6 +349,7 @@ INCLUDE_DIRECTORIES(SYSTEM "${CMAKE_CURRENT_SOURCE_DIR}/submodules/recastnavigat
INCLUDE_DIRECTORIES(SYSTEM "${CMAKE_CURRENT_SOURCE_DIR}/submodules/recastnavigation/DetourCrowd/Include")
INCLUDE_DIRECTORIES(SYSTEM "${CMAKE_CURRENT_SOURCE_DIR}/submodules/recastnavigation/DetourTileCache/Include")
INCLUDE_DIRECTORIES(SYSTEM "${CMAKE_CURRENT_SOURCE_DIR}/submodules/recastnavigation/Recast/Include")
INCLUDE_DIRECTORIES(SYSTEM "${CMAKE_CURRENT_SOURCE_DIR}/submodules/concurrentqueue")
IF(EQEMU_BUILD_SERVER OR EQEMU_BUILD_LOGIN OR EQEMU_BUILD_TESTS OR EQEMU_BUILD_HC)
ADD_SUBDIRECTORY(common)
+6
View File
@@ -79,6 +79,7 @@ SET(common_sources
net/crc32.cpp
net/daybreak_connection.cpp
net/eqstream.cpp
net/eqstream_concurrent.cpp
net/packet.cpp
net/servertalk_client_connection.cpp
net/servertalk_legacy_client_connection.cpp
@@ -222,6 +223,8 @@ SET(common_headers
net/dns.h
net/endian.h
net/eqstream.h
net/eqstream_concurrent.h
net/eqstream_concurrent_message.h
net/packet.h
net/servertalk_client_connection.h
net/servertalk_legacy_client_connection.h
@@ -294,6 +297,9 @@ SOURCE_GROUP(Net FILES
net/eqmq.h
net/eqstream.cpp
net/eqstream.h
net/eqstream_concurrent.cpp
net/eqstream_concurrent.h
net/eqstream_concurrent_message.h
net/packet.cpp
net/packet.h
net/servertalk_client_connection.cpp
+15 -2
View File
@@ -7,6 +7,7 @@
#include "emu_versions.h"
#include "eq_packet.h"
#include "net/daybreak_connection.h"
#include "event/event_loop.h"
typedef enum {
ESTABLISHED,
@@ -23,11 +24,11 @@ struct EQStreamManagerInterfaceOptions
{
EQStreamManagerInterfaceOptions() {
opcode_size = 2;
loop = &EQ::EventLoop::GetDefault();
}
EQStreamManagerInterfaceOptions(int port, bool encoded, bool compressed) {
opcode_size = 2;
track_opcode_stats = false;
//World seems to support both compression and xor zone supports one or the others.
//Enforce one or the other in the convienence construct
@@ -41,13 +42,21 @@ struct EQStreamManagerInterfaceOptions
}
daybreak_options.port = port;
loop = &EQ::EventLoop::GetDefault();
}
int opcode_size;
bool track_opcode_stats;
EQ::Net::DaybreakConnectionManagerOptions daybreak_options;
EQ::EventLoop *loop;
};
enum EQStreamPriority : int32_t {
High,
Normal,
Low
};
class EQStreamInterface;
class EQStreamManagerInterface
{
public:
@@ -56,6 +65,10 @@ public:
const EQStreamManagerInterfaceOptions& GetOptions() const { return m_options; }
EQStreamManagerInterfaceOptions& MutateOptions() { return m_options; }
virtual void OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func) = 0;
virtual void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) = 0;
virtual void SetPriority(EQStreamPriority priority) = 0;
protected:
EQStreamManagerInterfaceOptions m_options;
};
+10 -10
View File
@@ -7,15 +7,20 @@ namespace EQ
{
class EventLoop
{
public:
static EventLoop &Get() {
static thread_local EventLoop inst;
return inst;
public:
EventLoop() {
memset(&m_loop, 0, sizeof(uv_loop_t));
uv_loop_init(&m_loop);
}
~EventLoop() {
uv_loop_close(&m_loop);
}
static EventLoop &GetDefault() {
static EventLoop inst;
return inst;
}
void Process() {
uv_run(&m_loop, UV_RUN_NOWAIT);
@@ -23,12 +28,7 @@ namespace EQ
uv_loop_t* Handle() { return &m_loop; }
private:
EventLoop() {
memset(&m_loop, 0, sizeof(uv_loop_t));
uv_loop_init(&m_loop);
}
private:
EventLoop(const EventLoop&);
EventLoop& operator=(const EventLoop&);
+8 -1
View File
@@ -24,7 +24,13 @@ namespace EQ {
std::exception error;
};
Task(EQ::EventLoop *loop, TaskFn fn) {
m_loop = loop;
m_fn = fn;
}
Task(TaskFn fn) {
m_loop = &EQ::EventLoop::GetDefault();
m_fn = fn;
}
@@ -60,7 +66,7 @@ namespace EQ {
m_work->data = baton;
uv_queue_work(EventLoop::Get().Handle(), m_work, [](uv_work_t* req) {
uv_queue_work(m_loop->Handle(), m_work, [](uv_work_t* req) {
TaskBaton *baton = (TaskBaton*)req->data;
baton->fn([baton](const EQEmu::Any& result) {
@@ -92,6 +98,7 @@ namespace EQ {
}
private:
EQ::EventLoop *m_loop;
TaskFn m_fn;
ResolveFn m_then;
RejectFn m_catch;
+20 -2
View File
@@ -6,14 +6,31 @@ namespace EQ {
class Timer
{
public:
Timer(std::function<void(Timer *)> cb)
Timer(EQ::EventLoop *loop, std::function<void(Timer *)> cb)
{
m_loop = loop;
m_timer = nullptr;
m_cb = cb;
}
Timer(std::function<void(Timer *)> cb)
{
m_loop = &EQ::EventLoop::GetDefault();
m_timer = nullptr;
m_cb = cb;
}
Timer(EQ::EventLoop *loop, uint64_t duration_ms, bool repeats, std::function<void(Timer *)> cb)
{
m_loop = loop;
m_timer = nullptr;
m_cb = cb;
Start(duration_ms, repeats);
}
Timer(uint64_t duration_ms, bool repeats, std::function<void(Timer *)> cb)
{
m_loop = &EQ::EventLoop::GetDefault();
m_timer = nullptr;
m_cb = cb;
Start(duration_ms, repeats);
@@ -25,7 +42,7 @@ namespace EQ {
}
void Start(uint64_t duration_ms, bool repeats) {
auto loop = EventLoop::Get().Handle();
auto loop = m_loop->Handle();
if (!m_timer) {
m_timer = new uv_timer_t;
memset(m_timer, 0, sizeof(uv_timer_t));
@@ -61,6 +78,7 @@ namespace EQ {
m_cb(this);
}
EQ::EventLoop *m_loop;
uv_timer_t *m_timer;
std::function<void(Timer*)> m_cb;
};
+30 -6
View File
@@ -10,20 +10,27 @@
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
{
m_attached = nullptr;
m_next_id = 1;
memset(&m_timer, 0, sizeof(uv_timer_t));
memset(&m_socket, 0, sizeof(uv_udp_t));
Attach(EQ::EventLoop::Get().Handle());
Attach(EventLoop::GetDefault().Handle());
}
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConnectionManagerOptions &opts)
{
m_attached = nullptr;
m_options = opts;
m_next_id = 1;
memset(&m_timer, 0, sizeof(uv_timer_t));
memset(&m_socket, 0, sizeof(uv_udp_t));
Attach(EQ::EventLoop::Get().Handle());
if (opts.loop == nullptr) {
Attach(EventLoop::GetDefault().Handle());
}
else {
Attach(opts.loop->Handle());
}
}
EQ::Net::DaybreakConnectionManager::~DaybreakConnectionManager()
@@ -89,7 +96,7 @@ void EQ::Net::DaybreakConnectionManager::Connect(const std::string &addr, int po
{
//todo dns resolution
auto connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, addr, port));
auto connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, GetNextId(), addr, port));
connection->m_self = connection;
if (m_on_new_connection) {
@@ -227,7 +234,7 @@ void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoi
StaticPacket p((void*)data, size);
auto request = p.GetSerialize<DaybreakConnect>(0);
connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, request, endpoint, port));
connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, GetNextId(), request, endpoint, port));
connection->m_self = connection;
if (m_on_new_connection) {
@@ -285,10 +292,18 @@ void EQ::Net::DaybreakConnectionManager::SendDisconnect(const std::string &addr,
});
}
uint64_t EQ::Net::DaybreakConnectionManager::GetNextId()
{
auto id = m_next_id;
m_next_id++;
return id;
}
//new connection made as server
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port)
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const DaybreakConnect &connect, const std::string &endpoint, int port)
{
m_owner = owner;
m_id = id;
m_last_send = Clock::now();
m_last_recv = Clock::now();
m_status = StatusConnected;
@@ -311,9 +326,10 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner
}
//new connection made as client
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, const std::string &endpoint, int port)
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const std::string &endpoint, int port)
{
m_owner = owner;
m_id = id;
m_last_send = Clock::now();
m_last_recv = Clock::now();
m_status = StatusConnecting;
@@ -1089,6 +1105,10 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
}
m_stats.resent_packets++;
m_stats.resent_time_min = std::min(m_stats.resent_time_min, (uint64_t)time_since_last_send.count());
m_stats.resent_time_max = std::max(m_stats.resent_time_max, (uint64_t)time_since_last_send.count());
m_stats.resent_time_average = (m_stats.resent_time_average / 2) + (time_since_last_send.count() / 2);
InternalBufferedSend(p);
entry.second.last_sent = now;
entry.second.times_resent++;
@@ -1118,6 +1138,10 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
}
m_stats.resent_packets++;
m_stats.resent_time_min = std::min(m_stats.resent_time_min, (uint64_t)time_since_last_send.count());
m_stats.resent_time_max = std::max(m_stats.resent_time_max, (uint64_t)time_since_last_send.count());
m_stats.resent_time_average = (m_stats.resent_time_average / 2) + (time_since_last_send.count() / 2);
InternalBufferedSend(p);
entry.second.last_sent = now;
entry.second.times_resent++;
+15 -2
View File
@@ -13,6 +13,7 @@
namespace EQ
{
class EventLoop;
namespace Net
{
enum DaybreakProtocolOpcode
@@ -90,6 +91,9 @@ namespace EQ
resent_packets = 0;
resent_fragments = 0;
resent_full = 0;
resent_time_min = 0;
resent_time_max = 0;
resent_time_average = 0;
datarate_remaining = 0.0;
}
@@ -110,6 +114,9 @@ namespace EQ
uint64_t resent_packets;
uint64_t resent_fragments;
uint64_t resent_full;
uint64_t resent_time_min;
uint64_t resent_time_max;
uint64_t resent_time_average;
double datarate_remaining;
};
@@ -118,8 +125,8 @@ namespace EQ
class DaybreakConnection
{
public:
DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port);
DaybreakConnection(DaybreakConnectionManager *owner, const std::string &endpoint, int port);
DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const DaybreakConnect &connect, const std::string &endpoint, int port);
DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const std::string &endpoint, int port);
~DaybreakConnection();
const std::string& RemoteEndpoint() const { return m_endpoint; }
@@ -138,6 +145,7 @@ namespace EQ
const DaybreakEncodeType* GetEncodePasses() const { return m_encode_passes; }
const DaybreakConnectionManager* GetManager() const { return m_owner; }
DaybreakConnectionManager* GetManager() { return m_owner; }
uint64_t GetId() const { return m_id; }
private:
DaybreakConnectionManager *m_owner;
std::string m_endpoint;
@@ -160,6 +168,7 @@ namespace EQ
size_t m_rolling_ping;
Timestamp m_close_time;
double m_outgoing_budget;
uint64_t m_id;
struct DaybreakSentPacket
{
@@ -252,6 +261,7 @@ namespace EQ
resend_timeout = 90000;
connection_close_time = 2000;
outgoing_data_rate = 0.0;
loop = nullptr;
}
size_t max_packet_size;
@@ -275,6 +285,7 @@ namespace EQ
DaybreakEncodeType encode_passes[2];
int port;
double outgoing_data_rate;
EQ::EventLoop *loop;
};
class DaybreakConnectionManager
@@ -303,6 +314,7 @@ namespace EQ
uv_udp_t m_socket;
uv_loop_t *m_attached;
DaybreakConnectionManagerOptions m_options;
uint64_t m_next_id;
std::function<void(std::shared_ptr<DaybreakConnection>)> m_on_new_connection;
std::function<void(std::shared_ptr<DaybreakConnection>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
std::function<void(std::shared_ptr<DaybreakConnection>, const Packet&)> m_on_packet_recv;
@@ -312,6 +324,7 @@ namespace EQ
void ProcessPacket(const std::string &endpoint, int port, const char *data, size_t size);
std::shared_ptr<DaybreakConnection> FindConnectionByEndpoint(std::string addr, int port);
void SendDisconnect(const std::string &addr, int port);
uint64_t GetNextId();
friend class DaybreakConnection;
};
+6 -2
View File
@@ -8,7 +8,7 @@ namespace EQ
{
namespace Net
{
static void DNSLookup(const std::string &addr, int port, bool ipv6, std::function<void(const std::string&)> cb) {
static void DNSLookup(EQ::EventLoop *eloop, const std::string &addr, int port, bool ipv6, std::function<void(const std::string&)> cb) {
struct DNSBaton
{
std::function<void(const std::string&)> cb;
@@ -21,7 +21,7 @@ namespace EQ
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
auto loop = EQ::EventLoop::Get().Handle();
auto loop = eloop->Handle();
uv_getaddrinfo_t *resolver = new uv_getaddrinfo_t();
memset(resolver, 0, sizeof(uv_getaddrinfo_t));
auto port_str = std::to_string(port);
@@ -57,5 +57,9 @@ namespace EQ
cb(addr);
}, addr.c_str(), port_str.c_str(), &hints);
}
static void DNSLookup(const std::string &addr, int port, bool ipv6, std::function<void(const std::string&)> cb) {
DNSLookup(&EQ::EventLoop::GetDefault(), addr, port, ipv6, cb);
}
}
}
+2 -6
View File
@@ -65,9 +65,7 @@ void EQ::Net::EQStream::QueuePacket(const EQApplicationPacket *p, bool ack_req)
opcode = p->GetOpcodeBypass();
}
else {
if (m_owner->GetOptions().track_opcode_stats) {
m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
}
m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
opcode = (*m_opcode_manager)->EmuToEQ(p->GetOpcode());
}
@@ -117,9 +115,7 @@ EQApplicationPacket *EQ::Net::EQStream::PopPacket() {
}
EmuOpcode emu_op = (*m_opcode_manager)->EQToEmu(opcode);
if (m_owner->GetOptions().track_opcode_stats) {
m_packet_recv_count[emu_op]++;
}
m_packet_recv_count[emu_op]++;
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + m_owner->GetOptions().opcode_size, p->Length() - m_owner->GetOptions().opcode_size);
ret->SetProtocolOpcode(opcode);
+5 -4
View File
@@ -19,12 +19,13 @@ namespace EQ
EQStreamManager(const EQStreamManagerInterfaceOptions &options);
~EQStreamManager();
void OnNewConnection(std::function<void(std::shared_ptr<EQStream>)> func) { m_on_new_connection = func; }
void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStream>, DbProtocolStatus, DbProtocolStatus)> func) { m_on_connection_state_change = func; }
virtual void OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func) { m_on_new_connection = func; }
virtual void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) { m_on_connection_state_change = func; }
virtual void SetPriority(EQStreamPriority priority) { }
private:
DaybreakConnectionManager m_daybreak;
std::function<void(std::shared_ptr<EQStream>)> m_on_new_connection;
std::function<void(std::shared_ptr<EQStream>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
std::function<void(std::shared_ptr<EQStreamInterface>)> m_on_new_connection;
std::function<void(std::shared_ptr<EQStreamInterface>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
std::map<std::shared_ptr<DaybreakConnection>, std::shared_ptr<EQStream>> m_streams;
void DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection);
+685
View File
@@ -0,0 +1,685 @@
#include "eqstream_concurrent.h"
#include "eqstream_concurrent_message.h"
#include "../event/event_loop.h"
#include "../event/timer.h"
#include "../string_util.h"
#include "../opcodemgr.h"
#include "../eqemu_logsys.h"
#include "../eqemu_logsys_fmt.h"
#include "daybreak_connection.h"
#include <thread>
#include <concurrentqueue.h>
#include <unordered_map>
#include <queue>
#include <deque>
#include <list>
struct EQ::Net::ConcurrentEQStreamManager::Impl
{
std::thread background;
bool background_running;
moodycamel::ConcurrentQueue<ceqs_msg_t> foreground_queue;
moodycamel::ConcurrentQueue<ceqs_msg_t> background_queue;
std::unordered_map<uint64_t, std::shared_ptr<DaybreakConnection>> connections;
std::unique_ptr<EQ::Timer> foreground_loop_timer;
std::unique_ptr<EQ::Timer> background_loop_timer;
std::unique_ptr<EQ::Timer> background_update_stats_timer;
std::unordered_map<uint64_t, std::shared_ptr<ConcurrentEQStream>> streams;
std::function<void(std::shared_ptr<EQStreamInterface>)> on_new_connection;
std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> on_connection_state_change;
EQStreamPriority priority;
};
EQ::Net::ConcurrentEQStreamManager::ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options)
: EQStreamManagerInterface(options)
{
_impl.reset(new Impl());
_impl->background = std::thread(std::bind(&ConcurrentEQStreamManager::_BackgroundThread, this));
_impl->foreground_loop_timer.reset(new EQ::Timer(options.loop, 16, true,
std::bind(&ConcurrentEQStreamManager::_ForegroundTimer, this, std::placeholders::_1)));
_impl->priority = EQStreamPriority::High;
}
EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager()
{
for (auto &s : _impl->streams) {
s.second->_Invalidate();
}
_impl->foreground_loop_timer.release();
//Tell the background to shutdown and wait for it to actually do so
ceqs_terminate_msg_t msg;
msg.type = ceqs_msg_type::TerminateBackground;
_PushToBackgroundQueue((ceqs_msg_t*)&msg);
_impl->background.join();
//Go through our incoming messages to make sure we clean up any packets in that need to be freed
ceqs_msg_t eqs_msg;
while (_impl->foreground_queue.try_dequeue(eqs_msg)) {
if (eqs_msg.type == ceqs_msg_type::PacketRecv) {
ceqs_packet_recv_msg_t *eqs_msg_in = (ceqs_packet_recv_msg_t*)&eqs_msg;
delete eqs_msg_in->packet;
}
}
}
void EQ::Net::ConcurrentEQStreamManager::_BackgroundThread() {
_impl->background_running = true;
EQ::EventLoop loop;
auto &eqs_opts = GetOptions();
auto opts = eqs_opts.daybreak_options;
opts.loop = &loop;
std::unique_ptr<DaybreakConnectionManager> dbcm(new DaybreakConnectionManager(opts));
dbcm->OnNewConnection(std::bind(&ConcurrentEQStreamManager::DaybreakNewConnection, this, std::placeholders::_1));
dbcm->OnConnectionStateChange(std::bind(&ConcurrentEQStreamManager::DaybreakConnectionStateChange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
dbcm->OnPacketRecv(std::bind(&ConcurrentEQStreamManager::DaybreakPacketRecv, this, std::placeholders::_1, std::placeholders::_2));
_impl->background_loop_timer.reset(new EQ::Timer(&loop, 16, true,
std::bind(&ConcurrentEQStreamManager::_BackgroundTimer, this, std::placeholders::_1)));
_impl->background_update_stats_timer.reset(new EQ::Timer(&loop, 500, true,
std::bind(&ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer, this, std::placeholders::_1)));
while (true == _impl->background_running) {
loop.Process();
switch (_impl->priority) {
case EQStreamPriority::Low:
Sleep(10);
break;
case EQStreamPriority::Normal:
Sleep(5);
break;
case EQStreamPriority::High:
Sleep(1);
break;
}
}
_impl->background_loop_timer.release();
_impl->background_update_stats_timer.release();
dbcm.release();
ceqs_msg_t eqs_msg;
while (_impl->background_queue.try_dequeue(eqs_msg)) {
if (eqs_msg.type == ceqs_msg_type::QueuePacket) {
ceqs_queue_packet_msg_t *eqs_msg_in = (ceqs_queue_packet_msg_t*)&eqs_msg;
delete eqs_msg_in->packet;
}
}
}
//Called by background
void EQ::Net::ConcurrentEQStreamManager::_BackgroundTimer(EQ::Timer * t)
{
ceqs_msg_t msg_queue[16];
size_t count = 0;
while ((count = _impl->background_queue.try_dequeue_bulk(msg_queue, 16)) != 0) {
for (size_t i = 0; i < count; ++i) {
_ProcessBackgroundMessage(msg_queue[i]);
}
}
}
void EQ::Net::ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer(EQ::Timer *t)
{
ceqs_msg_t msgs[16];
int i = 0;
for (auto &c : _impl->connections) {
auto &connection = c.second;
auto msg = (ceqs_update_stats_msg_t*)&msgs[i];
msg->type = ceqs_msg_type::UpdateStats;
msg->stream_id = connection->GetId();
msg->stats = connection->GetStats();
i++;
if (i >= 16) {
_impl->foreground_queue.enqueue_bulk(msgs, 16);
i = 0;
}
}
if (i > 0) {
_impl->foreground_queue.enqueue_bulk(msgs, i);
}
}
//Called by background
void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_msg_t &msg)
{
switch (msg.type) {
case ceqs_msg_type::QueuePacket:
{
ceqs_queue_packet_msg_t *msg_in = (ceqs_queue_packet_msg_t*)&msg;
auto iter = _impl->connections.find(msg_in->stream_id);
if (iter != _impl->connections.end()) {
iter->second->QueuePacket(*msg_in->packet, 0, msg_in->ack_req);
}
delete msg_in->packet;
break;
}
case ceqs_msg_type::TerminateBackground:
{
_impl->background_running = false;
break;
}
case ceqs_msg_type::CloseConnection:
{
ceqs_close_connection_msg_t *msg_in = (ceqs_close_connection_msg_t*)&msg;
auto iter = _impl->connections.find(msg_in->stream_id);
if (iter != _impl->connections.end()) {
iter->second->Close();
}
break;
}
case ceqs_msg_type::ResetStats:
{
ceqs_reset_stats_msg_t *msg_in = (ceqs_reset_stats_msg_t*)&msg;
auto iter = _impl->connections.find(msg_in->stream_id);
if (iter != _impl->connections.end()) {
iter->second->ResetStats();
}
break;
}
case ceqs_msg_type::SetPriority:
{
ceqs_set_priority_msg_t *msg_in = (ceqs_set_priority_msg_t*)&msg;
_impl->priority = msg_in->priority;
break;
}
default:
break;
}
}
//Called by foreground
void EQ::Net::ConcurrentEQStreamManager::_ForegroundTimer(EQ::Timer *t)
{
ceqs_msg_t msg_queue[16];
size_t count = 0;
while ((count = _impl->foreground_queue.try_dequeue_bulk(msg_queue, 16)) != 0) {
for (size_t i = 0; i < count; ++i) {
_ProcessForegroundMessage(msg_queue[i]);
}
}
}
//Called by foreground
void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_msg_t &msg)
{
switch (msg.type) {
case ceqs_msg_type::NewConnection:
{
ceqs_new_connection_msg_t *msg_in = (ceqs_new_connection_msg_t*)&msg;
std::shared_ptr<ConcurrentEQStream> stream(new ConcurrentEQStream(this,
msg_in->stream_id,
msg_in->endpoint,
msg_in->remote_port,
(DbProtocolStatus)msg_in->state));
_impl->streams.insert(std::make_pair(msg_in->stream_id, stream));
if (_impl->on_new_connection) {
_impl->on_new_connection(stream);
}
break;
}
case ceqs_msg_type::ConnectionStateChange:
{
ceqs_connection_state_change_msg_t *msg_in = (ceqs_connection_state_change_msg_t*)&msg;
auto iter = _impl->streams.find(msg_in->stream_id);
if (iter != _impl->streams.end()) {
iter->second->_SetState((DbProtocolStatus)msg_in->to);
if ((DbProtocolStatus)msg_in->to == DbProtocolStatus::StatusDisconnected || (DbProtocolStatus)msg_in->to == DbProtocolStatus::StatusDisconnecting) {
_impl->streams.erase(iter);
}
}
break;
}
case ceqs_msg_type::PacketRecv:
{
ceqs_packet_recv_msg_t *msg_in = (ceqs_packet_recv_msg_t*)&msg;
std::unique_ptr<EQ::Net::Packet> p(msg_in->packet);
auto iter = _impl->streams.find(msg_in->stream_id);
if (iter != _impl->streams.end()) {
iter->second->_RecvPacket(std::move(p));
}
break;
}
case ceqs_msg_type::UpdateStats:
{
ceqs_update_stats_msg_t *msg_in = (ceqs_update_stats_msg_t*)&msg;
auto iter = _impl->streams.find(msg_in->stream_id);
if (iter != _impl->streams.end()) {
iter->second->_UpdateStats(msg_in->stats);
}
break;
}
default:
break;
}
}
void EQ::Net::ConcurrentEQStreamManager::_PushToBackgroundQueue(ceqs_msg_t *msg)
{
_impl->background_queue.enqueue(*msg);
}
void EQ::Net::ConcurrentEQStreamManager::_PushToForegroundQueue(ceqs_msg_t *msg)
{
_impl->foreground_queue.enqueue(*msg);
}
//Called by foreground
void EQ::Net::ConcurrentEQStreamManager::OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func)
{
_impl->on_new_connection = func;
}
//Called by foreground
void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, DbProtocolStatus, DbProtocolStatus)> func)
{
_impl->on_connection_state_change = func;
}
//Called by foreground
void EQ::Net::ConcurrentEQStreamManager::SetPriority(EQStreamPriority priority)
{
ceqs_set_priority_msg_t msg;
msg.type = ceqs_msg_type::SetPriority;
msg.priority = priority;
_PushToBackgroundQueue((ceqs_msg_t*)&msg);
}
//Called by background
void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection)
{
_impl->connections.insert(std::make_pair(connection->GetId(), connection));
ceqs_new_connection_msg_t msg;
msg.type = ceqs_msg_type::NewConnection;
msg.stream_id = connection->GetId();
msg.remote_port = connection->RemotePort();
msg.state = connection->GetStatus();
strcpy(msg.endpoint, connection->RemoteEndpoint().c_str());
msg.endpoint[connection->RemoteEndpoint().length()] = 0;
//Make sure the foreground gets this message
_PushToForegroundQueue((ceqs_msg_t*)&msg);
}
//Called by background
void EQ::Net::ConcurrentEQStreamManager::DaybreakConnectionStateChange(std::shared_ptr<DaybreakConnection> connection, DbProtocolStatus from, DbProtocolStatus to)
{
if (to == DbProtocolStatus::StatusDisconnecting || to == DbProtocolStatus::StatusDisconnected) {
auto iter = _impl->connections.find(connection->GetId());
if (iter != _impl->connections.end()) {
_impl->connections.erase(iter);
}
}
ceqs_connection_state_change_msg_t msg;
msg.type = ceqs_msg_type::ConnectionStateChange;
msg.stream_id = connection->GetId();
msg.from = (int)from;
msg.to = (int)to;
//Make sure the foreground gets this message
_PushToForegroundQueue((ceqs_msg_t*)&msg);
}
//Called by background
void EQ::Net::ConcurrentEQStreamManager::DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p)
{
ceqs_packet_recv_msg_t msg;
msg.type = ceqs_msg_type::PacketRecv;
msg.stream_id = connection->GetId();
msg.packet = new DynamicPacket();
msg.packet->PutPacket(0, p);
//Make sure the foreground gets this message
_PushToForegroundQueue((ceqs_msg_t*)&msg);
}
struct EQ::Net::ConcurrentEQStream::Impl
{
ConcurrentEQStreamManager *parent;
uint64_t id;
std::string remote_endpoint;
int remote_port;
uint32_t remote_ip;
DbProtocolStatus state;
std::deque<std::unique_ptr<EQ::Net::Packet>> packet_queue;
OpcodeManager **opcode_manager;
DaybreakConnectionStats stats;
std::unordered_map<EmuOpcode, int> packet_recv_count;
std::unordered_map<EmuOpcode, int> packet_sent_count;
};
//Called by foreground
EQ::Net::ConcurrentEQStream::ConcurrentEQStream(ConcurrentEQStreamManager *parent, uint64_t id, const std::string &remote_endpoint, int remote_port, DbProtocolStatus state)
{
_impl.reset(new Impl());
_impl->parent = parent;
_impl->id = id;
_impl->remote_endpoint = remote_endpoint;
_impl->remote_port = remote_port;
_impl->remote_ip = inet_addr(remote_endpoint.c_str());
_impl->state = state;
_impl->opcode_manager = nullptr;
}
//Called by foreground
EQ::Net::ConcurrentEQStream::~ConcurrentEQStream()
{
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::QueuePacket(const EQApplicationPacket *p, bool ack_req)
{
if (!_impl->parent) {
return;
}
if (_impl->opcode_manager && *_impl->opcode_manager) {
auto &options = _impl->parent->GetOptions();
uint16 opcode = 0;
if (p->GetOpcodeBypass() != 0) {
opcode = p->GetOpcodeBypass();
}
else {
_impl->packet_sent_count[p->GetOpcode()]++;
opcode = (*_impl->opcode_manager)->EmuToEQ(p->GetOpcode());
}
EQ::Net::DynamicPacket *out = new EQ::Net::DynamicPacket();
switch (options.opcode_size) {
case 1:
out->PutUInt8(0, opcode);
out->PutData(1, p->pBuffer, p->size);
break;
case 2:
out->PutUInt16(0, opcode);
out->PutData(2, p->pBuffer, p->size);
break;
}
ceqs_queue_packet_msg_t msg;
msg.type = ceqs_msg_type::QueuePacket;
msg.stream_id = _impl->id;
msg.packet = out;
msg.ack_req = ack_req;
//Make sure the background gets this message
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
}
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::FastQueuePacket(EQApplicationPacket **p, bool ack_req)
{
QueuePacket(*p, ack_req);
delete *p;
*p = nullptr;
}
//Called by foreground
EQApplicationPacket *EQ::Net::ConcurrentEQStream::PopPacket()
{
if (!_impl->parent) {
return nullptr;
}
if (_impl->packet_queue.empty()) {
return nullptr;
}
if (_impl->opcode_manager != nullptr && *_impl->opcode_manager != nullptr) {
auto &options = _impl->parent->GetOptions();
auto &p = _impl->packet_queue.front();
uint16 opcode = 0;
switch (options.opcode_size) {
case 1:
opcode = p->GetUInt8(0);
break;
case 2:
opcode = p->GetUInt16(0);
break;
}
EmuOpcode emu_op = (*_impl->opcode_manager)->EQToEmu(opcode);
_impl->packet_recv_count[emu_op]++;
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + options.opcode_size, p->Length() - options.opcode_size);
ret->SetProtocolOpcode(opcode);
_impl->packet_queue.pop_front();
return ret;
}
return nullptr;
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::Close()
{
if (!_impl->parent) {
return;
}
ceqs_close_connection_msg_t msg;
msg.type = ceqs_msg_type::CloseConnection;
msg.stream_id = _impl->id;
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::ReleaseFromUse()
{
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::RemoveData()
{
}
//Called by foreground
std::string EQ::Net::ConcurrentEQStream::GetRemoteAddr() const
{
return _impl->remote_endpoint;
}
//Called by foreground
uint32 EQ::Net::ConcurrentEQStream::GetRemoteIP() const
{
return _impl->remote_ip;
}
//Called by foreground
uint16 EQ::Net::ConcurrentEQStream::GetRemotePort() const
{
return _impl->remote_port;
}
//Called by foreground
bool EQ::Net::ConcurrentEQStream::CheckState(EQStreamState state)
{
return GetState() == state;
}
//Called by foreground
std::string EQ::Net::ConcurrentEQStream::Describe() const
{
return "Concurrent EQStream";
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::SetActive(bool val)
{
}
//Called by foreground
EQStreamInterface::MatchState EQ::Net::ConcurrentEQStream::CheckSignature(const Signature *sig)
{
if (!_impl->parent) {
return MatchFailed;
}
if (!_impl->packet_queue.empty()) {
auto& options = _impl->parent->GetOptions();
auto p = _impl->packet_queue.front().get();
uint16 opcode = 0;
size_t length = p->Length() - options.opcode_size;
switch (options.opcode_size) {
case 1:
opcode = p->GetUInt8(0);
break;
case 2:
opcode = p->GetUInt16(0);
break;
}
if (sig->ignore_eq_opcode != 0 && opcode == sig->ignore_eq_opcode) {
if (_impl->packet_queue.size() > 1) {
p = _impl->packet_queue[1].get();
opcode = 0;
length = p->Length() - options.opcode_size;
switch (options.opcode_size) {
case 1:
opcode = p->GetUInt8(0);
break;
case 2:
opcode = p->GetUInt16(0);
break;
}
}
else {
return MatchNotReady;
}
}
if (opcode == sig->first_eq_opcode) {
if (length == sig->first_length) {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length matched {3}",
GetRemoteAddr(), GetRemotePort(), sig->first_eq_opcode, length);
return MatchSuccessful;
}
else if (length == 0) {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length is ignored.",
GetRemoteAddr(), GetRemotePort(), sig->first_eq_opcode);
return MatchSuccessful;
}
else {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} but length {3} did not match expected {4}",
GetRemoteAddr(), GetRemotePort(), sig->first_eq_opcode, length, sig->first_length);
return MatchFailed;
}
}
else {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode {1:#x} did not match expected {2:#x}",
GetRemoteAddr(), GetRemotePort(), opcode, sig->first_eq_opcode);
return MatchFailed;
}
}
return MatchNotReady;
}
//Called by foreground
EQStreamState EQ::Net::ConcurrentEQStream::GetState()
{
switch (_impl->state) {
case StatusConnecting:
return UNESTABLISHED;
case StatusConnected:
return ESTABLISHED;
case StatusDisconnecting:
return DISCONNECTING;
default:
return CLOSED;
}
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::SetOpcodeManager(OpcodeManager **opm)
{
_impl->opcode_manager = opm;
}
//Called by foreground
EQStreamInterface::Stats EQ::Net::ConcurrentEQStream::GetStats() const
{
EQStreamInterface::Stats ret;
ret.DaybreakStats = _impl->stats;
for (int i = 0; i < _maxEmuOpcode; ++i) {
ret.RecvCount[i] = 0;
ret.SentCount[i] = 0;
}
for (auto &s : _impl->packet_sent_count) {
ret.SentCount[s.first] = s.second;
}
for (auto &r : _impl->packet_recv_count) {
ret.RecvCount[r.first] = r.second;
}
return ret;
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::ResetStats()
{
if (!_impl->parent) {
return;
}
ceqs_reset_stats_msg_t msg;
msg.type = ceqs_msg_type::ResetStats;
msg.stream_id = _impl->id;
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
}
//Called by foreground
EQStreamManagerInterface *EQ::Net::ConcurrentEQStream::GetManager() const
{
return _impl->parent;
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::_SetState(DbProtocolStatus state)
{
_impl->state = state;
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::_RecvPacket(std::unique_ptr<EQ::Net::Packet> p)
{
_impl->packet_queue.push_back(std::move(p));
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::_UpdateStats(const DaybreakConnectionStats &stats)
{
_impl->stats = stats;
}
//Called by foreground
void EQ::Net::ConcurrentEQStream::_Invalidate()
{
_impl->parent = nullptr;
}
+75
View File
@@ -0,0 +1,75 @@
#pragma once
#include "../eq_stream_intf.h"
#include "eqstream_concurrent_message.h"
#include <memory>
namespace EQ
{
class Timer;
namespace Net
{
class ConcurrentEQStream;
class ConcurrentEQStreamManager : public EQStreamManagerInterface
{
public:
ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options);
~ConcurrentEQStreamManager();
virtual void OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func);
virtual void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func);
virtual void SetPriority(EQStreamPriority priority);
void _PushToBackgroundQueue(ceqs_msg_t* msg);
void _PushToForegroundQueue(ceqs_msg_t* msg);
private:
struct Impl;
std::unique_ptr<Impl> _impl;
void _BackgroundThread();
void _BackgroundTimer(EQ::Timer *t);
void _BackgroundUpdateStatsTimer(EQ::Timer *t);
void _ProcessBackgroundMessage(const ceqs_msg_t &msg);
void _ForegroundTimer(EQ::Timer *t);
void _ProcessForegroundMessage(const ceqs_msg_t &msg);
void DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection);
void DaybreakConnectionStateChange(std::shared_ptr<DaybreakConnection> connection, DbProtocolStatus from, DbProtocolStatus to);
void DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p);
};
class ConcurrentEQStream : public EQStreamInterface
{
public:
ConcurrentEQStream(ConcurrentEQStreamManager *parent, uint64_t id, const std::string &remote_endpoint, int remote_port, DbProtocolStatus state);
~ConcurrentEQStream();
virtual void QueuePacket(const EQApplicationPacket *p, bool ack_req = true);
virtual void FastQueuePacket(EQApplicationPacket **p, bool ack_req = true);
virtual EQApplicationPacket *PopPacket();
virtual void Close();
virtual void ReleaseFromUse();
virtual void RemoveData();
virtual std::string GetRemoteAddr() const;
virtual uint32 GetRemoteIP() const;
virtual uint16 GetRemotePort() const;
virtual bool CheckState(EQStreamState state);
virtual std::string Describe() const;
virtual void SetActive(bool val);
virtual MatchState CheckSignature(const Signature *sig);
virtual EQStreamState GetState();
virtual void SetOpcodeManager(OpcodeManager **opm);
virtual Stats GetStats() const;
virtual void ResetStats();
virtual EQStreamManagerInterface* GetManager() const;
void _SetState(DbProtocolStatus state);
void _RecvPacket(std::unique_ptr<EQ::Net::Packet> p);
void _UpdateStats(const DaybreakConnectionStats &stats);
void _Invalidate();
private:
struct Impl;
std::unique_ptr<Impl> _impl;
};
}
}
+95
View File
@@ -0,0 +1,95 @@
#pragma once
#define EQSM_PAD_LEN 252
namespace EQ
{
namespace Net
{
class DynamicPacket;
enum ceqs_msg_type : uint32_t
{
//Sent by background
NewConnection,
ConnectionStateChange,
PacketRecv,
UpdateStats,
//Sent by foreground
QueuePacket,
TerminateBackground,
CloseConnection,
ResetStats,
SetPriority
};
typedef struct
{
ceqs_msg_type type;
char padding[EQSM_PAD_LEN];
} ceqs_msg_t;
//Sent by background
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
int remote_port;
int state;
char endpoint[64];
} ceqs_new_connection_msg_t;
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
int from;
int to;
} ceqs_connection_state_change_msg_t;
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
EQ::Net::DynamicPacket *packet;
} ceqs_packet_recv_msg_t;
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
DaybreakConnectionStats stats;
} ceqs_update_stats_msg_t;
//Sent by foreground
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
EQ::Net::DynamicPacket *packet;
bool ack_req;
} ceqs_queue_packet_msg_t;
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
} ceqs_close_connection_msg_t;
typedef struct
{
ceqs_msg_type type;
uint64_t stream_id;
} ceqs_reset_stats_msg_t;
typedef struct
{
ceqs_msg_type type;
} ceqs_terminate_msg_t;
typedef struct
{
ceqs_msg_type type;
EQStreamPriority priority;
} ceqs_set_priority_msg_t;
}
}
+7 -3
View File
@@ -15,7 +15,7 @@ EQ::Net::TCPConnection::~TCPConnection() {
Disconnect();
}
void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb)
void EQ::Net::TCPConnection::Connect(EQ::EventLoop *loop, const std::string & addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb)
{
struct EQTCPConnectBaton
{
@@ -23,10 +23,9 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv
std::function<void(std::shared_ptr<EQ::Net::TCPConnection>)> cb;
};
auto loop = EQ::EventLoop::Get().Handle();
uv_tcp_t *socket = new uv_tcp_t;
memset(socket, 0, sizeof(uv_tcp_t));
uv_tcp_init(loop, socket);
uv_tcp_init(loop->Handle(), socket);
sockaddr_storage iaddr;
if (ipv6) {
@@ -64,6 +63,11 @@ void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv
});
}
void EQ::Net::TCPConnection::Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb)
{
Connect(&EventLoop::GetDefault(), addr, port, ipv6, cb);
}
void EQ::Net::TCPConnection::Start() {
uv_read_start((uv_stream_t*)m_socket, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = new char[suggested_size];
+2
View File
@@ -7,6 +7,7 @@
namespace EQ
{
class EventLoop;
namespace Net
{
class TCPConnection
@@ -15,6 +16,7 @@ namespace EQ
TCPConnection(uv_tcp_t *socket);
~TCPConnection();
static void Connect(EQ::EventLoop *loop, const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb);
static void Connect(const std::string &addr, int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb);
void Start();
+9 -2
View File
@@ -7,6 +7,13 @@ void on_close_tcp_server_handle(uv_handle_t* handle) {
EQ::Net::TCPServer::TCPServer()
{
m_loop = &EventLoop::GetDefault();
m_socket = nullptr;
}
EQ::Net::TCPServer::TCPServer(EQ::EventLoop *loop)
{
m_loop = loop;
m_socket = nullptr;
}
@@ -32,7 +39,7 @@ void EQ::Net::TCPServer::Listen(const std::string &addr, int port, bool ipv6, st
m_on_new_connection = cb;
auto loop = EQ::EventLoop::Get().Handle();
auto loop = m_loop->Handle();
m_socket = new uv_tcp_t;
memset(m_socket, 0, sizeof(uv_tcp_t));
uv_tcp_init(loop, m_socket);
@@ -53,7 +60,7 @@ void EQ::Net::TCPServer::Listen(const std::string &addr, int port, bool ipv6, st
return;
}
auto loop = EQ::EventLoop::Get().Handle();
auto loop = server->loop;
uv_tcp_t *client = new uv_tcp_t;
memset(client, 0, sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
+4 -1
View File
@@ -1,6 +1,7 @@
#pragma once
#include "tcp_connection.h"
#include "../event/event_loop.h"
namespace EQ
{
@@ -10,6 +11,7 @@ namespace EQ
{
public:
TCPServer();
TCPServer(EQ::EventLoop *loop);
~TCPServer();
void Listen(int port, bool ipv6, std::function<void(std::shared_ptr<TCPConnection>)> cb);
@@ -19,7 +21,8 @@ namespace EQ
private:
std::function<void(std::shared_ptr<TCPConnection>)> m_on_new_connection;
EQ::EventLoop *m_loop;
uv_tcp_t *m_socket;
};
}
}
}
+1 -1
View File
@@ -136,7 +136,7 @@ int main(int argc, char *argv[]) {
zones.erase(rem);
}
EQ::EventLoop::Get().Process();
EQ::EventLoop::GetDefault().Process();
Sleep(5);
}
+6 -6
View File
@@ -28,7 +28,7 @@ ClientManager::ClientManager()
{
int titanium_port = atoi(server.config->GetVariable("Titanium", "port").c_str());
EQStreamManagerInterfaceOptions titanium_opts(titanium_port, false, false);
titanium_stream = new EQ::Net::EQStreamManager(titanium_opts);
titanium_stream = new EQ::Net::ConcurrentEQStreamManager(titanium_opts);
titanium_ops = new RegularOpcodeManager;
if (!titanium_ops->LoadOpcodes(server.config->GetVariable("Titanium", "opcodes").c_str()))
{
@@ -37,8 +37,8 @@ ClientManager::ClientManager()
run_server = false;
}
titanium_stream->OnNewConnection([this](std::shared_ptr<EQ::Net::EQStream> stream) {
LogF(Logs::General, Logs::Login_Server, "New Titanium client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
titanium_stream->OnNewConnection([this](std::shared_ptr<EQStreamInterface> stream) {
LogF(Logs::General, Logs::Login_Server, "New Titanium client connection from {0}:{1}", stream->GetRemoteAddr(), stream->GetRemotePort());
stream->SetOpcodeManager(&titanium_ops);
Client *c = new Client(stream, cv_titanium);
clients.push_back(c);
@@ -46,7 +46,7 @@ ClientManager::ClientManager()
int sod_port = atoi(server.config->GetVariable("SoD", "port").c_str());
EQStreamManagerInterfaceOptions sod_opts(sod_port, false, false);
sod_stream = new EQ::Net::EQStreamManager(sod_opts);
sod_stream = new EQ::Net::ConcurrentEQStreamManager(sod_opts);
sod_ops = new RegularOpcodeManager;
if (!sod_ops->LoadOpcodes(server.config->GetVariable("SoD", "opcodes").c_str()))
{
@@ -55,8 +55,8 @@ ClientManager::ClientManager()
run_server = false;
}
sod_stream->OnNewConnection([this](std::shared_ptr<EQ::Net::EQStream> stream) {
LogF(Logs::General, Logs::Login_Server, "New SoD client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
sod_stream->OnNewConnection([this](std::shared_ptr<EQStreamInterface> stream) {
LogF(Logs::General, Logs::Login_Server, "New SoD client connection from {0}:{1}", stream->GetRemoteAddr(), stream->GetRemotePort());
stream->SetOpcodeManager(&sod_ops);
Client *c = new Client(stream, cv_sod);
clients.push_back(c);
+3 -3
View File
@@ -20,7 +20,7 @@
#include "../common/global_define.h"
#include "../common/opcodemgr.h"
#include "../common/net/eqstream.h"
#include "../common/net/eqstream_concurrent.h"
#include "client.h"
#include <list>
@@ -63,9 +63,9 @@ private:
std::list<Client*> clients;
OpcodeManager *titanium_ops;
EQ::Net::EQStreamManager *titanium_stream;
EQ::Net::ConcurrentEQStreamManager *titanium_stream;
OpcodeManager *sod_ops;
EQ::Net::EQStreamManager *sod_stream;
EQ::Net::ConcurrentEQStreamManager *sod_stream;
};
#endif
+1 -1
View File
@@ -179,7 +179,7 @@ int main()
while (run_server) {
Timer::SetCurrentTime();
server.client_manager->Process();
EQ::EventLoop::Get().Process();
EQ::EventLoop::GetDefault().Process();
Sleep(5);
}
+1 -1
View File
@@ -99,7 +99,7 @@ int main() {
if(LFGuildExpireTimer.Check())
lfguildmanager.ExpireEntries();
EQ::EventLoop::Get().Process();
EQ::EventLoop::GetDefault().Process();
Sleep(5);
}
LogSys.CloseFileLogs();
+1 -1
View File
@@ -482,7 +482,7 @@ Clientlist::Clientlist(int ChatPort) {
if (!ChatOpMgr->LoadOpcodes("mail_opcodes.conf"))
exit(1);
chatsf->OnNewConnection([this](std::shared_ptr<EQ::Net::EQStream> stream) {
chatsf->OnNewConnection([this](std::shared_ptr<EQStreamInterface> stream) {
LogF(Logs::General, Logs::Login_Server, "New Client UDP connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
stream->SetOpcodeManager(&ChatOpMgr);
+1 -1
View File
@@ -153,7 +153,7 @@ int main() {
if(ChannelListProcessTimer.Check())
ChannelList->Process();
EQ::EventLoop::Get().Process();
EQ::EventLoop::GetDefault().Process();
Sleep(5);
}
+4 -4
View File
@@ -33,7 +33,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "../common/version.h"
#include "../common/eqtime.h"
#include "../common/event/event_loop.h"
#include "../common/net/eqstream.h"
#include "../common/net/eqstream_concurrent.h"
#include "../common/opcodemgr.h"
#include "../common/guilds.h"
#include "../common/eq_stream_ident.h"
@@ -505,7 +505,7 @@ int main(int argc, char** argv) {
opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
opts.daybreak_options.outgoing_data_rate = RuleR(Network, ClientDataRate);
EQ::Net::EQStreamManager eqsm(opts);
EQ::Net::ConcurrentEQStreamManager eqsm(opts);
//register all the patches we have avaliable with the stream identifier.
EQStreamIdentifier stream_identifier;
@@ -520,7 +520,7 @@ int main(int argc, char** argv) {
std::shared_ptr<EQStreamInterface> eqs;
EQStreamInterface *eqsi;
eqsm.OnNewConnection([&stream_identifier](std::shared_ptr<EQ::Net::EQStream> stream) {
eqsm.OnNewConnection([&stream_identifier](std::shared_ptr<EQStreamInterface> stream) {
stream_identifier.AddStream(stream);
LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort()));
});
@@ -587,7 +587,7 @@ int main(int argc, char** argv) {
UpdateWindowTitle(window_title);
}
EQ::EventLoop::Get().Process();
EQ::EventLoop::GetDefault().Process();
Sleep(5);
}
Log(Logs::General, Logs::World_Server, "World main loop completed.");
+16 -18
View File
@@ -9522,32 +9522,30 @@ void command_netstats(Client *c, const Seperator *sep)
c->Message(0, "Resent Packets: %u (%.2f/sec)", stats.resent_packets, stats.resent_packets / sec_since_stats_reset);
c->Message(0, "Resent Fragments: %u (%.2f/sec)", stats.resent_fragments, stats.resent_fragments / sec_since_stats_reset);
c->Message(0, "Resent Non-Fragments: %u (%.2f/sec)", stats.resent_full, stats.resent_full / sec_since_stats_reset);
c->Message(0, "Resent Times: %ums (min) %ums (max) %ums (avg)", stats.resent_time_min, stats.resent_time_max, stats.resent_time_average);
c->Message(0, "Dropped Datarate Packets: %u (%.2f/sec)", stats.dropped_datarate_packets, stats.dropped_datarate_packets / sec_since_stats_reset);
if (opts.daybreak_options.outgoing_data_rate > 0.0) {
c->Message(0, "Outgoing Link Saturation %.2f%% (%.2fkb/sec)", 100.0 * (1.0 - ((opts.daybreak_options.outgoing_data_rate - stats.datarate_remaining) / opts.daybreak_options.outgoing_data_rate)), opts.daybreak_options.outgoing_data_rate);
}
if (opts.track_opcode_stats) {
c->Message(0, "--------------------------------------------------------------------");
c->Message(0, "Sent Packet Types");
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto cnt = eqs_stats.SentCount[i];
if (cnt > 0) {
c->Message(0, "%s: %u (%.2f / sec)", OpcodeNames[i], cnt, cnt / sec_since_stats_reset);
}
}
c->Message(0, "--------------------------------------------------------------------");
c->Message(0, "Recv Packet Types");
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto cnt = eqs_stats.RecvCount[i];
if (cnt > 0) {
c->Message(0, "%s: %u (%.2f / sec)", OpcodeNames[i], cnt, cnt / sec_since_stats_reset);
}
c->Message(0, "--------------------------------------------------------------------");
c->Message(0, "Sent Packet Types");
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto cnt = eqs_stats.SentCount[i];
if (cnt > 0) {
c->Message(0, "%s: %u (%.2f / sec)", OpcodeNames[i], cnt, cnt / sec_since_stats_reset);
}
}
c->Message(0, "--------------------------------------------------------------------");
c->Message(0, "Recv Packet Types");
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto cnt = eqs_stats.RecvCount[i];
if (cnt > 0) {
c->Message(0, "%s: %u (%.2f / sec)", OpcodeNames[i], cnt, cnt / sec_since_stats_reset);
}
}
c->Message(0, "--------------------------------------------------------------------");
}
}
+21 -21
View File
@@ -689,32 +689,32 @@ void callGetPacketStatistics(Json::Value &response)
row["resent_packets"] = stats.resent_packets;
row["resent_fragments"] = stats.resent_fragments;
row["resent_non_fragments"] = stats.resent_full;
row["resent_time_min"] = stats.resent_time_min;
row["resent_time_max"] = stats.resent_time_max;
row["resent_time_average"] = stats.resent_time_average;
row["dropped_datarate_packets"] = stats.dropped_datarate_packets;
if (opts.track_opcode_stats) {
Json::Value sent_packet_types;
Json::Value sent_packet_types;
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto count = eqs_stats.SentCount[i];
if (count > 0) {
sent_packet_types[OpcodeNames[i]] = count;
}
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto count = eqs_stats.SentCount[i];
if (count > 0) {
sent_packet_types[OpcodeNames[i]] = count;
}
Json::Value receive_packet_types;
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto count = eqs_stats.RecvCount[i];
if (count > 0) {
receive_packet_types[OpcodeNames[i]] = count;
}
}
row["sent_packet_types"] = sent_packet_types;
row["receive_packet_types"] = receive_packet_types;
}
Json::Value receive_packet_types;
for (auto i = 0; i < _maxEmuOpcode; ++i) {
auto count = eqs_stats.RecvCount[i];
if (count > 0) {
receive_packet_types[OpcodeNames[i]] = count;
}
}
row["sent_packet_types"] = sent_packet_types;
row["receive_packet_types"] = receive_packet_types;
response.append(row);
}
}
@@ -776,4 +776,4 @@ void EQEmuApiZoneDataService::get(Json::Value &response, const std::vector<std::
if (method == "get_zone_attributes") {
callGetZoneAttributes(response);
}
}
}
+1 -1
View File
@@ -267,7 +267,7 @@ public:
virtual void WearChange(uint8 material_slot, uint16 texture, uint32 color, uint32 hero_forge_model = 0);
void ChangeSize(float in_size, bool bNoRestriction = false);
void DoAnim(const int animnum, int type=0, bool ackreq = true, eqFilterType filter = FilterNone);
void DoAnim(const int animnum, int type=0, bool ackreq = false, eqFilterType filter = FilterNone);
void ProjectileAnimation(Mob* to, int item_id, bool IsArrow = false, float speed = 0, float angle = 0, float tilt = 0, float arc = 0, const char *IDFile = nullptr, EQEmu::skills::SkillType skillInUse = EQEmu::skills::SkillArchery);
void SendAppearanceEffect(uint32 parm1, uint32 parm2, uint32 parm3, uint32 parm4, uint32 parm5, Client *specific_target=nullptr);
void SendLevelAppearance();
+7 -5
View File
@@ -44,6 +44,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "../common/eqemu_logsys.h"
#include "../common/eqemu_logsys_fmt.h"
#include "../common/net/console_server.h"
#include "../common/net/eqstream_concurrent.h"
#include "zone_config.h"
#include "masterentity.h"
@@ -461,7 +462,7 @@ int main(int argc, char** argv) {
UpdateWindowTitle();
std::shared_ptr<EQStreamInterface> eqss;
EQStreamInterface *eqsi;
std::unique_ptr<EQ::Net::EQStreamManager> eqsm;
std::unique_ptr<EQ::Net::ConcurrentEQStreamManager> eqsm;
std::chrono::time_point<std::chrono::system_clock> frame_prev = std::chrono::system_clock::now();
std::unique_ptr<EQ::Net::ConsoleServer> console;
@@ -506,11 +507,10 @@ int main(int argc, char** argv) {
opts.daybreak_options.resend_delay_min = RuleI(Network, ResendDelayMinMS);
opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
opts.daybreak_options.outgoing_data_rate = RuleR(Network, ClientDataRate);
opts.track_opcode_stats = RuleB(Network, TrackOpcodeStats);
eqsm.reset(new EQ::Net::EQStreamManager(opts));
eqsm.reset(new EQ::Net::ConcurrentEQStreamManager(opts));
eqsf_open = true;
eqsm->OnNewConnection([&stream_identifier](std::shared_ptr<EQ::Net::EQStream> stream) {
eqsm->OnNewConnection([&stream_identifier](std::shared_ptr<EQStreamInterface> stream) {
stream_identifier.AddStream(stream);
LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort()));
});
@@ -588,16 +588,18 @@ int main(int argc, char** argv) {
while (RunLoops) {
bool previous_loaded = is_zone_loaded && numclients > 0;
EQ::EventLoop::Get().Process();
EQ::EventLoop::GetDefault().Process();
bool current_loaded = is_zone_loaded && numclients > 0;
if (previous_loaded && !current_loaded) {
process_timer.Stop();
process_timer.Start(1000, true);
eqsm->SetPriority(EQStreamPriority::Low);
}
else if (!previous_loaded && current_loaded) {
process_timer.Stop();
process_timer.Start(32, true);
eqsm->SetPriority(EQStreamPriority::High);
}
if (current_loaded) {