17 assert(nThreadsServicingQueue == 0);
25 ++nThreadsServicingQueue;
40 std::chrono::steady_clock::time_point timeToWaitFor =
41 taskQueue.begin()->first;
43 std::cv_status::timeout) {
56 Function f = taskQueue.begin()->second;
57 taskQueue.erase(taskQueue.begin());
66 --nThreadsServicingQueue;
70 --nThreadsServicingQueue;
75 std::chrono::steady_clock::time_point t) {
78 taskQueue.insert(std::make_pair(t, f));
84 assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
90 std::multimap<std::chrono::steady_clock::time_point, Function>
93 for (
const auto &element : taskQueue) {
94 temp_queue.emplace_hint(temp_queue.cend(),
95 element.first - delta_seconds,
100 taskQueue = std::move(temp_queue);
108 std::chrono::milliseconds delta) {
115 std::chrono::milliseconds delta) {
121 std::chrono::steady_clock::time_point &last)
const {
123 size_t result = taskQueue.size();
124 if (!taskQueue.empty()) {
125 first = taskQueue.begin()->first;
126 last = taskQueue.rbegin()->first;
133 return nThreadsServicingQueue;
142 if (m_are_callbacks_running) {
145 if (m_callbacks_pending.empty()) {
150 std::chrono::steady_clock::now());
154 std::function<void()> callback;
157 if (m_are_callbacks_running) {
160 if (m_callbacks_pending.empty()) {
163 m_are_callbacks_running =
true;
165 callback = std::move(m_callbacks_pending.front());
166 m_callbacks_pending.pop_front();
172 struct RAIICallbacksRunning {
175 : instance(_instance) {}
176 ~RAIICallbacksRunning() {
179 instance->m_are_callbacks_running =
false;
183 } raiicallbacksrunning(
this);
189 std::function<
void()> func) {
192 m_callbacks_pending.emplace_back(std::move(func));
199 bool should_continue =
true;
200 while (should_continue) {
203 should_continue = !m_callbacks_pending.empty();
209 return m_callbacks_pending.size();
Simple class for background tasks that should be run periodically or once "after a while".
void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Mock the scheduler to fast forward in time.
void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Services the queue 'forever'.
std::function< bool()> Predicate
void scheduleEvery(Predicate p, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Repeat p until it return false.
size_t getQueueInfo(std::chrono::steady_clock::time_point &first, std::chrono::steady_clock::time_point &last) const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns number of tasks waiting to be serviced, and first and last task times.
std::function< void()> Function
bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns true if there are threads actively running in serviceQueue()
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
std::condition_variable newTaskScheduled
void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call f once after the delta has passed.
void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call func at/after time t.
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
void AddToProcessQueue(std::function< void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Add a callback to be executed.
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Processes all remaining queue members on the calling thread, blocking until queue is empty.
void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
#define WAIT_LOCK(cs, name)