Bitcoin ABC 0.30.5
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2016 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <scheduler.h>
6
7#include <sync.h>
8
9#include <cassert>
10#include <chrono>
11#include <functional>
12#include <utility>
13
15
17 assert(nThreadsServicingQueue == 0);
18 if (stopWhenEmpty) {
19 assert(taskQueue.empty());
20 }
21}
22
25 ++nThreadsServicingQueue;
26
27 // newTaskMutex is locked throughout this loop EXCEPT when the thread is
28 // waiting or when the user's function is called.
29 while (!shouldStop()) {
30 try {
31 while (!shouldStop() && taskQueue.empty()) {
32 // Wait until there is something to do.
33 newTaskScheduled.wait(lock);
34 }
35
36 // Wait until either there is a new task, or until
37 // the time of the first item on the queue:
38
39 while (!shouldStop() && !taskQueue.empty()) {
40 std::chrono::steady_clock::time_point timeToWaitFor =
41 taskQueue.begin()->first;
42 if (newTaskScheduled.wait_until(lock, timeToWaitFor) ==
43 std::cv_status::timeout) {
44 // Exit loop after timeout, it means we reached the time of
45 // the event
46 break;
47 }
48 }
49
50 // If there are multiple threads, the queue can empty while we're
51 // waiting (another thread may service the task we were waiting on).
52 if (shouldStop() || taskQueue.empty()) {
53 continue;
54 }
55
56 Function f = taskQueue.begin()->second;
57 taskQueue.erase(taskQueue.begin());
58
59 {
60 // Unlock before calling f, so it can reschedule itself or
61 // another task without deadlocking:
62 REVERSE_LOCK(lock);
63 f();
64 }
65 } catch (...) {
66 --nThreadsServicingQueue;
67 throw;
68 }
69 }
70 --nThreadsServicingQueue;
71 newTaskScheduled.notify_one();
72}
73
75 std::chrono::steady_clock::time_point t) {
76 {
78 taskQueue.insert(std::make_pair(t, f));
79 }
80 newTaskScheduled.notify_one();
81}
82
83void CScheduler::MockForward(std::chrono::seconds delta_seconds) {
84 assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
85
86 {
88
89 // use temp_queue to maintain updated schedule
90 std::multimap<std::chrono::steady_clock::time_point, Function>
91 temp_queue;
92
93 for (const auto &element : taskQueue) {
94 temp_queue.emplace_hint(temp_queue.cend(),
95 element.first - delta_seconds,
96 element.second);
97 }
98
99 // point taskQueue to temp_queue
100 taskQueue = std::move(temp_queue);
101 }
102
103 // notify that the taskQueue needs to be processed
104 newTaskScheduled.notify_one();
105}
106
108 std::chrono::milliseconds delta) {
109 if (p()) {
110 s.scheduleFromNow([=, &s] { Repeat(s, p, delta); }, delta);
111 }
112}
113
115 std::chrono::milliseconds delta) {
116 scheduleFromNow([this, p, delta] { Repeat(*this, p, delta); }, delta);
117}
118
119size_t
120CScheduler::getQueueInfo(std::chrono::steady_clock::time_point &first,
121 std::chrono::steady_clock::time_point &last) const {
123 size_t result = taskQueue.size();
124 if (!taskQueue.empty()) {
125 first = taskQueue.begin()->first;
126 last = taskQueue.rbegin()->first;
127 }
128 return result;
129}
130
133 return nThreadsServicingQueue;
134}
135
137 {
139 // Try to avoid scheduling too many copies here, but if we
140 // accidentally have two ProcessQueue's scheduled at once its
141 // not a big deal.
142 if (m_are_callbacks_running) {
143 return;
144 }
145 if (m_callbacks_pending.empty()) {
146 return;
147 }
148 }
149 m_scheduler.schedule([this] { this->ProcessQueue(); },
150 std::chrono::steady_clock::now());
151}
152
154 std::function<void()> callback;
155 {
157 if (m_are_callbacks_running) {
158 return;
159 }
160 if (m_callbacks_pending.empty()) {
161 return;
162 }
163 m_are_callbacks_running = true;
164
165 callback = std::move(m_callbacks_pending.front());
166 m_callbacks_pending.pop_front();
167 }
168
169 // RAII the setting of fCallbacksRunning and calling
170 // MaybeScheduleProcessQueue to ensure both happen safely even if callback()
171 // throws.
172 struct RAIICallbacksRunning {
174 explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance)
175 : instance(_instance) {}
176 ~RAIICallbacksRunning() {
177 {
178 LOCK(instance->m_callbacks_mutex);
179 instance->m_are_callbacks_running = false;
180 }
181 instance->MaybeScheduleProcessQueue();
182 }
183 } raiicallbacksrunning(this);
184
185 callback();
186}
187
189 std::function<void()> func) {
190 {
192 m_callbacks_pending.emplace_back(std::move(func));
193 }
195}
196
199 bool should_continue = true;
200 while (should_continue) {
201 ProcessQueue();
203 should_continue = !m_callbacks_pending.empty();
204 }
205}
206
209 return m_callbacks_pending.size();
210}
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:41
void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:83
void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Services the queue 'forever'.
Definition: scheduler.cpp:23
std::function< bool()> Predicate
Definition: scheduler.h:49
void scheduleEvery(Predicate p, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Repeat p until it return false.
Definition: scheduler.cpp:114
size_t getQueueInfo(std::chrono::steady_clock::time_point &first, std::chrono::steady_clock::time_point &last) const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:120
std::function< void()> Function
Definition: scheduler.h:48
bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:131
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:128
std::condition_variable newTaskScheduled
Definition: scheduler.h:122
Mutex newTaskMutex
Definition: scheduler.h:121
void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call f once after the delta has passed.
Definition: scheduler.h:56
void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call func at/after time t.
Definition: scheduler.cpp:74
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:143
void AddToProcessQueue(std::function< void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Add a callback to be executed.
Definition: scheduler.cpp:188
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:153
size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:207
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Processes all remaining queue members on the calling thread, blocking until queue is empty.
Definition: scheduler.cpp:197
void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:136
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
Definition: scheduler.cpp:107
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define LOCK(cs)
Definition: sync.h:306
#define REVERSE_LOCK(g)
Definition: sync.h:265
assert(!tx.IsCoinBase())