Changed where queued packets are sent while zoning. Moved where zoneinpacket_timer is started to assist in not dropping needed packets.

Added better netcode support for handling out of order acks, to preclude excessive resending of same packets.

Changed how timeout checks are performing on individual packets, for re-sends, so they do not happen more often than the client can respond.

Improved how the data rate limit for throttling packets for compressed stream, so the size reduction in packets are accounted for better.
This commit is contained in:
ngdeao
2016-03-28 19:11:24 -06:00
parent 6c5d686b22
commit fb23d961c1
10 changed files with 105 additions and 106 deletions
+2 -1
View File
@@ -62,7 +62,7 @@ class EQProtocolPacket : public BasePacket {
friend class EQStream; friend class EQStream;
friend class EQStreamPair; friend class EQStreamPair;
public: public:
EQProtocolPacket(uint16 op, const unsigned char *buf, uint32 len) : BasePacket(buf,len), opcode(op) { acked = false; } EQProtocolPacket(uint16 op, const unsigned char *buf, uint32 len) : BasePacket(buf, len), opcode(op) { acked = false; sent_time = 0; }
// EQProtocolPacket(const unsigned char *buf, uint32 len); // EQProtocolPacket(const unsigned char *buf, uint32 len);
bool combine(const EQProtocolPacket *rhs); bool combine(const EQProtocolPacket *rhs);
uint32 serialize (unsigned char *dest) const; uint32 serialize (unsigned char *dest) const;
@@ -70,6 +70,7 @@ public:
EQRawApplicationPacket *MakeAppPacket() const; EQRawApplicationPacket *MakeAppPacket() const;
bool acked; bool acked;
uint32 sent_time;
virtual void build_raw_header_dump(char *buffer, uint16 seq=0xffff) const; virtual void build_raw_header_dump(char *buffer, uint16 seq=0xffff) const;
virtual void build_header_dump(char *buffer) const; virtual void build_header_dump(char *buffer) const;
+89 -94
View File
@@ -75,7 +75,7 @@ void EQStream::init(bool resetSession) {
sent_packet_count = 0; sent_packet_count = 0;
received_packet_count = 0; received_packet_count = 0;
SequencedBase = 0; SequencedBase = 0;
NextSequencedSend = 0; AverageDelta = 500;
if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) {
retransmittimer = Timer::GetCurrentTime(); retransmittimer = Timer::GetCurrentTime();
@@ -86,10 +86,6 @@ void EQStream::init(bool resetSession) {
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "init Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "init Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "init Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
} }
EQRawApplicationPacket *EQStream::MakeApplicationPacket(EQProtocolPacket *p) EQRawApplicationPacket *EQStream::MakeApplicationPacket(EQProtocolPacket *p)
@@ -420,36 +416,33 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-OOA Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-OOA Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-OOA Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
//if the packet they got out of order is between our last acked packet and the last sent packet, then its valid. //if the packet they got out of order is between our last acked packet and the last sent packet, then its valid.
if (CompareSequence(SequencedBase,seq) != SeqPast && CompareSequence(NextOutSeq,seq) == SeqPast) { if (CompareSequence(SequencedBase,seq) != SeqPast && CompareSequence(NextOutSeq,seq) == SeqPast) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Received OP_OutOfOrderAck for sequence %d, starting retransmit at the start of our unacked buffer (seq %d, was %d)." __L, Log.Out(Logs::Detail, Logs::Netcode, _L "Received OP_OutOfOrderAck for sequence %d, starting retransmit at the start of our unacked buffer (seq %d, was %d)." __L,
seq, SequencedBase, SequencedBase+NextSequencedSend); seq, SequencedBase, SequencedBase+SequencedQueue.size());
bool retransmit_acked_packets = false; uint16 sqsize = SequencedQueue.size();
if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { uint16 index = seq - SequencedBase;
retransmit_acked_packets = RETRANSMIT_ACKED_PACKETS; Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck marking packet acked in queue (queue index = %d, queue size = %d)." __L, index, sqsize);
} if (index < sqsize) {
std::deque<EQProtocolPacket *>::iterator sitr;
if(!retransmit_acked_packets) { sitr = SequencedQueue.begin();
uint16 sqsize = SequencedQueue.size(); sitr += index;
uint16 index = seq - SequencedBase; (*sitr)->acked = true;
Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck marking packet acked in queue (queue index = %d, queue size = %d)." __L, index, sqsize); // flag packets for a resend
if (index < sqsize) { uint16 count = 0;
std::deque<EQProtocolPacket *>::iterator sitr; uint32 timeout = AverageDelta * 2 + 100;
sitr = SequencedQueue.begin(); for (sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) {
sitr += index; if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime())) {
(*sitr)->acked = true; (*sitr)->sent_time = 0;
Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck Flagging packet %d for retransmission" __L, SequencedBase + count);
}
} }
} }
if(RETRANSMIT_TIMEOUT_MULT) { if(RETRANSMIT_TIMEOUT_MULT) {
retransmittimer = Timer::GetCurrentTime(); retransmittimer = Timer::GetCurrentTime();
} }
NextSequencedSend = 0;
} else { } else {
Log.Out(Logs::Detail, Logs::Netcode, _L "Received OP_OutOfOrderAck for out-of-window %d. Window (%d->%d)." __L, seq, SequencedBase, NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Received OP_OutOfOrderAck for out-of-window %d. Window (%d->%d)." __L, seq, SequencedBase, NextOutSeq);
} }
@@ -458,9 +451,6 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
Log.Out(Logs::Detail, Logs::Netcode, _L "Post-OOA Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Post-OOA Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Post-OOA Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
MOutboundQueue.unlock(); MOutboundQueue.unlock();
#endif #endif
} }
@@ -489,6 +479,7 @@ void EQStream::ProcessPacket(EQProtocolPacket *p)
} else { } else {
retransmittimeout = ntohl(ClientStats->average_delta) * 2 * RETRANSMIT_TIMEOUT_MULT; retransmittimeout = ntohl(ClientStats->average_delta) * 2 * RETRANSMIT_TIMEOUT_MULT;
} }
retransmittimeout += 300;
if(retransmittimeout > RETRANSMIT_TIMEOUT_MAX) if(retransmittimeout > RETRANSMIT_TIMEOUT_MAX)
retransmittimeout = RETRANSMIT_TIMEOUT_MAX; retransmittimeout = RETRANSMIT_TIMEOUT_MAX;
Log.Out(Logs::Detail, Logs::Netcode, _L "Retransmit timeout recalculated to %dms" __L, retransmittimeout); Log.Out(Logs::Detail, Logs::Netcode, _L "Retransmit timeout recalculated to %dms" __L, retransmittimeout);
@@ -631,9 +622,7 @@ void EQStream::SequencedPush(EQProtocolPacket *p)
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
Log.Out(Logs::Detail, Logs::Netcode, _L "Pushing sequenced packet %d of length %d. Base Seq is %d." __L, NextOutSeq, p->size, SequencedBase); Log.Out(Logs::Detail, Logs::Netcode, _L "Pushing sequenced packet %d of length %d. Base Seq is %d." __L, NextOutSeq, p->size, SequencedBase);
*(uint16 *)(p->pBuffer)=htons(NextOutSeq); *(uint16 *)(p->pBuffer)=htons(NextOutSeq);
@@ -643,9 +632,7 @@ if(NextSequencedSend > SequencedQueue.size()) {
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Push Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
MOutboundQueue.unlock(); MOutboundQueue.unlock();
#endif #endif
} }
@@ -703,21 +690,15 @@ void EQStream::Write(int eq_fd)
// Place to hold the base packet t combine into // Place to hold the base packet t combine into
EQProtocolPacket *p=nullptr; EQProtocolPacket *p=nullptr;
if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) {
// if we have a timeout defined and we have not received an ack recently enough, retransmit from beginning of queue
if (RETRANSMIT_TIMEOUT_MULT && !SequencedQueue.empty() && NextSequencedSend &&
(GetState()==ESTABLISHED) && ((retransmittimer+retransmittimeout) < Timer::GetCurrentTime())) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Timeout since last ack received, starting retransmit at the start of our unacked "
"buffer (seq %d, was %d)." __L, SequencedBase, SequencedBase+NextSequencedSend);
NextSequencedSend = 0;
retransmittimer = Timer::GetCurrentTime(); // don't want to endlessly retransmit the first packet
}
}
// Find the next sequenced packet to send from the "queue" // Find the next sequenced packet to send from the "queue"
sitr = SequencedQueue.begin(); sitr = SequencedQueue.begin();
if (sitr!=SequencedQueue.end())
sitr += NextSequencedSend; uint16 count = 0;
// get to start of packets
while (sitr != SequencedQueue.end() && (*sitr)->sent_time > 0) {
sitr++;
count++;
}
// Loop until both are empty or MaxSends is reached // Loop until both are empty or MaxSends is reached
while(!SeqEmpty || !NonSeqEmpty) { while(!SeqEmpty || !NonSeqEmpty) {
@@ -731,7 +712,7 @@ void EQStream::Write(int eq_fd)
Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with non-seq packet of len %d" __L, p->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with non-seq packet of len %d" __L, p->size);
NonSequencedQueue.pop(); NonSequencedQueue.pop();
} else if (!p->combine(NonSequencedQueue.front())) { } else if (!p->combine(NonSequencedQueue.front())) {
// Tryint to combine this packet with the base didn't work (too big maybe) // Trying to combine this packet with the base didn't work (too big maybe)
// So just send the base packet (we'll try this packet again later) // So just send the base packet (we'll try this packet again later)
Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next non-seq packet is len %d" __L, p->size, (NonSequencedQueue.front())->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next non-seq packet is len %d" __L, p->size, (NonSequencedQueue.front())->size);
ReadyToSend.push(p); ReadyToSend.push(p);
@@ -754,15 +735,10 @@ void EQStream::Write(int eq_fd)
NonSeqEmpty=true; NonSeqEmpty=true;
} }
if (sitr!=SequencedQueue.end()) {
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Send Seq NSS=%d Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, NextSequencedSend, SequencedBase, SequencedQueue.size(), NextOutSeq);
}
if(NextSequencedSend > SequencedQueue.size()) { if (sitr!=SequencedQueue.end()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Send Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
} uint16 seq_send = SequencedBase + count; //just for logging...
uint16 seq_send = SequencedBase + NextSequencedSend; //just for logging...
if(SequencedQueue.empty()) { if(SequencedQueue.empty()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Tried to write a packet with an empty queue (%d is past next out %d)" __L, seq_send, NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Tried to write a packet with an empty queue (%d is past next out %d)" __L, seq_send, NextOutSeq);
@@ -771,26 +747,32 @@ void EQStream::Write(int eq_fd)
} }
if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) {
if (!RETRANSMIT_ACKED_PACKETS && (*sitr)->acked) { if ((*sitr)->acked || (*sitr)->sent_time != 0) {
++sitr;
++count;
if (p) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Final combined packet not full, len %d" __L, p->size);
ReadyToSend.push(p);
BytesWritten += p->size;
p = nullptr;
}
Log.Out(Logs::Detail, Logs::Netcode, _L "Not retransmitting seq packet %d because already marked as acked" __L, seq_send); Log.Out(Logs::Detail, Logs::Netcode, _L "Not retransmitting seq packet %d because already marked as acked" __L, seq_send);
sitr++;
NextSequencedSend++;
} else if (!p) { } else if (!p) {
// If we don't have a packet to try to combine into, use this one as the base // If we don't have a packet to try to combine into, use this one as the base
// Copy it first as it will still live until it is acked // Copy it first as it will still live until it is acked
p=(*sitr)->Copy(); p=(*sitr)->Copy();
Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with seq packet %d of len %d" __L, seq_send, p->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with seq packet %d of len %d" __L, seq_send, p->size);
(*sitr)->sent_time = Timer::GetCurrentTime();
++sitr; ++sitr;
NextSequencedSend++; ++count;
} else if (!p->combine(*sitr)) { } else if (!p->combine(*sitr)) {
// Trying to combine this packet with the base didn't work (too big maybe) // Trying to combine this packet with the base didn't work (too big maybe)
// So just send the base packet (we'll try this packet again later) // So just send the base packet (we'll try this packet again later)
Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next seq packet %d is len %d" __L, p->size, seq_send, (*sitr)->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next seq packet %d is len %d" __L, p->size, seq_send + 1, (*sitr)->size);
ReadyToSend.push(p); ReadyToSend.push(p);
BytesWritten+=p->size; BytesWritten+=p->size;
p=nullptr; p=nullptr;
if ((*sitr)->opcode != OP_Fragment && BytesWritten > threshold) {
if (BytesWritten > threshold) {
// Sent enough this round, lets stop to be fair // Sent enough this round, lets stop to be fair
Log.Out(Logs::Detail, Logs::Netcode, _L "Exceeded write threshold in seq (%d > %d)" __L, BytesWritten, threshold); Log.Out(Logs::Detail, Logs::Netcode, _L "Exceeded write threshold in seq (%d > %d)" __L, BytesWritten, threshold);
break; break;
@@ -798,17 +780,28 @@ void EQStream::Write(int eq_fd)
} else { } else {
// Combine worked // Combine worked
Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yeilding %d combined." __L, seq_send, (*sitr)->size, p->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yeilding %d combined." __L, seq_send, (*sitr)->size, p->size);
(*sitr)->sent_time = Timer::GetCurrentTime();
++sitr; ++sitr;
NextSequencedSend++; ++count;
} }
} else { } else {
if (!p) { if ((*sitr)->sent_time != 0) {
++sitr;
++count;
if (p) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Final combined packet not full, len %d" __L, p->size);
ReadyToSend.push(p);
BytesWritten += p->size;
p = nullptr;
}
} else if (!p) {
// If we don't have a packet to try to combine into, use this one as the base // If we don't have a packet to try to combine into, use this one as the base
// Copy it first as it will still live until it is acked // Copy it first as it will still live until it is acked
p=(*sitr)->Copy(); p=(*sitr)->Copy();
(*sitr)->sent_time = Timer::GetCurrentTime();
Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with seq packet %d of len %d" __L, seq_send, p->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with seq packet %d of len %d" __L, seq_send, p->size);
++sitr; ++sitr;
NextSequencedSend++; ++count;
} else if (!p->combine(*sitr)) { } else if (!p->combine(*sitr)) {
// Trying to combine this packet with the base didn't work (too big maybe) // Trying to combine this packet with the base didn't work (too big maybe)
// So just send the base packet (we'll try this packet again later) // So just send the base packet (we'll try this packet again later)
@@ -824,18 +817,16 @@ void EQStream::Write(int eq_fd)
} }
} else { } else {
// Combine worked // Combine worked
Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yeilding %d combined." __L, seq_send, (*sitr)->size, p->size); Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yielding %d combined." __L, seq_send, (*sitr)->size, p->size);
(*sitr)->sent_time = Timer::GetCurrentTime();
++sitr; ++sitr;
NextSequencedSend++; ++count;
} }
} }
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Post send Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Post send Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Post send Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
} else { } else {
// No more sequenced packets // No more sequenced packets
SeqEmpty=true; SeqEmpty=true;
@@ -894,9 +885,11 @@ sockaddr_in address;
length=p->serialize(buffer); length=p->serialize(buffer);
if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse) { if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse) {
if (compressed) { if (compressed) {
BytesWritten -= p->size;
uint32 newlen=EQProtocolPacket::Compress(buffer,length, _tempBuffer, 2048); uint32 newlen=EQProtocolPacket::Compress(buffer,length, _tempBuffer, 2048);
memcpy(buffer,_tempBuffer,newlen); memcpy(buffer,_tempBuffer,newlen);
length=newlen; length=newlen;
BytesWritten += newlen;
} }
if (encoded) { if (encoded) {
EQProtocolPacket::ChatEncode(buffer,length,Key); EQProtocolPacket::ChatEncode(buffer,length,Key);
@@ -1158,13 +1151,6 @@ void EQStream::AckPackets(uint16 seq)
std::deque<EQProtocolPacket *>::iterator itr, tmp; std::deque<EQProtocolPacket *>::iterator itr, tmp;
MOutboundQueue.lock(); MOutboundQueue.lock();
//do a bit of sanity checking.
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Ack Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq);
}
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Ack Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
SeqOrder ord = CompareSequence(SequencedBase, seq); SeqOrder ord = CompareSequence(SequencedBase, seq);
if(ord == SeqInOrder) { if(ord == SeqInOrder) {
@@ -1180,28 +1166,21 @@ if(NextSequencedSend > SequencedQueue.size()) {
//this is a good ack, we get to ack some blocks. //this is a good ack, we get to ack some blocks.
seq++; //we stop at the block right after their ack, counting on the wrap of both numbers. seq++; //we stop at the block right after their ack, counting on the wrap of both numbers.
while(SequencedBase != seq) { while(SequencedBase != seq) {
if(SequencedQueue.empty()) { if(SequencedQueue.empty()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "OUT OF PACKETS acked packet with sequence %lu. Next send is %d before this." __L, (unsigned long)SequencedBase, NextSequencedSend); Log.Out(Logs::Detail, Logs::Netcode, _L "OUT OF PACKETS acked packet with sequence %lu. Next send is %d before this." __L, (unsigned long)SequencedBase, SequencedQueue.size());
SequencedBase = NextOutSeq; SequencedBase = NextOutSeq;
NextSequencedSend = 0; break;
break; }
} Log.Out(Logs::Detail, Logs::Netcode, _L "Removing acked packet with sequence %lu." __L, (unsigned long)SequencedBase);
Log.Out(Logs::Detail, Logs::Netcode, _L "Removing acked packet with sequence %lu. Next send is %d before this." __L, (unsigned long)SequencedBase, NextSequencedSend);
//clean out the acked packet //clean out the acked packet
delete SequencedQueue.front(); delete SequencedQueue.front();
SequencedQueue.pop_front(); SequencedQueue.pop_front();
//adjust our "next" pointer
if(NextSequencedSend > 0)
NextSequencedSend--;
//advance the base sequence number to the seq of the block after the one we just got rid of. //advance the base sequence number to the seq of the block after the one we just got rid of.
SequencedBase++; SequencedBase++;
} }
if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Post-Ack on %d Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, seq, SequencedBase, SequencedQueue.size(), NextOutSeq); Log.Out(Logs::Detail, Logs::Netcode, _L "Post-Ack on %d Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, seq, SequencedBase, SequencedQueue.size(), NextOutSeq);
} }
if(NextSequencedSend > SequencedQueue.size()) {
Log.Out(Logs::Detail, Logs::Netcode, _L "Post-Ack Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size());
}
} }
MOutboundQueue.unlock(); MOutboundQueue.unlock();
@@ -1379,6 +1358,16 @@ void EQStream::Decay()
if (BytesWritten<0) if (BytesWritten<0)
BytesWritten=0; BytesWritten=0;
} }
// check for any timed out acks
if ((GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) && RETRANSMIT_TIMEOUT_MULT && retransmittimeout) {
int count = 0;
for (std::deque<EQProtocolPacket *>::iterator sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); sitr++, count++) {
if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime()) {
(*sitr)->sent_time = 0;
Log.Out(Logs::Detail, Logs::Netcode, _L "Timeout exceeded for seq %d. Flagging packet for retransmission" __L, SequencedBase + count);
}
}
}
} }
void EQStream::AdjustRates(uint32 average_delta) void EQStream::AdjustRates(uint32 average_delta)
@@ -1386,18 +1375,24 @@ void EQStream::AdjustRates(uint32 average_delta)
if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) {
if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) { if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) {
MRate.lock(); MRate.lock();
AverageDelta = average_delta;
RateThreshold=RATEBASE/average_delta; RateThreshold=RATEBASE/average_delta;
DecayRate=DECAYBASE/average_delta; DecayRate=DECAYBASE/average_delta;
if (BytesWritten > RateThreshold)
BytesWritten = RateThreshold + DecayRate;
Log.Out(Logs::Detail, Logs::Netcode, _L "Adjusting data rate to thresh %d, decay %d based on avg delta %d" __L, Log.Out(Logs::Detail, Logs::Netcode, _L "Adjusting data rate to thresh %d, decay %d based on avg delta %d" __L,
RateThreshold, DecayRate, average_delta); RateThreshold, DecayRate, average_delta);
MRate.unlock(); MRate.unlock();
} else { } else {
Log.Out(Logs::Detail, Logs::Netcode, _L "Not adjusting data rate because avg delta over max (%d > %d)" __L, Log.Out(Logs::Detail, Logs::Netcode, _L "Not adjusting data rate because avg delta over max (%d > %d)" __L,
average_delta, AVERAGE_DELTA_MAX); average_delta, AVERAGE_DELTA_MAX);
AverageDelta = AVERAGE_DELTA_MAX;
} }
} else { } else {
if (average_delta) { if (average_delta) {
MRate.lock(); MRate.lock();
AverageDelta = average_delta;
BytesWritten = 0;
RateThreshold=RATEBASE/average_delta; RateThreshold=RATEBASE/average_delta;
DecayRate=DECAYBASE/average_delta; DecayRate=DECAYBASE/average_delta;
Log.Out(Logs::Detail, Logs::Netcode, _L "Adjusting data rate to thresh %d, decay %d based on avg delta %d" __L, Log.Out(Logs::Detail, Logs::Netcode, _L "Adjusting data rate to thresh %d, decay %d based on avg delta %d" __L,
+1 -2
View File
@@ -153,7 +153,6 @@ class EQStream : public EQStreamInterface {
std::deque<EQProtocolPacket *> SequencedQueue; std::deque<EQProtocolPacket *> SequencedQueue;
uint16 NextOutSeq; uint16 NextOutSeq;
uint16 SequencedBase; //the sequence number of SequencedQueue[0] uint16 SequencedBase; //the sequence number of SequencedQueue[0]
long NextSequencedSend; //index into SequencedQueue
Mutex MOutboundQueue; Mutex MOutboundQueue;
//a buffer we use for compression/decompression //a buffer we use for compression/decompression
@@ -174,7 +173,7 @@ class EQStream : public EQStreamInterface {
Mutex MRate; Mutex MRate;
int32 RateThreshold; int32 RateThreshold;
int32 DecayRate; int32 DecayRate;
uint32 AverageDelta;
OpcodeManager **OpMgr; OpcodeManager **OpMgr;
+1
View File
@@ -8816,6 +8816,7 @@ Bot* EntityList::GetBotByBotName(std::string botName) {
void EntityList::AddBot(Bot *newBot, bool SendSpawnPacket, bool dontqueue) { void EntityList::AddBot(Bot *newBot, bool SendSpawnPacket, bool dontqueue) {
if(newBot) { if(newBot) {
newBot->SetID(GetFreeID()); newBot->SetID(GetFreeID());
newBot->SetSpawned();
if(SendSpawnPacket) { if(SendSpawnPacket) {
if(dontqueue) { if(dontqueue) {
EQApplicationPacket* outapp = new EQApplicationPacket(); EQApplicationPacket* outapp = new EQApplicationPacket();
+2 -1
View File
@@ -123,7 +123,7 @@ Client::Client(EQStreamInterface* ieqs)
camp_timer(29000), camp_timer(29000),
process_timer(100), process_timer(100),
stamina_timer(40000), stamina_timer(40000),
zoneinpacket_timer(3000), zoneinpacket_timer(1000),
linkdead_timer(RuleI(Zone,ClientLinkdeadMS)), linkdead_timer(RuleI(Zone,ClientLinkdeadMS)),
dead_timer(2000), dead_timer(2000),
global_channel_timer(1000), global_channel_timer(1000),
@@ -439,6 +439,7 @@ void Client::SendZoneInPackets()
outapp->priority = 6; outapp->priority = 6;
if (!GetHideMe()) entity_list.QueueClients(this, outapp, true); if (!GetHideMe()) entity_list.QueueClients(this, outapp, true);
safe_delete(outapp); safe_delete(outapp);
SetSpawned();
if (GetPVP()) //force a PVP update until we fix the spawn struct if (GetPVP()) //force a PVP update until we fix the spawn struct
SendAppearancePacket(AT_PVP, GetPVP(), true, false); SendAppearancePacket(AT_PVP, GetPVP(), true, false);
+2 -4
View File
@@ -495,7 +495,7 @@ void Client::CompleteConnect()
{ {
UpdateWho(); UpdateWho();
client_state = CLIENT_CONNECTED; client_state = CLIENT_CONNECTED;
SendAllPackets();
hpupdate_timer.Start(); hpupdate_timer.Start();
position_timer.Start(); position_timer.Start();
autosave_timer.Start(); autosave_timer.Start();
@@ -750,8 +750,6 @@ void Client::CompleteConnect()
entity_list.SendTraders(this); entity_list.SendTraders(this);
zoneinpacket_timer.Start();
if (GetPet()){ if (GetPet()){
GetPet()->SendPetBuffsToClient(); GetPet()->SendPetBuffsToClient();
} }
@@ -1729,7 +1727,7 @@ void Client::Handle_Connect_OP_ZoneEntry(const EQApplicationPacket *app)
SetAttackTimer(); SetAttackTimer();
conn_state = ZoneInfoSent; conn_state = ZoneInfoSent;
zoneinpacket_timer.Start();
return; return;
} }
+1 -1
View File
@@ -66,7 +66,7 @@ bool Client::Process() {
if(Connected() || IsLD()) if(Connected() || IsLD())
{ {
// try to send all packets that weren't sent before // try to send all packets that weren't sent before
if(!IsLD() && zoneinpacket_timer.Check()) if (!IsLD() && zoneinpacket_timer.Check())
{ {
SendAllPackets(); SendAllPackets();
} }
+3 -3
View File
@@ -647,7 +647,7 @@ void EntityList::AddNPC(NPC *npc, bool SendSpawnPacket, bool dontqueue)
uint16 emoteid = npc->GetEmoteID(); uint16 emoteid = npc->GetEmoteID();
if (emoteid != 0) if (emoteid != 0)
npc->DoNPCEmote(ONSPAWN, emoteid); npc->DoNPCEmote(ONSPAWN, emoteid);
npc->SetSpawned();
if (SendSpawnPacket) { if (SendSpawnPacket) {
if (dontqueue) { // aka, SEND IT NOW BITCH! if (dontqueue) { // aka, SEND IT NOW BITCH!
EQApplicationPacket *app = new EQApplicationPacket; EQApplicationPacket *app = new EQApplicationPacket;
@@ -686,7 +686,7 @@ void EntityList::AddMerc(Merc *merc, bool SendSpawnPacket, bool dontqueue)
if (merc) if (merc)
{ {
merc->SetID(GetFreeID()); merc->SetID(GetFreeID());
merc->SetSpawned();
if (SendSpawnPacket) if (SendSpawnPacket)
{ {
if (dontqueue) { if (dontqueue) {
@@ -1231,7 +1231,7 @@ void EntityList::SendZoneSpawnsBulk(Client *client)
int32 race=-1; int32 race=-1;
for (auto it = mob_list.begin(); it != mob_list.end(); ++it) { for (auto it = mob_list.begin(); it != mob_list.end(); ++it) {
spawn = it->second; spawn = it->second;
if (spawn && spawn->InZone()) { if (spawn && spawn->GetID() > 0 && spawn->Spawned()) {
if (spawn->IsClient() && (spawn->CastToClient()->GMHideMe(client) || if (spawn->IsClient() && (spawn->CastToClient()->GMHideMe(client) ||
spawn->CastToClient()->IsHoveringForRespawn())) spawn->CastToClient()->IsHoveringForRespawn()))
continue; continue;
+1
View File
@@ -253,6 +253,7 @@ Mob::Mob(const char* in_name,
invulnerable = false; invulnerable = false;
IsFullHP = (cur_hp == max_hp); IsFullHP = (cur_hp == max_hp);
qglobal=0; qglobal=0;
spawned = false;
InitializeBuffSlots(); InitializeBuffSlots();
+3
View File
@@ -488,6 +488,8 @@ public:
void MakeSpawnUpdateNoDelta(PlayerPositionUpdateServer_Struct* spu); void MakeSpawnUpdateNoDelta(PlayerPositionUpdateServer_Struct* spu);
void MakeSpawnUpdate(PlayerPositionUpdateServer_Struct* spu); void MakeSpawnUpdate(PlayerPositionUpdateServer_Struct* spu);
void SendPosition(); void SendPosition();
void SetSpawned() { spawned = true; };
bool Spawned() { return spawned; };
void SetFlyMode(uint8 flymode); void SetFlyMode(uint8 flymode);
inline void Teleport(glm::vec3 NewPosition) { m_Position.x = NewPosition.x; m_Position.y = NewPosition.y; inline void Teleport(glm::vec3 NewPosition) { m_Position.x = NewPosition.x; m_Position.y = NewPosition.y;
m_Position.z = NewPosition.z; }; m_Position.z = NewPosition.z; };
@@ -1137,6 +1139,7 @@ protected:
bool held; bool held;
bool nocast; bool nocast;
bool focused; bool focused;
bool spawned;
void CalcSpellBonuses(StatBonuses* newbon); void CalcSpellBonuses(StatBonuses* newbon);
virtual void CalcBonuses(); virtual void CalcBonuses();
void TrySkillProc(Mob *on, uint16 skill, uint16 ReuseTime, bool Success = false, uint16 hand = 0, bool IsDefensive = false); // hand = MainCharm? void TrySkillProc(Mob *on, uint16 skill, uint16 ReuseTime, bool Success = false, uint16 hand = 0, bool IsDefensive = false); // hand = MainCharm?