From fb23d961c17ed6760677848742f8191c89a15ed9 Mon Sep 17 00:00:00 2001 From: ngdeao Date: Mon, 28 Mar 2016 19:11:24 -0600 Subject: [PATCH 1/3] Changed where queued packets are sent while zoning. Moved where zoneinpacket_timer is started to assist in not dropping needed packets. Added better netcode support for handling out of order acks, to preclude excessive resending of same packets. Changed how timeout checks are performing on individual packets, for re-sends, so they do not happen more often than the client can respond. Improved how the data rate limit for throttling packets for compressed stream, so the size reduction in packets are accounted for better. --- common/eq_packet.h | 3 +- common/eq_stream.cpp | 183 +++++++++++++++++++--------------------- common/eq_stream.h | 3 +- zone/bot.cpp | 1 + zone/client.cpp | 3 +- zone/client_packet.cpp | 6 +- zone/client_process.cpp | 2 +- zone/entity.cpp | 6 +- zone/mob.cpp | 1 + zone/mob.h | 3 + 10 files changed, 105 insertions(+), 106 deletions(-) diff --git a/common/eq_packet.h b/common/eq_packet.h index ed90d132a..94c8a1e10 100644 --- a/common/eq_packet.h +++ b/common/eq_packet.h @@ -62,7 +62,7 @@ class EQProtocolPacket : public BasePacket { friend class EQStream; friend class EQStreamPair; public: - EQProtocolPacket(uint16 op, const unsigned char *buf, uint32 len) : BasePacket(buf,len), opcode(op) { acked = false; } + EQProtocolPacket(uint16 op, const unsigned char *buf, uint32 len) : BasePacket(buf, len), opcode(op) { acked = false; sent_time = 0; } // EQProtocolPacket(const unsigned char *buf, uint32 len); bool combine(const EQProtocolPacket *rhs); uint32 serialize (unsigned char *dest) const; @@ -70,6 +70,7 @@ public: EQRawApplicationPacket *MakeAppPacket() const; bool acked; + uint32 sent_time; virtual void build_raw_header_dump(char *buffer, uint16 seq=0xffff) const; virtual void build_header_dump(char *buffer) const; diff --git a/common/eq_stream.cpp b/common/eq_stream.cpp index 3438f533c..d132fb872 100644 --- a/common/eq_stream.cpp +++ b/common/eq_stream.cpp @@ -75,7 +75,7 @@ void EQStream::init(bool resetSession) { sent_packet_count = 0; received_packet_count = 0; SequencedBase = 0; - NextSequencedSend = 0; + AverageDelta = 500; if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { retransmittimer = Timer::GetCurrentTime(); @@ -86,10 +86,6 @@ void EQStream::init(bool resetSession) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { Log.Out(Logs::Detail, Logs::Netcode, _L "init Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); } - - if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "init Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); - } } EQRawApplicationPacket *EQStream::MakeApplicationPacket(EQProtocolPacket *p) @@ -420,36 +416,33 @@ void EQStream::ProcessPacket(EQProtocolPacket *p) Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-OOA Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); } - if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-OOA Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); - } //if the packet they got out of order is between our last acked packet and the last sent packet, then its valid. if (CompareSequence(SequencedBase,seq) != SeqPast && CompareSequence(NextOutSeq,seq) == SeqPast) { Log.Out(Logs::Detail, Logs::Netcode, _L "Received OP_OutOfOrderAck for sequence %d, starting retransmit at the start of our unacked buffer (seq %d, was %d)." __L, - seq, SequencedBase, SequencedBase+NextSequencedSend); + seq, SequencedBase, SequencedBase+SequencedQueue.size()); - bool retransmit_acked_packets = false; - if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { - retransmit_acked_packets = RETRANSMIT_ACKED_PACKETS; - } - - if(!retransmit_acked_packets) { - uint16 sqsize = SequencedQueue.size(); - uint16 index = seq - SequencedBase; - Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck marking packet acked in queue (queue index = %d, queue size = %d)." __L, index, sqsize); - if (index < sqsize) { - std::deque::iterator sitr; - sitr = SequencedQueue.begin(); - sitr += index; - (*sitr)->acked = true; + uint16 sqsize = SequencedQueue.size(); + uint16 index = seq - SequencedBase; + Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck marking packet acked in queue (queue index = %d, queue size = %d)." __L, index, sqsize); + if (index < sqsize) { + std::deque::iterator sitr; + sitr = SequencedQueue.begin(); + sitr += index; + (*sitr)->acked = true; + // flag packets for a resend + uint16 count = 0; + uint32 timeout = AverageDelta * 2 + 100; + for (sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) { + if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime())) { + (*sitr)->sent_time = 0; + Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck Flagging packet %d for retransmission" __L, SequencedBase + count); + } } } if(RETRANSMIT_TIMEOUT_MULT) { retransmittimer = Timer::GetCurrentTime(); } - - NextSequencedSend = 0; } else { Log.Out(Logs::Detail, Logs::Netcode, _L "Received OP_OutOfOrderAck for out-of-window %d. Window (%d->%d)." __L, seq, SequencedBase, NextOutSeq); } @@ -458,9 +451,6 @@ void EQStream::ProcessPacket(EQProtocolPacket *p) Log.Out(Logs::Detail, Logs::Netcode, _L "Post-OOA Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); } - if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Post-OOA Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); - } MOutboundQueue.unlock(); #endif } @@ -489,6 +479,7 @@ void EQStream::ProcessPacket(EQProtocolPacket *p) } else { retransmittimeout = ntohl(ClientStats->average_delta) * 2 * RETRANSMIT_TIMEOUT_MULT; } + retransmittimeout += 300; if(retransmittimeout > RETRANSMIT_TIMEOUT_MAX) retransmittimeout = RETRANSMIT_TIMEOUT_MAX; Log.Out(Logs::Detail, Logs::Netcode, _L "Retransmit timeout recalculated to %dms" __L, retransmittimeout); @@ -631,9 +622,7 @@ void EQStream::SequencedPush(EQProtocolPacket *p) if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); } -if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); -} + Log.Out(Logs::Detail, Logs::Netcode, _L "Pushing sequenced packet %d of length %d. Base Seq is %d." __L, NextOutSeq, p->size, SequencedBase); *(uint16 *)(p->pBuffer)=htons(NextOutSeq); @@ -643,9 +632,7 @@ if(NextSequencedSend > SequencedQueue.size()) { if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { Log.Out(Logs::Detail, Logs::Netcode, _L "Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); } -if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Push Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); -} + MOutboundQueue.unlock(); #endif } @@ -703,21 +690,15 @@ void EQStream::Write(int eq_fd) // Place to hold the base packet t combine into EQProtocolPacket *p=nullptr; - if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { - // if we have a timeout defined and we have not received an ack recently enough, retransmit from beginning of queue - if (RETRANSMIT_TIMEOUT_MULT && !SequencedQueue.empty() && NextSequencedSend && - (GetState()==ESTABLISHED) && ((retransmittimer+retransmittimeout) < Timer::GetCurrentTime())) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Timeout since last ack received, starting retransmit at the start of our unacked " - "buffer (seq %d, was %d)." __L, SequencedBase, SequencedBase+NextSequencedSend); - NextSequencedSend = 0; - retransmittimer = Timer::GetCurrentTime(); // don't want to endlessly retransmit the first packet - } - } - // Find the next sequenced packet to send from the "queue" sitr = SequencedQueue.begin(); - if (sitr!=SequencedQueue.end()) - sitr += NextSequencedSend; + + uint16 count = 0; + // get to start of packets + while (sitr != SequencedQueue.end() && (*sitr)->sent_time > 0) { + sitr++; + count++; + } // Loop until both are empty or MaxSends is reached while(!SeqEmpty || !NonSeqEmpty) { @@ -731,7 +712,7 @@ void EQStream::Write(int eq_fd) Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with non-seq packet of len %d" __L, p->size); NonSequencedQueue.pop(); } else if (!p->combine(NonSequencedQueue.front())) { - // Tryint to combine this packet with the base didn't work (too big maybe) + // Trying to combine this packet with the base didn't work (too big maybe) // So just send the base packet (we'll try this packet again later) Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next non-seq packet is len %d" __L, p->size, (NonSequencedQueue.front())->size); ReadyToSend.push(p); @@ -754,15 +735,10 @@ void EQStream::Write(int eq_fd) NonSeqEmpty=true; } - if (sitr!=SequencedQueue.end()) { - if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Send Seq NSS=%d Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, NextSequencedSend, SequencedBase, SequencedQueue.size(), NextOutSeq); - } - if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Send Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); - } - uint16 seq_send = SequencedBase + NextSequencedSend; //just for logging... + if (sitr!=SequencedQueue.end()) { + + uint16 seq_send = SequencedBase + count; //just for logging... if(SequencedQueue.empty()) { Log.Out(Logs::Detail, Logs::Netcode, _L "Tried to write a packet with an empty queue (%d is past next out %d)" __L, seq_send, NextOutSeq); @@ -771,26 +747,32 @@ void EQStream::Write(int eq_fd) } if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { - if (!RETRANSMIT_ACKED_PACKETS && (*sitr)->acked) { + if ((*sitr)->acked || (*sitr)->sent_time != 0) { + ++sitr; + ++count; + if (p) { + Log.Out(Logs::Detail, Logs::Netcode, _L "Final combined packet not full, len %d" __L, p->size); + ReadyToSend.push(p); + BytesWritten += p->size; + p = nullptr; + } Log.Out(Logs::Detail, Logs::Netcode, _L "Not retransmitting seq packet %d because already marked as acked" __L, seq_send); - sitr++; - NextSequencedSend++; } else if (!p) { // If we don't have a packet to try to combine into, use this one as the base // Copy it first as it will still live until it is acked p=(*sitr)->Copy(); Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with seq packet %d of len %d" __L, seq_send, p->size); + (*sitr)->sent_time = Timer::GetCurrentTime(); ++sitr; - NextSequencedSend++; + ++count; } else if (!p->combine(*sitr)) { // Trying to combine this packet with the base didn't work (too big maybe) // So just send the base packet (we'll try this packet again later) - Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next seq packet %d is len %d" __L, p->size, seq_send, (*sitr)->size); + Log.Out(Logs::Detail, Logs::Netcode, _L "Combined packet full at len %d, next seq packet %d is len %d" __L, p->size, seq_send + 1, (*sitr)->size); ReadyToSend.push(p); BytesWritten+=p->size; p=nullptr; - - if (BytesWritten > threshold) { + if ((*sitr)->opcode != OP_Fragment && BytesWritten > threshold) { // Sent enough this round, lets stop to be fair Log.Out(Logs::Detail, Logs::Netcode, _L "Exceeded write threshold in seq (%d > %d)" __L, BytesWritten, threshold); break; @@ -798,17 +780,28 @@ void EQStream::Write(int eq_fd) } else { // Combine worked Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yeilding %d combined." __L, seq_send, (*sitr)->size, p->size); + (*sitr)->sent_time = Timer::GetCurrentTime(); ++sitr; - NextSequencedSend++; + ++count; } } else { - if (!p) { + if ((*sitr)->sent_time != 0) { + ++sitr; + ++count; + if (p) { + Log.Out(Logs::Detail, Logs::Netcode, _L "Final combined packet not full, len %d" __L, p->size); + ReadyToSend.push(p); + BytesWritten += p->size; + p = nullptr; + } + } else if (!p) { // If we don't have a packet to try to combine into, use this one as the base // Copy it first as it will still live until it is acked p=(*sitr)->Copy(); + (*sitr)->sent_time = Timer::GetCurrentTime(); Log.Out(Logs::Detail, Logs::Netcode, _L "Starting combined packet with seq packet %d of len %d" __L, seq_send, p->size); ++sitr; - NextSequencedSend++; + ++count; } else if (!p->combine(*sitr)) { // Trying to combine this packet with the base didn't work (too big maybe) // So just send the base packet (we'll try this packet again later) @@ -824,18 +817,16 @@ void EQStream::Write(int eq_fd) } } else { // Combine worked - Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yeilding %d combined." __L, seq_send, (*sitr)->size, p->size); + Log.Out(Logs::Detail, Logs::Netcode, _L "Combined seq packet %d of len %d, yielding %d combined." __L, seq_send, (*sitr)->size, p->size); + (*sitr)->sent_time = Timer::GetCurrentTime(); ++sitr; - NextSequencedSend++; + ++count; } } if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { Log.Out(Logs::Detail, Logs::Netcode, _L "Post send Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); } - if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Post send Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); - } } else { // No more sequenced packets SeqEmpty=true; @@ -894,9 +885,11 @@ sockaddr_in address; length=p->serialize(buffer); if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse) { if (compressed) { + BytesWritten -= p->size; uint32 newlen=EQProtocolPacket::Compress(buffer,length, _tempBuffer, 2048); memcpy(buffer,_tempBuffer,newlen); length=newlen; + BytesWritten += newlen; } if (encoded) { EQProtocolPacket::ChatEncode(buffer,length,Key); @@ -1158,13 +1151,6 @@ void EQStream::AckPackets(uint16 seq) std::deque::iterator itr, tmp; MOutboundQueue.lock(); -//do a bit of sanity checking. -if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Ack Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); -} -if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Ack Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); -} SeqOrder ord = CompareSequence(SequencedBase, seq); if(ord == SeqInOrder) { @@ -1180,28 +1166,21 @@ if(NextSequencedSend > SequencedQueue.size()) { //this is a good ack, we get to ack some blocks. seq++; //we stop at the block right after their ack, counting on the wrap of both numbers. while(SequencedBase != seq) { -if(SequencedQueue.empty()) { -Log.Out(Logs::Detail, Logs::Netcode, _L "OUT OF PACKETS acked packet with sequence %lu. Next send is %d before this." __L, (unsigned long)SequencedBase, NextSequencedSend); - SequencedBase = NextOutSeq; - NextSequencedSend = 0; - break; -} - Log.Out(Logs::Detail, Logs::Netcode, _L "Removing acked packet with sequence %lu. Next send is %d before this." __L, (unsigned long)SequencedBase, NextSequencedSend); + if(SequencedQueue.empty()) { + Log.Out(Logs::Detail, Logs::Netcode, _L "OUT OF PACKETS acked packet with sequence %lu. Next send is %d before this." __L, (unsigned long)SequencedBase, SequencedQueue.size()); + SequencedBase = NextOutSeq; + break; + } + Log.Out(Logs::Detail, Logs::Netcode, _L "Removing acked packet with sequence %lu." __L, (unsigned long)SequencedBase); //clean out the acked packet delete SequencedQueue.front(); SequencedQueue.pop_front(); - //adjust our "next" pointer - if(NextSequencedSend > 0) - NextSequencedSend--; //advance the base sequence number to the seq of the block after the one we just got rid of. SequencedBase++; } -if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Post-Ack on %d Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, seq, SequencedBase, SequencedQueue.size(), NextOutSeq); -} -if(NextSequencedSend > SequencedQueue.size()) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Post-Ack Next Send Sequence is beyond the end of the queue NSS %d > SQ %d" __L, NextSequencedSend, SequencedQueue.size()); -} + if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { + Log.Out(Logs::Detail, Logs::Netcode, _L "Post-Ack on %d Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, seq, SequencedBase, SequencedQueue.size(), NextOutSeq); + } } MOutboundQueue.unlock(); @@ -1379,6 +1358,16 @@ void EQStream::Decay() if (BytesWritten<0) BytesWritten=0; } + // check for any timed out acks + if ((GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) && RETRANSMIT_TIMEOUT_MULT && retransmittimeout) { + int count = 0; + for (std::deque::iterator sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); sitr++, count++) { + if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime()) { + (*sitr)->sent_time = 0; + Log.Out(Logs::Detail, Logs::Netcode, _L "Timeout exceeded for seq %d. Flagging packet for retransmission" __L, SequencedBase + count); + } + } + } } void EQStream::AdjustRates(uint32 average_delta) @@ -1386,18 +1375,24 @@ void EQStream::AdjustRates(uint32 average_delta) if(GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) { if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) { MRate.lock(); + AverageDelta = average_delta; RateThreshold=RATEBASE/average_delta; DecayRate=DECAYBASE/average_delta; + if (BytesWritten > RateThreshold) + BytesWritten = RateThreshold + DecayRate; Log.Out(Logs::Detail, Logs::Netcode, _L "Adjusting data rate to thresh %d, decay %d based on avg delta %d" __L, RateThreshold, DecayRate, average_delta); MRate.unlock(); } else { Log.Out(Logs::Detail, Logs::Netcode, _L "Not adjusting data rate because avg delta over max (%d > %d)" __L, average_delta, AVERAGE_DELTA_MAX); + AverageDelta = AVERAGE_DELTA_MAX; } } else { if (average_delta) { MRate.lock(); + AverageDelta = average_delta; + BytesWritten = 0; RateThreshold=RATEBASE/average_delta; DecayRate=DECAYBASE/average_delta; Log.Out(Logs::Detail, Logs::Netcode, _L "Adjusting data rate to thresh %d, decay %d based on avg delta %d" __L, diff --git a/common/eq_stream.h b/common/eq_stream.h index 0bdfeab53..d5e78c085 100644 --- a/common/eq_stream.h +++ b/common/eq_stream.h @@ -153,7 +153,6 @@ class EQStream : public EQStreamInterface { std::deque SequencedQueue; uint16 NextOutSeq; uint16 SequencedBase; //the sequence number of SequencedQueue[0] - long NextSequencedSend; //index into SequencedQueue Mutex MOutboundQueue; //a buffer we use for compression/decompression @@ -174,7 +173,7 @@ class EQStream : public EQStreamInterface { Mutex MRate; int32 RateThreshold; int32 DecayRate; - + uint32 AverageDelta; OpcodeManager **OpMgr; diff --git a/zone/bot.cpp b/zone/bot.cpp index 6cb27f8e7..c2213436b 100644 --- a/zone/bot.cpp +++ b/zone/bot.cpp @@ -8816,6 +8816,7 @@ Bot* EntityList::GetBotByBotName(std::string botName) { void EntityList::AddBot(Bot *newBot, bool SendSpawnPacket, bool dontqueue) { if(newBot) { newBot->SetID(GetFreeID()); + newBot->SetSpawned(); if(SendSpawnPacket) { if(dontqueue) { EQApplicationPacket* outapp = new EQApplicationPacket(); diff --git a/zone/client.cpp b/zone/client.cpp index 837306d99..896a794c2 100644 --- a/zone/client.cpp +++ b/zone/client.cpp @@ -123,7 +123,7 @@ Client::Client(EQStreamInterface* ieqs) camp_timer(29000), process_timer(100), stamina_timer(40000), - zoneinpacket_timer(3000), + zoneinpacket_timer(1000), linkdead_timer(RuleI(Zone,ClientLinkdeadMS)), dead_timer(2000), global_channel_timer(1000), @@ -439,6 +439,7 @@ void Client::SendZoneInPackets() outapp->priority = 6; if (!GetHideMe()) entity_list.QueueClients(this, outapp, true); safe_delete(outapp); + SetSpawned(); if (GetPVP()) //force a PVP update until we fix the spawn struct SendAppearancePacket(AT_PVP, GetPVP(), true, false); diff --git a/zone/client_packet.cpp b/zone/client_packet.cpp index fb7216b86..0ec86ed0d 100644 --- a/zone/client_packet.cpp +++ b/zone/client_packet.cpp @@ -495,7 +495,7 @@ void Client::CompleteConnect() { UpdateWho(); client_state = CLIENT_CONNECTED; - + SendAllPackets(); hpupdate_timer.Start(); position_timer.Start(); autosave_timer.Start(); @@ -750,8 +750,6 @@ void Client::CompleteConnect() entity_list.SendTraders(this); - zoneinpacket_timer.Start(); - if (GetPet()){ GetPet()->SendPetBuffsToClient(); } @@ -1729,7 +1727,7 @@ void Client::Handle_Connect_OP_ZoneEntry(const EQApplicationPacket *app) SetAttackTimer(); conn_state = ZoneInfoSent; - + zoneinpacket_timer.Start(); return; } diff --git a/zone/client_process.cpp b/zone/client_process.cpp index 3154a9ca9..6c51e6196 100644 --- a/zone/client_process.cpp +++ b/zone/client_process.cpp @@ -66,7 +66,7 @@ bool Client::Process() { if(Connected() || IsLD()) { // try to send all packets that weren't sent before - if(!IsLD() && zoneinpacket_timer.Check()) + if (!IsLD() && zoneinpacket_timer.Check()) { SendAllPackets(); } diff --git a/zone/entity.cpp b/zone/entity.cpp index a79d01609..00da8a7dc 100644 --- a/zone/entity.cpp +++ b/zone/entity.cpp @@ -647,7 +647,7 @@ void EntityList::AddNPC(NPC *npc, bool SendSpawnPacket, bool dontqueue) uint16 emoteid = npc->GetEmoteID(); if (emoteid != 0) npc->DoNPCEmote(ONSPAWN, emoteid); - + npc->SetSpawned(); if (SendSpawnPacket) { if (dontqueue) { // aka, SEND IT NOW BITCH! EQApplicationPacket *app = new EQApplicationPacket; @@ -686,7 +686,7 @@ void EntityList::AddMerc(Merc *merc, bool SendSpawnPacket, bool dontqueue) if (merc) { merc->SetID(GetFreeID()); - + merc->SetSpawned(); if (SendSpawnPacket) { if (dontqueue) { @@ -1231,7 +1231,7 @@ void EntityList::SendZoneSpawnsBulk(Client *client) int32 race=-1; for (auto it = mob_list.begin(); it != mob_list.end(); ++it) { spawn = it->second; - if (spawn && spawn->InZone()) { + if (spawn && spawn->GetID() > 0 && spawn->Spawned()) { if (spawn->IsClient() && (spawn->CastToClient()->GMHideMe(client) || spawn->CastToClient()->IsHoveringForRespawn())) continue; diff --git a/zone/mob.cpp b/zone/mob.cpp index c91ae404f..fdfe602ce 100644 --- a/zone/mob.cpp +++ b/zone/mob.cpp @@ -253,6 +253,7 @@ Mob::Mob(const char* in_name, invulnerable = false; IsFullHP = (cur_hp == max_hp); qglobal=0; + spawned = false; InitializeBuffSlots(); diff --git a/zone/mob.h b/zone/mob.h index 8cb899d0f..8bf1f1dda 100644 --- a/zone/mob.h +++ b/zone/mob.h @@ -488,6 +488,8 @@ public: void MakeSpawnUpdateNoDelta(PlayerPositionUpdateServer_Struct* spu); void MakeSpawnUpdate(PlayerPositionUpdateServer_Struct* spu); void SendPosition(); + void SetSpawned() { spawned = true; }; + bool Spawned() { return spawned; }; void SetFlyMode(uint8 flymode); inline void Teleport(glm::vec3 NewPosition) { m_Position.x = NewPosition.x; m_Position.y = NewPosition.y; m_Position.z = NewPosition.z; }; @@ -1137,6 +1139,7 @@ protected: bool held; bool nocast; bool focused; + bool spawned; void CalcSpellBonuses(StatBonuses* newbon); virtual void CalcBonuses(); void TrySkillProc(Mob *on, uint16 skill, uint16 ReuseTime, bool Success = false, uint16 hand = 0, bool IsDefensive = false); // hand = MainCharm? From 1551e5d908e2e3d07413fca78a51bc7771a4f37e Mon Sep 17 00:00:00 2001 From: "Michael Cook (mackal)" Date: Fri, 8 Apr 2016 14:11:02 -0400 Subject: [PATCH 2/3] Add mutex to EQStream::Decay to prevent threading issues --- common/eq_stream.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/eq_stream.cpp b/common/eq_stream.cpp index d132fb872..24d96562f 100644 --- a/common/eq_stream.cpp +++ b/common/eq_stream.cpp @@ -1361,12 +1361,14 @@ void EQStream::Decay() // check for any timed out acks if ((GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) && RETRANSMIT_TIMEOUT_MULT && retransmittimeout) { int count = 0; + MOutboundQueue.lock(); for (std::deque::iterator sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); sitr++, count++) { if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime()) { (*sitr)->sent_time = 0; Log.Out(Logs::Detail, Logs::Netcode, _L "Timeout exceeded for seq %d. Flagging packet for retransmission" __L, SequencedBase + count); } } + MOutboundQueue.unlock(); } } From 761c2be72278bb4840987c09c217240985693c8a Mon Sep 17 00:00:00 2001 From: "Michael Cook (mackal)" Date: Fri, 8 Apr 2016 14:14:09 -0400 Subject: [PATCH 3/3] Style changes (auto, post-inc to pre-inc) --- common/eq_stream.cpp | 39 +++++++++++++++++------------------- common/eq_stream_factory.cpp | 27 +++++++++++++------------ 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/common/eq_stream.cpp b/common/eq_stream.cpp index 24d96562f..4b231dc9a 100644 --- a/common/eq_stream.cpp +++ b/common/eq_stream.cpp @@ -425,14 +425,11 @@ void EQStream::ProcessPacket(EQProtocolPacket *p) uint16 index = seq - SequencedBase; Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck marking packet acked in queue (queue index = %d, queue size = %d)." __L, index, sqsize); if (index < sqsize) { - std::deque::iterator sitr; - sitr = SequencedQueue.begin(); - sitr += index; - (*sitr)->acked = true; + SequencedQueue[index]->acked = true; // flag packets for a resend uint16 count = 0; uint32 timeout = AverageDelta * 2 + 100; - for (sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) { + for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) { if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime())) { (*sitr)->sent_time = 0; Log.Out(Logs::Detail, Logs::Netcode, _L "OP_OutOfOrderAck Flagging packet %d for retransmission" __L, SequencedBase + count); @@ -619,19 +616,21 @@ void EQStream::SequencedPush(EQProtocolPacket *p) delete p; #else MOutboundQueue.lock(); -if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); -} + if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { + Log.Out(Logs::Detail, Logs::Netcode, _L "Pre-Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, + SequencedBase, SequencedQueue.size(), NextOutSeq); + } - - Log.Out(Logs::Detail, Logs::Netcode, _L "Pushing sequenced packet %d of length %d. Base Seq is %d." __L, NextOutSeq, p->size, SequencedBase); - *(uint16 *)(p->pBuffer)=htons(NextOutSeq); + Log.Out(Logs::Detail, Logs::Netcode, _L "Pushing sequenced packet %d of length %d. Base Seq is %d." __L, + NextOutSeq, p->size, SequencedBase); + *(uint16 *)(p->pBuffer) = htons(NextOutSeq); SequencedQueue.push_back(p); NextOutSeq++; -if(uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - Log.Out(Logs::Detail, Logs::Netcode, _L "Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, SequencedBase, SequencedQueue.size(), NextOutSeq); -} + if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { + Log.Out(Logs::Detail, Logs::Netcode, _L "Push Invalid Sequenced queue: BS %d + SQ %d != NOS %d" __L, + SequencedBase, SequencedQueue.size(), NextOutSeq); + } MOutboundQueue.unlock(); #endif @@ -696,8 +695,8 @@ void EQStream::Write(int eq_fd) uint16 count = 0; // get to start of packets while (sitr != SequencedQueue.end() && (*sitr)->sent_time > 0) { - sitr++; - count++; + ++sitr; + ++count; } // Loop until both are empty or MaxSends is reached @@ -735,11 +734,9 @@ void EQStream::Write(int eq_fd) NonSeqEmpty=true; } - - if (sitr!=SequencedQueue.end()) { - + if (sitr != SequencedQueue.end()) { uint16 seq_send = SequencedBase + count; //just for logging... - + if(SequencedQueue.empty()) { Log.Out(Logs::Detail, Logs::Netcode, _L "Tried to write a packet with an empty queue (%d is past next out %d)" __L, seq_send, NextOutSeq); SeqEmpty=true; @@ -1362,7 +1359,7 @@ void EQStream::Decay() if ((GetExecutablePlatform() == ExePlatformWorld || GetExecutablePlatform() == ExePlatformZone) && RETRANSMIT_TIMEOUT_MULT && retransmittimeout) { int count = 0; MOutboundQueue.lock(); - for (std::deque::iterator sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); sitr++, count++) { + for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); ++sitr, count++) { if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime()) { (*sitr)->sent_time = 0; Log.Out(Logs::Detail, Logs::Netcode, _L "Timeout exceeded for seq %d. Flagging packet for retransmission" __L, SequencedBase + count); diff --git a/common/eq_stream_factory.cpp b/common/eq_stream_factory.cpp index f48b0f723..ca5596410 100644 --- a/common/eq_stream_factory.cpp +++ b/common/eq_stream_factory.cpp @@ -250,8 +250,7 @@ void EQStreamFactory::CheckTimeout() void EQStreamFactory::WriterLoop() { - std::map, std::shared_ptr>::iterator stream_itr; - bool havework=true; + bool havework = true; std::vector> wants_write; std::vector>::iterator cur, end; bool decay = false; @@ -260,7 +259,7 @@ void EQStreamFactory::WriterLoop() WriterRunning = true; DecayTimer.Enable(); - while(sock!=-1) { + while (sock != -1) { MWriterRunning.lock(); if (!WriterRunning) break; @@ -269,34 +268,36 @@ void EQStreamFactory::WriterLoop() havework = false; wants_write.clear(); - decay=DecayTimer.Check(); + decay = DecayTimer.Check(); - //copy streams into a seperate list so we dont have to keep - //MStreams locked while we are writting + // copy streams into a seperate list so we dont have to keep + // MStreams locked while we are writting MStreams.lock(); - for(stream_itr=Streams.begin();stream_itr!=Streams.end();++stream_itr) { + for (auto stream_itr = Streams.begin(); stream_itr != Streams.end(); ++stream_itr) { // If it's time to decay the bytes sent, then let's do it before we try to write if (decay) stream_itr->second->Decay(); - //bullshit checking, to see if this is really happening, GDB seems to think so... - if(stream_itr->second == nullptr) { - fprintf(stderr, "ERROR: nullptr Stream encountered in EQStreamFactory::WriterLoop for: %i:%i", stream_itr->first.first, stream_itr->first.second); + // bullshit checking, to see if this is really happening, GDB seems to think so... + if (stream_itr->second == nullptr) { + fprintf(stderr, + "ERROR: nullptr Stream encountered in EQStreamFactory::WriterLoop for: %i:%i", + stream_itr->first.first, stream_itr->first.second); continue; } if (stream_itr->second->HasOutgoingData()) { - havework=true; + havework = true; stream_itr->second->PutInUse(); wants_write.push_back(stream_itr->second); } } MStreams.unlock(); - //do the actual writes + // do the actual writes cur = wants_write.begin(); end = wants_write.end(); - for(; cur != end; ++cur) { + for (; cur != end; ++cur) { (*cur)->Write(sock); (*cur)->ReleaseFromUse(); }