Merge pull request #353 from EQEmu/eqstream_ptr

Change EQStream raw pointers to be std::shared_ptr
This commit is contained in:
Alex
2015-01-30 13:41:49 -08:00
22 changed files with 115 additions and 112 deletions
+8 -2
View File
@@ -206,8 +206,14 @@ class EQStream : public EQStreamInterface {
void init(bool resetSession=true);
public:
EQStream() { init(); remote_ip = 0; remote_port = 0; State=UNESTABLISHED; StreamType=UnknownStream; compressed=true; encoded=false; app_opcode_size=2; bytes_sent=0; bytes_recv=0; create_time=Timer::GetTimeSeconds(); sessionAttempts = 0; streamactive=false; }
EQStream(sockaddr_in addr) { init(); remote_ip=addr.sin_addr.s_addr; remote_port=addr.sin_port; State=UNESTABLISHED; StreamType=UnknownStream; compressed=true; encoded=false; app_opcode_size=2; bytes_sent=0; bytes_recv=0; create_time=Timer::GetTimeSeconds(); }
EQStream() { init(); remote_ip = 0; remote_port = 0; State = UNESTABLISHED;
StreamType = UnknownStream; compressed = true; encoded = false; app_opcode_size = 2;
bytes_sent = 0; bytes_recv = 0; create_time = Timer::GetTimeSeconds(); sessionAttempts = 0;
streamactive = false; }
EQStream(sockaddr_in addr) { init(); remote_ip = addr.sin_addr.s_addr;
remote_port = addr.sin_port; State = UNESTABLISHED; StreamType = UnknownStream;
compressed = true; encoded = false; app_opcode_size = 2; bytes_sent = 0; bytes_recv = 0;
create_time = Timer::GetTimeSeconds(); }
virtual ~EQStream() { RemoveData(); SetState(CLOSED); }
void SetMaxLen(uint32 length) { MaxLen=length; }
+33 -38
View File
@@ -116,12 +116,12 @@ struct sockaddr_in address;
return true;
}
EQStream *EQStreamFactory::Pop()
std::shared_ptr<EQStream> EQStreamFactory::Pop()
{
EQStream *s=nullptr;
std::shared_ptr<EQStream> s = nullptr;
MNewStreams.lock();
if (NewStreams.size()) {
s=NewStreams.front();
s = NewStreams.front();
NewStreams.pop();
s->PutInUse();
}
@@ -130,7 +130,7 @@ EQStream *s=nullptr;
return s;
}
void EQStreamFactory::Push(EQStream *s)
void EQStreamFactory::Push(std::shared_ptr<EQStream> s)
{
MNewStreams.lock();
NewStreams.push(s);
@@ -139,17 +139,16 @@ void EQStreamFactory::Push(EQStream *s)
void EQStreamFactory::ReaderLoop()
{
fd_set readset;
std::map<std::pair<uint32, uint16>,EQStream *>::iterator stream_itr;
int num;
int length;
unsigned char buffer[2048];
sockaddr_in from;
int socklen=sizeof(sockaddr_in);
timeval sleep_time;
//time_t now;
fd_set readset;
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator stream_itr;
int num;
int length;
unsigned char buffer[2048];
sockaddr_in from;
int socklen = sizeof(sockaddr_in);
timeval sleep_time;
ReaderRunning = true;
ReaderRunning=true;
while(sock!=-1) {
MReaderRunning.lock();
if (!ReaderRunning)
@@ -180,10 +179,10 @@ timeval sleep_time;
// What do we wanna do?
} else {
MStreams.lock();
stream_itr=Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
if (stream_itr == Streams.end()) {
if (buffer[1]==OP_SessionRequest) {
EQStream *s = new EQStream(from);
std::shared_ptr<EQStream> s = std::make_shared<EQStream>(from);
s->SetStreamType(StreamType);
Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s;
WriterWork.Signal();
@@ -194,13 +193,13 @@ timeval sleep_time;
}
MStreams.unlock();
} else {
EQStream *curstream = stream_itr->second;
std::shared_ptr<EQStream> curstream = stream_itr->second;
//dont bother processing incoming packets for closed connections
if(curstream->CheckClosed())
curstream = nullptr;
else
curstream->PutInUse();
MStreams.unlock(); //the in use flag prevents the stream from being deleted while we are using it.
//the in use flag prevents the stream from being deleted while we are using it.
if(curstream) {
curstream->AddBytesRecv(length);
@@ -208,6 +207,7 @@ timeval sleep_time;
curstream->SetLastPacketTime(Timer::GetCurrentTime());
curstream->ReleaseFromUse();
}
MStreams.unlock();
}
}
}
@@ -220,10 +220,10 @@ void EQStreamFactory::CheckTimeout()
MStreams.lock();
unsigned long now=Timer::GetCurrentTime();
std::map<std::pair<uint32, uint16>,EQStream *>::iterator stream_itr;
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator stream_itr;
for(stream_itr=Streams.begin();stream_itr!=Streams.end();) {
EQStream *s = stream_itr->second;
for(stream_itr = Streams.begin(); stream_itr != Streams.end();) {
std::shared_ptr<EQStream> s = stream_itr->second;
s->CheckTimeout(now, stream_timeout);
@@ -235,10 +235,9 @@ void EQStreamFactory::CheckTimeout()
//give it a little time for everybody to finish with it
} else {
//everybody is done, we can delete it now
std::map<std::pair<uint32, uint16>,EQStream *>::iterator temp=stream_itr;
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator temp = stream_itr;
++stream_itr;
//let whoever has the stream outside delete it
delete temp->second;
temp->second = nullptr;
Streams.erase(temp);
continue;
}
@@ -251,21 +250,17 @@ void EQStreamFactory::CheckTimeout()
void EQStreamFactory::WriterLoop()
{
std::map<std::pair<uint32, uint16>,EQStream *>::iterator stream_itr;
bool havework=true;
std::vector<EQStream *> wants_write;
std::vector<EQStream *>::iterator cur,end;
bool decay=false;
uint32 stream_count;
Timer DecayTimer(20);
WriterRunning=true;
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>>::iterator stream_itr;
bool havework=true;
std::vector<std::shared_ptr<EQStream>> wants_write;
std::vector<std::shared_ptr<EQStream>>::iterator cur, end;
bool decay = false;
uint32 stream_count;
Timer DecayTimer(20);
WriterRunning = true;
DecayTimer.Enable();
while(sock!=-1) {
//if (!havework) {
//WriterWork.Wait();
//}
MWriterRunning.lock();
if (!WriterRunning)
break;
@@ -309,7 +304,7 @@ Timer DecayTimer(20);
Sleep(10);
MStreams.lock();
stream_count=Streams.size();
stream_count = Streams.size();
MStreams.unlock();
if (!stream_count) {
WriterWork.Wait();
+5 -4
View File
@@ -2,6 +2,7 @@
#define _EQSTREAMFACTORY_H
#include <memory>
#include <queue>
#include <map>
@@ -26,10 +27,10 @@ class EQStreamFactory : private Timeoutable {
EQStreamType StreamType;
std::queue<EQStream *> NewStreams;
std::queue<std::shared_ptr<EQStream>> NewStreams;
Mutex MNewStreams;
std::map<std::pair<uint32, uint16>,EQStream *> Streams;
std::map<std::pair<uint32, uint16>, std::shared_ptr<EQStream>> Streams;
Mutex MStreams;
virtual void CheckTimeout();
@@ -42,8 +43,8 @@ class EQStreamFactory : private Timeoutable {
EQStreamFactory(EQStreamType type, uint32 timeout = 135000) : Timeoutable(5000), stream_timeout(timeout) { ReaderRunning=false; WriterRunning=false; StreamType=type; sock=-1; }
EQStreamFactory(EQStreamType type, int port, uint32 timeout = 135000);
EQStream *Pop();
void Push(EQStream *s);
std::shared_ptr<EQStream> Pop();
void Push(std::shared_ptr<EQStream> s);
bool Open();
bool Open(unsigned long port) { Port=port; return Open(); }
+25 -29
View File
@@ -9,13 +9,12 @@ EQStreamIdentifier::~EQStreamIdentifier() {
m_identified.front()->ReleaseFromUse();
m_identified.pop();
}
std::vector<Record *>::iterator cur, end;
std::vector<Record>::iterator cur, end;
cur = m_streams.begin();
end = m_streams.end();
for(; cur != end; ++cur) {
Record *r = *cur;
r->stream->ReleaseFromUse();
delete r;
Record &r = *cur;
r.stream->ReleaseFromUse();
}
std::vector<Patch *>::iterator curp, endp;
curp = m_patches.begin();
@@ -35,35 +34,34 @@ void EQStreamIdentifier::RegisterPatch(const EQStream::Signature &sig, const cha
}
void EQStreamIdentifier::Process() {
std::vector<Record *>::iterator cur;
std::vector<Record>::iterator cur;
std::vector<Patch *>::iterator curp, endp;
//foreach pending stream.
cur = m_streams.begin();
while(cur != m_streams.end()) {
Record *r = *cur;
Record &r = *cur;
//first see if this stream has expired
if(r->expire.Check(false)) {
if(r.expire.Check(false)) {
//this stream has failed to match any pattern in our timeframe.
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before timeout.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
r->stream->ReleaseFromUse();
delete r;
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before timeout.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()));
r.stream->ReleaseFromUse();
cur = m_streams.erase(cur);
continue;
}
//then make sure the stream is still active
//if stream hasn't finished initializing then continue;
if(r->stream->GetState() == UNESTABLISHED)
if(r.stream->GetState() == UNESTABLISHED)
{
++cur;
continue;
}
if(r->stream->GetState() != ESTABLISHED) {
if(r.stream->GetState() != ESTABLISHED) {
//the stream closed before it was identified.
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before it closed.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
switch(r->stream->GetState())
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before it closed.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()));
switch(r.stream->GetState())
{
case ESTABLISHED:
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Stream state was Established");
@@ -81,8 +79,7 @@ void EQStreamIdentifier::Process() {
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Stream state was Unestablished or unknown");
break;
}
r->stream->ReleaseFromUse();
delete r;
r.stream->ReleaseFromUse();
cur = m_streams.erase(cur);
continue;
}
@@ -99,23 +96,23 @@ void EQStreamIdentifier::Process() {
Patch *p = *curp;
//ask the stream to see if it matches the supplied signature
EQStream::MatchState res = r->stream->CheckSignature(&p->signature);
EQStream::MatchState res = r.stream->CheckSignature(&p->signature);
switch(res) {
case EQStream::MatchNotReady:
//the stream has not received enough packets to compare with this signature
// Log.LogDebugType(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, but stream is not ready for it.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()), p->name.c_str());
// Log.LogDebugType(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, but stream is not ready for it.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()), p->name.c_str());
all_ready = false;
break;
case EQStream::MatchSuccessful: {
//yay, a match.
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Identified stream %s:%d with signature %s", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()), p->name.c_str());
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Identified stream %s:%d with signature %s", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()), p->name.c_str());
// before we assign the eqstream to an interface, let the stream recognize it is in use and the session should not be reset any further
r->stream->SetActive(true);
r.stream->SetActive(true);
//might want to do something less-specific here... some day..
EQStreamInterface *s = new EQStreamProxy(r->stream, p->structs, p->opcodes);
EQStreamInterface *s = new EQStreamProxy(r.stream, p->structs, p->opcodes);
m_identified.push(s);
found_one = true;
@@ -123,7 +120,7 @@ void EQStreamIdentifier::Process() {
}
case EQStream::MatchFailed:
//do nothing...
Log.Out(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, and it did not match.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()), p->name.c_str());
Log.Out(Logs::General, Logs::Netcode, "[IDENT_TRACE] %s:%d: Tried patch %s, and it did not match.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()), p->name.c_str());
break;
}
}
@@ -131,14 +128,13 @@ void EQStreamIdentifier::Process() {
//if we checked all patches and did not find a match.
if(all_ready && !found_one) {
//the stream cannot be identified.
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d, no match found.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
r->stream->ReleaseFromUse();
Log.Out(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d, no match found.", long2ip(r.stream->GetRemoteIP()).c_str(), ntohs(r.stream->GetRemotePort()));
r.stream->ReleaseFromUse();
}
//if we found a match, or were not able to identify it
if(found_one || all_ready) {
//cannot print ip/port here. r->stream is invalid.
delete r;
//cannot print ip/port here. r.stream is invalid.
cur = m_streams.erase(cur);
} else {
++cur;
@@ -146,8 +142,8 @@ void EQStreamIdentifier::Process() {
} //end foreach stream
}
void EQStreamIdentifier::AddStream(EQStream *&eqs) {
m_streams.push_back(new Record(eqs));
void EQStreamIdentifier::AddStream(std::shared_ptr<EQStream> &eqs) {
m_streams.push_back(Record(eqs));
eqs = nullptr;
}
@@ -159,7 +155,7 @@ EQStreamInterface *EQStreamIdentifier::PopIdentified() {
return(res);
}
EQStreamIdentifier::Record::Record(EQStream *s)
EQStreamIdentifier::Record::Record(std::shared_ptr<EQStream> s)
: stream(s),
expire(STREAM_IDENT_WAIT_MS)
{
+5 -4
View File
@@ -5,6 +5,7 @@
#include "timer.h"
#include <vector>
#include <queue>
#include <memory>
#define STREAM_IDENT_WAIT_MS 10000
@@ -21,7 +22,7 @@ public:
//main processing interface
void Process();
void AddStream(EQStream *& eqs);
void AddStream(std::shared_ptr<EQStream> &eqs);
EQStreamInterface *PopIdentified();
protected:
@@ -39,11 +40,11 @@ protected:
//pending streams..
class Record {
public:
Record(EQStream *s);
EQStream *stream; //we own this
Record(std::shared_ptr<EQStream> s);
std::shared_ptr<EQStream> stream; //we own this
Timer expire;
};
std::vector<Record *> m_streams; //we own these objects, and the streams contained in them.
std::vector<Record> m_streams; //we own these objects, and the streams contained in them.
std::queue<EQStreamInterface *> m_identified; //we own these objects
};
+1 -8
View File
@@ -5,7 +5,7 @@
#include "struct_strategy.h"
EQStreamProxy::EQStreamProxy(EQStream *&stream, const StructStrategy *structs, OpcodeManager **opcodes)
EQStreamProxy::EQStreamProxy(std::shared_ptr<EQStream> &stream, const StructStrategy *structs, OpcodeManager **opcodes)
: m_stream(stream),
m_structs(structs),
m_opcodes(opcodes)
@@ -15,7 +15,6 @@ EQStreamProxy::EQStreamProxy(EQStream *&stream, const StructStrategy *structs, O
}
EQStreamProxy::~EQStreamProxy() {
//delete m_stream; //released by the stream factory.
}
std::string EQStreamProxy::Describe() const {
@@ -85,12 +84,6 @@ const uint32 EQStreamProxy::GetBytesRecvPerSecond() const
void EQStreamProxy::ReleaseFromUse() {
m_stream->ReleaseFromUse();
//this is so ugly, but I cant think of a better way to deal with
//it right now...
if(!m_stream->IsInUse()) {
delete this;
}
}
void EQStreamProxy::RemoveData() {
+4 -3
View File
@@ -4,8 +4,9 @@
#include "types.h"
#include "eq_stream_intf.h"
#include "eq_stream.h"
#include <memory>
class EQStream;
class StructStrategy;
class OpcodeManager;
class EQApplicationPacket;
@@ -13,7 +14,7 @@ class EQApplicationPacket;
class EQStreamProxy : public EQStreamInterface {
public:
//takes ownership of the stream.
EQStreamProxy(EQStream *&stream, const StructStrategy *structs, OpcodeManager **opcodes);
EQStreamProxy(std::shared_ptr<EQStream> &stream, const StructStrategy *structs, OpcodeManager **opcodes);
virtual ~EQStreamProxy();
//EQStreamInterface:
@@ -35,7 +36,7 @@ public:
virtual const uint32 GetBytesRecvPerSecond() const;
protected:
EQStream *const m_stream; //we own this stream object.
std::shared_ptr<EQStream> const m_stream; //we own this stream object.
const StructStrategy *const m_structs; //we do not own this object.
//this is a pointer to a pointer to make it less likely that a packet will
//reference an invalid opcode manager when they are being reloaded.
+1 -1
View File
@@ -1,6 +1,6 @@
#define E(x) static void Encode_##x(EQApplicationPacket **p, EQStream *dest, bool ack_req);
#define E(x) static void Encode_##x(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
#define D(x) static void Decode_##x(EQApplicationPacket *p);
+1 -1
View File
@@ -1,5 +1,5 @@
#define ENCODE(x) void Strategy::Encode_##x(EQApplicationPacket **p, EQStream *dest, bool ack_req)
#define ENCODE(x) void Strategy::Encode_##x(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req)
#define DECODE(x) void Strategy::Decode_##x(EQApplicationPacket *__packet)
#define StructDist(in, f1, f2) (uint32(&in->f2)-uint32(&in->f1))
+4 -3
View File
@@ -5,6 +5,7 @@
#include "eq_stream.h"
#include <map>
#include <memory>
//note: all encoders and decoders must be valid functions.
@@ -17,7 +18,7 @@ StructStrategy::StructStrategy() {
}
}
void StructStrategy::Encode(EQApplicationPacket **p, EQStream *dest, bool ack_req) const {
void StructStrategy::Encode(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req) const {
if((*p)->GetOpcodeBypass() != 0) {
PassEncoder(p, dest, ack_req);
return;
@@ -35,7 +36,7 @@ void StructStrategy::Decode(EQApplicationPacket *p) const {
}
void StructStrategy::ErrorEncoder(EQApplicationPacket **in_p, EQStream *dest, bool ack_req) {
void StructStrategy::ErrorEncoder(EQApplicationPacket **in_p, std::shared_ptr<EQStream> dest, bool ack_req) {
EQApplicationPacket *p = *in_p;
*in_p = nullptr;
@@ -49,7 +50,7 @@ void StructStrategy::ErrorDecoder(EQApplicationPacket *p) {
p->SetOpcode(OP_Unknown);
}
void StructStrategy::PassEncoder(EQApplicationPacket **p, EQStream *dest, bool ack_req) {
void StructStrategy::PassEncoder(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req) {
dest->FastQueuePacket(p, ack_req);
}
+5 -4
View File
@@ -7,11 +7,12 @@ class EQStream;
#include "clientversions.h"
#include <string>
#include <memory>
class StructStrategy {
public:
//the encoder takes ownership of the supplied packet, and may enqueue multiple resulting packets into the stream
typedef void (*Encoder)(EQApplicationPacket **p, EQStream *dest, bool ack_req);
typedef void(*Encoder)(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
//the decoder may only edit the supplied packet, producing a single packet for eqemu to consume.
typedef void (*Decoder)(EQApplicationPacket *p);
@@ -19,7 +20,7 @@ public:
virtual ~StructStrategy() {}
//this method takes an eqemu struct, and enqueues the produced structs into the stream.
void Encode(EQApplicationPacket **p, EQStream *dest, bool ack_req) const;
void Encode(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req) const;
//this method takes an EQ wire struct, and converts it into an eqemu struct
void Decode(EQApplicationPacket *p) const;
@@ -29,10 +30,10 @@ public:
protected:
//some common coders:
//Print an error saying unknown struct/opcode and drop it
static void ErrorEncoder(EQApplicationPacket **p, EQStream *dest, bool ack_req);
static void ErrorEncoder(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
static void ErrorDecoder(EQApplicationPacket *p);
//pass the packet through without modification (emu == EQ) (default)
static void PassEncoder(EQApplicationPacket **p, EQStream *dest, bool ack_req);
static void PassEncoder(EQApplicationPacket **p, std::shared_ptr<EQStream> dest, bool ack_req);
static void PassDecoder(EQApplicationPacket *p);
Encoder encoders[_maxEmuOpcode];