Streams work on all of the servers now

This commit is contained in:
KimLS
2016-09-29 22:21:39 -07:00
parent a76149c8e3
commit 7a3147a3b3
25 changed files with 128 additions and 181 deletions
+57 -31
View File
@@ -46,7 +46,7 @@ void EQ::Net::DaybreakConnectionManager::Attach(uv_loop_t *loop)
uv_timer_start(&m_resend_timer, [](uv_timer_t *handle) {
DaybreakConnectionManager *c = (DaybreakConnectionManager*)handle->data;
c->ProcessResend();
}, 10, 10);
}, 5, 5);
uv_udp_init(loop, &m_socket);
m_socket.data = this;
@@ -174,6 +174,10 @@ void EQ::Net::DaybreakConnectionManager::ProcessResend()
void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoint, int port, const char *data, size_t size)
{
if (m_options.simulated_in_packet_loss && m_options.simulated_in_packet_loss >= m_rand.Int(0, 100)) {
return;
}
if (size < DaybreakHeader::size()) {
Log.OutF(Logs::Detail, Logs::Netcode, "Packet of size {0} which is less than {1}", size, DaybreakHeader::size());
return;
@@ -200,7 +204,7 @@ void EQ::Net::DaybreakConnectionManager::ProcessPacket(const std::string &endpoi
connection->ProcessPacket(p);
}
else if(data[1] != OP_OutOfSession) {
SendSessionLost(endpoint, port);
SendDisconnect(endpoint, port);
}
}
}
@@ -220,11 +224,12 @@ std::shared_ptr<EQ::Net::DaybreakConnection> EQ::Net::DaybreakConnectionManager:
return nullptr;
}
void EQ::Net::DaybreakConnectionManager::SendSessionLost(const std::string &addr, int port)
void EQ::Net::DaybreakConnectionManager::SendDisconnect(const std::string &addr, int port)
{
DaybreakHeader header;
DaybreakDisconnect header;
header.zero = 0;
header.opcode = OP_OutOfSession;
header.opcode = OP_SessionDisconnect;
header.connect_code = 0;
WritablePacket out;
out.PutSerialize(0, header);
@@ -336,28 +341,28 @@ void EQ::Net::DaybreakConnection::ResetStats()
void EQ::Net::DaybreakConnection::Process()
{
m_resend_delay = (size_t)(m_rolling_ping * 1.33);
try {
m_resend_delay = (size_t)(m_stats.last_stat_ping * m_owner->m_options.resend_delay_factor) + m_owner->m_options.resend_delay_ms;
if (m_resend_delay > 1000) {
m_resend_delay = 1000;
}
if (m_resend_delay < 15) {
m_resend_delay = 15;
auto now = Clock::now();
auto time_since_hold = (size_t)std::chrono::duration_cast<std::chrono::milliseconds>(now - m_hold_time).count();
if (time_since_hold >= m_owner->m_options.hold_length_ms) {
FlushBuffer();
}
ProcessQueue();
auto time_since_stats = (size_t)std::chrono::duration_cast<std::chrono::milliseconds>(now - m_last_stats).count();
if (m_owner->m_options.stats_delay_ms > 0 && time_since_stats >= m_owner->m_options.stats_delay_ms) {
SendStatSync();
m_last_stats = now;
}
}
if (m_resend_delay > 1000) {
m_resend_delay = 1000;
}
auto now = Clock::now();
auto time_since_hold = (size_t)std::chrono::duration_cast<std::chrono::milliseconds>(now - m_hold_time).count();
if (time_since_hold >= m_owner->m_options.hold_length_ms) {
FlushBuffer();
}
ProcessQueue();
auto time_since_stats = (size_t)std::chrono::duration_cast<std::chrono::milliseconds>(now - m_last_stats).count();
if (m_owner->m_options.stats_delay_ms > 0 && time_since_stats >= m_owner->m_options.stats_delay_ms) {
SendStatSync();
m_last_stats = now;
catch (std::exception ex) {
Log.OutF(Logs::Detail, Logs::Netcode, "Error processing connection: {0}", ex.what());
}
}
@@ -954,10 +959,19 @@ void EQ::Net::DaybreakConnection::ProcessResend(int stream)
auto s = &m_streams[stream];
for (auto &entry : s->sent_packets) {
auto time_since_last_send = std::chrono::duration_cast<std::chrono::milliseconds>(now - entry.second.last_sent);
if ((size_t)time_since_last_send.count() > m_resend_delay) {
InternalBufferedSend(entry.second.packet);
entry.second.last_sent = now;
entry.second.times_resent++;
if(entry.second.times_resent == 0) {
if ((size_t)time_since_last_send.count() > m_resend_delay) {
InternalBufferedSend(entry.second.packet);
entry.second.last_sent = now;
entry.second.times_resent++;
}
}
else {
if ((size_t)time_since_last_send.count() > std::min(m_resend_delay / (entry.second.times_resent + 1), 5ULL)) {
InternalBufferedSend(entry.second.packet);
entry.second.last_sent = now;
entry.second.times_resent++;
}
}
}
}
@@ -1042,6 +1056,7 @@ void EQ::Net::DaybreakConnection::SendStatSync()
request.packets_sent = m_stats.sent_packets + 1;
request.packets_recv = m_stats.recv_packets;
m_last_session_stats = Clock::now();
//InternalSend()
}
void EQ::Net::DaybreakConnection::InternalBufferedSend(Packet &p)
@@ -1130,9 +1145,14 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
send_buffers[0].base = (char*)out.Data();
send_buffers[0].len = out.Length();
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
m_stats.sent_bytes += out.Length();
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete send_req;
return;
}
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
return;
}
@@ -1144,9 +1164,15 @@ void EQ::Net::DaybreakConnection::InternalSend(Packet &p)
send_buffers[0].base = (char*)p.Data();
send_buffers[0].len = p.Length();
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
m_stats.sent_bytes += p.Length();
m_stats.sent_packets++;
if (m_owner->m_options.simulated_out_packet_loss && m_owner->m_options.simulated_out_packet_loss >= m_owner->m_rand.Int(0, 100)) {
delete send_req;
return;
}
uv_udp_send(send_req, &m_owner->m_socket, send_buffers, 1, (sockaddr*)&send_addr, send_func);
}
void EQ::Net::DaybreakConnection::InternalQueuePacket(Packet &p, int stream_id, bool reliable)