Task scheduler test

This commit is contained in:
KimLS 2019-08-04 20:12:22 -07:00
parent f5cfec529e
commit a65d8836af
4 changed files with 166 additions and 0 deletions

View File

@ -213,6 +213,7 @@ SET(common_headers
zone_numbers.h
event/event_loop.h
event/task.h
event/task_scheduler.h
event/timer.h
json/json.h
json/json-forwards.h

View File

@ -0,0 +1,114 @@
#pragma once
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <future>
namespace EQ
{
namespace Event
{
class TaskScheduler
{
public:
static const int DefaultThreadCount = 4;
TaskScheduler() : _running(false)
{
Start(DefaultThreadCount);
}
TaskScheduler(size_t threads) : _running(false)
{
Start(threads);
}
~TaskScheduler() {
Stop();
}
void Start(size_t threads) {
if (true == _running) {
return;
}
_running = true;
for (size_t i = 0; i < threads; ++i) {
_threads.push_back(std::thread(std::bind(&TaskScheduler::ProcessWork, this)));
}
}
void Stop() {
if (false == _running) {
return;
}
{
std::unique_lock<std::mutex> lock(_lock);
_running = false;
}
_cv.notify_all();
for (auto &t : _threads) {
t.join();
}
}
template<typename Fn, typename... Args>
auto Enqueue(Fn&& fn, Args&&... args) -> std::future<typename std::result_of<Fn(Args...)>::type> {
using return_type = typename std::result_of<Fn(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(_lock);
if (false == _running) {
throw std::runtime_error("Enqueue on stopped scheduler.");
}
_tasks.emplace([task]() { (*task)(); });
}
_cv.notify_one();
return res;
}
private:
void ProcessWork() {
for (;;) {
std::function<void()> work;
{
std::unique_lock<std::mutex> lock(_lock);
_cv.wait(lock, [this] { return !_running || !_tasks.empty(); });
if (false == _running) {
return;
}
work = std::move(_tasks.front());
_tasks.pop();
}
work();
}
}
bool _running = true;
std::vector<std::thread> _threads;
std::mutex _lock;
std::condition_variable _cv;
std::queue<std::function<void()>> _tasks;
};
}
}

View File

@ -56,6 +56,7 @@
#include "../common/eqemu_logsys.h"
#include "../common/profanity_manager.h"
#include "../common/net/eqstream.h"
#include "../common/event/task_scheduler.h"
#include "data_bucket.h"
#include "command.h"
@ -78,6 +79,7 @@ extern TaskManager *taskmanager;
extern FastMath g_Math;
void CatchSignal(int sig_num);
EQ::Event::TaskScheduler task_scheduler;
int commandcount; // how many commands we have
@ -395,6 +397,7 @@ int command_init(void)
command_add("tempname", "[newname] - Temporarily renames your target. Leave name blank to restore the original name.", 100, command_tempname) ||
command_add("petname", "[newname] - Temporarily renames your pet. Leave name blank to restore the original name.", 100, command_petname) ||
command_add("test", "Test command", 200, command_test) ||
command_add("test_login", "Test login command", 200, command_test_login) ||
command_add("texture", "[texture] [helmtexture] - Change your or your target's appearance, use 255 to show equipment", 10, command_texture) ||
command_add("time", "[HH] [MM] - Set EQ time", 90, command_time) ||
command_add("timers", "- Display persistent timers for target", 200, command_timers) ||
@ -12424,6 +12427,53 @@ void command_network(Client *c, const Seperator *sep)
}
}
void command_test_login(Client *c, const Seperator *sep) {
auto res = task_scheduler.Enqueue([]() -> bool {
bool running = true;
bool ret = false;
EQ::Net::DaybreakConnectionManager mgr;
std::shared_ptr<EQ::Net::DaybreakConnection> c;
mgr.OnNewConnection([&](std::shared_ptr<EQ::Net::DaybreakConnection> connection) {
c = connection;
});
mgr.OnConnectionStateChange([&](std::shared_ptr<EQ::Net::DaybreakConnection> conn, EQ::Net::DbProtocolStatus from, EQ::Net::DbProtocolStatus to) {
if (EQ::Net::StatusConnected == to) {
EQ::Net::DynamicPacket p;
p.PutUInt16(0, 1); //OP_SessionReady
p.PutUInt32(2, 2);
c->QueuePacket(p);
}
else if (EQ::Net::StatusDisconnected == to) {
running = false;
}
});
mgr.OnPacketRecv([&](std::shared_ptr<EQ::Net::DaybreakConnection> conn, const EQ::Net::Packet & p) {
auto opcode = p.GetUInt16(0);
switch (opcode) {
case 0x0017: //OP_ChatMessage
//Would do the actual sending and parsing here if eqcrypto wasn't in loginserver.
ret = true;
running = false;
break;
}
});
mgr.Connect("127.0.0.1", 5999);
auto &loop = EQ::EventLoop::Get();
while (true == running) {
loop.Process();
}
return ret;
});
c->Message(0, "Result was %s", res.get() ? "true" : "false");
}
// All new code added to command.cpp should be BEFORE this comment line. Do no append code to this file below the BOTS code block.
#ifdef BOTS
#include "bot_command.h"

View File

@ -304,6 +304,7 @@ void command_petname(Client *c, const Seperator *sep);
void command_test(Client *c, const Seperator *sep);
void command_testspawn(Client *c, const Seperator *sep);
void command_testspawnkill(Client *c, const Seperator *sep);
void command_test_login(Client *c, const Seperator *sep);
void command_texture(Client *c, const Seperator *sep);
void command_time(Client *c, const Seperator *sep);
void command_timers(Client *c, const Seperator *sep);