mirror of
https://github.com/EQEmu/Server.git
synced 2026-05-03 05:52:27 +00:00
Implement remain functionality and cleaning up some code
This commit is contained in:
parent
5e7316edb9
commit
c09a48d58c
@ -224,6 +224,7 @@ SET(common_headers
|
|||||||
net/endian.h
|
net/endian.h
|
||||||
net/eqstream.h
|
net/eqstream.h
|
||||||
net/eqstream_concurrent.h
|
net/eqstream_concurrent.h
|
||||||
|
net/eqstream_concurrent_message.h
|
||||||
net/packet.h
|
net/packet.h
|
||||||
net/servertalk_client_connection.h
|
net/servertalk_client_connection.h
|
||||||
net/servertalk_legacy_client_connection.h
|
net/servertalk_legacy_client_connection.h
|
||||||
@ -298,6 +299,7 @@ SOURCE_GROUP(Net FILES
|
|||||||
net/eqstream.h
|
net/eqstream.h
|
||||||
net/eqstream_concurrent.cpp
|
net/eqstream_concurrent.cpp
|
||||||
net/eqstream_concurrent.h
|
net/eqstream_concurrent.h
|
||||||
|
net/eqstream_concurrent_message.h
|
||||||
net/packet.cpp
|
net/packet.cpp
|
||||||
net/packet.h
|
net/packet.h
|
||||||
net/servertalk_client_connection.cpp
|
net/servertalk_client_connection.cpp
|
||||||
|
|||||||
@ -7,6 +7,7 @@
|
|||||||
#include "emu_versions.h"
|
#include "emu_versions.h"
|
||||||
#include "eq_packet.h"
|
#include "eq_packet.h"
|
||||||
#include "net/daybreak_connection.h"
|
#include "net/daybreak_connection.h"
|
||||||
|
#include "event/event_loop.h"
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
ESTABLISHED,
|
ESTABLISHED,
|
||||||
@ -23,6 +24,7 @@ struct EQStreamManagerInterfaceOptions
|
|||||||
{
|
{
|
||||||
EQStreamManagerInterfaceOptions() {
|
EQStreamManagerInterfaceOptions() {
|
||||||
opcode_size = 2;
|
opcode_size = 2;
|
||||||
|
loop = &EQ::EventLoop::GetDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
EQStreamManagerInterfaceOptions(int port, bool encoded, bool compressed) {
|
EQStreamManagerInterfaceOptions(int port, bool encoded, bool compressed) {
|
||||||
@ -41,13 +43,16 @@ struct EQStreamManagerInterfaceOptions
|
|||||||
}
|
}
|
||||||
|
|
||||||
daybreak_options.port = port;
|
daybreak_options.port = port;
|
||||||
|
loop = &EQ::EventLoop::GetDefault();
|
||||||
}
|
}
|
||||||
|
|
||||||
int opcode_size;
|
int opcode_size;
|
||||||
bool track_opcode_stats;
|
bool track_opcode_stats;
|
||||||
EQ::Net::DaybreakConnectionManagerOptions daybreak_options;
|
EQ::Net::DaybreakConnectionManagerOptions daybreak_options;
|
||||||
|
EQ::EventLoop *loop;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class EQStreamInterface;
|
||||||
class EQStreamManagerInterface
|
class EQStreamManagerInterface
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -56,6 +61,9 @@ public:
|
|||||||
|
|
||||||
const EQStreamManagerInterfaceOptions& GetOptions() const { return m_options; }
|
const EQStreamManagerInterfaceOptions& GetOptions() const { return m_options; }
|
||||||
EQStreamManagerInterfaceOptions& MutateOptions() { return m_options; }
|
EQStreamManagerInterfaceOptions& MutateOptions() { return m_options; }
|
||||||
|
|
||||||
|
virtual void OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func) = 0;
|
||||||
|
virtual void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) = 0;
|
||||||
protected:
|
protected:
|
||||||
EQStreamManagerInterfaceOptions m_options;
|
EQStreamManagerInterfaceOptions m_options;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -10,6 +10,7 @@
|
|||||||
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
|
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
|
||||||
{
|
{
|
||||||
m_attached = nullptr;
|
m_attached = nullptr;
|
||||||
|
m_next_id = 1;
|
||||||
memset(&m_timer, 0, sizeof(uv_timer_t));
|
memset(&m_timer, 0, sizeof(uv_timer_t));
|
||||||
memset(&m_socket, 0, sizeof(uv_udp_t));
|
memset(&m_socket, 0, sizeof(uv_udp_t));
|
||||||
|
|
||||||
@ -20,6 +21,7 @@ EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager(const DaybreakConn
|
|||||||
{
|
{
|
||||||
m_attached = nullptr;
|
m_attached = nullptr;
|
||||||
m_options = opts;
|
m_options = opts;
|
||||||
|
m_next_id = 1;
|
||||||
memset(&m_timer, 0, sizeof(uv_timer_t));
|
memset(&m_timer, 0, sizeof(uv_timer_t));
|
||||||
memset(&m_socket, 0, sizeof(uv_udp_t));
|
memset(&m_socket, 0, sizeof(uv_udp_t));
|
||||||
|
|
||||||
@ -94,7 +96,7 @@ void EQ::Net::DaybreakConnectionManager::Connect(const std::string &addr, int po
|
|||||||
{
|
{
|
||||||
//todo dns resolution
|
//todo dns resolution
|
||||||
|
|
||||||
auto connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, addr, port));
|
auto connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, GetNextId(), addr, port));
|
||||||
connection->m_self = connection;
|
connection->m_self = connection;
|
||||||
|
|
||||||
if (m_on_new_connection) {
|
if (m_on_new_connection) {
|
||||||
@ -232,7 +234,7 @@ void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoi
|
|||||||
StaticPacket p((void*)data, size);
|
StaticPacket p((void*)data, size);
|
||||||
auto request = p.GetSerialize<DaybreakConnect>(0);
|
auto request = p.GetSerialize<DaybreakConnect>(0);
|
||||||
|
|
||||||
connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, request, endpoint, port));
|
connection = std::shared_ptr<DaybreakConnection>(new DaybreakConnection(this, GetNextId(), request, endpoint, port));
|
||||||
connection->m_self = connection;
|
connection->m_self = connection;
|
||||||
|
|
||||||
if (m_on_new_connection) {
|
if (m_on_new_connection) {
|
||||||
@ -290,10 +292,18 @@ void EQ::Net::DaybreakConnectionManager::SendDisconnect(const std::string &addr,
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t EQ::Net::DaybreakConnectionManager::GetNextId()
|
||||||
|
{
|
||||||
|
auto id = m_next_id;
|
||||||
|
m_next_id++;
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
//new connection made as server
|
//new connection made as server
|
||||||
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port)
|
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const DaybreakConnect &connect, const std::string &endpoint, int port)
|
||||||
{
|
{
|
||||||
m_owner = owner;
|
m_owner = owner;
|
||||||
|
m_id = id;
|
||||||
m_last_send = Clock::now();
|
m_last_send = Clock::now();
|
||||||
m_last_recv = Clock::now();
|
m_last_recv = Clock::now();
|
||||||
m_status = StatusConnected;
|
m_status = StatusConnected;
|
||||||
@ -316,9 +326,10 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner
|
|||||||
}
|
}
|
||||||
|
|
||||||
//new connection made as client
|
//new connection made as client
|
||||||
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, const std::string &endpoint, int port)
|
EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const std::string &endpoint, int port)
|
||||||
{
|
{
|
||||||
m_owner = owner;
|
m_owner = owner;
|
||||||
|
m_id = id;
|
||||||
m_last_send = Clock::now();
|
m_last_send = Clock::now();
|
||||||
m_last_recv = Clock::now();
|
m_last_recv = Clock::now();
|
||||||
m_status = StatusConnecting;
|
m_status = StatusConnecting;
|
||||||
|
|||||||
@ -119,8 +119,8 @@ namespace EQ
|
|||||||
class DaybreakConnection
|
class DaybreakConnection
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DaybreakConnection(DaybreakConnectionManager *owner, const DaybreakConnect &connect, const std::string &endpoint, int port);
|
DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const DaybreakConnect &connect, const std::string &endpoint, int port);
|
||||||
DaybreakConnection(DaybreakConnectionManager *owner, const std::string &endpoint, int port);
|
DaybreakConnection(DaybreakConnectionManager *owner, uint64_t id, const std::string &endpoint, int port);
|
||||||
~DaybreakConnection();
|
~DaybreakConnection();
|
||||||
|
|
||||||
const std::string& RemoteEndpoint() const { return m_endpoint; }
|
const std::string& RemoteEndpoint() const { return m_endpoint; }
|
||||||
@ -139,6 +139,7 @@ namespace EQ
|
|||||||
const DaybreakEncodeType* GetEncodePasses() const { return m_encode_passes; }
|
const DaybreakEncodeType* GetEncodePasses() const { return m_encode_passes; }
|
||||||
const DaybreakConnectionManager* GetManager() const { return m_owner; }
|
const DaybreakConnectionManager* GetManager() const { return m_owner; }
|
||||||
DaybreakConnectionManager* GetManager() { return m_owner; }
|
DaybreakConnectionManager* GetManager() { return m_owner; }
|
||||||
|
uint64_t GetId() const { return m_id; }
|
||||||
private:
|
private:
|
||||||
DaybreakConnectionManager *m_owner;
|
DaybreakConnectionManager *m_owner;
|
||||||
std::string m_endpoint;
|
std::string m_endpoint;
|
||||||
@ -161,6 +162,7 @@ namespace EQ
|
|||||||
size_t m_rolling_ping;
|
size_t m_rolling_ping;
|
||||||
Timestamp m_close_time;
|
Timestamp m_close_time;
|
||||||
double m_outgoing_budget;
|
double m_outgoing_budget;
|
||||||
|
uint64_t m_id;
|
||||||
|
|
||||||
struct DaybreakSentPacket
|
struct DaybreakSentPacket
|
||||||
{
|
{
|
||||||
@ -306,6 +308,7 @@ namespace EQ
|
|||||||
uv_udp_t m_socket;
|
uv_udp_t m_socket;
|
||||||
uv_loop_t *m_attached;
|
uv_loop_t *m_attached;
|
||||||
DaybreakConnectionManagerOptions m_options;
|
DaybreakConnectionManagerOptions m_options;
|
||||||
|
uint64_t m_next_id;
|
||||||
std::function<void(std::shared_ptr<DaybreakConnection>)> m_on_new_connection;
|
std::function<void(std::shared_ptr<DaybreakConnection>)> m_on_new_connection;
|
||||||
std::function<void(std::shared_ptr<DaybreakConnection>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
|
std::function<void(std::shared_ptr<DaybreakConnection>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
|
||||||
std::function<void(std::shared_ptr<DaybreakConnection>, const Packet&)> m_on_packet_recv;
|
std::function<void(std::shared_ptr<DaybreakConnection>, const Packet&)> m_on_packet_recv;
|
||||||
@ -315,6 +318,7 @@ namespace EQ
|
|||||||
void ProcessPacket(const std::string &endpoint, int port, const char *data, size_t size);
|
void ProcessPacket(const std::string &endpoint, int port, const char *data, size_t size);
|
||||||
std::shared_ptr<DaybreakConnection> FindConnectionByEndpoint(std::string addr, int port);
|
std::shared_ptr<DaybreakConnection> FindConnectionByEndpoint(std::string addr, int port);
|
||||||
void SendDisconnect(const std::string &addr, int port);
|
void SendDisconnect(const std::string &addr, int port);
|
||||||
|
uint64_t GetNextId();
|
||||||
|
|
||||||
friend class DaybreakConnection;
|
friend class DaybreakConnection;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -19,12 +19,12 @@ namespace EQ
|
|||||||
EQStreamManager(const EQStreamManagerInterfaceOptions &options);
|
EQStreamManager(const EQStreamManagerInterfaceOptions &options);
|
||||||
~EQStreamManager();
|
~EQStreamManager();
|
||||||
|
|
||||||
void OnNewConnection(std::function<void(std::shared_ptr<EQStream>)> func) { m_on_new_connection = func; }
|
virtual void OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func) { m_on_new_connection = func; }
|
||||||
void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStream>, DbProtocolStatus, DbProtocolStatus)> func) { m_on_connection_state_change = func; }
|
virtual void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) { m_on_connection_state_change = func; }
|
||||||
private:
|
private:
|
||||||
DaybreakConnectionManager m_daybreak;
|
DaybreakConnectionManager m_daybreak;
|
||||||
std::function<void(std::shared_ptr<EQStream>)> m_on_new_connection;
|
std::function<void(std::shared_ptr<EQStreamInterface>)> m_on_new_connection;
|
||||||
std::function<void(std::shared_ptr<EQStream>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
|
std::function<void(std::shared_ptr<EQStreamInterface>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
|
||||||
std::map<std::shared_ptr<DaybreakConnection>, std::shared_ptr<EQStream>> m_streams;
|
std::map<std::shared_ptr<DaybreakConnection>, std::shared_ptr<EQStream>> m_streams;
|
||||||
|
|
||||||
void DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection);
|
void DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection);
|
||||||
|
|||||||
@ -1,36 +1,653 @@
|
|||||||
#include "eqstream_concurrent.h"
|
#include "eqstream_concurrent.h"
|
||||||
|
#include "eqstream_concurrent_message.h"
|
||||||
|
#include "../event/event_loop.h"
|
||||||
|
#include "../event/timer.h"
|
||||||
|
#include "../string_util.h"
|
||||||
|
#include "../opcodemgr.h"
|
||||||
|
#include "daybreak_connection.h"
|
||||||
|
#include <thread>
|
||||||
|
#include <concurrentqueue.h>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <queue>
|
||||||
|
#include <deque>
|
||||||
|
#include <list>
|
||||||
|
|
||||||
struct EQ::Net::ConcurrentEQStreamManager::Impl
|
struct EQ::Net::ConcurrentEQStreamManager::Impl
|
||||||
{
|
{
|
||||||
|
std::thread background;
|
||||||
|
bool background_running;
|
||||||
|
moodycamel::ConcurrentQueue<ceqs_msg_t> foreground_queue;
|
||||||
|
moodycamel::ConcurrentQueue<ceqs_msg_t> background_queue;
|
||||||
|
std::unordered_map<uint64_t, std::shared_ptr<DaybreakConnection>> connections;
|
||||||
|
std::unique_ptr<EQ::Timer> foreground_loop_timer;
|
||||||
|
std::unique_ptr<EQ::Timer> background_loop_timer;
|
||||||
|
std::unique_ptr<EQ::Timer> background_update_stats_timer;
|
||||||
|
std::unordered_map<uint64_t, std::shared_ptr<ConcurrentEQStream>> streams;
|
||||||
|
std::function<void(std::shared_ptr<EQStreamInterface>)> on_new_connection;
|
||||||
|
std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> on_connection_state_change;
|
||||||
};
|
};
|
||||||
|
|
||||||
EQ::Net::ConcurrentEQStreamManager::ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options)
|
EQ::Net::ConcurrentEQStreamManager::ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options)
|
||||||
: EQStreamManagerInterface(options)
|
: EQStreamManagerInterface(options)
|
||||||
{
|
{
|
||||||
_impl.reset(new Impl());
|
_impl.reset(new Impl());
|
||||||
|
_impl->background = std::thread(std::bind(&ConcurrentEQStreamManager::_BackgroundThread, this));
|
||||||
|
_impl->foreground_loop_timer.reset(new EQ::Timer(options.loop, 16, true,
|
||||||
|
std::bind(&ConcurrentEQStreamManager::_ForegroundTimer, this, std::placeholders::_1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager()
|
EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager()
|
||||||
{
|
{
|
||||||
|
for (auto &s : _impl->streams) {
|
||||||
|
s.second->_Invalidate();
|
||||||
|
}
|
||||||
|
|
||||||
|
_impl->foreground_loop_timer.release();
|
||||||
|
|
||||||
|
//Tell the background to shutdown and wait for it to actually do so
|
||||||
|
ceqs_terminate_msg_t msg;
|
||||||
|
msg.type = TerminateBackground;
|
||||||
|
|
||||||
|
_PushToBackgroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
_impl->background.join();
|
||||||
|
|
||||||
|
//Go through our incoming messages to make sure we clean up any packets in that need to be freed
|
||||||
|
ceqs_msg_t eqs_msg;
|
||||||
|
|
||||||
|
while (_impl->foreground_queue.try_dequeue(eqs_msg)) {
|
||||||
|
if (eqs_msg.type == PacketRecv) {
|
||||||
|
ConcurrentEQStreamPacketRecvMessage *eqs_msg_in = (ConcurrentEQStreamPacketRecvMessage*)&eqs_msg;
|
||||||
|
|
||||||
|
delete eqs_msg_in->packet;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void EQ::Net::ConcurrentEQStreamManager::OnNewConnection(std::function<void(std::shared_ptr<ConcurrentEQStream>)> func)
|
void EQ::Net::ConcurrentEQStreamManager::_BackgroundThread() {
|
||||||
|
_impl->background_running = true;
|
||||||
|
EQ::EventLoop loop;
|
||||||
|
auto &eqs_opts = GetOptions();
|
||||||
|
auto opts = eqs_opts.daybreak_options;
|
||||||
|
opts.loop = &loop;
|
||||||
|
|
||||||
|
std::unique_ptr<DaybreakConnectionManager> dbcm(new DaybreakConnectionManager(opts));
|
||||||
|
dbcm->OnNewConnection(std::bind(&ConcurrentEQStreamManager::DaybreakNewConnection, this, std::placeholders::_1));
|
||||||
|
dbcm->OnConnectionStateChange(std::bind(&ConcurrentEQStreamManager::DaybreakConnectionStateChange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
|
||||||
|
dbcm->OnPacketRecv(std::bind(&ConcurrentEQStreamManager::DaybreakPacketRecv, this, std::placeholders::_1, std::placeholders::_2));
|
||||||
|
|
||||||
|
_impl->background_loop_timer.reset(new EQ::Timer(&loop, 16, true,
|
||||||
|
std::bind(&ConcurrentEQStreamManager::_BackgroundTimer, this, std::placeholders::_1)));
|
||||||
|
|
||||||
|
_impl->background_update_stats_timer.reset(new EQ::Timer(&loop, 500, true,
|
||||||
|
std::bind(&ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer, this, std::placeholders::_1)));
|
||||||
|
|
||||||
|
while (true == _impl->background_running) {
|
||||||
|
loop.Process();
|
||||||
|
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
_impl->background_loop_timer.release();
|
||||||
|
_impl->background_update_stats_timer.release();
|
||||||
|
dbcm.release();
|
||||||
|
|
||||||
|
ceqs_msg_t eqs_msg;
|
||||||
|
while (_impl->background_queue.try_dequeue(eqs_msg)) {
|
||||||
|
if (eqs_msg.type == PacketRecv) {
|
||||||
|
ConcurrentEQStreamPacketRecvMessage *eqs_msg_in = (ConcurrentEQStreamPacketRecvMessage*)&eqs_msg;
|
||||||
|
delete eqs_msg_in->packet;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by background
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::_BackgroundTimer(EQ::Timer * t)
|
||||||
{
|
{
|
||||||
|
ceqs_msg_t msg_queue[16];
|
||||||
|
size_t count = 0;
|
||||||
|
while ((count = _impl->background_queue.try_dequeue_bulk(msg_queue, 16)) != 0) {
|
||||||
|
for (size_t i = 0; i < count; ++i) {
|
||||||
|
_ProcessBackgroundMessage(msg_queue[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::function<void(std::shared_ptr<ConcurrentEQStream>, DbProtocolStatus, DbProtocolStatus)> func)
|
void EQ::Net::ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer(EQ::Timer *t)
|
||||||
{
|
{
|
||||||
|
ceqs_msg_t msgs[16];
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
for (auto &c : _impl->connections) {
|
||||||
|
auto &connection = c.second;
|
||||||
|
auto msg = (ceqs_update_daybreak_stats_msg_t*)&msgs[i];
|
||||||
|
|
||||||
|
msg->type = ceqs_msg_type::UpdateDaybreakStats;
|
||||||
|
msg->stream_id = connection->GetId();
|
||||||
|
msg->stats = connection->GetStats();
|
||||||
|
i++;
|
||||||
|
|
||||||
|
printf("Sending stats to client %u\n", connection->GetId());
|
||||||
|
if (i >= 16) {
|
||||||
|
_impl->background_queue.enqueue_bulk(msgs, 16);
|
||||||
|
i = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i > 0) {
|
||||||
|
_impl->background_queue.enqueue_bulk(msgs, i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Called by background
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_msg_t &msg)
|
||||||
|
{
|
||||||
|
switch (msg.type) {
|
||||||
|
case QueuePacket:
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamQueuePacketMessage *msg_in = (ConcurrentEQStreamQueuePacketMessage*)&msg;
|
||||||
|
printf("(background) Packet Queue for %u with %u bytes with ack: %s\n", msg_in->stream_id, msg_in->packet->Length(), msg_in->ack_req ? "true" : "false");
|
||||||
|
|
||||||
|
auto iter = _impl->connections.find(msg_in->stream_id);
|
||||||
|
if (iter != _impl->connections.end()) {
|
||||||
|
iter->second->QueuePacket(*msg_in->packet, 0, msg_in->ack_req);
|
||||||
|
}
|
||||||
|
|
||||||
|
delete msg_in->packet;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TerminateBackground:
|
||||||
|
{
|
||||||
|
_impl->background_running = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CloseConnection:
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamCloseConnectionMessage *msg_in = (ConcurrentEQStreamCloseConnectionMessage*)&msg;
|
||||||
|
auto iter = _impl->connections.find(msg_in->stream_id);
|
||||||
|
if (iter != _impl->connections.end()) {
|
||||||
|
iter->second->Close();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ResetStats:
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamResetStatsMessage *msg_in = (ConcurrentEQStreamResetStatsMessage*)&msg;
|
||||||
|
auto iter = _impl->connections.find(msg_in->stream_id);
|
||||||
|
if (iter != _impl->connections.end()) {
|
||||||
|
iter->second->ResetStats();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
printf("(background) New message with unhandled type %u\n", (int)msg.type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::_ForegroundTimer(EQ::Timer *t)
|
||||||
|
{
|
||||||
|
ceqs_msg_t msg_queue[16];
|
||||||
|
size_t count = 0;
|
||||||
|
while ((count = _impl->foreground_queue.try_dequeue_bulk(msg_queue, 16)) != 0) {
|
||||||
|
for (size_t i = 0; i < count; ++i) {
|
||||||
|
_ProcessForegroundMessage(msg_queue[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_msg_t &msg)
|
||||||
|
{
|
||||||
|
switch (msg.type) {
|
||||||
|
case NewConnection:
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamNewConnectionMessage *msg_in = (ConcurrentEQStreamNewConnectionMessage*)&msg;
|
||||||
|
printf("(foreground) New connection from %s:%u with id: %u\n", msg_in->endpoint, msg_in->remote_port, msg_in->stream_id);
|
||||||
|
|
||||||
|
std::shared_ptr<ConcurrentEQStream> stream(new ConcurrentEQStream(this,
|
||||||
|
msg_in->stream_id,
|
||||||
|
msg_in->endpoint,
|
||||||
|
msg_in->remote_port,
|
||||||
|
(DbProtocolStatus)msg_in->state));
|
||||||
|
|
||||||
|
_impl->streams.insert(std::make_pair(msg_in->stream_id, stream));
|
||||||
|
if (_impl->on_new_connection) {
|
||||||
|
_impl->on_new_connection(stream);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case ConnectionStateChange:
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamConnectionStateChangeMessage *msg_in = (ConcurrentEQStreamConnectionStateChangeMessage*)&msg;
|
||||||
|
printf("(foreground) Connection State Change for %u, was %u now is %u\n", msg_in->stream_id, msg_in->from, msg_in->to);
|
||||||
|
|
||||||
|
|
||||||
|
auto iter = _impl->streams.find(msg_in->stream_id);
|
||||||
|
if (iter != _impl->streams.end()) {
|
||||||
|
iter->second->_SetState((DbProtocolStatus)msg_in->to);
|
||||||
|
|
||||||
|
if ((DbProtocolStatus)msg_in->to == DbProtocolStatus::StatusDisconnected || (DbProtocolStatus)msg_in->to == DbProtocolStatus::StatusDisconnecting) {
|
||||||
|
_impl->streams.erase(iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PacketRecv:
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamPacketRecvMessage *msg_in = (ConcurrentEQStreamPacketRecvMessage*)&msg;
|
||||||
|
printf("(foreground) Packet Recv for %u with %u bytes\n", msg_in->stream_id, msg_in->packet->Length());
|
||||||
|
std::unique_ptr<EQ::Net::Packet> p(msg_in->packet);
|
||||||
|
|
||||||
|
auto iter = _impl->streams.find(msg_in->stream_id);
|
||||||
|
if (iter != _impl->streams.end()) {
|
||||||
|
iter->second->_RecvPacket(std::move(p));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case UpdateDaybreakStats:
|
||||||
|
{
|
||||||
|
ceqs_update_daybreak_stats_msg_t *msg_in = (ceqs_update_daybreak_stats_msg_t*)&msg;
|
||||||
|
auto iter = _impl->streams.find(msg_in->stream_id);
|
||||||
|
if (iter != _impl->streams.end()) {
|
||||||
|
iter->second->_UpdateStats(msg_in->stats);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::_PushToBackgroundQueue(ceqs_msg_t *msg)
|
||||||
|
{
|
||||||
|
_impl->background_queue.enqueue(*msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::_PushToForegroundQueue(ceqs_msg_t *msg)
|
||||||
|
{
|
||||||
|
_impl->foreground_queue.enqueue(*msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func)
|
||||||
|
{
|
||||||
|
_impl->on_new_connection = func;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, DbProtocolStatus, DbProtocolStatus)> func)
|
||||||
|
{
|
||||||
|
_impl->on_connection_state_change = func;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by background
|
||||||
void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection)
|
void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection)
|
||||||
{
|
{
|
||||||
|
_impl->connections.insert(std::make_pair(connection->GetId(), connection));
|
||||||
|
ConcurrentEQStreamNewConnectionMessage msg;
|
||||||
|
msg.type = ceqs_msg_type::NewConnection;
|
||||||
|
msg.stream_id = connection->GetId();
|
||||||
|
msg.remote_port = connection->RemotePort();
|
||||||
|
msg.state = connection->GetStatus();
|
||||||
|
strcpy(msg.endpoint, connection->RemoteEndpoint().c_str());
|
||||||
|
msg.endpoint[connection->RemoteEndpoint().length()] = 0;
|
||||||
|
|
||||||
|
//Make sure the foreground gets this message
|
||||||
|
_PushToForegroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
printf("(background) New connection from %s:%u with id: %u\n", connection->RemoteEndpoint().c_str(), connection->RemotePort(), connection->GetId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Called by background
|
||||||
void EQ::Net::ConcurrentEQStreamManager::DaybreakConnectionStateChange(std::shared_ptr<DaybreakConnection> connection, DbProtocolStatus from, DbProtocolStatus to)
|
void EQ::Net::ConcurrentEQStreamManager::DaybreakConnectionStateChange(std::shared_ptr<DaybreakConnection> connection, DbProtocolStatus from, DbProtocolStatus to)
|
||||||
{
|
{
|
||||||
|
if (to == DbProtocolStatus::StatusDisconnecting || to == DbProtocolStatus::StatusDisconnected) {
|
||||||
|
auto iter = _impl->connections.find(connection->GetId());
|
||||||
|
if (iter != _impl->connections.end()) {
|
||||||
|
_impl->connections.erase(iter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentEQStreamConnectionStateChangeMessage msg;
|
||||||
|
msg.type = ceqs_msg_type::ConnectionStateChange;
|
||||||
|
msg.stream_id = connection->GetId();
|
||||||
|
msg.from = (int)from;
|
||||||
|
msg.to = (int)to;
|
||||||
|
|
||||||
|
//Make sure the foreground gets this message
|
||||||
|
_PushToForegroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
printf("(background) Connection State Change for %u, was %u now is %u\n", connection->GetId(), (int)from, (int)to);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Called by background
|
||||||
void EQ::Net::ConcurrentEQStreamManager::DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p)
|
void EQ::Net::ConcurrentEQStreamManager::DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p)
|
||||||
{
|
{
|
||||||
|
ConcurrentEQStreamPacketRecvMessage msg;
|
||||||
|
msg.type = ceqs_msg_type::PacketRecv;
|
||||||
|
msg.stream_id = connection->GetId();
|
||||||
|
msg.packet = new DynamicPacket();
|
||||||
|
msg.packet->PutPacket(0, p);
|
||||||
|
|
||||||
|
//Make sure the foreground gets this message
|
||||||
|
_PushToForegroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
printf("(background) Packet Recv for %u with %u bytes\n", connection->GetId(), p.Length());
|
||||||
|
}
|
||||||
|
|
||||||
|
struct EQ::Net::ConcurrentEQStream::Impl
|
||||||
|
{
|
||||||
|
ConcurrentEQStreamManager *parent;
|
||||||
|
uint64_t id;
|
||||||
|
std::string remote_endpoint;
|
||||||
|
int remote_port;
|
||||||
|
uint32_t remote_ip;
|
||||||
|
DbProtocolStatus state;
|
||||||
|
std::deque<std::unique_ptr<EQ::Net::Packet>> packet_queue;
|
||||||
|
OpcodeManager **opcode_manager;
|
||||||
|
DaybreakConnectionStats stats;
|
||||||
|
};
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQ::Net::ConcurrentEQStream::ConcurrentEQStream(ConcurrentEQStreamManager *parent, uint64_t id, const std::string &remote_endpoint, int remote_port, DbProtocolStatus state)
|
||||||
|
{
|
||||||
|
_impl.reset(new Impl());
|
||||||
|
_impl->parent = parent;
|
||||||
|
_impl->id = id;
|
||||||
|
_impl->remote_endpoint = remote_endpoint;
|
||||||
|
_impl->remote_port = remote_port;
|
||||||
|
_impl->remote_ip = inet_addr(remote_endpoint.c_str());
|
||||||
|
_impl->state = state;
|
||||||
|
_impl->opcode_manager = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQ::Net::ConcurrentEQStream::~ConcurrentEQStream()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::QueuePacket(const EQApplicationPacket *p, bool ack_req)
|
||||||
|
{
|
||||||
|
if (!_impl->parent) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_impl->opcode_manager && *_impl->opcode_manager) {
|
||||||
|
auto &options = _impl->parent->GetOptions();
|
||||||
|
uint16 opcode = 0;
|
||||||
|
if (p->GetOpcodeBypass() != 0) {
|
||||||
|
opcode = p->GetOpcodeBypass();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (options.track_opcode_stats) {
|
||||||
|
//m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
|
||||||
|
}
|
||||||
|
opcode = (*_impl->opcode_manager)->EmuToEQ(p->GetOpcode());
|
||||||
|
}
|
||||||
|
|
||||||
|
EQ::Net::DynamicPacket *out = new EQ::Net::DynamicPacket();
|
||||||
|
switch (options.opcode_size) {
|
||||||
|
case 1:
|
||||||
|
out->PutUInt8(0, opcode);
|
||||||
|
out->PutData(1, p->pBuffer, p->size);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
out->PutUInt16(0, opcode);
|
||||||
|
out->PutData(2, p->pBuffer, p->size);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentEQStreamQueuePacketMessage msg;
|
||||||
|
msg.type = ceqs_msg_type::QueuePacket;
|
||||||
|
msg.stream_id = _impl->id;
|
||||||
|
msg.packet = out;
|
||||||
|
msg.ack_req = ack_req;
|
||||||
|
|
||||||
|
//Make sure the background gets this message
|
||||||
|
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
|
||||||
|
printf("(foreground) Packet Queue for %u with %u bytes with ack: %s\n", _impl->id, out->Length(), ack_req ? "true" : "false");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::FastQueuePacket(EQApplicationPacket **p, bool ack_req)
|
||||||
|
{
|
||||||
|
std::unique_ptr<EQApplicationPacket> app(*p);
|
||||||
|
QueuePacket(app.get(), ack_req);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQApplicationPacket *EQ::Net::ConcurrentEQStream::PopPacket()
|
||||||
|
{
|
||||||
|
if (!_impl->parent) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_impl->packet_queue.empty()) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_impl->opcode_manager != nullptr && *_impl->opcode_manager != nullptr) {
|
||||||
|
auto &options = _impl->parent->GetOptions();
|
||||||
|
auto &p = _impl->packet_queue.front();
|
||||||
|
|
||||||
|
uint16 opcode = 0;
|
||||||
|
switch (options.opcode_size) {
|
||||||
|
case 1:
|
||||||
|
opcode = p->GetUInt8(0);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
opcode = p->GetUInt16(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
EmuOpcode emu_op = (*_impl->opcode_manager)->EQToEmu(opcode);
|
||||||
|
if (options.track_opcode_stats) {
|
||||||
|
//m_packet_recv_count[emu_op]++;
|
||||||
|
}
|
||||||
|
|
||||||
|
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + options.opcode_size, p->Length() - options.opcode_size);
|
||||||
|
ret->SetProtocolOpcode(opcode);
|
||||||
|
_impl->packet_queue.pop_front();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::Close()
|
||||||
|
{
|
||||||
|
if (!_impl->parent) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentEQStreamCloseConnectionMessage msg;
|
||||||
|
msg.type = CloseConnection;
|
||||||
|
msg.stream_id = _impl->id;
|
||||||
|
|
||||||
|
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::ReleaseFromUse()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::RemoveData()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
std::string EQ::Net::ConcurrentEQStream::GetRemoteAddr() const
|
||||||
|
{
|
||||||
|
return _impl->remote_endpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
uint32 EQ::Net::ConcurrentEQStream::GetRemoteIP() const
|
||||||
|
{
|
||||||
|
return _impl->remote_ip;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
uint16 EQ::Net::ConcurrentEQStream::GetRemotePort() const
|
||||||
|
{
|
||||||
|
return _impl->remote_port;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
bool EQ::Net::ConcurrentEQStream::CheckState(EQStreamState state)
|
||||||
|
{
|
||||||
|
return GetState() == state;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
std::string EQ::Net::ConcurrentEQStream::Describe() const
|
||||||
|
{
|
||||||
|
return "Concurrent EQStream";
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::SetActive(bool val)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQStreamInterface::MatchState EQ::Net::ConcurrentEQStream::CheckSignature(const Signature *sig)
|
||||||
|
{
|
||||||
|
if (!_impl->parent) {
|
||||||
|
return MatchFailed;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_impl->packet_queue.empty()) {
|
||||||
|
auto& options = _impl->parent->GetOptions();
|
||||||
|
auto p = _impl->packet_queue.front().get();
|
||||||
|
uint16 opcode = 0;
|
||||||
|
size_t length = p->Length() - options.opcode_size;
|
||||||
|
switch (options.opcode_size) {
|
||||||
|
case 1:
|
||||||
|
opcode = p->GetUInt8(0);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
opcode = p->GetUInt16(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sig->ignore_eq_opcode != 0 && opcode == sig->ignore_eq_opcode) {
|
||||||
|
if (_impl->packet_queue.size() > 1) {
|
||||||
|
p = _impl->packet_queue[1].get();
|
||||||
|
opcode = 0;
|
||||||
|
length = p->Length() - options.opcode_size;
|
||||||
|
switch (options.opcode_size) {
|
||||||
|
case 1:
|
||||||
|
opcode = p->GetUInt8(0);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
opcode = p->GetUInt16(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return MatchNotReady;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opcode == sig->first_eq_opcode) {
|
||||||
|
if (length == sig->first_length) {
|
||||||
|
// LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length matched {3}",
|
||||||
|
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length);
|
||||||
|
return MatchSuccessful;
|
||||||
|
}
|
||||||
|
else if (length == 0) {
|
||||||
|
// LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length is ignored.",
|
||||||
|
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode);
|
||||||
|
return MatchSuccessful;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} but length {3} did not match expected {4}",
|
||||||
|
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length, sig->first_length);
|
||||||
|
return MatchFailed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
//LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode {1:#x} did not match expected {2:#x}",
|
||||||
|
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), opcode, sig->first_eq_opcode);
|
||||||
|
return MatchFailed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return MatchNotReady;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQStreamState EQ::Net::ConcurrentEQStream::GetState()
|
||||||
|
{
|
||||||
|
switch (_impl->state) {
|
||||||
|
case StatusConnecting:
|
||||||
|
return UNESTABLISHED;
|
||||||
|
case StatusConnected:
|
||||||
|
return ESTABLISHED;
|
||||||
|
case StatusDisconnecting:
|
||||||
|
return DISCONNECTING;
|
||||||
|
default:
|
||||||
|
return CLOSED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::SetOpcodeManager(OpcodeManager **opm)
|
||||||
|
{
|
||||||
|
_impl->opcode_manager = opm;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQStreamInterface::Stats EQ::Net::ConcurrentEQStream::GetStats() const
|
||||||
|
{
|
||||||
|
EQStreamInterface::Stats ret;
|
||||||
|
ret.DaybreakStats = _impl->stats;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::ResetStats()
|
||||||
|
{
|
||||||
|
if (!_impl->parent) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ConcurrentEQStreamResetStatsMessage msg;
|
||||||
|
msg.type = ceqs_msg_type::ResetStats;
|
||||||
|
msg.stream_id = _impl->id;
|
||||||
|
|
||||||
|
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
EQStreamManagerInterface *EQ::Net::ConcurrentEQStream::GetManager() const
|
||||||
|
{
|
||||||
|
return _impl->parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::_SetState(DbProtocolStatus state)
|
||||||
|
{
|
||||||
|
_impl->state = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::_RecvPacket(std::unique_ptr<EQ::Net::Packet> p)
|
||||||
|
{
|
||||||
|
_impl->packet_queue.push_back(std::move(p));
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::_UpdateStats(const DaybreakConnectionStats &stats)
|
||||||
|
{
|
||||||
|
_impl->stats = stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Called by foreground
|
||||||
|
void EQ::Net::ConcurrentEQStream::_Invalidate()
|
||||||
|
{
|
||||||
|
_impl->parent = nullptr;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,12 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "../eq_stream_intf.h"
|
#include "../eq_stream_intf.h"
|
||||||
|
#include "eqstream_concurrent_message.h"
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
namespace EQ
|
namespace EQ
|
||||||
{
|
{
|
||||||
|
class Timer;
|
||||||
namespace Net
|
namespace Net
|
||||||
{
|
{
|
||||||
class ConcurrentEQStream;
|
class ConcurrentEQStream;
|
||||||
@ -14,22 +16,30 @@ namespace EQ
|
|||||||
ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options);
|
ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options);
|
||||||
~ConcurrentEQStreamManager();
|
~ConcurrentEQStreamManager();
|
||||||
|
|
||||||
void OnNewConnection(std::function<void(std::shared_ptr<ConcurrentEQStream>)> func);
|
virtual void OnNewConnection(std::function<void(std::shared_ptr<EQStreamInterface>)> func);
|
||||||
void OnConnectionStateChange(std::function<void(std::shared_ptr<ConcurrentEQStream>, DbProtocolStatus, DbProtocolStatus)> func);
|
virtual void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStreamInterface>, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func);
|
||||||
|
|
||||||
|
void _PushToBackgroundQueue(ceqs_msg_t* msg);
|
||||||
|
void _PushToForegroundQueue(ceqs_msg_t* msg);
|
||||||
private:
|
private:
|
||||||
struct Impl;
|
struct Impl;
|
||||||
std::unique_ptr<Impl> _impl;
|
std::unique_ptr<Impl> _impl;
|
||||||
|
void _BackgroundThread();
|
||||||
|
void _BackgroundTimer(EQ::Timer *t);
|
||||||
|
void _BackgroundUpdateStatsTimer(EQ::Timer *t);
|
||||||
|
void _ProcessBackgroundMessage(const ceqs_msg_t &msg);
|
||||||
|
void _ForegroundTimer(EQ::Timer *t);
|
||||||
|
void _ProcessForegroundMessage(const ceqs_msg_t &msg);
|
||||||
|
|
||||||
void DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection);
|
void DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection);
|
||||||
void DaybreakConnectionStateChange(std::shared_ptr<DaybreakConnection> connection, DbProtocolStatus from, DbProtocolStatus to);
|
void DaybreakConnectionStateChange(std::shared_ptr<DaybreakConnection> connection, DbProtocolStatus from, DbProtocolStatus to);
|
||||||
void DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p);
|
void DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p);
|
||||||
friend class EQStream;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class ConcurrentEQStream : public EQStreamInterface
|
class ConcurrentEQStream : public EQStreamInterface
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ConcurrentEQStream(EQStreamManagerInterface *parent, uint64_t id);
|
ConcurrentEQStream(ConcurrentEQStreamManager *parent, uint64_t id, const std::string &remote_endpoint, int remote_port, DbProtocolStatus state);
|
||||||
~ConcurrentEQStream();
|
~ConcurrentEQStream();
|
||||||
|
|
||||||
virtual void QueuePacket(const EQApplicationPacket *p, bool ack_req = true);
|
virtual void QueuePacket(const EQApplicationPacket *p, bool ack_req = true);
|
||||||
@ -50,11 +60,15 @@ namespace EQ
|
|||||||
virtual Stats GetStats() const;
|
virtual Stats GetStats() const;
|
||||||
virtual void ResetStats();
|
virtual void ResetStats();
|
||||||
virtual EQStreamManagerInterface* GetManager() const;
|
virtual EQStreamManagerInterface* GetManager() const;
|
||||||
|
|
||||||
|
void _SetState(DbProtocolStatus state);
|
||||||
|
void _RecvPacket(std::unique_ptr<EQ::Net::Packet> p);
|
||||||
|
void _UpdateStats(const DaybreakConnectionStats &stats);
|
||||||
|
void _Invalidate();
|
||||||
private:
|
private:
|
||||||
struct Impl;
|
struct Impl;
|
||||||
|
|
||||||
std::unique_ptr<Impl> _impl;
|
std::unique_ptr<Impl> _impl;
|
||||||
friend class ConcurrentEQStreamManager;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ ClientManager::ClientManager()
|
|||||||
run_server = false;
|
run_server = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
titanium_stream->OnNewConnection([this](std::shared_ptr<EQ::Net::EQStream> stream) {
|
titanium_stream->OnNewConnection([this](std::shared_ptr<EQStreamInterface> stream) {
|
||||||
LogF(Logs::General, Logs::Login_Server, "New Titanium client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
|
LogF(Logs::General, Logs::Login_Server, "New Titanium client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
|
||||||
stream->SetOpcodeManager(&titanium_ops);
|
stream->SetOpcodeManager(&titanium_ops);
|
||||||
Client *c = new Client(stream, cv_titanium);
|
Client *c = new Client(stream, cv_titanium);
|
||||||
@ -55,7 +55,7 @@ ClientManager::ClientManager()
|
|||||||
run_server = false;
|
run_server = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
sod_stream->OnNewConnection([this](std::shared_ptr<EQ::Net::EQStream> stream) {
|
sod_stream->OnNewConnection([this](std::shared_ptr<EQStreamInterface> stream) {
|
||||||
LogF(Logs::General, Logs::Login_Server, "New SoD client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
|
LogF(Logs::General, Logs::Login_Server, "New SoD client connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
|
||||||
stream->SetOpcodeManager(&sod_ops);
|
stream->SetOpcodeManager(&sod_ops);
|
||||||
Client *c = new Client(stream, cv_sod);
|
Client *c = new Client(stream, cv_sod);
|
||||||
|
|||||||
@ -482,7 +482,7 @@ Clientlist::Clientlist(int ChatPort) {
|
|||||||
if (!ChatOpMgr->LoadOpcodes("mail_opcodes.conf"))
|
if (!ChatOpMgr->LoadOpcodes("mail_opcodes.conf"))
|
||||||
exit(1);
|
exit(1);
|
||||||
|
|
||||||
chatsf->OnNewConnection([this](std::shared_ptr<EQ::Net::EQStream> stream) {
|
chatsf->OnNewConnection([this](std::shared_ptr<EQStreamInterface> stream) {
|
||||||
LogF(Logs::General, Logs::Login_Server, "New Client UDP connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
|
LogF(Logs::General, Logs::Login_Server, "New Client UDP connection from {0}:{1}", stream->GetRemoteIP(), stream->GetRemotePort());
|
||||||
stream->SetOpcodeManager(&ChatOpMgr);
|
stream->SetOpcodeManager(&ChatOpMgr);
|
||||||
|
|
||||||
|
|||||||
@ -33,7 +33,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|||||||
#include "../common/version.h"
|
#include "../common/version.h"
|
||||||
#include "../common/eqtime.h"
|
#include "../common/eqtime.h"
|
||||||
#include "../common/event/event_loop.h"
|
#include "../common/event/event_loop.h"
|
||||||
#include "../common/net/eqstream.h"
|
#include "../common/net/eqstream_concurrent.h"
|
||||||
#include "../common/opcodemgr.h"
|
#include "../common/opcodemgr.h"
|
||||||
#include "../common/guilds.h"
|
#include "../common/guilds.h"
|
||||||
#include "../common/eq_stream_ident.h"
|
#include "../common/eq_stream_ident.h"
|
||||||
@ -505,7 +505,7 @@ int main(int argc, char** argv) {
|
|||||||
opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
|
opts.daybreak_options.resend_delay_max = RuleI(Network, ResendDelayMaxMS);
|
||||||
opts.daybreak_options.outgoing_data_rate = RuleR(Network, ClientDataRate);
|
opts.daybreak_options.outgoing_data_rate = RuleR(Network, ClientDataRate);
|
||||||
|
|
||||||
EQ::Net::EQStreamManager eqsm(opts);
|
EQ::Net::ConcurrentEQStreamManager eqsm(opts);
|
||||||
|
|
||||||
//register all the patches we have avaliable with the stream identifier.
|
//register all the patches we have avaliable with the stream identifier.
|
||||||
EQStreamIdentifier stream_identifier;
|
EQStreamIdentifier stream_identifier;
|
||||||
@ -520,7 +520,7 @@ int main(int argc, char** argv) {
|
|||||||
std::shared_ptr<EQStreamInterface> eqs;
|
std::shared_ptr<EQStreamInterface> eqs;
|
||||||
EQStreamInterface *eqsi;
|
EQStreamInterface *eqsi;
|
||||||
|
|
||||||
eqsm.OnNewConnection([&stream_identifier](std::shared_ptr<EQ::Net::EQStream> stream) {
|
eqsm.OnNewConnection([&stream_identifier](std::shared_ptr<EQStreamInterface> stream) {
|
||||||
stream_identifier.AddStream(stream);
|
stream_identifier.AddStream(stream);
|
||||||
LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort()));
|
LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort()));
|
||||||
});
|
});
|
||||||
|
|||||||
@ -510,7 +510,7 @@ int main(int argc, char** argv) {
|
|||||||
eqsm.reset(new EQ::Net::EQStreamManager(opts));
|
eqsm.reset(new EQ::Net::EQStreamManager(opts));
|
||||||
eqsf_open = true;
|
eqsf_open = true;
|
||||||
|
|
||||||
eqsm->OnNewConnection([&stream_identifier](std::shared_ptr<EQ::Net::EQStream> stream) {
|
eqsm->OnNewConnection([&stream_identifier](std::shared_ptr<EQStreamInterface> stream) {
|
||||||
stream_identifier.AddStream(stream);
|
stream_identifier.AddStream(stream);
|
||||||
LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort()));
|
LogF(Logs::Detail, Logs::World_Server, "New connection from IP {0}:{1}", stream->GetRemoteIP(), ntohs(stream->GetRemotePort()));
|
||||||
});
|
});
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user