Relay link improvements, considering changing it.

This commit is contained in:
KimLS 2016-10-14 19:48:49 -07:00
parent 4ba0aa8e7f
commit 44b9c99781
10 changed files with 314 additions and 149 deletions

View File

@ -283,7 +283,6 @@ bool EmuTCPConnection::LineOutQueuePush(char* line) {
}
#endif
if(line[0] == '*') {
printf("LineOutQueuePush %s\n", line);
if (strcmp(line, "**PACKETMODE**") == 0) {
MSendQueue.lock();
safe_delete_array(sendbuf);

View File

@ -15,6 +15,12 @@ namespace EQ
bool ipv6;
};
addrinfo hints;
memset(&hints, 0, sizeof(addrinfo));
hints.ai_family = PF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
auto loop = EQ::EventLoop::Get().Handle();
uv_getaddrinfo_t *resolver = new uv_getaddrinfo_t();
memset(resolver, 0, sizeof(uv_getaddrinfo_t));
@ -47,8 +53,9 @@ namespace EQ
delete baton;
delete req;
uv_freeaddrinfo(res);
cb(addr);
}, addr.c_str(), port_str.c_str(), nullptr);
}, addr.c_str(), port_str.c_str(), &hints);
}
}
}

View File

@ -283,6 +283,10 @@ std::string EQ::Net::Packet::ToString() const
std::string EQ::Net::Packet::ToString(size_t line_length) const
{
if (Length() == 0) {
return fmt::format("{:0>5x} |", 0);
}
std::string ret;
size_t lines = Length() / line_length;
size_t i;

View File

@ -1,24 +1,52 @@
#include "relay_link.h"
#include "dns.h"
#include "../eqemu_logsys.h"
#include <algorithm>
#include "../md5.h"
#include "../servertalk.h"
EQ::Net::RelayLink::RelayLink(const std::string &addr, int port, const std::string &identifier)
EQ::Net::RelayLink::RelayLink(const std::string &addr, int port, const std::string &identifier, const std::string &password)
: m_timer(std::unique_ptr<EQ::Timer>(new EQ::Timer(250, true, std::bind(&EQ::Net::RelayLink::Connect, this))))
{
m_established = false;
m_connecting = false;
m_port = port;
m_identifier = identifier;
m_password = password;
DNSLookup(addr, port, false, [this](const std::string &address) {
m_addr = address;
});
m_opcode_dispatch.insert(std::make_pair(ServerOP_ZAAuthFailed, std::bind(&RelayLink::OnAuthFailed, this, std::placeholders::_1)));
}
EQ::Net::RelayLink::~RelayLink()
{
}
void EQ::Net::RelayLink::OnMessageType(uint16 opcode, std::function<void(const EQ::Net::Packet&p)> cb)
{
if (opcode != ServerOP_ZAAuthFailed) {
m_opcode_dispatch.insert(std::make_pair(opcode, cb));
}
}
void EQ::Net::RelayLink::SendPacket(uint16 opcode, const EQ::Net::Packet &p)
{
EQ::Net::WritablePacket packet;
packet.PutUInt32(0, p.Length() + 7);
packet.PutInt8(4, 0);
packet.PutUInt16(5, opcode);
if(p.Length() > 0)
packet.PutPacket(7, p);
if (m_connection) {
m_connection->Write((const char*)packet.Data(), packet.Length());
}
else {
m_packet_queue.push(packet);
}
}
void EQ::Net::RelayLink::Connect()
{
if (m_addr.length() == 0 || m_port == 0 || m_connection || m_connecting) {
@ -56,10 +84,17 @@ void EQ::Net::RelayLink::ProcessData(EQ::Net::TCPConnection *c, const unsigned c
Log.OutF(Logs::General, Logs::Debug, "Process data:\n{0}", p.ToString());
if (m_established) {
//process raw packet
ProcessPacket(p);
}
else {
auto msg = fmt::format("**PACKETMODE{0}**", m_identifier);
std::string msg;
if (m_identifier.compare("LOGIN") == 0) {
msg = fmt::format("**PACKETMODE**\r");
}
else {
msg = fmt::format("**PACKETMODE{0}**\r", m_identifier);
}
std::string cmp_msg;
if (p.GetInt8(0) == '*') {
cmp_msg = p.GetString(0, msg.length());
@ -74,6 +109,7 @@ void EQ::Net::RelayLink::ProcessData(EQ::Net::TCPConnection *c, const unsigned c
if (cmp_msg.compare(msg) == 0) {
m_established = true;
Log.OutF(Logs::General, Logs::Debug, "Established connection of type {0}", m_identifier);
SendPassword();
}
}
}
@ -82,9 +118,79 @@ void EQ::Net::RelayLink::ProcessData(EQ::Net::TCPConnection *c, const unsigned c
}
}
void EQ::Net::RelayLink::ProcessPacket(const EQ::Net::Packet &p)
{
char *buffer = (char*)p.Data();
m_data_buffer.insert(m_data_buffer.begin() + m_data_buffer.size(), buffer, buffer + p.Length());
ProcessBuffer();
}
void EQ::Net::RelayLink::ProcessBuffer()
{
size_t size = 7;
size_t base = 0;
size_t used = m_data_buffer.size();
while ((used - base) >= size) {
uint32 packet_size = *(uint32*)&m_data_buffer[base];
uint8 packet_flags = *(uint8*)&m_data_buffer[base + 4];
uint16 packet_opcode = *(uint16*)&m_data_buffer[base + 5];
if ((used - base) >= packet_size) {
EQ::Net::ReadOnlyPacket p(&m_data_buffer[base], packet_size);
if (m_opcode_dispatch.count(packet_opcode) > 0) {
auto &cb = m_opcode_dispatch[(int)packet_opcode];
cb(p);
}
else {
Log.OutF(Logs::General, Logs::Debug, "Unhandled packet of type {0:x}", packet_opcode);
}
base += packet_size;
}
else {
EQ::Net::WritablePacket p;
if (m_opcode_dispatch.count(packet_opcode) > 0) {
auto &cb = m_opcode_dispatch[(int)packet_opcode];
cb(p);
}
else {
Log.OutF(Logs::General, Logs::Debug, "Unhandled packet of type {0:x}", packet_opcode);
}
}
}
if (used == base) {
m_data_buffer.clear();
}
else {
m_data_buffer.erase(m_data_buffer.begin(), m_data_buffer.begin() + base);
}
}
void EQ::Net::RelayLink::ProcessQueue()
{
if (!m_connection)
return;
while (!m_packet_queue.empty()) {
auto &p = m_packet_queue.front();
m_connection->Write((const char*)p.Data(), p.Length());
m_packet_queue.pop();
}
}
void EQ::Net::RelayLink::SendIdentifier()
{
auto msg = fmt::format("**PACKETMODE{0}**\r", m_identifier);
std::string msg;
if (m_identifier.compare("LOGIN") == 0) {
msg = fmt::format("**PACKETMODE**\r");
}
else {
msg = fmt::format("**PACKETMODE{0}**\r", m_identifier);
}
EQ::Net::WritablePacket packet;
packet.PutData(0, (void*)msg.c_str(), msg.length());
SendInternal(packet);
@ -98,3 +204,23 @@ void EQ::Net::RelayLink::SendInternal(const EQ::Net::Packet &p)
m_connection->Write((const char*)p.Data(), p.Length());
}
void EQ::Net::RelayLink::SendPassword()
{
if (m_password.length() > 0) {
char hash[16] = { 0 };
MD5::Generate((const uchar*)m_password.c_str(), m_password.length(), (uchar*)&hash[0]);
EQ::Net::WritablePacket p;
p.PutData(0, &hash[0], 16);
SendPacket(ServerOP_ZAAuth, p);
}
}
void EQ::Net::RelayLink::OnAuthFailed(const EQ::Net::Packet &p)
{
if (m_connection) {
Log.OutF(Logs::General, Logs::Debug, "Authorization failed for server type {0}", m_identifier);
m_connection->Disconnect();
}
}

View File

@ -2,8 +2,11 @@
#include "tcp_server.h"
#include "packet.h"
#include "../types.h"
#include "../event/timer.h"
#include "../event/event_loop.h"
#include <map>
#include <queue>
namespace EQ
{
@ -11,22 +14,36 @@ namespace EQ
class RelayLink
{
public:
RelayLink(const std::string &addr, int port, const std::string &identifier);
RelayLink(const std::string &addr, int port, const std::string &identifier, const std::string &password);
~RelayLink();
void OnMessageType(uint16 opcode, std::function<void(const EQ::Net::Packet &p)> cb);
void SendPacket(uint16 opcode, const EQ::Net::Packet &p);
bool Connected() const { return m_connection != nullptr; }
std::string GetIP() const { return m_addr; }
uint16 GetPort() const { return m_port; }
private:
void Connect();
void ProcessData(EQ::Net::TCPConnection *c, const unsigned char *data, size_t length);
void ProcessPacket(const EQ::Net::Packet &p);
void ProcessBuffer();
void ProcessQueue();
void SendIdentifier();
void SendInternal(const EQ::Net::Packet &p);
void SendPassword();
void OnAuthFailed(const EQ::Net::Packet &p);
std::unique_ptr<EQ::Timer> m_timer;
std::string m_addr;
std::string m_identifier;
std::string m_password;
int m_port;
std::shared_ptr<EQ::Net::TCPConnection> m_connection;
bool m_established;
bool m_connecting;
std::vector<char> m_data_buffer;
std::map<uint16, std::function<void(const EQ::Net::Packet &p)>> m_opcode_dispatch;
std::queue<EQ::Net::WritablePacket> m_packet_queue;
};
}
}

View File

@ -21,16 +21,11 @@ INSTALL(TARGETS queryserv RUNTIME DESTINATION ${CMAKE_INSTALL_PREFIX}/bin)
ADD_DEFINITIONS(-DQSERV)
TARGET_LINK_LIBRARIES(queryserv common debug ${MySQL_LIBRARY_DEBUG} optimized ${MySQL_LIBRARY_RELEASE} ${ZLIB_LIBRARY})
TARGET_LINK_LIBRARIES(queryserv common debug ${MySQL_LIBRARY_DEBUG} optimized ${MySQL_LIBRARY_RELEASE} ${ZLIB_LIBRARY} libuv fmt)
IF(MSVC)
SET_TARGET_PROPERTIES(queryserv PROPERTIES LINK_FLAGS_RELEASE "/OPT:REF /OPT:ICF")
TARGET_LINK_LIBRARIES(queryserv "Ws2_32.lib")
ENDIF(MSVC)
IF(MINGW)
TARGET_LINK_LIBRARIES(queryserv "WS2_32")
ENDIF(MINGW)
IF(WIN32)
TARGET_LINK_LIBRARIES(queryserv "ws2_32" "psapi" "iphlpapi" "userenv")
ENDIF(WIN32)
IF(UNIX)
TARGET_LINK_LIBRARIES(queryserv "${CMAKE_DL_LIBS}")

View File

@ -24,10 +24,13 @@
#include "../common/servertalk.h"
#include "../common/platform.h"
#include "../common/crash.h"
#include "../common/event/event_loop.h"
#include "../common/net/relay_link.h"
#include "../common/timer.h"
#include "database.h"
#include "queryservconfig.h"
#include "worldserver.h"
#include "lfguild.h"
#include "worldserver.h"
#include <list>
#include <signal.h>
@ -42,8 +45,6 @@ EQEmuLogSys Log;
void CatchSignal(int sig_num) {
RunLoops = false;
if(worldserver)
worldserver->Disconnect();
}
int main() {
@ -51,17 +52,6 @@ int main() {
Log.LoadLogSettingsDefaults();
set_exception_handler();
Timer LFGuildExpireTimer(60000);
Timer InterserverTimer(INTERSERVER_TIMER); // does auto-reconnect
/* Load XML from eqemu_config.xml
<qsdatabase>
<host>127.0.0.1</host>
<port>3306</port>
<username>user</username>
<password>password</password>
<db>dbname</db>
</qsdatabase>
*/
Log.Out(Logs::General, Logs::QS_Server, "Starting EQEmu QueryServ.");
if (!queryservconfig::LoadConfig()) {
@ -110,11 +100,7 @@ int main() {
if(LFGuildExpireTimer.Check())
lfguildmanager.ExpireEntries();
if (InterserverTimer.Check()) {
if (worldserver->TryReconnect() && (!worldserver->Connected()))
worldserver->AsyncConnect();
}
worldserver->Process();
EQ::EventLoop::Get().Process();
Sleep(1);
}
Log.CloseFileLogs();

View File

@ -42,125 +42,152 @@ extern Database database;
extern LFGuildManager lfguildmanager;
WorldServer::WorldServer()
: WorldConnection(EmuTCPConnection::packetModeQueryServ, Config->SharedKey.c_str())
{
pTryReconnect = true;
}
WorldServer::~WorldServer()
{
}
void WorldServer::OnConnected()
void WorldServer::Connect()
{
Log.Out(Logs::Detail, Logs::QS_Server, "Connected to World.");
WorldConnection::OnConnected();
m_link.reset(new EQ::Net::RelayLink(Config->WorldIP, Config->WorldTCPPort, "QS", Config->SharedKey));
m_link->OnMessageType(ServerOP_Speech, std::bind(&WorldServer::HandleMessage, this, ServerOP_Speech, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSPlayerLogTrades, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSPlayerLogTrades, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSPlayerLogHandins, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSPlayerLogHandins, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSPlayerLogNPCKills, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSPlayerLogNPCKills, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSPlayerLogDeletes, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSPlayerLogDeletes, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSPlayerLogMoves, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSPlayerLogMoves, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSPlayerLogMerchantTransactions, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSPlayerLogMerchantTransactions, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QueryServGeneric, std::bind(&WorldServer::HandleMessage, this, ServerOP_QueryServGeneric, std::placeholders::_1));
m_link->OnMessageType(ServerOP_QSSendQuery, std::bind(&WorldServer::HandleMessage, this, ServerOP_QSSendQuery, std::placeholders::_1));
}
void WorldServer::Process()
bool WorldServer::SendPacket(ServerPacket *pack)
{
WorldConnection::Process();
if (!Connected())
return;
EQ::Net::ReadOnlyPacket p(pack->pBuffer, pack->size);
m_link->SendPacket(pack->opcode, p);
return true;
}
ServerPacket *pack = 0;
while((pack = tcpc.PopPacket()))
{
Log.Out(Logs::Detail, Logs::QS_Server, "Received Opcode: %4X", pack->opcode);
switch(pack->opcode) {
case 0: {
break;
}
case ServerOP_KeepAlive: {
break;
}
case ServerOP_Speech: {
Server_Speech_Struct *SSS = (Server_Speech_Struct*)pack->pBuffer;
std::string tmp1 = SSS->from;
std::string tmp2 = SSS->to;
database.AddSpeech(tmp1.c_str(), tmp2.c_str(), SSS->message, SSS->minstatus, SSS->guilddbid, SSS->type);
break;
}
case ServerOP_QSPlayerLogTrades: {
QSPlayerLogTrade_Struct *QS = (QSPlayerLogTrade_Struct*)pack->pBuffer;
database.LogPlayerTrade(QS, QS->_detail_count);
break;
}
case ServerOP_QSPlayerLogHandins: {
QSPlayerLogHandin_Struct *QS = (QSPlayerLogHandin_Struct*)pack->pBuffer;
database.LogPlayerHandin(QS, QS->_detail_count);
break;
}
case ServerOP_QSPlayerLogNPCKills: {
QSPlayerLogNPCKill_Struct *QS = (QSPlayerLogNPCKill_Struct*)pack->pBuffer;
uint32 Members = pack->size - sizeof(QSPlayerLogNPCKill_Struct);
if (Members > 0) Members = Members / sizeof(QSPlayerLogNPCKillsPlayers_Struct);
database.LogPlayerNPCKill(QS, Members);
break;
}
case ServerOP_QSPlayerLogDeletes: {
QSPlayerLogDelete_Struct *QS = (QSPlayerLogDelete_Struct*)pack->pBuffer;
uint32 Items = QS->char_count;
database.LogPlayerDelete(QS, Items);
break;
}
case ServerOP_QSPlayerLogMoves: {
QSPlayerLogMove_Struct *QS = (QSPlayerLogMove_Struct*)pack->pBuffer;
uint32 Items = QS->char_count;
database.LogPlayerMove(QS, Items);
break;
}
case ServerOP_QSPlayerLogMerchantTransactions: {
QSMerchantLogTransaction_Struct *QS = (QSMerchantLogTransaction_Struct*)pack->pBuffer;
uint32 Items = QS->char_count + QS->merchant_count;
database.LogMerchantTransaction(QS, Items);
break;
}
case ServerOP_QueryServGeneric: {
/*
The purpose of ServerOP_QueryServerGeneric is so that we don't have to add code to world just to relay packets
each time we add functionality to queryserv.
A ServerOP_QueryServGeneric packet has the following format:
uint32 SourceZoneID
uint32 SourceInstanceID
char OriginatingCharacterName[0]
- Null terminated name of the character this packet came from. This could be just
- an empty string if it has no meaning in the context of a particular packet.
uint32 Type
The 'Type' field is a 'sub-opcode'. A value of 0 is used for the LFGuild packets. The next feature to be added
to queryserv would use 1, etc.
Obviously, any fields in the packet following the 'Type' will be unique to the particular type of packet. The
'Generic' in the name of this ServerOP code relates to the four header fields.
*/
std::string WorldServer::GetIP() const
{
return m_link->GetIP();
}
char From[64];
pack->SetReadPosition(8);
pack->ReadString(From);
uint32 Type = pack->ReadUInt32();
uint16 WorldServer::GetPort() const
{
return m_link->GetPort();
}
switch(Type) {
case QSG_LFGuild:{
lfguildmanager.HandlePacket(pack);
break;
}
default:
Log.Out(Logs::Detail, Logs::QS_Server, "Received unhandled ServerOP_QueryServGeneric", Type);
break;
}
break;
}
case ServerOP_QSSendQuery: {
/* Process all packets here */
database.GeneralQueryReceive(pack);
break;
}
bool WorldServer::Connected() const
{
return m_link->Connected();
}
void WorldServer::HandleMessage(uint16 opcode, const EQ::Net::Packet &p)
{
Log.OutF(Logs::General, Logs::Debug, "Received Opcode: {0}\n{1}", opcode, p.ToString());
switch(opcode) {
case 0: {
break;
}
}
safe_delete(pack);
return;
}
case ServerOP_KeepAlive: {
break;
}
case ServerOP_Speech: {
Server_Speech_Struct *SSS = (Server_Speech_Struct*)p.Data();
std::string tmp1 = SSS->from;
std::string tmp2 = SSS->to;
database.AddSpeech(tmp1.c_str(), tmp2.c_str(), SSS->message, SSS->minstatus, SSS->guilddbid, SSS->type);
break;
}
case ServerOP_QSPlayerLogTrades: {
QSPlayerLogTrade_Struct *QS = (QSPlayerLogTrade_Struct*)p.Data();
database.LogPlayerTrade(QS, QS->_detail_count);
break;
}
case ServerOP_QSPlayerLogHandins: {
QSPlayerLogHandin_Struct *QS = (QSPlayerLogHandin_Struct*)p.Data();
database.LogPlayerHandin(QS, QS->_detail_count);
break;
}
case ServerOP_QSPlayerLogNPCKills: {
QSPlayerLogNPCKill_Struct *QS = (QSPlayerLogNPCKill_Struct*)p.Data();
uint32 Members = p.Length() - sizeof(QSPlayerLogNPCKill_Struct);
if (Members > 0) Members = Members / sizeof(QSPlayerLogNPCKillsPlayers_Struct);
database.LogPlayerNPCKill(QS, Members);
break;
}
case ServerOP_QSPlayerLogDeletes: {
QSPlayerLogDelete_Struct *QS = (QSPlayerLogDelete_Struct*)p.Data();
uint32 Items = QS->char_count;
database.LogPlayerDelete(QS, Items);
break;
}
case ServerOP_QSPlayerLogMoves: {
QSPlayerLogMove_Struct *QS = (QSPlayerLogMove_Struct*)p.Data();
uint32 Items = QS->char_count;
database.LogPlayerMove(QS, Items);
break;
}
case ServerOP_QSPlayerLogMerchantTransactions: {
QSMerchantLogTransaction_Struct *QS = (QSMerchantLogTransaction_Struct*)p.Data();
uint32 Items = QS->char_count + QS->merchant_count;
database.LogMerchantTransaction(QS, Items);
break;
}
case ServerOP_QueryServGeneric: {
/*
The purpose of ServerOP_QueryServerGeneric is so that we don't have to add code to world just to relay packets
each time we add functionality to queryserv.
A ServerOP_QueryServGeneric packet has the following format:
uint32 SourceZoneID
uint32 SourceInstanceID
char OriginatingCharacterName[0]
- Null terminated name of the character this packet came from. This could be just
- an empty string if it has no meaning in the context of a particular packet.
uint32 Type
The 'Type' field is a 'sub-opcode'. A value of 0 is used for the LFGuild packets. The next feature to be added
to queryserv would use 1, etc.
Obviously, any fields in the packet following the 'Type' will be unique to the particular type of packet. The
'Generic' in the name of this ServerOP code relates to the four header fields.
*/
auto from = p.GetCString(8);
uint32 Type = p.GetUInt32(8 + from.length() + 1);
switch(Type) {
case QSG_LFGuild:{
ServerPacket pack;
pack.pBuffer = (uchar*)p.Data();
pack.opcode = opcode;
pack.size = p.Length();
lfguildmanager.HandlePacket(&pack);
pack.pBuffer = nullptr;
break;
}
default:
Log.Out(Logs::Detail, Logs::QS_Server, "Received unhandled ServerOP_QueryServGeneric", Type);
break;
}
break;
}
case ServerOP_QSSendQuery: {
/* Process all packets here */
ServerPacket pack;
pack.pBuffer = (uchar*)p.Data();
pack.opcode = opcode;
pack.size = p.Length();
database.GeneralQueryReceive(&pack);
pack.pBuffer = nullptr;
break;
}
}
}

View File

@ -18,18 +18,24 @@
#ifndef WORLDSERVER_H
#define WORLDSERVER_H
#include "../common/worldconn.h"
#include "../common/eq_packet_structs.h"
#include "../common/net/relay_link.h"
class WorldServer : public WorldConnection
class WorldServer
{
public:
WorldServer();
virtual ~WorldServer();
virtual void Process();
~WorldServer();
void Connect();
bool SendPacket(ServerPacket* pack);
std::string GetIP() const;
uint16 GetPort() const;
bool Connected() const;
void HandleMessage(uint16 opcode, const EQ::Net::Packet &p);
private:
virtual void OnConnected();
std::unique_ptr<EQ::Net::RelayLink> m_link;
};
#endif

View File

@ -436,8 +436,6 @@ int main(int argc, char** argv) {
bool eqsf_open = false;
std::unique_ptr<EQ::Net::EQStreamManager> eqsm;
EQ::Net::RelayLink world_link("127.0.0.1", Config->WorldTCPPort, "QS");
EQ::Timer process_timer(33, true, [&eqsf_open, &eqsm, &stream_identifier, &eqsi, &worldwasconnected,
&zoneupdate_timer, &IDLEZONEUPDATE, &ZONEUPDATE, &quest_timers, &InterserverTimer](EQ::Timer* t) {
//Advance the timer to our current point in time