mirror of
https://github.com/EQEmu/Server.git
synced 2025-12-14 19:51:29 +00:00
Many tweaks to stream memory allocation, including but not limited to streams now are shared_ptrs.
This commit is contained in:
parent
d037bc9dcc
commit
7dbe6a7426
@ -206,8 +206,14 @@ class EQStream : public EQStreamInterface {
|
|||||||
|
|
||||||
void init(bool resetSession=true);
|
void init(bool resetSession=true);
|
||||||
public:
|
public:
|
||||||
EQStream() { init(); remote_ip = 0; remote_port = 0; State=UNESTABLISHED; StreamType=UnknownStream; compressed=true; encoded=false; app_opcode_size=2; bytes_sent=0; bytes_recv=0; create_time=Timer::GetTimeSeconds(); sessionAttempts = 0; streamactive=false; }
|
EQStream() { init(); remote_ip = 0; remote_port = 0; State = UNESTABLISHED;
|
||||||
EQStream(sockaddr_in addr) { init(); remote_ip=addr.sin_addr.s_addr; remote_port=addr.sin_port; State=UNESTABLISHED; StreamType=UnknownStream; compressed=true; encoded=false; app_opcode_size=2; bytes_sent=0; bytes_recv=0; create_time=Timer::GetTimeSeconds(); }
|
StreamType = UnknownStream; compressed = true; encoded = false; app_opcode_size = 2;
|
||||||
|
bytes_sent = 0; bytes_recv = 0; create_time = Timer::GetTimeSeconds(); sessionAttempts = 0;
|
||||||
|
streamactive = false; }
|
||||||
|
EQStream(sockaddr_in addr) { init(); remote_ip = addr.sin_addr.s_addr;
|
||||||
|
remote_port = addr.sin_port; State = UNESTABLISHED; StreamType = UnknownStream;
|
||||||
|
compressed = true; encoded = false; app_opcode_size = 2; bytes_sent = 0; bytes_recv = 0;
|
||||||
|
create_time = Timer::GetTimeSeconds(); }
|
||||||
virtual ~EQStream() { RemoveData(); SetState(CLOSED); }
|
virtual ~EQStream() { RemoveData(); SetState(CLOSED); }
|
||||||
void SetMaxLen(uint32 length) { MaxLen=length; }
|
void SetMaxLen(uint32 length) { MaxLen=length; }
|
||||||
|
|
||||||
|
|||||||
@ -116,9 +116,9 @@ struct sockaddr_in address;
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
EQStream *EQStreamFactory::Pop()
|
std::shared_ptr<EQStream> EQStreamFactory::Pop()
|
||||||
{
|
{
|
||||||
EQStream *s=nullptr;
|
std::shared_ptr<EQStream> s = nullptr;
|
||||||
MNewStreams.lock();
|
MNewStreams.lock();
|
||||||
if (NewStreams.size()) {
|
if (NewStreams.size()) {
|
||||||
s = NewStreams.front();
|
s = NewStreams.front();
|
||||||
@ -130,7 +130,7 @@ EQStream *s=nullptr;
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
void EQStreamFactory::Push(EQStream *s)
|
void EQStreamFactory::Push(std::shared_ptr<EQStream> s)
|
||||||
{
|
{
|
||||||
MNewStreams.lock();
|
MNewStreams.lock();
|
||||||
NewStreams.push(s);
|
NewStreams.push(s);
|
||||||
@ -140,16 +140,15 @@ void EQStreamFactory::Push(EQStream *s)
|
|||||||
void EQStreamFactory::ReaderLoop()
|
void EQStreamFactory::ReaderLoop()
|
||||||
{
|
{
|
||||||
fd_set readset;
|
fd_set readset;
|
||||||
std::map<std::pair<uint32, uint16>,EQStream *>::iterator stream_itr;
|
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator stream_itr;
|
||||||
int num;
|
int num;
|
||||||
int length;
|
int length;
|
||||||
unsigned char buffer[2048];
|
unsigned char buffer[2048];
|
||||||
sockaddr_in from;
|
sockaddr_in from;
|
||||||
int socklen = sizeof(sockaddr_in);
|
int socklen = sizeof(sockaddr_in);
|
||||||
timeval sleep_time;
|
timeval sleep_time;
|
||||||
//time_t now;
|
|
||||||
|
|
||||||
ReaderRunning = true;
|
ReaderRunning = true;
|
||||||
|
|
||||||
while(sock!=-1) {
|
while(sock!=-1) {
|
||||||
MReaderRunning.lock();
|
MReaderRunning.lock();
|
||||||
if (!ReaderRunning)
|
if (!ReaderRunning)
|
||||||
@ -183,7 +182,7 @@ timeval sleep_time;
|
|||||||
stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
|
stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
|
||||||
if (stream_itr == Streams.end()) {
|
if (stream_itr == Streams.end()) {
|
||||||
if (buffer[1]==OP_SessionRequest) {
|
if (buffer[1]==OP_SessionRequest) {
|
||||||
EQStream *s = new EQStream(from);
|
std::shared_ptr<EQStream> s = std::make_shared<EQStream>(from);
|
||||||
s->SetStreamType(StreamType);
|
s->SetStreamType(StreamType);
|
||||||
Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s;
|
Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s;
|
||||||
WriterWork.Signal();
|
WriterWork.Signal();
|
||||||
@ -194,13 +193,13 @@ timeval sleep_time;
|
|||||||
}
|
}
|
||||||
MStreams.unlock();
|
MStreams.unlock();
|
||||||
} else {
|
} else {
|
||||||
EQStream *curstream = stream_itr->second;
|
std::shared_ptr<EQStream> curstream = stream_itr->second;
|
||||||
//dont bother processing incoming packets for closed connections
|
//dont bother processing incoming packets for closed connections
|
||||||
if(curstream->CheckClosed())
|
if(curstream->CheckClosed())
|
||||||
curstream = nullptr;
|
curstream = nullptr;
|
||||||
else
|
else
|
||||||
curstream->PutInUse();
|
curstream->PutInUse();
|
||||||
MStreams.unlock(); //the in use flag prevents the stream from being deleted while we are using it.
|
//the in use flag prevents the stream from being deleted while we are using it.
|
||||||
|
|
||||||
if(curstream) {
|
if(curstream) {
|
||||||
curstream->AddBytesRecv(length);
|
curstream->AddBytesRecv(length);
|
||||||
@ -208,6 +207,7 @@ timeval sleep_time;
|
|||||||
curstream->SetLastPacketTime(Timer::GetCurrentTime());
|
curstream->SetLastPacketTime(Timer::GetCurrentTime());
|
||||||
curstream->ReleaseFromUse();
|
curstream->ReleaseFromUse();
|
||||||
}
|
}
|
||||||
|
MStreams.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -220,10 +220,10 @@ void EQStreamFactory::CheckTimeout()
|
|||||||
MStreams.lock();
|
MStreams.lock();
|
||||||
|
|
||||||
unsigned long now=Timer::GetCurrentTime();
|
unsigned long now=Timer::GetCurrentTime();
|
||||||
std::map<std::pair<uint32, uint16>,EQStream *>::iterator stream_itr;
|
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator stream_itr;
|
||||||
|
|
||||||
for(stream_itr = Streams.begin(); stream_itr != Streams.end();) {
|
for(stream_itr = Streams.begin(); stream_itr != Streams.end();) {
|
||||||
EQStream *s = stream_itr->second;
|
std::shared_ptr<EQStream> s = stream_itr->second;
|
||||||
|
|
||||||
s->CheckTimeout(now, stream_timeout);
|
s->CheckTimeout(now, stream_timeout);
|
||||||
|
|
||||||
@ -235,10 +235,9 @@ void EQStreamFactory::CheckTimeout()
|
|||||||
//give it a little time for everybody to finish with it
|
//give it a little time for everybody to finish with it
|
||||||
} else {
|
} else {
|
||||||
//everybody is done, we can delete it now
|
//everybody is done, we can delete it now
|
||||||
std::map<std::pair<uint32, uint16>,EQStream *>::iterator temp=stream_itr;
|
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator temp = stream_itr;
|
||||||
++stream_itr;
|
++stream_itr;
|
||||||
//let whoever has the stream outside delete it
|
temp->second = nullptr;
|
||||||
delete temp->second;
|
|
||||||
Streams.erase(temp);
|
Streams.erase(temp);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -251,21 +250,17 @@ void EQStreamFactory::CheckTimeout()
|
|||||||
|
|
||||||
void EQStreamFactory::WriterLoop()
|
void EQStreamFactory::WriterLoop()
|
||||||
{
|
{
|
||||||
std::map<std::pair<uint32, uint16>,EQStream *>::iterator stream_itr;
|
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator stream_itr;
|
||||||
bool havework=true;
|
bool havework=true;
|
||||||
std::vector<EQStream *> wants_write;
|
std::vector<std::shared_ptr<EQStream>> wants_write;
|
||||||
std::vector<EQStream *>::iterator cur,end;
|
std::vector<std::shared_ptr<EQStream>>::iterator cur, end;
|
||||||
bool decay = false;
|
bool decay = false;
|
||||||
uint32 stream_count;
|
uint32 stream_count;
|
||||||
|
|
||||||
Timer DecayTimer(20);
|
Timer DecayTimer(20);
|
||||||
|
|
||||||
WriterRunning = true;
|
WriterRunning = true;
|
||||||
DecayTimer.Enable();
|
DecayTimer.Enable();
|
||||||
|
|
||||||
while(sock!=-1) {
|
while(sock!=-1) {
|
||||||
//if (!havework) {
|
|
||||||
//WriterWork.Wait();
|
|
||||||
//}
|
|
||||||
MWriterRunning.lock();
|
MWriterRunning.lock();
|
||||||
if (!WriterRunning)
|
if (!WriterRunning)
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#define _EQSTREAMFACTORY_H
|
#define _EQSTREAMFACTORY_H
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|
||||||
@ -26,10 +27,10 @@ class EQStreamFactory : private Timeoutable {
|
|||||||
|
|
||||||
EQStreamType StreamType;
|
EQStreamType StreamType;
|
||||||
|
|
||||||
std::queue<EQStream *> NewStreams;
|
std::queue<std::shared_ptr<EQStream>> NewStreams;
|
||||||
Mutex MNewStreams;
|
Mutex MNewStreams;
|
||||||
|
|
||||||
std::map<std::pair<uint32, uint16>,EQStream *> Streams;
|
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>> Streams;
|
||||||
Mutex MStreams;
|
Mutex MStreams;
|
||||||
|
|
||||||
virtual void CheckTimeout();
|
virtual void CheckTimeout();
|
||||||
@ -42,8 +43,8 @@ class EQStreamFactory : private Timeoutable {
|
|||||||
EQStreamFactory(EQStreamType type, uint32 timeout = 135000) : Timeoutable(5000), stream_timeout(timeout) { ReaderRunning=false; WriterRunning=false; StreamType=type; sock=-1; }
|
EQStreamFactory(EQStreamType type, uint32 timeout = 135000) : Timeoutable(5000), stream_timeout(timeout) { ReaderRunning=false; WriterRunning=false; StreamType=type; sock=-1; }
|
||||||
EQStreamFactory(EQStreamType type, int port, uint32 timeout = 135000);
|
EQStreamFactory(EQStreamType type, int port, uint32 timeout = 135000);
|
||||||
|
|
||||||
EQStream *Pop();
|
std::shared_ptr<EQStream> Pop();
|
||||||
void Push(EQStream *s);
|
void Push(std::shared_ptr<EQStream> s);
|
||||||
|
|
||||||
bool Open();
|
bool Open();
|
||||||
bool Open(unsigned long port) { Port=port; return Open(); }
|
bool Open(unsigned long port) { Port=port; return Open(); }
|
||||||
|
|||||||
@ -9,13 +9,12 @@ EQStreamIdentifier::~EQStreamIdentifier() {
|
|||||||
m_identified.front()->ReleaseFromUse();
|
m_identified.front()->ReleaseFromUse();
|
||||||
m_identified.pop();
|
m_identified.pop();
|
||||||
}
|
}
|
||||||
std::vector<Record *>::iterator cur, end;
|
std::vector<Record>::iterator cur, end;
|
||||||
cur = m_streams.begin();
|
cur = m_streams.begin();
|
||||||
end = m_streams.end();
|
end = m_streams.end();
|
||||||
for(; cur != end; ++cur) {
|
for(; cur != end; ++cur) {
|
||||||
Record *r = *cur;
|
Record &r = *cur;
|
||||||
r->stream->ReleaseFromUse();
|
r.stream->ReleaseFromUse();
|
||||||
delete r;
|
|
||||||
}
|
}
|
||||||
std::vector<Patch *>::iterator curp, endp;
|
std::vector<Patch *>::iterator curp, endp;
|
||||||
curp = m_patches.begin();
|
curp = m_patches.begin();
|
||||||
@ -35,35 +34,34 @@ void EQStreamIdentifier::RegisterPatch(const EQStream::Signature &sig, const cha
|
|||||||
}
|
}
|
||||||
|
|
||||||
void EQStreamIdentifier::Process() {
|
void EQStreamIdentifier::Process() {
|
||||||
std::vector<Record *>::iterator cur;
|
std::vector<Record>::iterator cur;
|
||||||
std::vector<Patch *>::iterator curp, endp;
|
std::vector<Patch *>::iterator curp, endp;
|
||||||
|
|
||||||
//foreach pending stream.
|
//foreach pending stream.
|
||||||
cur = m_streams.begin();
|
cur = m_streams.begin();
|
||||||
while(cur != m_streams.end()) {
|
while(cur != m_streams.end()) {
|
||||||
Record *r = *cur;
|
Record &r = *cur;
|
||||||
|
|
||||||
//first see if this stream has expired
|
//first see if this stream has expired
|
||||||
if(r->expire.Check(false)) {
|
if(r.expire.Check(false)) {
|
||||||
//this stream has failed to match any pattern in our timeframe.
|
//this stream has failed to match any pattern in our timeframe.
|
||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before timeout.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
|
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before timeout.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()));
|
||||||
r->stream->ReleaseFromUse();
|
r.stream->ReleaseFromUse();
|
||||||
delete r;
|
|
||||||
cur = m_streams.erase(cur);
|
cur = m_streams.erase(cur);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
//then make sure the stream is still active
|
//then make sure the stream is still active
|
||||||
//if stream hasn't finished initializing then continue;
|
//if stream hasn't finished initializing then continue;
|
||||||
if(r->stream->GetState() == UNESTABLISHED)
|
if(r.stream->GetState() == UNESTABLISHED)
|
||||||
{
|
{
|
||||||
++cur;
|
++cur;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if(r->stream->GetState() != ESTABLISHED) {
|
if(r.stream->GetState() != ESTABLISHED) {
|
||||||
//the stream closed before it was identified.
|
//the stream closed before it was identified.
|
||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before it closed.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
|
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before it closed.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()));
|
||||||
switch(r->stream->GetState())
|
switch(r.stream->GetState())
|
||||||
{
|
{
|
||||||
case ESTABLISHED:
|
case ESTABLISHED:
|
||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Stream state was Established");
|
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Stream state was Established");
|
||||||
@ -81,8 +79,7 @@ void EQStreamIdentifier::Process() {
|
|||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Stream state was Unestablished or unknown");
|
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Stream state was Unestablished or unknown");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
r->stream->ReleaseFromUse();
|
r.stream->ReleaseFromUse();
|
||||||
delete r;
|
|
||||||
cur = m_streams.erase(cur);
|
cur = m_streams.erase(cur);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -99,23 +96,23 @@ void EQStreamIdentifier::Process() {
|
|||||||
Patch *p = *curp;
|
Patch *p = *curp;
|
||||||
|
|
||||||
//ask the stream to see if it matches the supplied signature
|
//ask the stream to see if it matches the supplied signature
|
||||||
EQStream::MatchState res = r->stream->CheckSignature(&p->signature);
|
EQStream::MatchState res = r.stream->CheckSignature(&p->signature);
|
||||||
switch(res) {
|
switch(res) {
|
||||||
case EQStream::MatchNotReady:
|
case EQStream::MatchNotReady:
|
||||||
//the stream has not received enough packets to compare with this signature
|
//the stream has not received enough packets to compare with this signature
|
||||||
// Log.LogDebugType(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, but stream is not ready for it.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()), p->name.c_str());
|
// Log.LogDebugType(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, but stream is not ready for it.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()), p->name.c_str());
|
||||||
all_ready = false;
|
all_ready = false;
|
||||||
break;
|
break;
|
||||||
case EQStream::MatchSuccessful: {
|
case EQStream::MatchSuccessful: {
|
||||||
//yay, a match.
|
//yay, a match.
|
||||||
|
|
||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Identified stream %s:%d with signature %s", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()), p->name.c_str());
|
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Identified stream %s:%d with signature %s", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()), p->name.c_str());
|
||||||
|
|
||||||
// before we assign the eqstream to an interface, let the stream recognize it is in use and the session should not be reset any further
|
// before we assign the eqstream to an interface, let the stream recognize it is in use and the session should not be reset any further
|
||||||
r->stream->SetActive(true);
|
r.stream->SetActive(true);
|
||||||
|
|
||||||
//might want to do something less-specific here... some day..
|
//might want to do something less-specific here... some day..
|
||||||
EQStreamInterface *s = new EQStreamProxy(r->stream, p->structs, p->opcodes);
|
EQStreamInterface *s = new EQStreamProxy(r.stream, p->structs, p->opcodes);
|
||||||
m_identified.push(s);
|
m_identified.push(s);
|
||||||
|
|
||||||
found_one = true;
|
found_one = true;
|
||||||
@ -123,7 +120,7 @@ void EQStreamIdentifier::Process() {
|
|||||||
}
|
}
|
||||||
case EQStream::MatchFailed:
|
case EQStream::MatchFailed:
|
||||||
//do nothing...
|
//do nothing...
|
||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, and it did not match.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()), p->name.c_str());
|
Log.Out(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, and it did not match.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()), p->name.c_str());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,14 +128,13 @@ void EQStreamIdentifier::Process() {
|
|||||||
//if we checked all patches and did not find a match.
|
//if we checked all patches and did not find a match.
|
||||||
if(all_ready && !found_one) {
|
if(all_ready && !found_one) {
|
||||||
//the stream cannot be identified.
|
//the stream cannot be identified.
|
||||||
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d, no match found.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
|
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d, no match found.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()));
|
||||||
r->stream->ReleaseFromUse();
|
r.stream->ReleaseFromUse();
|
||||||
}
|
}
|
||||||
|
|
||||||
//if we found a match, or were not able to identify it
|
//if we found a match, or were not able to identify it
|
||||||
if(found_one || all_ready) {
|
if(found_one || all_ready) {
|
||||||
//cannot print ip/port here. r->stream is invalid.
|
//cannot print ip/port here. r.stream is invalid.
|
||||||
delete r;
|
|
||||||
cur = m_streams.erase(cur);
|
cur = m_streams.erase(cur);
|
||||||
} else {
|
} else {
|
||||||
++cur;
|
++cur;
|
||||||
@ -146,8 +142,8 @@ void EQStreamIdentifier::Process() {
|
|||||||
} //end foreach stream
|
} //end foreach stream
|
||||||
}
|
}
|
||||||
|
|
||||||
void EQStreamIdentifier::AddStream(EQStream *&eqs) {
|
void EQStreamIdentifier::AddStream(std::shared_ptr<EQStream> &eqs) {
|
||||||
m_streams.push_back(new Record(eqs));
|
m_streams.push_back(Record(eqs));
|
||||||
eqs = nullptr;
|
eqs = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,7 +155,7 @@ EQStreamInterface *EQStreamIdentifier::PopIdentified() {
|
|||||||
return(res);
|
return(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
EQStreamIdentifier::Record::Record(EQStream *s)
|
EQStreamIdentifier::Record::Record(std::shared_ptr<EQStream> s)
|
||||||
: stream(s),
|
: stream(s),
|
||||||
expire(STREAM_IDENT_WAIT_MS)
|
expire(STREAM_IDENT_WAIT_MS)
|
||||||
{
|
{
|
||||||
|
|||||||
@ -5,6 +5,7 @@
|
|||||||
#include "timer.h"
|
#include "timer.h"
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
#define STREAM_IDENT_WAIT_MS 10000
|
#define STREAM_IDENT_WAIT_MS 10000
|
||||||
|
|
||||||
@ -21,7 +22,7 @@ public:
|
|||||||
|
|
||||||
//main processing interface
|
//main processing interface
|
||||||
void Process();
|
void Process();
|
||||||
void AddStream(EQStream *& eqs);
|
void AddStream(std::shared_ptr<EQStream> &eqs);
|
||||||
EQStreamInterface *PopIdentified();
|
EQStreamInterface *PopIdentified();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -39,11 +40,11 @@ protected:
|
|||||||
//pending streams..
|
//pending streams..
|
||||||
class Record {
|
class Record {
|
||||||
public:
|
public:
|
||||||
Record(EQStream *s);
|
Record(std::shared_ptr<EQStream> s);
|
||||||
EQStream *stream; //we own this
|
std::shared_ptr<EQStream> stream; //we own this
|
||||||
Timer expire;
|
Timer expire;
|
||||||
};
|
};
|
||||||
std::vector<Record *> m_streams; //we own these objects, and the streams contained in them.
|
std::vector<Record> m_streams; //we own these objects, and the streams contained in them.
|
||||||
std::queue<EQStreamInterface *> m_identified; //we own these objects
|
std::queue<EQStreamInterface *> m_identified; //we own these objects
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,7 @@
|
|||||||
#include "struct_strategy.h"
|
#include "struct_strategy.h"
|
||||||
|
|
||||||
|
|
||||||
EQStreamProxy::EQStreamProxy(EQStream *&stream, const StructStrategy *structs, OpcodeManager **opcodes)
|
EQStreamProxy::EQStreamProxy(std::shared_ptr<EQStream> &stream, const StructStrategy *structs, OpcodeManager **opcodes)
|
||||||
: m_stream(stream),
|
: m_stream(stream),
|
||||||
m_structs(structs),
|
m_structs(structs),
|
||||||
m_opcodes(opcodes)
|
m_opcodes(opcodes)
|
||||||
@ -15,7 +15,6 @@ EQStreamProxy::EQStreamProxy(EQStream *&stream, const StructStrategy *structs, O
|
|||||||
}
|
}
|
||||||
|
|
||||||
EQStreamProxy::~EQStreamProxy() {
|
EQStreamProxy::~EQStreamProxy() {
|
||||||
//delete m_stream; //released by the stream factory.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string EQStreamProxy::Describe() const {
|
std::string EQStreamProxy::Describe() const {
|
||||||
@ -85,12 +84,6 @@ const uint32 EQStreamProxy::GetBytesRecvPerSecond() const
|
|||||||
|
|
||||||
void EQStreamProxy::ReleaseFromUse() {
|
void EQStreamProxy::ReleaseFromUse() {
|
||||||
m_stream->ReleaseFromUse();
|
m_stream->ReleaseFromUse();
|
||||||
|
|
||||||
//this is so ugly, but I cant think of a better way to deal with
|
|
||||||
//it right now...
|
|
||||||
if(!m_stream->IsInUse()) {
|
|
||||||
delete this;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void EQStreamProxy::RemoveData() {
|
void EQStreamProxy::RemoveData() {
|
||||||
|
|||||||
@ -4,8 +4,9 @@
|
|||||||
|
|
||||||
#include "types.h"
|
#include "types.h"
|
||||||
#include "eq_stream_intf.h"
|
#include "eq_stream_intf.h"
|
||||||
|
#include "eq_stream.h"
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
class EQStream;
|
|
||||||
class StructStrategy;
|
class StructStrategy;
|
||||||
class OpcodeManager;
|
class OpcodeManager;
|
||||||
class EQApplicationPacket;
|
class EQApplicationPacket;
|
||||||
@ -13,7 +14,7 @@ class EQApplicationPacket;
|
|||||||
class EQStreamProxy : public EQStreamInterface {
|
class EQStreamProxy : public EQStreamInterface {
|
||||||
public:
|
public:
|
||||||
//takes ownership of the stream.
|
//takes ownership of the stream.
|
||||||
EQStreamProxy(EQStream *&stream, const StructStrategy *structs, OpcodeManager **opcodes);
|
EQStreamProxy(std::shared_ptr<EQStream> &stream, const StructStrategy *structs, OpcodeManager **opcodes);
|
||||||
virtual ~EQStreamProxy();
|
virtual ~EQStreamProxy();
|
||||||
|
|
||||||
//EQStreamInterface:
|
//EQStreamInterface:
|
||||||
@ -35,7 +36,7 @@ public:
|
|||||||
virtual const uint32 GetBytesRecvPerSecond() const;
|
virtual const uint32 GetBytesRecvPerSecond() const;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
EQStream *const m_stream; //we own this stream object.
|
std::shared_ptr<EQStream> const m_stream; //we own this stream object.
|
||||||
const StructStrategy *const m_structs; //we do not own this object.
|
const StructStrategy *const m_structs; //we do not own this object.
|
||||||
//this is a pointer to a pointer to make it less likely that a packet will
|
//this is a pointer to a pointer to make it less likely that a packet will
|
||||||
//reference an invalid opcode manager when they are being reloaded.
|
//reference an invalid opcode manager when they are being reloaded.
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
|
|
||||||
|
|
||||||
#define E(x) static void Encode_##x(EQApplicationPacket **p, EQStream *dest, bool ack_req);
|
#define E(x) static void Encode_##x(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
|
||||||
#define D(x) static void Decode_##x(EQApplicationPacket *p);
|
#define D(x) static void Decode_##x(EQApplicationPacket *p);
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
|
|
||||||
#define ENCODE(x) void Strategy::Encode_##x(EQApplicationPacket **p, EQStream *dest, bool ack_req)
|
#define ENCODE(x) void Strategy::Encode_##x(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req)
|
||||||
#define DECODE(x) void Strategy::Decode_##x(EQApplicationPacket *__packet)
|
#define DECODE(x) void Strategy::Decode_##x(EQApplicationPacket *__packet)
|
||||||
|
|
||||||
#define StructDist(in, f1, f2) (uint32(&in->f2)-uint32(&in->f1))
|
#define StructDist(in, f1, f2) (uint32(&in->f2)-uint32(&in->f1))
|
||||||
|
|||||||
@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
#include "eq_stream.h"
|
#include "eq_stream.h"
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
|
||||||
//note: all encoders and decoders must be valid functions.
|
//note: all encoders and decoders must be valid functions.
|
||||||
@ -17,7 +18,7 @@ StructStrategy::StructStrategy() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StructStrategy::Encode(EQApplicationPacket **p, EQStream *dest, bool ack_req) const {
|
void StructStrategy::Encode(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req) const {
|
||||||
if((*p)->GetOpcodeBypass() != 0) {
|
if((*p)->GetOpcodeBypass() != 0) {
|
||||||
PassEncoder(p, dest, ack_req);
|
PassEncoder(p, dest, ack_req);
|
||||||
return;
|
return;
|
||||||
@ -35,7 +36,7 @@ void StructStrategy::Decode(EQApplicationPacket *p) const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StructStrategy::ErrorEncoder(EQApplicationPacket **in_p, EQStream *dest, bool ack_req) {
|
void StructStrategy::ErrorEncoder(EQApplicationPacket **in_p, std::shared_ptr<EQStream> dest, bool ack_req) {
|
||||||
EQApplicationPacket *p = *in_p;
|
EQApplicationPacket *p = *in_p;
|
||||||
*in_p = nullptr;
|
*in_p = nullptr;
|
||||||
|
|
||||||
@ -49,7 +50,7 @@ void StructStrategy::ErrorDecoder(EQApplicationPacket *p) {
|
|||||||
p->SetOpcode(OP_Unknown);
|
p->SetOpcode(OP_Unknown);
|
||||||
}
|
}
|
||||||
|
|
||||||
void StructStrategy::PassEncoder(EQApplicationPacket **p, EQStream *dest, bool ack_req) {
|
void StructStrategy::PassEncoder(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req) {
|
||||||
dest->FastQueuePacket(p, ack_req);
|
dest->FastQueuePacket(p, ack_req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -7,11 +7,12 @@ class EQStream;
|
|||||||
#include "clientversions.h"
|
#include "clientversions.h"
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
class StructStrategy {
|
class StructStrategy {
|
||||||
public:
|
public:
|
||||||
//the encoder takes ownership of the supplied packet, and may enqueue multiple resulting packets into the stream
|
//the encoder takes ownership of the supplied packet, and may enqueue multiple resulting packets into the stream
|
||||||
typedef void (*Encoder)(EQApplicationPacket **p, EQStream *dest, bool ack_req);
|
typedef void(*Encoder)(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
|
||||||
//the decoder may only edit the supplied packet, producing a single packet for eqemu to consume.
|
//the decoder may only edit the supplied packet, producing a single packet for eqemu to consume.
|
||||||
typedef void (*Decoder)(EQApplicationPacket *p);
|
typedef void (*Decoder)(EQApplicationPacket *p);
|
||||||
|
|
||||||
@ -19,7 +20,7 @@ public:
|
|||||||
virtual ~StructStrategy() {}
|
virtual ~StructStrategy() {}
|
||||||
|
|
||||||
//this method takes an eqemu struct, and enqueues the produced structs into the stream.
|
//this method takes an eqemu struct, and enqueues the produced structs into the stream.
|
||||||
void Encode(EQApplicationPacket **p, EQStream *dest, bool ack_req) const;
|
void Encode(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req) const;
|
||||||
//this method takes an EQ wire struct, and converts it into an eqemu struct
|
//this method takes an EQ wire struct, and converts it into an eqemu struct
|
||||||
void Decode(EQApplicationPacket *p) const;
|
void Decode(EQApplicationPacket *p) const;
|
||||||
|
|
||||||
@ -29,10 +30,10 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
//some common coders:
|
//some common coders:
|
||||||
//Print an error saying unknown struct/opcode and drop it
|
//Print an error saying unknown struct/opcode and drop it
|
||||||
static void ErrorEncoder(EQApplicationPacket **p, EQStream *dest, bool ack_req);
|
static void ErrorEncoder(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
|
||||||
static void ErrorDecoder(EQApplicationPacket *p);
|
static void ErrorDecoder(EQApplicationPacket *p);
|
||||||
//pass the packet through without modification (emu == EQ) (default)
|
//pass the packet through without modification (emu == EQ) (default)
|
||||||
static void PassEncoder(EQApplicationPacket **p, EQStream *dest, bool ack_req);
|
static void PassEncoder(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
|
||||||
static void PassDecoder(EQApplicationPacket *p);
|
static void PassDecoder(EQApplicationPacket *p);
|
||||||
|
|
||||||
Encoder encoders[_maxEmuOpcode];
|
Encoder encoders[_maxEmuOpcode];
|
||||||
|
|||||||
@ -24,7 +24,7 @@
|
|||||||
extern ErrorLog *server_log;
|
extern ErrorLog *server_log;
|
||||||
extern LoginServer server;
|
extern LoginServer server;
|
||||||
|
|
||||||
Client::Client(EQStream *c, LSClientVersion v)
|
Client::Client(std::shared_ptr<EQStream> c, LSClientVersion v)
|
||||||
{
|
{
|
||||||
connection = c;
|
connection = c;
|
||||||
version = v;
|
version = v;
|
||||||
|
|||||||
@ -59,7 +59,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Constructor, sets our connection to c and version to v
|
* Constructor, sets our connection to c and version to v
|
||||||
*/
|
*/
|
||||||
Client(EQStream *c, LSClientVersion v);
|
Client(std::shared_ptr<EQStream> c, LSClientVersion v);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destructor.
|
* Destructor.
|
||||||
@ -129,11 +129,11 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Gets the connection for this client.
|
* Gets the connection for this client.
|
||||||
*/
|
*/
|
||||||
EQStream *GetConnection() { return connection; }
|
std::shared_ptr<EQStream> GetConnection() { return connection; }
|
||||||
|
|
||||||
EQEmu::Random random;
|
EQEmu::Random random;
|
||||||
private:
|
private:
|
||||||
EQStream *connection;
|
std::shared_ptr<EQStream> connection;
|
||||||
LSClientVersion version;
|
LSClientVersion version;
|
||||||
LSClientStatus status;
|
LSClientStatus status;
|
||||||
|
|
||||||
|
|||||||
@ -94,7 +94,7 @@ ClientManager::~ClientManager()
|
|||||||
void ClientManager::Process()
|
void ClientManager::Process()
|
||||||
{
|
{
|
||||||
ProcessDisconnect();
|
ProcessDisconnect();
|
||||||
EQStream *cur = titanium_stream->Pop();
|
std::shared_ptr<EQStream> cur = titanium_stream->Pop();
|
||||||
while(cur)
|
while(cur)
|
||||||
{
|
{
|
||||||
struct in_addr in;
|
struct in_addr in;
|
||||||
@ -141,7 +141,7 @@ void ClientManager::ProcessDisconnect()
|
|||||||
list<Client*>::iterator iter = clients.begin();
|
list<Client*>::iterator iter = clients.begin();
|
||||||
while(iter != clients.end())
|
while(iter != clients.end())
|
||||||
{
|
{
|
||||||
EQStream *c = (*iter)->GetConnection();
|
std::shared_ptr<EQStream> c = (*iter)->GetConnection();
|
||||||
if(c->CheckClosed())
|
if(c->CheckClosed())
|
||||||
{
|
{
|
||||||
server_log->Log(log_network, "Client disconnected from the server, removing client.");
|
server_log->Log(log_network, "Client disconnected from the server, removing client.");
|
||||||
|
|||||||
@ -485,7 +485,7 @@ Clientlist::Clientlist(int ChatPort) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Client::Client(EQStream *eqs) {
|
Client::Client(std::shared_ptr<EQStream> eqs) {
|
||||||
|
|
||||||
ClientStream = eqs;
|
ClientStream = eqs;
|
||||||
|
|
||||||
@ -574,7 +574,7 @@ void Clientlist::CheckForStaleConnections(Client *c) {
|
|||||||
|
|
||||||
void Clientlist::Process() {
|
void Clientlist::Process() {
|
||||||
|
|
||||||
EQStream *eqs;
|
std::shared_ptr<EQStream> eqs;
|
||||||
|
|
||||||
while((eqs = chatsf->Pop())) {
|
while((eqs = chatsf->Pop())) {
|
||||||
|
|
||||||
|
|||||||
@ -84,10 +84,10 @@ struct CharacterEntry {
|
|||||||
class Client {
|
class Client {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Client(EQStream* eqs);
|
Client(std::shared_ptr<EQStream> eqs);
|
||||||
~Client();
|
~Client();
|
||||||
|
|
||||||
EQStream *ClientStream;
|
std::shared_ptr<EQStream> ClientStream;
|
||||||
void AddCharacter(int CharID, const char *CharacterName, int Level);
|
void AddCharacter(int CharID, const char *CharacterName, int Level);
|
||||||
void ClearCharacters() { Characters.clear(); }
|
void ClearCharacters() { Characters.clear(); }
|
||||||
void SendMailBoxes();
|
void SendMailBoxes();
|
||||||
|
|||||||
@ -102,6 +102,7 @@ Client::~Client() {
|
|||||||
//let the stream factory know were done with this stream
|
//let the stream factory know were done with this stream
|
||||||
eqs->Close();
|
eqs->Close();
|
||||||
eqs->ReleaseFromUse();
|
eqs->ReleaseFromUse();
|
||||||
|
safe_delete(eqs);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Client::SendLogServer()
|
void Client::SendLogServer()
|
||||||
|
|||||||
@ -109,7 +109,7 @@ private:
|
|||||||
bool HandleDeleteCharacterPacket(const EQApplicationPacket *app);
|
bool HandleDeleteCharacterPacket(const EQApplicationPacket *app);
|
||||||
bool HandleZoneChangePacket(const EQApplicationPacket *app);
|
bool HandleZoneChangePacket(const EQApplicationPacket *app);
|
||||||
|
|
||||||
EQStreamInterface* const eqs;
|
EQStreamInterface* eqs;
|
||||||
};
|
};
|
||||||
|
|
||||||
bool CheckCharCreateInfoSoF(CharCreate_Struct *cc);
|
bool CheckCharCreateInfoSoF(CharCreate_Struct *cc);
|
||||||
|
|||||||
@ -394,7 +394,7 @@ int main(int argc, char** argv) {
|
|||||||
Timer InterserverTimer(INTERSERVER_TIMER); // does MySQL pings and auto-reconnect
|
Timer InterserverTimer(INTERSERVER_TIMER); // does MySQL pings and auto-reconnect
|
||||||
InterserverTimer.Trigger();
|
InterserverTimer.Trigger();
|
||||||
uint8 ReconnectCounter = 100;
|
uint8 ReconnectCounter = 100;
|
||||||
EQStream* eqs;
|
std::shared_ptr<EQStream> eqs;
|
||||||
EmuTCPConnection* tcpc;
|
EmuTCPConnection* tcpc;
|
||||||
EQStreamInterface *eqsi;
|
EQStreamInterface *eqsi;
|
||||||
|
|
||||||
@ -412,6 +412,8 @@ int main(int argc, char** argv) {
|
|||||||
stream_identifier.AddStream(eqs); //takes the stream
|
stream_identifier.AddStream(eqs); //takes the stream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eqs = nullptr;
|
||||||
|
|
||||||
//give the stream identifier a chance to do its work....
|
//give the stream identifier a chance to do its work....
|
||||||
stream_identifier.Process();
|
stream_identifier.Process();
|
||||||
|
|
||||||
|
|||||||
@ -402,6 +402,7 @@ Client::~Client() {
|
|||||||
//let the stream factory know were done with this stream
|
//let the stream factory know were done with this stream
|
||||||
eqs->Close();
|
eqs->Close();
|
||||||
eqs->ReleaseFromUse();
|
eqs->ReleaseFromUse();
|
||||||
|
safe_delete(eqs);
|
||||||
|
|
||||||
UninitializeBuffSlots();
|
UninitializeBuffSlots();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -64,6 +64,7 @@ struct Item_Struct;
|
|||||||
#include <float.h>
|
#include <float.h>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
|
||||||
#define CLIENT_TIMEOUT 90000
|
#define CLIENT_TIMEOUT 90000
|
||||||
@ -201,7 +202,7 @@ struct ClientReward
|
|||||||
|
|
||||||
class ClientFactory {
|
class ClientFactory {
|
||||||
public:
|
public:
|
||||||
Client *MakeClient(EQStream* ieqs);
|
Client *MakeClient(std::shared_ptr<EQStream> ieqs);
|
||||||
};
|
};
|
||||||
|
|
||||||
class Client : public Mob
|
class Client : public Mob
|
||||||
|
|||||||
@ -332,7 +332,7 @@ int main(int argc, char** argv) {
|
|||||||
Timer quest_timers(100);
|
Timer quest_timers(100);
|
||||||
UpdateWindowTitle();
|
UpdateWindowTitle();
|
||||||
bool worldwasconnected = worldserver.Connected();
|
bool worldwasconnected = worldserver.Connected();
|
||||||
EQStream* eqss;
|
std::shared_ptr<EQStream> eqss;
|
||||||
EQStreamInterface *eqsi;
|
EQStreamInterface *eqsi;
|
||||||
uint8 IDLEZONEUPDATE = 200;
|
uint8 IDLEZONEUPDATE = 200;
|
||||||
uint8 ZONEUPDATE = 10;
|
uint8 ZONEUPDATE = 10;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user