diff --git a/common/eq_stream_intf.h b/common/eq_stream_intf.h index 6224a8cbd..3a7e85dfa 100644 --- a/common/eq_stream_intf.h +++ b/common/eq_stream_intf.h @@ -50,6 +50,12 @@ struct EQStreamManagerInterfaceOptions EQ::EventLoop *loop; }; +enum EQStreamPriority : int32_t { + High, + Normal, + Low +}; + class EQStreamInterface; class EQStreamManagerInterface { @@ -62,6 +68,7 @@ public: virtual void OnNewConnection(std::function)> func) = 0; virtual void OnConnectionStateChange(std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) = 0; + virtual void SetPriority(EQStreamPriority priority) = 0; protected: EQStreamManagerInterfaceOptions m_options; }; diff --git a/common/net/eqstream.h b/common/net/eqstream.h index 2bf7beb3a..7881c2ff3 100644 --- a/common/net/eqstream.h +++ b/common/net/eqstream.h @@ -21,6 +21,7 @@ namespace EQ virtual void OnNewConnection(std::function)> func) { m_on_new_connection = func; } virtual void OnConnectionStateChange(std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func) { m_on_connection_state_change = func; } + virtual void SetPriority(EQStreamPriority priority) { } private: DaybreakConnectionManager m_daybreak; std::function)> m_on_new_connection; diff --git a/common/net/eqstream_concurrent.cpp b/common/net/eqstream_concurrent.cpp index 0212eceae..fcf7153fc 100644 --- a/common/net/eqstream_concurrent.cpp +++ b/common/net/eqstream_concurrent.cpp @@ -27,6 +27,7 @@ struct EQ::Net::ConcurrentEQStreamManager::Impl std::unordered_map> streams; std::function)> on_new_connection; std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> on_connection_state_change; + EQStreamPriority priority; }; EQ::Net::ConcurrentEQStreamManager::ConcurrentEQStreamManager(const EQStreamManagerInterfaceOptions &options) @@ -36,6 +37,7 @@ EQ::Net::ConcurrentEQStreamManager::ConcurrentEQStreamManager(const EQStreamMana _impl->background = std::thread(std::bind(&ConcurrentEQStreamManager::_BackgroundThread, this)); _impl->foreground_loop_timer.reset(new EQ::Timer(options.loop, 16, true, std::bind(&ConcurrentEQStreamManager::_ForegroundTimer, this, std::placeholders::_1))); + _impl->priority = EQStreamPriority::High; } EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager() @@ -48,7 +50,7 @@ EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager() //Tell the background to shutdown and wait for it to actually do so ceqs_terminate_msg_t msg; - msg.type = TerminateBackground; + msg.type = ceqs_msg_type::TerminateBackground; _PushToBackgroundQueue((ceqs_msg_t*)&msg); _impl->background.join(); @@ -57,7 +59,7 @@ EQ::Net::ConcurrentEQStreamManager::~ConcurrentEQStreamManager() ceqs_msg_t eqs_msg; while (_impl->foreground_queue.try_dequeue(eqs_msg)) { - if (eqs_msg.type == PacketRecv) { + if (eqs_msg.type == ceqs_msg_type::PacketRecv) { ceqs_packet_recv_msg_t *eqs_msg_in = (ceqs_packet_recv_msg_t*)&eqs_msg; delete eqs_msg_in->packet; @@ -85,7 +87,18 @@ void EQ::Net::ConcurrentEQStreamManager::_BackgroundThread() { while (true == _impl->background_running) { loop.Process(); - Sleep(1); + + switch (_impl->priority) { + case EQStreamPriority::Low: + Sleep(10); + break; + case EQStreamPriority::Normal: + Sleep(5); + break; + case EQStreamPriority::High: + Sleep(1); + break; + } } _impl->background_loop_timer.release(); @@ -94,7 +107,7 @@ void EQ::Net::ConcurrentEQStreamManager::_BackgroundThread() { ceqs_msg_t eqs_msg; while (_impl->background_queue.try_dequeue(eqs_msg)) { - if (eqs_msg.type == QueuePacket) { + if (eqs_msg.type == ceqs_msg_type::QueuePacket) { ceqs_queue_packet_msg_t *eqs_msg_in = (ceqs_queue_packet_msg_t*)&eqs_msg; delete eqs_msg_in->packet; } @@ -142,7 +155,7 @@ void EQ::Net::ConcurrentEQStreamManager::_BackgroundUpdateStatsTimer(EQ::Timer * void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_msg_t &msg) { switch (msg.type) { - case QueuePacket: + case ceqs_msg_type::QueuePacket: { ceqs_queue_packet_msg_t *msg_in = (ceqs_queue_packet_msg_t*)&msg; @@ -154,12 +167,12 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms delete msg_in->packet; break; } - case TerminateBackground: + case ceqs_msg_type::TerminateBackground: { _impl->background_running = false; break; } - case CloseConnection: + case ceqs_msg_type::CloseConnection: { ceqs_close_connection_msg_t *msg_in = (ceqs_close_connection_msg_t*)&msg; auto iter = _impl->connections.find(msg_in->stream_id); @@ -168,7 +181,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms } break; } - case ResetStats: + case ceqs_msg_type::ResetStats: { ceqs_reset_stats_msg_t *msg_in = (ceqs_reset_stats_msg_t*)&msg; auto iter = _impl->connections.find(msg_in->stream_id); @@ -177,6 +190,12 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessBackgroundMessage(const ceqs_ms } break; } + case ceqs_msg_type::SetPriority: + { + ceqs_set_priority_msg_t *msg_in = (ceqs_set_priority_msg_t*)&msg; + _impl->priority = msg_in->priority; + break; + } default: break; } @@ -198,7 +217,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ForegroundTimer(EQ::Timer *t) void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_msg_t &msg) { switch (msg.type) { - case NewConnection: + case ceqs_msg_type::NewConnection: { ceqs_new_connection_msg_t *msg_in = (ceqs_new_connection_msg_t*)&msg; @@ -214,7 +233,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms } break; } - case ConnectionStateChange: + case ceqs_msg_type::ConnectionStateChange: { ceqs_connection_state_change_msg_t *msg_in = (ceqs_connection_state_change_msg_t*)&msg; @@ -228,7 +247,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms } break; } - case PacketRecv: + case ceqs_msg_type::PacketRecv: { ceqs_packet_recv_msg_t *msg_in = (ceqs_packet_recv_msg_t*)&msg; std::unique_ptr p(msg_in->packet); @@ -239,7 +258,7 @@ void EQ::Net::ConcurrentEQStreamManager::_ProcessForegroundMessage(const ceqs_ms } break; } - case UpdateStats: + case ceqs_msg_type::UpdateStats: { ceqs_update_stats_msg_t *msg_in = (ceqs_update_stats_msg_t*)&msg; auto iter = _impl->streams.find(msg_in->stream_id); @@ -275,6 +294,15 @@ void EQ::Net::ConcurrentEQStreamManager::OnConnectionStateChange(std::functionon_connection_state_change = func; } +//Called by foreground +void EQ::Net::ConcurrentEQStreamManager::SetPriority(EQStreamPriority priority) +{ + ceqs_set_priority_msg_t msg; + msg.type = ceqs_msg_type::SetPriority; + msg.priority = priority; + _PushToBackgroundQueue((ceqs_msg_t*)&msg); +} + //Called by background void EQ::Net::ConcurrentEQStreamManager::DaybreakNewConnection(std::shared_ptr connection) { @@ -451,7 +479,7 @@ void EQ::Net::ConcurrentEQStream::Close() } ceqs_close_connection_msg_t msg; - msg.type = CloseConnection; + msg.type = ceqs_msg_type::CloseConnection; msg.stream_id = _impl->id; _impl->parent->_PushToBackgroundQueue((ceqs_msg_t*)&msg); diff --git a/common/net/eqstream_concurrent.h b/common/net/eqstream_concurrent.h index bbec7c19f..d2b424fe8 100644 --- a/common/net/eqstream_concurrent.h +++ b/common/net/eqstream_concurrent.h @@ -18,6 +18,7 @@ namespace EQ virtual void OnNewConnection(std::function)> func); virtual void OnConnectionStateChange(std::function, EQ::Net::DbProtocolStatus, EQ::Net::DbProtocolStatus)> func); + virtual void SetPriority(EQStreamPriority priority); void _PushToBackgroundQueue(ceqs_msg_t* msg); void _PushToForegroundQueue(ceqs_msg_t* msg); diff --git a/common/net/eqstream_concurrent_message.h b/common/net/eqstream_concurrent_message.h index c50cc2b46..efe5c291d 100644 --- a/common/net/eqstream_concurrent_message.h +++ b/common/net/eqstream_concurrent_message.h @@ -18,7 +18,8 @@ namespace EQ QueuePacket, TerminateBackground, CloseConnection, - ResetStats + ResetStats, + SetPriority }; typedef struct @@ -85,5 +86,10 @@ namespace EQ ceqs_msg_type type; } ceqs_terminate_msg_t; + typedef struct + { + ceqs_msg_type type; + EQStreamPriority priority; + } ceqs_set_priority_msg_t; } } diff --git a/zone/net.cpp b/zone/net.cpp index 85f32a371..dc28a7dbf 100644 --- a/zone/net.cpp +++ b/zone/net.cpp @@ -594,10 +594,12 @@ int main(int argc, char** argv) { if (previous_loaded && !current_loaded) { process_timer.Stop(); process_timer.Start(1000, true); + eqsm->SetPriority(EQStreamPriority::Low); } else if (!previous_loaded && current_loaded) { process_timer.Stop(); process_timer.Start(32, true); + eqsm->SetPriority(EQStreamPriority::High); } if (current_loaded) {