mirror of
https://github.com/EQEmu/Server.git
synced 2026-05-31 13:16:39 +00:00
Bug fixes and implementation cleanup
This commit is contained in:
@@ -65,9 +65,7 @@ void EQ::Net::EQStream::QueuePacket(const EQApplicationPacket *p, bool ack_req)
|
||||
opcode = p->GetOpcodeBypass();
|
||||
}
|
||||
else {
|
||||
if (m_owner->GetOptions().track_opcode_stats) {
|
||||
m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
|
||||
}
|
||||
m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
|
||||
opcode = (*m_opcode_manager)->EmuToEQ(p->GetOpcode());
|
||||
}
|
||||
|
||||
@@ -117,9 +115,7 @@ EQApplicationPacket *EQ::Net::EQStream::PopPacket() {
|
||||
}
|
||||
|
||||
EmuOpcode emu_op = (*m_opcode_manager)->EQToEmu(opcode);
|
||||
if (m_owner->GetOptions().track_opcode_stats) {
|
||||
m_packet_recv_count[emu_op]++;
|
||||
}
|
||||
m_packet_recv_count[emu_op]++;
|
||||
|
||||
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + m_owner->GetOptions().opcode_size, p->Length() - m_owner->GetOptions().opcode_size);
|
||||
ret->SetProtocolOpcode(opcode);
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
#include "../event/timer.h"
|
||||
#include "../string_util.h"
|
||||
#include "../opcodemgr.h"
|
||||
#include "../eqemu_logsys.h"
|
||||
#include "../eqemu_logsys_fmt.h"
|
||||
#include "daybreak_connection.h"
|
||||
#include <thread>
|
||||
#include <concurrentqueue.h>
|
||||
@@ -56,7 +58,7 @@ EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager()
|
||||
|
||||
while (_impl->foreground_queue.try_dequeue(eqs_msg)) {
|
||||
if (eqs_msg.type == PacketRecv) {
|
||||
ConcurrentEQStreamPacketRecvMessage *eqs_msg_in = (ConcurrentEQStreamPacketRecvMessage*)&eqs_msg;
|
||||
ceqs_packet_recv_msg_t *eqs_msg_in = (ceqs_packet_recv_msg_t*)&eqs_msg;
|
||||
|
||||
delete eqs_msg_in->packet;
|
||||
}
|
||||
@@ -92,8 +94,8 @@ void EQ::Net::ConcurrentEQStreamManager::_BackgroundThread() {
|
||||
|
||||
ceqs_msg_t eqs_msg;
|
||||
while (_impl->background_queue.try_dequeue(eqs_msg)) {
|
||||
if (eqs_msg.type == PacketRecv) {
|
||||
ConcurrentEQStreamPacketRecvMessage *eqs_msg_in = (ConcurrentEQStreamPacketRecvMessage*)&eqs_msg;
|
||||
if (eqs_msg.type == QueuePacket) {
|
||||
ceqs_queue_packet_msg_t *eqs_msg_in = (ceqs_queue_packet_msg_t*)&eqs_msg;
|
||||
delete eqs_msg_in->packet;
|
||||
}
|
||||
}
|
||||
@@ -118,22 +120,21 @@ void EQ::Net::ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer(EQ::Timer *
|
||||
|
||||
for (auto &c : _impl->connections) {
|
||||
auto &connection = c.second;
|
||||
auto msg = (ceqs_update_daybreak_stats_msg_t*)&msgs[i];
|
||||
auto msg = (ceqs_update_stats_msg_t*)&msgs[i];
|
||||
|
||||
msg->type = ceqs_msg_type::UpdateDaybreakStats;
|
||||
msg->type = ceqs_msg_type::UpdateStats;
|
||||
msg->stream_id = connection->GetId();
|
||||
msg->stats = connection->GetStats();
|
||||
i++;
|
||||
|
||||
printf("Sending stats to client %u\n", connection->GetId());
|
||||
if (i >= 16) {
|
||||
_impl->background_queue.enqueue_bulk(msgs, 16);
|
||||
_impl->foreground_queue.enqueue_bulk(msgs, 16);
|
||||
i = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (i > 0) {
|
||||
_impl->background_queue.enqueue_bulk(msgs, i);
|
||||
_impl->foreground_queue.enqueue_bulk(msgs, i);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,8 +144,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms
|
||||
switch (msg.type) {
|
||||
case QueuePacket:
|
||||
{
|
||||
ConcurrentEQStreamQueuePacketMessage *msg_in = (ConcurrentEQStreamQueuePacketMessage*)&msg;
|
||||
printf("(background) Packet Queue for %u with %u bytes with ack: %s\n", msg_in->stream_id, msg_in->packet->Length(), msg_in->ack_req ? "true" : "false");
|
||||
ceqs_queue_packet_msg_t *msg_in = (ceqs_queue_packet_msg_t*)&msg;
|
||||
|
||||
auto iter = _impl->connections.find(msg_in->stream_id);
|
||||
if (iter != _impl->connections.end()) {
|
||||
@@ -161,7 +161,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms
|
||||
}
|
||||
case CloseConnection:
|
||||
{
|
||||
ConcurrentEQStreamCloseConnectionMessage *msg_in = (ConcurrentEQStreamCloseConnectionMessage*)&msg;
|
||||
ceqs_close_connection_msg_t *msg_in = (ceqs_close_connection_msg_t*)&msg;
|
||||
auto iter = _impl->connections.find(msg_in->stream_id);
|
||||
if (iter != _impl->connections.end()) {
|
||||
iter->second->Close();
|
||||
@@ -170,7 +170,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms
|
||||
}
|
||||
case ResetStats:
|
||||
{
|
||||
ConcurrentEQStreamResetStatsMessage *msg_in = (ConcurrentEQStreamResetStatsMessage*)&msg;
|
||||
ceqs_reset_stats_msg_t *msg_in = (ceqs_reset_stats_msg_t*)&msg;
|
||||
auto iter = _impl->connections.find(msg_in->stream_id);
|
||||
if (iter != _impl->connections.end()) {
|
||||
iter->second->ResetStats();
|
||||
@@ -178,7 +178,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms
|
||||
break;
|
||||
}
|
||||
default:
|
||||
printf("(background) New message with unhandled type %u\n", (int)msg.type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,8 +200,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms
|
||||
switch (msg.type) {
|
||||
case NewConnection:
|
||||
{
|
||||
ConcurrentEQStreamNewConnectionMessage *msg_in = (ConcurrentEQStreamNewConnectionMessage*)&msg;
|
||||
printf("(foreground) New connection from %s:%u with id: %u\n", msg_in->endpoint, msg_in->remote_port, msg_in->stream_id);
|
||||
ceqs_new_connection_msg_t *msg_in = (ceqs_new_connection_msg_t*)&msg;
|
||||
|
||||
std::shared_ptr<ConcurrentEQStream> stream(new ConcurrentEQStream(this,
|
||||
msg_in->stream_id,
|
||||
@@ -217,9 +216,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms
|
||||
}
|
||||
case ConnectionStateChange:
|
||||
{
|
||||
ConcurrentEQStreamConnectionStateChangeMessage *msg_in = (ConcurrentEQStreamConnectionStateChangeMessage*)&msg;
|
||||
printf("(foreground) Connection State Change for %u, was %u now is %u\n", msg_in->stream_id, msg_in->from, msg_in->to);
|
||||
|
||||
ceqs_connection_state_change_msg_t *msg_in = (ceqs_connection_state_change_msg_t*)&msg;
|
||||
|
||||
auto iter = _impl->streams.find(msg_in->stream_id);
|
||||
if (iter != _impl->streams.end()) {
|
||||
@@ -233,8 +230,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms
|
||||
}
|
||||
case PacketRecv:
|
||||
{
|
||||
ConcurrentEQStreamPacketRecvMessage *msg_in = (ConcurrentEQStreamPacketRecvMessage*)&msg;
|
||||
printf("(foreground) Packet Recv for %u with %u bytes\n", msg_in->stream_id, msg_in->packet->Length());
|
||||
ceqs_packet_recv_msg_t *msg_in = (ceqs_packet_recv_msg_t*)&msg;
|
||||
std::unique_ptr<EQ::Net::Packet> p(msg_in->packet);
|
||||
|
||||
auto iter = _impl->streams.find(msg_in->stream_id);
|
||||
@@ -243,9 +239,9 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms
|
||||
}
|
||||
break;
|
||||
}
|
||||
case UpdateDaybreakStats:
|
||||
case UpdateStats:
|
||||
{
|
||||
ceqs_update_daybreak_stats_msg_t *msg_in = (ceqs_update_daybreak_stats_msg_t*)&msg;
|
||||
ceqs_update_stats_msg_t *msg_in = (ceqs_update_stats_msg_t*)&msg;
|
||||
auto iter = _impl->streams.find(msg_in->stream_id);
|
||||
if (iter != _impl->streams.end()) {
|
||||
iter->second->_UpdateStats(msg_in->stats);
|
||||
@@ -283,7 +279,7 @@ void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::function<v
|
||||
void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection)
|
||||
{
|
||||
_impl->connections.insert(std::make_pair(connection->GetId(), connection));
|
||||
ConcurrentEQStreamNewConnectionMessage msg;
|
||||
ceqs_new_connection_msg_t msg;
|
||||
msg.type = ceqs_msg_type::NewConnection;
|
||||
msg.stream_id = connection->GetId();
|
||||
msg.remote_port = connection->RemotePort();
|
||||
@@ -293,7 +289,6 @@ void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr<D
|
||||
|
||||
//Make sure the foreground gets this message
|
||||
_PushToForegroundQueue((ceqs_msg_t*)&msg);
|
||||
printf("(background) New connection from %s:%u with id: %u\n", connection->RemoteEndpoint().c_str(), connection->RemotePort(), connection->GetId());
|
||||
}
|
||||
|
||||
//Called by background
|
||||
@@ -306,7 +301,7 @@ void EQ::Net::ConcurrentEQStreamManager::DaybreakConnectionStateChange(std::shar
|
||||
}
|
||||
}
|
||||
|
||||
ConcurrentEQStreamConnectionStateChangeMessage msg;
|
||||
ceqs_connection_state_change_msg_t msg;
|
||||
msg.type = ceqs_msg_type::ConnectionStateChange;
|
||||
msg.stream_id = connection->GetId();
|
||||
msg.from = (int)from;
|
||||
@@ -314,13 +309,12 @@ void EQ::Net::ConcurrentEQStreamManager::DaybreakConnectionStateChange(std::shar
|
||||
|
||||
//Make sure the foreground gets this message
|
||||
_PushToForegroundQueue((ceqs_msg_t*)&msg);
|
||||
printf("(background) Connection State Change for %u, was %u now is %u\n", connection->GetId(), (int)from, (int)to);
|
||||
}
|
||||
|
||||
//Called by background
|
||||
void EQ::Net::ConcurrentEQStreamManager::DaybreakPacketRecv(std::shared_ptr<DaybreakConnection> connection, const Packet &p)
|
||||
{
|
||||
ConcurrentEQStreamPacketRecvMessage msg;
|
||||
ceqs_packet_recv_msg_t msg;
|
||||
msg.type = ceqs_msg_type::PacketRecv;
|
||||
msg.stream_id = connection->GetId();
|
||||
msg.packet = new DynamicPacket();
|
||||
@@ -328,7 +322,6 @@ void EQ::Net::ConcurrentEQStreamManager::DaybreakPacketRecv(std::shared_ptr<Dayb
|
||||
|
||||
//Make sure the foreground gets this message
|
||||
_PushToForegroundQueue((ceqs_msg_t*)&msg);
|
||||
printf("(background) Packet Recv for %u with %u bytes\n", connection->GetId(), p.Length());
|
||||
}
|
||||
|
||||
struct EQ::Net::ConcurrentEQStream::Impl
|
||||
@@ -342,6 +335,8 @@ struct EQ::Net::ConcurrentEQStream::Impl
|
||||
std::deque<std::unique_ptr<EQ::Net::Packet>> packet_queue;
|
||||
OpcodeManager **opcode_manager;
|
||||
DaybreakConnectionStats stats;
|
||||
std::unordered_map<EmuOpcode, int> packet_recv_count;
|
||||
std::unordered_map<EmuOpcode, int> packet_sent_count;
|
||||
};
|
||||
|
||||
//Called by foreground
|
||||
@@ -376,9 +371,7 @@ void EQ::Net::ConcurrentEQStream::QueuePacket(const EQApplicationPacket *p, bool
|
||||
opcode = p->GetOpcodeBypass();
|
||||
}
|
||||
else {
|
||||
if (options.track_opcode_stats) {
|
||||
//m_packet_sent_count[p->GetOpcode()]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
|
||||
}
|
||||
_impl->packet_sent_count[p->GetOpcode()]++;
|
||||
opcode = (*_impl->opcode_manager)->EmuToEQ(p->GetOpcode());
|
||||
}
|
||||
|
||||
@@ -394,7 +387,7 @@ void EQ::Net::ConcurrentEQStream::QueuePacket(const EQApplicationPacket *p, bool
|
||||
break;
|
||||
}
|
||||
|
||||
ConcurrentEQStreamQueuePacketMessage msg;
|
||||
ceqs_queue_packet_msg_t msg;
|
||||
msg.type = ceqs_msg_type::QueuePacket;
|
||||
msg.stream_id = _impl->id;
|
||||
msg.packet = out;
|
||||
@@ -402,16 +395,15 @@ void EQ::Net::ConcurrentEQStream::QueuePacket(const EQApplicationPacket *p, bool
|
||||
|
||||
//Make sure the background gets this message
|
||||
_impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg);
|
||||
|
||||
printf("(foreground) Packet Queue for %u with %u bytes with ack: %s\n", _impl->id, out->Length(), ack_req ? "true" : "false");
|
||||
}
|
||||
}
|
||||
|
||||
//Called by foreground
|
||||
void EQ::Net::ConcurrentEQStream::FastQueuePacket(EQApplicationPacket **p, bool ack_req)
|
||||
{
|
||||
std::unique_ptr<EQApplicationPacket> app(*p);
|
||||
QueuePacket(app.get(), ack_req);
|
||||
QueuePacket(*p, ack_req);
|
||||
delete *p;
|
||||
*p = nullptr;
|
||||
}
|
||||
|
||||
//Called by foreground
|
||||
@@ -440,9 +432,7 @@ EQApplicationPacket *EQ::Net::ConcurrentEQStream::PopPacket()
|
||||
}
|
||||
|
||||
EmuOpcode emu_op = (*_impl->opcode_manager)->EQToEmu(opcode);
|
||||
if (options.track_opcode_stats) {
|
||||
//m_packet_recv_count[emu_op]++;
|
||||
}
|
||||
_impl->packet_recv_count[emu_op]++;
|
||||
|
||||
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + options.opcode_size, p->Length() - options.opcode_size);
|
||||
ret->SetProtocolOpcode(opcode);
|
||||
@@ -460,7 +450,7 @@ void EQ::Net::ConcurrentEQStream::Close()
|
||||
return;
|
||||
}
|
||||
|
||||
ConcurrentEQStreamCloseConnectionMessage msg;
|
||||
ceqs_close_connection_msg_t msg;
|
||||
msg.type = CloseConnection;
|
||||
msg.stream_id = _impl->id;
|
||||
|
||||
@@ -554,24 +544,24 @@ EQStreamInterface::MatchState EQ::Net::ConcurrentEQStream::CheckSignature(const
|
||||
|
||||
if (opcode == sig->first_eq_opcode) {
|
||||
if (length == sig->first_length) {
|
||||
// LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length matched {3}",
|
||||
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length);
|
||||
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length matched {3}",
|
||||
GetRemoteAddr(), GetRemotePort(), sig->first_eq_opcode, length);
|
||||
return MatchSuccessful;
|
||||
}
|
||||
else if (length == 0) {
|
||||
// LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length is ignored.",
|
||||
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode);
|
||||
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length is ignored.",
|
||||
GetRemoteAddr(), GetRemotePort(), sig->first_eq_opcode);
|
||||
return MatchSuccessful;
|
||||
}
|
||||
else {
|
||||
// LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} but length {3} did not match expected {4}",
|
||||
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length, sig->first_length);
|
||||
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} but length {3} did not match expected {4}",
|
||||
GetRemoteAddr(), GetRemotePort(), sig->first_eq_opcode, length, sig->first_length);
|
||||
return MatchFailed;
|
||||
}
|
||||
}
|
||||
else {
|
||||
//LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode {1:#x} did not match expected {2:#x}",
|
||||
// m_connection->RemoteEndpoint(), m_connection->RemotePort(), opcode, sig->first_eq_opcode);
|
||||
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode {1:#x} did not match expected {2:#x}",
|
||||
GetRemoteAddr(), GetRemotePort(), opcode, sig->first_eq_opcode);
|
||||
return MatchFailed;
|
||||
}
|
||||
}
|
||||
@@ -605,6 +595,20 @@ EQStreamInterface::Stats EQ::Net::ConcurrentEQStream::GetStats() const
|
||||
{
|
||||
EQStreamInterface::Stats ret;
|
||||
ret.DaybreakStats = _impl->stats;
|
||||
|
||||
for (int i = 0; i < _maxEmuOpcode; ++i) {
|
||||
ret.RecvCount[i] = 0;
|
||||
ret.SentCount[i] = 0;
|
||||
}
|
||||
|
||||
for (auto &s : _impl->packet_sent_count) {
|
||||
ret.SentCount[s.first] = s.second;
|
||||
}
|
||||
|
||||
for (auto &r : _impl->packet_recv_count) {
|
||||
ret.RecvCount[r.first] = r.second;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -615,7 +619,7 @@ void EQ::Net::ConcurrentEQStream::ResetStats()
|
||||
return;
|
||||
}
|
||||
|
||||
ConcurrentEQStreamResetStatsMessage msg;
|
||||
ceqs_reset_stats_msg_t msg;
|
||||
msg.type = ceqs_msg_type::ResetStats;
|
||||
msg.stream_id = _impl->id;
|
||||
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
#pragma once
|
||||
|
||||
#define EQSM_PAD_LEN 252
|
||||
|
||||
namespace EQ
|
||||
{
|
||||
namespace Net
|
||||
{
|
||||
class DynamicPacket;
|
||||
enum ceqs_msg_type : uint32_t
|
||||
{
|
||||
//Sent by background
|
||||
NewConnection,
|
||||
ConnectionStateChange,
|
||||
PacketRecv,
|
||||
UpdateStats,
|
||||
//Sent by foreground
|
||||
QueuePacket,
|
||||
TerminateBackground,
|
||||
CloseConnection,
|
||||
ResetStats
|
||||
};
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
char padding[EQSM_PAD_LEN];
|
||||
} ceqs_msg_t;
|
||||
|
||||
//Sent by background
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
int remote_port;
|
||||
int state;
|
||||
char endpoint[64];
|
||||
} ceqs_new_connection_msg_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
int from;
|
||||
int to;
|
||||
} ceqs_connection_state_change_msg_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
EQ::Net::DynamicPacket *packet;
|
||||
} ceqs_packet_recv_msg_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
DaybreakConnectionStats stats;
|
||||
} ceqs_update_stats_msg_t;
|
||||
|
||||
//Sent by foreground
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
EQ::Net::DynamicPacket *packet;
|
||||
bool ack_req;
|
||||
} ceqs_queue_packet_msg_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
} ceqs_close_connection_msg_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
uint64_t stream_id;
|
||||
} ceqs_reset_stats_msg_t;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
ceqs_msg_type type;
|
||||
} ceqs_terminate_msg_t;
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user