Bitcoin ABC  0.28.12
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <sync.h>
8 
9 #include <cassert>
10 #include <chrono>
11 #include <functional>
12 #include <utility>
13 
15 
17  assert(nThreadsServicingQueue == 0);
18  if (stopWhenEmpty) {
19  assert(taskQueue.empty());
20  }
21 }
22 
24  WAIT_LOCK(newTaskMutex, lock);
25  ++nThreadsServicingQueue;
26 
27  // newTaskMutex is locked throughout this loop EXCEPT when the thread is
28  // waiting or when the user's function is called.
29  while (!shouldStop()) {
30  try {
31  while (!shouldStop() && taskQueue.empty()) {
32  // Wait until there is something to do.
33  newTaskScheduled.wait(lock);
34  }
35 
36  // Wait until either there is a new task, or until
37  // the time of the first item on the queue:
38 
39  while (!shouldStop() && !taskQueue.empty()) {
40  std::chrono::steady_clock::time_point timeToWaitFor =
41  taskQueue.begin()->first;
42  if (newTaskScheduled.wait_until(lock, timeToWaitFor) ==
43  std::cv_status::timeout) {
44  // Exit loop after timeout, it means we reached the time of
45  // the event
46  break;
47  }
48  }
49 
50  // If there are multiple threads, the queue can empty while we're
51  // waiting (another thread may service the task we were waiting on).
52  if (shouldStop() || taskQueue.empty()) {
53  continue;
54  }
55 
56  Function f = taskQueue.begin()->second;
57  taskQueue.erase(taskQueue.begin());
58 
59  {
60  // Unlock before calling f, so it can reschedule itself or
61  // another task without deadlocking:
62  REVERSE_LOCK(lock);
63  f();
64  }
65  } catch (...) {
66  --nThreadsServicingQueue;
67  throw;
68  }
69  }
70  --nThreadsServicingQueue;
71  newTaskScheduled.notify_one();
72 }
73 
75  std::chrono::steady_clock::time_point t) {
76  {
78  taskQueue.insert(std::make_pair(t, f));
79  }
80  newTaskScheduled.notify_one();
81 }
82 
83 void CScheduler::MockForward(std::chrono::seconds delta_seconds) {
84  assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
85 
86  {
88 
89  // use temp_queue to maintain updated schedule
90  std::multimap<std::chrono::steady_clock::time_point, Function>
91  temp_queue;
92 
93  for (const auto &element : taskQueue) {
94  temp_queue.emplace_hint(temp_queue.cend(),
95  element.first - delta_seconds,
96  element.second);
97  }
98 
99  // point taskQueue to temp_queue
100  taskQueue = std::move(temp_queue);
101  }
102 
103  // notify that the taskQueue needs to be processed
104  newTaskScheduled.notify_one();
105 }
106 
108  std::chrono::milliseconds delta) {
109  if (p()) {
110  s.scheduleFromNow([=, &s] { Repeat(s, p, delta); }, delta);
111  }
112 }
113 
115  std::chrono::milliseconds delta) {
116  scheduleFromNow([this, p, delta] { Repeat(*this, p, delta); }, delta);
117 }
118 
119 size_t
120 CScheduler::getQueueInfo(std::chrono::steady_clock::time_point &first,
121  std::chrono::steady_clock::time_point &last) const {
123  size_t result = taskQueue.size();
124  if (!taskQueue.empty()) {
125  first = taskQueue.begin()->first;
126  last = taskQueue.rbegin()->first;
127  }
128  return result;
129 }
130 
133  return nThreadsServicingQueue;
134 }
135 
137  {
139  // Try to avoid scheduling too many copies here, but if we
140  // accidentally have two ProcessQueue's scheduled at once its
141  // not a big deal.
142  if (m_are_callbacks_running) {
143  return;
144  }
145  if (m_callbacks_pending.empty()) {
146  return;
147  }
148  }
149  m_scheduler.schedule([this] { this->ProcessQueue(); },
150  std::chrono::steady_clock::now());
151 }
152 
154  std::function<void()> callback;
155  {
157  if (m_are_callbacks_running) {
158  return;
159  }
160  if (m_callbacks_pending.empty()) {
161  return;
162  }
163  m_are_callbacks_running = true;
164 
165  callback = std::move(m_callbacks_pending.front());
166  m_callbacks_pending.pop_front();
167  }
168 
169  // RAII the setting of fCallbacksRunning and calling
170  // MaybeScheduleProcessQueue to ensure both happen safely even if callback()
171  // throws.
172  struct RAIICallbacksRunning {
174  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance)
175  : instance(_instance) {}
176  ~RAIICallbacksRunning() {
177  {
178  LOCK(instance->m_callbacks_mutex);
179  instance->m_are_callbacks_running = false;
180  }
181  instance->MaybeScheduleProcessQueue();
182  }
183  } raiicallbacksrunning(this);
184 
185  callback();
186 }
187 
189  std::function<void()> func) {
190  {
192  m_callbacks_pending.emplace_back(std::move(func));
193  }
195 }
196 
199  bool should_continue = true;
200  while (should_continue) {
201  ProcessQueue();
203  should_continue = !m_callbacks_pending.empty();
204  }
205 }
206 
209  return m_callbacks_pending.size();
210 }
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:41
void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:83
void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Services the queue 'forever'.
Definition: scheduler.cpp:23
std::function< bool()> Predicate
Definition: scheduler.h:49
void scheduleEvery(Predicate p, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Repeat p until it return false.
Definition: scheduler.cpp:114
size_t getQueueInfo(std::chrono::steady_clock::time_point &first, std::chrono::steady_clock::time_point &last) const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:120
std::function< void()> Function
Definition: scheduler.h:48
bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:131
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:128
std::condition_variable newTaskScheduled
Definition: scheduler.h:122
Mutex newTaskMutex
Definition: scheduler.h:121
void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call f once after the delta has passed.
Definition: scheduler.h:56
void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call func at/after time t.
Definition: scheduler.cpp:74
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:143
void AddToProcessQueue(std::function< void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Add a callback to be executed.
Definition: scheduler.cpp:188
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:153
size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:207
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Processes all remaining queue members on the calling thread, blocking until queue is empty.
Definition: scheduler.cpp:197
void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:136
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
Definition: scheduler.cpp:107
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define LOCK(cs)
Definition: sync.h:306
#define REVERSE_LOCK(g)
Definition: sync.h:265
assert(!tx.IsCoinBase())