Merge fix

This commit is contained in:
KimLS
2019-06-22 18:59:34 -07:00
2221 changed files with 3234 additions and 461643 deletions
+1
View File
@@ -4,6 +4,7 @@
#include "../eqemu_logsys.h"
#include "../servertalk.h"
#include "../rulesys.h"
#include <fmt/format.h>
EQ::Net::ConsoleServerConnection::ConsoleServerConnection(ConsoleServer *parent, std::shared_ptr<TCPConnection> connection)
{
+118 -10
View File
@@ -1,10 +1,10 @@
#include "daybreak_connection.h"
#include "../event/event_loop.h"
#include "../event/task.h"
#include "../eqemu_logsys.h"
#include "../data_verification.h"
#include "crc32.h"
#include <zlib.h>
#include <fmt/format.h>
#include <sstream>
EQ::Net::DaybreakConnectionManager::DaybreakConnectionManager()
@@ -41,6 +41,7 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
uv_timer_start(&m_timer, [](uv_timer_t *handle) {
DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data;
c->UpdateDataBudget();
c->Process();
c->ProcessResend();
}, update_rate, update_rate);
@@ -162,6 +163,25 @@ void EQ::Net::DaybreakConnectionManager::Process()
}
}
void EQ::Net::DaybreakConnectionManager::UpdateDataBudget()
{
auto outgoing_data_rate = m_options.outgoing_data_rate;
if (outgoing_data_rate <= 0.0) {
return;
}
auto update_rate = (uint64_t)(1000.0 / m_options.tic_rate_hertz);
auto budget_add = update_rate * outgoing_data_rate / 1000.0;
auto iter = m_connections.begin();
while (iter != m_connections.end()) {
auto &connection = iter->second;
connection->UpdateDataBudget(budget_add);
iter++;
}
}
void EQ::Net::DaybreakConnectionManager::ProcessResend()
{
auto iter = m_connections.begin();
@@ -190,7 +210,9 @@ void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoi
}
if (size < DaybreakHeader::size()) {
LogF(Logs::Detail, Logs::Netcode, "Packet of size {0} which is less than {1}", size, DaybreakHeader::size());
if (m_on_error_message) {
m_on_error_message(fmt::format("Packet of size {0} which is less than {1}", size, DaybreakHeader::size()));
}
return;
}
@@ -220,7 +242,9 @@ void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoi
}
}
catch (std::exception &ex) {
LogF(Logs::Detail, Logs::Netcode, "Error processing packet: {0}", ex.what());
if (m_on_error_message) {
m_on_error_message(fmt::format("Error processing packet: {0}", ex.what()));
}
}
}
@@ -283,6 +307,7 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner
m_combined[0] = 0;
m_combined[1] = OP_Combined;
m_last_session_stats = Clock::now();
m_outgoing_budget = owner->m_options.outgoing_data_rate;
}
//new connection made as client
@@ -305,6 +330,7 @@ EQ::Net::DaybreakConnection::DaybreakConnection(DaybreakConnectionManager *owner
m_combined[0] = 0;
m_combined[1] = OP_Combined;
m_last_session_stats = Clock::now();
m_outgoing_budget = owner->m_options.outgoing_data_rate;
}
EQ::Net::DaybreakConnection::~DaybreakConnection()
@@ -342,14 +368,24 @@ void EQ::Net::DaybreakConnection::QueuePacket(Packet &p, int stream, bool reliab
packet.PutUInt8(0, 0);
packet.PutPacket(1, p);
InternalQueuePacket(packet, stream, reliable);
return;
}
InternalQueuePacket(p, stream, reliable);
}
EQ::Net::DaybreakConnectionStats EQ::Net::DaybreakConnection::GetStats()
{
EQ::Net::DaybreakConnectionStats ret = m_stats;
ret.datarate_remaining = m_outgoing_budget;
ret.avg_ping = m_rolling_ping;
return ret;
}
void EQ::Net::DaybreakConnection::ResetStats()
{
m_stats = DaybreakConnectionStats();
m_stats.Reset();
}
void EQ::Net::DaybreakConnection::Process()
@@ -364,7 +400,9 @@ void EQ::Net::DaybreakConnection::Process()
ProcessQueue();
}
catch (std::exception ex) {
LogF(Logs::Detail, Logs::Netcode, "Error processing connection: {0}", ex.what());
if (m_owner->m_on_error_message) {
m_owner->m_on_error_message(fmt::format("Error processing connection: {0}", ex.what()));
}
}
}
@@ -380,12 +418,17 @@ void EQ::Net::DaybreakConnection::ProcessPacket(Packet &p)
auto opcode = p.GetInt8(1);
if (p.GetInt8(0) == 0 && (opcode == OP_KeepAlive || opcode == OP_OutboundPing)) {
m_stats.bytes_after_decode += p.Length();
return;
}
if (PacketCanBeEncoded(p)) {
if (!ValidateCRC(p)) {
LogF(Logs::Detail, Logs::Netcode, "Tossed packet that failed CRC of type {0:#x}", p.Length() >= 2 ? p.GetInt8(1) : 0);
if (m_owner->m_on_error_message) {
m_owner->m_on_error_message(fmt::format("Tossed packet that failed CRC of type {0:#x}", p.Length() >= 2 ? p.GetInt8(1) : 0));
}
m_stats.bytes_after_decode += p.Length();
return;
}
@@ -414,6 +457,7 @@ void EQ::Net::DaybreakConnection::ProcessPacket(Packet &p)
}
}
m_stats.bytes_after_decode += temp.Length();
ProcessDecodedPacket(StaticPacket(temp.Data(), temp.Length()));
}
else {
@@ -432,10 +476,12 @@ void EQ::Net::DaybreakConnection::ProcessPacket(Packet &p)
}
}
m_stats.bytes_after_decode += temp.Length();
ProcessDecodedPacket(StaticPacket(temp.Data(), temp.Length()));
}
}
else {
m_stats.bytes_after_decode += p.Length();
ProcessDecodedPacket(p);
}
}
@@ -738,6 +784,10 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p)
case OP_SessionStatRequest:
{
auto request = p.GetSerialize<DaybreakSessionStatRequest>(0);
m_stats.sync_remote_sent_packets = EQ::Net::NetworkToHost(request.packets_sent);
m_stats.sync_remote_recv_packets = EQ::Net::NetworkToHost(request.packets_recv);
m_stats.sync_sent_packets = m_stats.sent_packets;
m_stats.sync_recv_packets = m_stats.recv_packets;
DaybreakSessionStatResponse response;
response.zero = 0;
@@ -753,10 +803,18 @@ void EQ::Net::DaybreakConnection::ProcessDecodedPacket(const Packet &p)
InternalSend(out);
break;
}
case OP_SessionStatResponse:
case OP_SessionStatResponse: {
auto response = p.GetSerialize<DaybreakSessionStatResponse>(0);
m_stats.sync_remote_sent_packets = EQ::Net::NetworkToHost(response.server_sent);
m_stats.sync_remote_recv_packets = EQ::Net::NetworkToHost(response.server_recv);
m_stats.sync_sent_packets = m_stats.sent_packets;
m_stats.sync_recv_packets = m_stats.recv_packets;
break;
}
default:
LogF(Logs::Detail, Logs::Netcode, "Unhandled opcode {0:#x}", p.GetInt8(1));
if (m_owner->m_on_error_message) {
m_owner->m_on_error_message(fmt::format("Unhandled opcode {0:#x}", p.GetInt8(1)));
}
break;
}
}
@@ -1024,7 +1082,21 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
auto time_since_last_send = std::chrono::duration_cast<std::chrono::milliseconds>(now - entry.second.last_sent);
if (entry.second.times_resent == 0) {
if ((size_t)time_since_last_send.count() > entry.second.resend_delay) {
InternalBufferedSend(entry.second.packet);
auto &p = entry.second.packet;
if (p.Length() >= DaybreakHeader::size()) {
if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) {
m_stats.resent_fragments++;
}
else {
m_stats.resent_full++;
}
}
else {
m_stats.resent_full++;
}
m_stats.resent_packets++;
InternalBufferedSend(p);
entry.second.last_sent = now;
entry.second.times_resent++;
entry.second.resend_delay = EQEmu::Clamp(entry.second.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max);
@@ -1039,7 +1111,21 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
}
if ((size_t)time_since_last_send.count() > entry.second.resend_delay) {
InternalBufferedSend(entry.second.packet);
auto &p = entry.second.packet;
if (p.Length() >= DaybreakHeader::size()) {
if (p.GetInt8(0) == 0 && p.GetInt8(1) >= OP_Fragment && p.GetInt8(1) <= OP_Fragment4) {
m_stats.resent_fragments++;
}
else {
m_stats.resent_full++;
}
}
else {
m_stats.resent_full++;
}
m_stats.resent_packets++;
InternalBufferedSend(p);
entry.second.last_sent = now;
entry.second.times_resent++;
entry.second.resend_delay = EQEmu::Clamp(entry.second.resend_delay * 2, m_owner->m_options.resend_delay_min, m_owner->m_options.resend_delay_max);
@@ -1091,6 +1177,12 @@ void EQ::Net::DaybreakConnection::OutOfOrderAck(int stream, uint16_t seq)
}
}
void EQ::Net::DaybreakConnection::UpdateDataBudget(double budget_add)
{
auto outgoing_data_rate = m_owner->m_options.outgoing_data_rate;
m_outgoing_budget = EQEmu::ClampUpper(m_outgoing_budget + budget_add, outgoing_data_rate);
}
void EQ::Net::DaybreakConnection::SendAck(int stream_id, uint16_t seq)
{
DaybreakReliableHeader ack;
@@ -1181,6 +1273,17 @@ void EQ::Net::DaybreakConnection::SendKeepAlive()
void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
{
if (m_owner->m_options.outgoing_data_rate > 0.0) {
auto new_budget = m_outgoing_budget - (p.Length() / 1024.0);
if (new_budget <= 0.0) {
m_stats.dropped_datarate_packets++;
return;
}
else {
m_outgoing_budget = new_budget;
}
}
m_last_send = Clock::now();
auto send_func = [](uv_udp_send_t* req, int status) {
@@ -1189,6 +1292,9 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
};
if (PacketCanBeEncoded(p)) {
m_stats.bytes_before_encode += p.Length();
DynamicPacket out;
out.PutPacket(0, p);
@@ -1236,6 +1342,8 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
return;
}
m_stats.bytes_before_encode += p.Length();
uv_udp_send_t *send_req = new uv_udp_send_t;
sockaddr_in send_addr;
uv_ip4_addr(m_endpoint.c_str(), m_port, &send_addr);
+49 -3
View File
@@ -78,19 +78,59 @@ namespace EQ
sent_bytes = 0;
recv_packets = 0;
sent_packets = 0;
sync_recv_packets = 0;
sync_sent_packets = 0;
sync_remote_recv_packets = 0;
sync_remote_sent_packets = 0;
min_ping = 0xFFFFFFFFFFFFFFFFUL;
max_ping = 0;
avg_ping = 0;
created = Clock::now();
dropped_datarate_packets = 0;
resent_packets = 0;
resent_fragments = 0;
resent_full = 0;
datarate_remaining = 0.0;
bytes_after_decode = 0;
bytes_before_encode = 0;
}
void Reset() {
recv_bytes = 0;
sent_bytes = 0;
min_ping = 0xFFFFFFFFFFFFFFFFUL;
max_ping = 0;
avg_ping = 0;
created = Clock::now();
dropped_datarate_packets = 0;
resent_packets = 0;
resent_fragments = 0;
resent_full = 0;
datarate_remaining = 0.0;
bytes_after_decode = 0;
bytes_before_encode = 0;
}
uint64_t recv_bytes;
uint64_t sent_bytes;
uint64_t recv_packets;
uint64_t sent_packets;
uint64_t sync_recv_packets;
uint64_t sync_sent_packets;
uint64_t sync_remote_recv_packets;
uint64_t sync_remote_sent_packets;
uint64_t min_ping;
uint64_t max_ping;
uint64_t avg_ping;
uint64_t last_ping;
Timestamp created;
uint64_t dropped_datarate_packets; //packets dropped due to datarate limit, couldn't think of a great name
uint64_t resent_packets;
uint64_t resent_fragments;
uint64_t resent_full;
double datarate_remaining;
uint64_t bytes_after_decode;
uint64_t bytes_before_encode;
};
class DaybreakConnectionManager;
@@ -110,8 +150,7 @@ namespace EQ
void QueuePacket(Packet &p, int stream);
void QueuePacket(Packet &p, int stream, bool reliable);
const DaybreakConnectionStats& GetStats() const { return m_stats; }
DaybreakConnectionStats &GetStats() { return m_stats; }
DaybreakConnectionStats GetStats();
void ResetStats();
size_t GetRollingPing() const { return m_rolling_ping; }
DbProtocolStatus GetStatus() const { return m_status; }
@@ -140,6 +179,7 @@ namespace EQ
Timestamp m_last_session_stats;
size_t m_rolling_ping;
Timestamp m_close_time;
double m_outgoing_budget;
struct DaybreakSentPacket
{
@@ -191,6 +231,7 @@ namespace EQ
void ProcessResend(int stream);
void Ack(int stream, uint16_t seq);
void OutOfOrderAck(int stream, uint16_t seq);
void UpdateDataBudget(double budget_add);
void SendConnect();
void SendKeepAlive();
@@ -223,13 +264,14 @@ namespace EQ
encode_passes[0] = DaybreakEncodeType::EncodeNone;
encode_passes[1] = DaybreakEncodeType::EncodeNone;
port = 0;
hold_size = 448;
hold_size = 512;
hold_length_ms = 50;
simulated_in_packet_loss = 0;
simulated_out_packet_loss = 0;
tic_rate_hertz = 60.0;
resend_timeout = 90000;
connection_close_time = 2000;
outgoing_data_rate = 0.0;
}
size_t max_packet_size;
@@ -252,6 +294,7 @@ namespace EQ
size_t connection_close_time;
DaybreakEncodeType encode_passes[2];
int port;
double outgoing_data_rate;
};
class DaybreakConnectionManager
@@ -263,10 +306,12 @@ namespace EQ
void Connect(const std::string &addr, int port);
void Process();
void UpdateDataBudget();
void ProcessResend();
void OnNewConnection(std::function<void(std::shared_ptr<DaybreakConnection>)> func) { m_on_new_connection = func; }
void OnConnectionStateChange(std::function<void(std::shared_ptr<DaybreakConnection>, DbProtocolStatus, DbProtocolStatus)> func) { m_on_connection_state_change = func; }
void OnPacketRecv(std::function<void(std::shared_ptr<DaybreakConnection>, const Packet &)> func) { m_on_packet_recv = func; }
void OnErrorMessage(std::function<void(const std::string&)> func) { m_on_error_message = func; }
DaybreakConnectionManagerOptions& GetOptions() { return m_options; }
private:
@@ -281,6 +326,7 @@ namespace EQ
std::function<void(std::shared_ptr<DaybreakConnection>)> m_on_new_connection;
std::function<void(std::shared_ptr<DaybreakConnection>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
std::function<void(std::shared_ptr<DaybreakConnection>, const Packet&)> m_on_packet_recv;
std::function<void(const std::string&)> m_on_error_message;
std::map<std::pair<std::string, int>, std::shared_ptr<DaybreakConnection>> m_connections;
void ProcessPacket(const std::string &endpoint, int port, const char *data, size_t size);
+58 -18
View File
@@ -1,10 +1,9 @@
#include "eqstream.h"
#include "../eqemu_logsys.h"
#include "../eqemu_logsys_fmt.h"
EQ::Net::EQStreamManager::EQStreamManager(EQStreamManagerOptions &options) : m_daybreak(options.daybreak_options)
EQ::Net::EQStreamManager::EQStreamManager(const EQStreamManagerInterfaceOptions &options) : EQStreamManagerInterface(options), m_daybreak(options.daybreak_options)
{
m_options = options;
m_daybreak.OnNewConnection(std::bind(&EQStreamManager::DaybreakNewConnection, this, std::placeholders::_1));
m_daybreak.OnConnectionStateChange(std::bind(&EQStreamManager::DaybreakConnectionStateChange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
m_daybreak.OnPacketRecv(std::bind(&EQStreamManager::DaybreakPacketRecv, this, std::placeholders::_1, std::placeholders::_2));
@@ -14,6 +13,13 @@ EQ::Net::EQStreamManager::~EQStreamManager()
{
}
void EQ::Net::EQStreamManager::SetOptions(const EQStreamManagerInterfaceOptions &options)
{
m_options = options;
auto &opts = m_daybreak.GetOptions();
opts = options.daybreak_options;
}
void EQ::Net::EQStreamManager::DaybreakNewConnection(std::shared_ptr<DaybreakConnection> connection)
{
std::shared_ptr<EQStream> stream(new EQStream(this, connection));
@@ -48,7 +54,7 @@ void EQ::Net::EQStreamManager::DaybreakPacketRecv(std::shared_ptr<DaybreakConnec
}
}
EQ::Net::EQStream::EQStream(EQStreamManager *owner, std::shared_ptr<DaybreakConnection> connection)
EQ::Net::EQStream::EQStream(EQStreamManagerInterface *owner, std::shared_ptr<DaybreakConnection> connection)
{
m_owner = owner;
m_connection = connection;
@@ -66,11 +72,12 @@ void EQ::Net::EQStream::QueuePacket(const EQApplicationPacket *p, bool ack_req)
opcode = p->GetOpcodeBypass();
}
else {
m_packet_sent_count[static_cast<int>(p->GetOpcode())]++; //Wont bother with bypass tracking of these since those are rare for testing anyway
opcode = (*m_opcode_manager)->EmuToEQ(p->GetOpcode());
}
EQ::Net::DynamicPacket out;
switch (m_owner->m_options.opcode_size) {
switch (m_owner->GetOptions().opcode_size) {
case 1:
out.PutUInt8(0, opcode);
out.PutData(1, p->pBuffer, p->size);
@@ -105,7 +112,7 @@ EQApplicationPacket *EQ::Net::EQStream::PopPacket() {
auto &p = m_packet_queue.front();
uint16 opcode = 0;
switch (m_owner->m_options.opcode_size) {
switch (m_owner->GetOptions().opcode_size) {
case 1:
opcode = p->GetUInt8(0);
break;
@@ -115,7 +122,9 @@ EQApplicationPacket *EQ::Net::EQStream::PopPacket() {
}
EmuOpcode emu_op = (*m_opcode_manager)->EQToEmu(opcode);
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + m_owner->m_options.opcode_size, p->Length() - m_owner->m_options.opcode_size);
m_packet_recv_count[static_cast<int>(emu_op)]++;
EQApplicationPacket *ret = new EQApplicationPacket(emu_op, (unsigned char*)p->Data() + m_owner->GetOptions().opcode_size, p->Length() - m_owner->GetOptions().opcode_size);
ret->SetProtocolOpcode(opcode);
m_packet_queue.pop_front();
return ret;
@@ -130,11 +139,11 @@ void EQ::Net::EQStream::Close() {
std::string EQ::Net::EQStream::GetRemoteAddr() const
{
return RemoteEndpoint();
return m_connection->RemoteEndpoint();
}
uint32 EQ::Net::EQStream::GetRemoteIP() const {
return inet_addr(RemoteEndpoint().c_str());
return inet_addr(m_connection->RemoteEndpoint().c_str());
}
bool EQ::Net::EQStream::CheckState(EQStreamState state) {
@@ -145,8 +154,8 @@ EQStreamInterface::MatchState EQ::Net::EQStream::CheckSignature(const Signature
if (!m_packet_queue.empty()) {
auto p = m_packet_queue.front().get();
uint16 opcode = 0;
size_t length = p->Length() - m_owner->m_options.opcode_size;
switch (m_owner->m_options.opcode_size) {
size_t length = p->Length() - m_owner->GetOptions().opcode_size;
switch (m_owner->GetOptions().opcode_size) {
case 1:
opcode = p->GetUInt8(0);
break;
@@ -159,8 +168,8 @@ EQStreamInterface::MatchState EQ::Net::EQStream::CheckSignature(const Signature
if (m_packet_queue.size() > 1) {
p = m_packet_queue[1].get();
opcode = 0;
length = p->Length() - m_owner->m_options.opcode_size;
switch (m_owner->m_options.opcode_size) {
length = p->Length() - m_owner->GetOptions().opcode_size;
switch (m_owner->GetOptions().opcode_size) {
case 1:
opcode = p->GetUInt8(0);
break;
@@ -177,23 +186,23 @@ EQStreamInterface::MatchState EQ::Net::EQStream::CheckSignature(const Signature
if (opcode == sig->first_eq_opcode) {
if (length == sig->first_length) {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length matched {3}",
RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length);
m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length);
return MatchSuccessful;
}
else if (length == 0) {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} and length is ignored.",
RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode);
m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode);
return MatchSuccessful;
}
else {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode matched {2:#x} but length {3} did not match expected {4}",
RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length, sig->first_length);
m_connection->RemoteEndpoint(), m_connection->RemotePort(), sig->first_eq_opcode, length, sig->first_length);
return MatchFailed;
}
}
else {
LogF(Logs::General, Logs::Netcode, "[IDENT_TRACE] {0}:{1}: First opcode {1:#x} did not match expected {2:#x}",
RemoteEndpoint(), m_connection->RemotePort(), opcode, sig->first_eq_opcode);
m_connection->RemoteEndpoint(), m_connection->RemotePort(), opcode, sig->first_eq_opcode);
return MatchFailed;
}
}
@@ -213,4 +222,35 @@ EQStreamState EQ::Net::EQStream::GetState() {
default:
return CLOSED;
}
}
}
EQ::Net::EQStream::Stats EQ::Net::EQStream::GetStats() const
{
Stats ret;
ret.DaybreakStats = m_connection->GetStats();
for (int i = 0; i < _maxEmuOpcode; ++i) {
ret.RecvCount[i] = 0;
ret.SentCount[i] = 0;
}
for (auto &s : m_packet_sent_count) {
ret.SentCount[s.first] = s.second;
}
for (auto &r : m_packet_recv_count) {
ret.RecvCount[r.first] = r.second;
}
return ret;
}
void EQ::Net::EQStream::ResetStats()
{
m_connection->ResetStats();
}
EQStreamManagerInterface *EQ::Net::EQStream::GetManager() const
{
return m_owner;
}
+11 -40
View File
@@ -6,49 +6,23 @@
#include "daybreak_connection.h"
#include <vector>
#include <deque>
#include <unordered_map>
namespace EQ
{
namespace Net
{
struct EQStreamManagerOptions
{
EQStreamManagerOptions() {
opcode_size = 2;
}
EQStreamManagerOptions(int port, bool encoded, bool compressed) {
opcode_size = 2;
//World seems to support both compression and xor zone supports one or the others.
//Enforce one or the other in the convienence construct
//Login I had trouble getting to recognize compression at all
//but that might be because it was still a bit buggy when i was testing that.
if (compressed) {
daybreak_options.encode_passes[0] = EncodeCompression;
}
else if (encoded) {
daybreak_options.encode_passes[0] = EncodeXOR;
}
daybreak_options.port = port;
}
int opcode_size;
DaybreakConnectionManagerOptions daybreak_options;
};
class EQStream;
class EQStreamManager
class EQStreamManager : public EQStreamManagerInterface
{
public:
EQStreamManager(EQStreamManagerOptions &options);
EQStreamManager(const EQStreamManagerInterfaceOptions &options);
~EQStreamManager();
virtual void SetOptions(const EQStreamManagerInterfaceOptions& options);
void OnNewConnection(std::function<void(std::shared_ptr<EQStream>)> func) { m_on_new_connection = func; }
void OnConnectionStateChange(std::function<void(std::shared_ptr<EQStream>, DbProtocolStatus, DbProtocolStatus)> func) { m_on_connection_state_change = func; }
private:
EQStreamManagerOptions m_options;
DaybreakConnectionManager m_daybreak;
std::function<void(std::shared_ptr<EQStream>)> m_on_new_connection;
std::function<void(std::shared_ptr<EQStream>, DbProtocolStatus, DbProtocolStatus)> m_on_connection_state_change;
@@ -63,7 +37,7 @@ namespace EQ
class EQStream : public EQStreamInterface
{
public:
EQStream(EQStreamManager *parent, std::shared_ptr<DaybreakConnection> connection);
EQStream(EQStreamManagerInterface *parent, std::shared_ptr<DaybreakConnection> connection);
~EQStream();
virtual void QueuePacket(const EQApplicationPacket *p, bool ack_req = true);
@@ -84,19 +58,16 @@ namespace EQ
m_opcode_manager = opm;
}
virtual std::shared_ptr<EQ::Net::DaybreakConnection> GetRawConnection() {
return m_connection;
}
const std::string& RemoteEndpoint() const { return m_connection->RemoteEndpoint(); }
const DaybreakConnectionStats& GetStats() const { return m_connection->GetStats(); }
void ResetStats() { m_connection->ResetStats(); }
size_t GetRollingPing() const { return m_connection->GetRollingPing(); }
virtual Stats GetStats() const;
virtual void ResetStats();
virtual EQStreamManagerInterface* GetManager() const;
private:
EQStreamManager *m_owner;
EQStreamManagerInterface *m_owner;
std::shared_ptr<DaybreakConnection> m_connection;
OpcodeManager **m_opcode_manager;
std::deque<std::unique_ptr<EQ::Net::Packet>> m_packet_queue;
std::unordered_map<int, int> m_packet_recv_count;
std::unordered_map<int, int> m_packet_sent_count;
friend class EQStreamManager;
};
}
+1
View File
@@ -1,6 +1,7 @@
#include "packet.h"
#include <fmt/format.h>
#include <cctype>
#include <fmt/format.h>
bool EQ::Net::StaticPacket::Resize(size_t new_size)
{
@@ -1,6 +1,7 @@
#include "servertalk_client_connection.h"
#include "dns.h"
#include "../eqemu_logsys.h"
#include "../eqemu_logsys_fmt.h"
EQ::Net::ServertalkClient::ServertalkClient(const std::string &addr, int port, bool ipv6, const std::string &identifier, const std::string &credentials)
: m_timer(std::unique_ptr<EQ::Timer>(new EQ::Timer(100, true, std::bind(&EQ::Net::ServertalkClient::Connect, this))))
@@ -1,6 +1,7 @@
#include "servertalk_legacy_client_connection.h"
#include "dns.h"
#include "../eqemu_logsys.h"
#include "../eqemu_logsys_fmt.h"
EQ::Net::ServertalkLegacyClient::ServertalkLegacyClient(const std::string &addr, int port, bool ipv6)
: m_timer(std::unique_ptr<EQ::Timer>(new EQ::Timer(100, true, std::bind(&EQ::Net::ServertalkLegacyClient::Connect, this))))
@@ -1,6 +1,7 @@
#include "servertalk_server_connection.h"
#include "servertalk_server.h"
#include "../eqemu_logsys.h"
#include "../eqemu_logsys_fmt.h"
#include "../util/uuid.h"
EQ::Net::ServertalkServerConnection::ServertalkServerConnection(std::shared_ptr<EQ::Net::TCPConnection> c, EQ::Net::ServertalkServer *parent, bool encrypted, bool allow_downgrade)
+259
View File
@@ -0,0 +1,259 @@
#include "websocket_server.h"
#include "../event/event_loop.h"
#include "../event/timer.h"
#include <fmt/format.h>
#include <map>
#include <unordered_set>
#include <array>
struct MethodHandlerEntry
{
MethodHandlerEntry() {
status = 0;
}
MethodHandlerEntry(EQ::Net::WebsocketServer::MethodHandler h, int s) {
handler = h;
status = s;
}
EQ::Net::WebsocketServer::MethodHandler handler;
int status;
};
struct EQ::Net::WebsocketServer::Impl
{
std::unique_ptr<TCPServer> server;
std::unique_ptr<EQ::Timer> ping_timer;
std::map<std::shared_ptr<websocket_connection>, std::unique_ptr<WebsocketServerConnection>> connections;
std::map<std::string, MethodHandlerEntry> methods;
websocket_server ws_server;
LoginHandler login_handler;
std::array<std::unordered_set<WebsocketServerConnection*>, SubscriptionEventMax> subscriptions;
};
EQ::Net::WebsocketServer::WebsocketServer(const std::string &addr, int port)
{
_impl.reset(new Impl());
_impl->server.reset(new EQ::Net::TCPServer());
_impl->server->Listen(addr, port, false, [this](std::shared_ptr<EQ::Net::TCPConnection> connection) {
auto wsc = _impl->ws_server.get_connection();
WebsocketServerConnection *c = new WebsocketServerConnection(this, connection, wsc);
_impl->connections.insert(std::make_pair(wsc, std::unique_ptr<WebsocketServerConnection>(c)));
});
_impl->ws_server.set_write_handler(
[this](websocketpp::connection_hdl hdl, char const *data, size_t size) -> websocketpp::lib::error_code {
auto c = _impl->ws_server.get_con_from_hdl(hdl);
auto iter = _impl->connections.find(c);
if (iter != _impl->connections.end()) {
iter->second->GetTCPConnection()->Write(data, size);
}
return websocketpp::lib::error_code();
});
_impl->ping_timer.reset(new EQ::Timer(5000, true, [this](EQ::Timer *t) {
auto iter = _impl->connections.begin();
while (iter != _impl->connections.end()) {
try {
auto &connection = iter->second;
connection->GetWebsocketConnection()->ping("keepalive");
}
catch (std::exception) {
iter->second->GetTCPConnection()->Disconnect();
}
iter++;
}
}));
_impl->methods.insert(std::make_pair("login", MethodHandlerEntry(std::bind(&WebsocketServer::Login, this, std::placeholders::_1, std::placeholders::_2), 0)));
_impl->methods.insert(std::make_pair("subscribe", MethodHandlerEntry(std::bind(&WebsocketServer::Subscribe, this, std::placeholders::_1, std::placeholders::_2), 0)));
_impl->methods.insert(std::make_pair("unsubscribe", MethodHandlerEntry(std::bind(&WebsocketServer::Unsubscribe, this, std::placeholders::_1, std::placeholders::_2), 0)));
_impl->login_handler = [](const WebsocketServerConnection* connection, const std::string& user, const std::string& pass) {
WebsocketLoginStatus ret;
ret.account_name = "admin";
if (connection->RemoteIP() == "127.0.0.1" || connection->RemoteIP() == "::") {
ret.logged_in = true;
return ret;
}
ret.logged_in = false;
return ret;
};
_impl->ws_server.clear_access_channels(websocketpp::log::alevel::all);
}
EQ::Net::WebsocketServer::~WebsocketServer()
{
}
void EQ::Net::WebsocketServer::ReleaseConnection(WebsocketServerConnection *connection)
{
UnsubscribeAll(connection);
_impl->connections.erase(connection->GetWebsocketConnection());
}
Json::Value EQ::Net::WebsocketServer::HandleRequest(WebsocketServerConnection *connection, const std::string &method, const Json::Value &params)
{
Json::Value err;
if (method != "login") {
if (!connection->IsAuthorized()) {
throw WebsocketException("Not logged in");
}
}
auto iter = _impl->methods.find(method);
if (iter != _impl->methods.end()) {
auto &s = iter->second;
if (s.status > connection->GetStatus()) {
throw WebsocketException("Status too low");
}
return s.handler(connection, params);
}
throw WebsocketException("Unknown Method");
}
void EQ::Net::WebsocketServer::SetMethodHandler(const std::string &method, MethodHandler handler, int required_status)
{
//Reserved method names
if (method == "subscribe" ||
method == "unsubscribe" ||
method == "login") {
return;
}
_impl->methods[method] = MethodHandlerEntry(handler, required_status);
}
void EQ::Net::WebsocketServer::SetLoginHandler(LoginHandler handler)
{
_impl->login_handler = handler;
}
void EQ::Net::WebsocketServer::DispatchEvent(WebsocketSubscriptionEvent evt, Json::Value data, int required_status)
{
try {
Json::Value event_obj;
event_obj["type"] = "event";
event_obj["event"] = (int)evt;
event_obj["data"] = data;
std::stringstream payload;
payload << event_obj;
for (auto &iter : _impl->connections) {
auto &c = iter.second;
if (c->GetStatus() >= required_status && IsSubscribed(c.get(), evt)) {
c->GetWebsocketConnection()->send(payload.str());
}
}
}
catch (std::exception) {
}
}
Json::Value EQ::Net::WebsocketServer::Login(WebsocketServerConnection *connection, const Json::Value &params)
{
Json::Value ret;
try {
Json::Value ret;
auto user = params[0].asString();
auto pass = params[1].asString();
auto r = _impl->login_handler(connection, user, pass);
if (r.logged_in) {
connection->SetAuthorized(true, r.account_name, r.account_id, 255);
ret["status"] = "Ok";
}
else {
connection->SetAuthorized(false, "", 0, 0);
ret["status"] = "Not Authorized";
}
return ret;
}
catch (std::exception) {
throw WebsocketException("Unable to process login request");
}
}
Json::Value EQ::Net::WebsocketServer::Subscribe(WebsocketServerConnection *connection, const Json::Value &params)
{
Json::Value ret;
try {
auto evt = params[0].asInt();
if (evt < 0 || evt >= SubscriptionEventMax) {
throw WebsocketException("Not a valid subscription");
}
DoSubscribe(connection, (WebsocketSubscriptionEvent)evt);
ret["status"] = "Ok";
return ret;
}
catch (WebsocketException &ex) {
throw ex;
}
catch (std::exception) {
throw WebsocketException("Unable to process unsubscribe request");
}
}
Json::Value EQ::Net::WebsocketServer::Unsubscribe(WebsocketServerConnection *connection, const Json::Value &params)
{
Json::Value ret;
try {
auto evt = params[0].asInt();
if (evt < 0 || evt >= SubscriptionEventMax) {
throw WebsocketException("Not a valid subscription");
}
DoUnsubscribe(connection, (WebsocketSubscriptionEvent)evt);
ret["status"] = "Ok";
return ret;
}
catch (WebsocketException &ex) {
throw ex;
}
catch (std::exception) {
throw WebsocketException("Unable to process unsubscribe request");
}
}
void EQ::Net::WebsocketServer::DoSubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub) {
auto &s = _impl->subscriptions[sub];
auto iter = s.find(connection);
if (iter == s.end()) {
s.insert(connection);
}
}
void EQ::Net::WebsocketServer::DoUnsubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub) {
auto &s = _impl->subscriptions[sub];
s.erase(connection);
}
bool EQ::Net::WebsocketServer::IsSubscribed(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub) {
auto &s = _impl->subscriptions[sub];
return s.count(connection) == 1;
}
void EQ::Net::WebsocketServer::UnsubscribeAll(WebsocketServerConnection *connection) {
for (auto i = 0; i < SubscriptionEventMax; ++i) {
DoUnsubscribe(connection, (WebsocketSubscriptionEvent)i);
}
}
+74
View File
@@ -0,0 +1,74 @@
#pragma once
#include "websocket_server_connection.h"
#include "../json/json.h"
#include <memory>
#include <functional>
#include <exception>
namespace EQ
{
namespace Net
{
enum WebsocketSubscriptionEvent : int
{
SubscriptionEventNone,
SubscriptionEventLog,
SubscriptionEventMax
};
struct WebsocketLoginStatus
{
bool logged_in;
std::string account_name;
uint32 account_id;
int status;
};
class WebsocketException : public std::exception
{
public:
WebsocketException(const std::string &msg)
: _msg(msg.empty() ? "Unknown Error" : msg) { }
~WebsocketException() throw() {}
virtual char const *what() const throw() {
return _msg.c_str();
}
private:
const std::string _msg;
};
class WebsocketServer
{
public:
typedef std::function<Json::Value(WebsocketServerConnection*, const Json::Value&)> MethodHandler;
typedef std::function<WebsocketLoginStatus(WebsocketServerConnection*, const std::string&, const std::string&)> LoginHandler;
WebsocketServer(const std::string &addr, int port);
~WebsocketServer();
void SetMethodHandler(const std::string& method, MethodHandler handler, int required_status);
void SetLoginHandler(LoginHandler handler);
void DispatchEvent(WebsocketSubscriptionEvent evt, Json::Value data = Json::Value(), int required_status = 0);
private:
void ReleaseConnection(WebsocketServerConnection *connection);
Json::Value HandleRequest(WebsocketServerConnection *connection, const std::string& method, const Json::Value &params);
Json::Value Login(WebsocketServerConnection *connection, const Json::Value &params);
Json::Value Subscribe(WebsocketServerConnection *connection, const Json::Value &params);
Json::Value Unsubscribe(WebsocketServerConnection *connection, const Json::Value &params);
void DoSubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub);
void DoUnsubscribe(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub);
bool IsSubscribed(WebsocketServerConnection *connection, WebsocketSubscriptionEvent sub);
void UnsubscribeAll(WebsocketServerConnection *connection);
struct Impl;
std::unique_ptr<Impl> _impl;
friend class WebsocketServerConnection;
};
}
}
+153
View File
@@ -0,0 +1,153 @@
#include "websocket_server_connection.h"
#include "websocket_server.h"
#include "../timer.h"
#include "../util/uuid.h"
#include <sstream>
#include <fmt/format.h>
struct EQ::Net::WebsocketServerConnection::Impl {
WebsocketServer *parent;
std::shared_ptr<TCPConnection> connection;
std::shared_ptr<websocket_connection> ws_connection;
std::string id;
bool authorized;
std::string account_name;
uint32 account_id;
int status;
};
EQ::Net::WebsocketServerConnection::WebsocketServerConnection(WebsocketServer *parent,
std::shared_ptr<TCPConnection> connection,
std::shared_ptr<websocket_connection> ws_connection)
{
_impl.reset(new Impl());
_impl->parent = parent;
_impl->connection = connection;
_impl->id = EQ::Util::UUID::Generate().ToString();
_impl->authorized = false;
_impl->account_id = 0;
_impl->status = 0;
_impl->ws_connection = ws_connection;
_impl->ws_connection->set_message_handler(std::bind(&WebsocketServerConnection::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
_impl->ws_connection->start();
connection->OnDisconnect([this](EQ::Net::TCPConnection *connection) {
_impl->parent->ReleaseConnection(this);
});
connection->OnRead([this](EQ::Net::TCPConnection *c, const unsigned char *buffer, size_t buffer_size) {
_impl->ws_connection->read_all((const char*)buffer, buffer_size);
});
connection->Start();
}
EQ::Net::WebsocketServerConnection::~WebsocketServerConnection()
{
}
std::string EQ::Net::WebsocketServerConnection::GetID() const
{
return _impl->id;
}
bool EQ::Net::WebsocketServerConnection::IsAuthorized() const
{
return _impl->authorized;
}
std::string EQ::Net::WebsocketServerConnection::GetAccountName() const
{
return _impl->account_name;
}
uint32 EQ::Net::WebsocketServerConnection::GetAccountID() const
{
return _impl->account_id;
}
int EQ::Net::WebsocketServerConnection::GetStatus() const
{
return _impl->status;
}
std::string EQ::Net::WebsocketServerConnection::RemoteIP() const
{
return _impl->connection->RemoteIP();
}
int EQ::Net::WebsocketServerConnection::RemotePort() const
{
return _impl->connection->RemotePort();
}
std::shared_ptr<EQ::Net::websocket_connection> EQ::Net::WebsocketServerConnection::GetWebsocketConnection()
{
return _impl->ws_connection;
}
std::shared_ptr<EQ::Net::TCPConnection> EQ::Net::WebsocketServerConnection::GetTCPConnection()
{
return _impl->connection;
}
void EQ::Net::WebsocketServerConnection::OnMessage(websocketpp::connection_hdl hdl, websocket_message_ptr msg)
{
BenchTimer timer;
timer.reset();
if (msg->get_opcode() == websocketpp::frame::opcode::text) {
try {
auto &payload = msg->get_payload();
std::stringstream ss(payload);
Json::Value root;
ss >> root;
auto method = root["method"].asString();
auto params = root["params"];
std::string id = "";
auto idNode = root["id"];
if (!idNode.isNull() && idNode.isString()) {
id = idNode.asString();
}
Json::Value response;
response["type"] = "method";
response["data"] = _impl->parent->HandleRequest(this, method, params);
response["method"] = method;
if(id != "") {
response["id"] = id;
}
SendResponse(response, timer.elapsed());
}
catch (std::exception &ex) {
Json::Value error;
error["type"] = "method";
error["error"] = fmt::format("{0}", ex.what());
SendResponse(error, timer.elapsed());
}
}
}
void EQ::Net::WebsocketServerConnection::SendResponse(const Json::Value &response, double time_elapsed)
{
Json::Value root = response;
root["execution_time"] = std::to_string(time_elapsed);
std::stringstream payload;
payload << root;
_impl->ws_connection->send(payload.str());
}
void EQ::Net::WebsocketServerConnection::SetAuthorized(bool v, const std::string account_name, uint32 account_id, int status)
{
_impl->authorized = v;
_impl->account_name = account_name;
_impl->account_id = account_id;
_impl->status = status;
}
+46
View File
@@ -0,0 +1,46 @@
#pragma once
#include "tcp_server.h"
#include "../types.h"
#include "../json/json-forwards.h"
#include <websocketpp/config/core.hpp>
#include <websocketpp/server.hpp>
namespace EQ
{
namespace Net
{
typedef websocketpp::server<websocketpp::config::core> websocket_server;
typedef websocketpp::connection<websocketpp::config::core> websocket_connection;
typedef websocket_server::message_ptr websocket_message_ptr;
class WebsocketServer;
class WebsocketServerConnection
{
public:
WebsocketServerConnection(WebsocketServer *parent,
std::shared_ptr<TCPConnection> connection,
std::shared_ptr<websocket_connection> ws_connection);
~WebsocketServerConnection();
std::string GetID() const;
bool IsAuthorized() const;
std::string GetAccountName() const;
uint32 GetAccountID() const;
int GetStatus() const;
std::string RemoteIP() const;
int RemotePort() const;
private:
std::shared_ptr<websocket_connection> GetWebsocketConnection();
std::shared_ptr<TCPConnection> GetTCPConnection();
void OnMessage(websocketpp::connection_hdl hdl, websocket_message_ptr msg);
void SendResponse(const Json::Value &response, double time_elapsed);
void SetAuthorized(bool v, const std::string account_name, uint32 account_id, int status);
struct Impl;
std::unique_ptr<Impl> _impl;
friend class WebsocketServer;
};
}
}