Bitcoin ABC 0.30.5
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1// Copyright (c) 2012-2018 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_CHECKQUEUE_H
6#define BITCOIN_CHECKQUEUE_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/threadnames.h>
11
12#include <algorithm>
13#include <iterator>
14#include <vector>
15
16template <typename T> class CCheckQueueControl;
17
28template <typename T> class CCheckQueue {
29private:
32
34 std::condition_variable m_worker_cv;
35
37 std::condition_variable m_master_cv;
38
41 std::vector<T> queue GUARDED_BY(m_mutex);
42
44 int nIdle GUARDED_BY(m_mutex){0};
45
47 int nTotal GUARDED_BY(m_mutex){0};
48
50 bool fAllOk GUARDED_BY(m_mutex){true};
51
57 unsigned int nTodo GUARDED_BY(m_mutex){0};
58
60 const unsigned int nBatchSize;
61
62 std::vector<std::thread> m_worker_threads;
63 bool m_request_stop GUARDED_BY(m_mutex){false};
64
66 bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
67 std::condition_variable &cond = fMaster ? m_master_cv : m_worker_cv;
68 std::vector<T> vChecks;
69 vChecks.reserve(nBatchSize);
70 unsigned int nNow = 0;
71 bool fOk = true;
72 do {
73 {
74 WAIT_LOCK(m_mutex, lock);
75 // first do the clean-up of the previous loop run (allowing us
76 // to do it in the same critsect)
77 if (nNow) {
78 fAllOk &= fOk;
79 nTodo -= nNow;
80 if (nTodo == 0 && !fMaster) {
81 // We processed the last element; inform the master it
82 // can exit and return the result
83 m_master_cv.notify_one();
84 }
85 } else {
86 // first iteration
87 nTotal++;
88 }
89 // logically, the do loop starts here
90 while (queue.empty() && !m_request_stop) {
91 if (fMaster && nTodo == 0) {
92 nTotal--;
93 bool fRet = fAllOk;
94 // reset the status for new work later
95 fAllOk = true;
96 // return the current status
97 return fRet;
98 }
99 nIdle++;
100 cond.wait(lock); // wait
101 nIdle--;
102 }
103 if (m_request_stop) {
104 return false;
105 }
106
107 // Decide how many work units to process now.
108 // * Do not try to do everything at once, but aim for
109 // increasingly smaller batches so all workers finish
110 // approximately simultaneously.
111 // * Try to account for idle jobs which will instantly start
112 // helping.
113 // * Don't do batches smaller than 1 (duh), or larger than
114 // nBatchSize.
115 nNow = std::max(
116 1U, std::min(nBatchSize, (unsigned int)queue.size() /
117 (nTotal + nIdle + 1)));
118 auto start_it = queue.end() - nNow;
119 vChecks.assign(std::make_move_iterator(start_it),
120 std::make_move_iterator(queue.end()));
121 queue.erase(start_it, queue.end());
122 // Check whether we need to do work at all
123 fOk = fAllOk;
124 }
125 // execute work
126 for (T &check : vChecks) {
127 if (fOk) {
128 fOk = check();
129 }
130 }
131 vChecks.clear();
132 } while (true);
133 }
134
135public:
138
140 explicit CCheckQueue(unsigned int nBatchSizeIn)
141 : nBatchSize(nBatchSizeIn) {}
142
144 void StartWorkerThreads(const int threads_num)
146 {
147 LOCK(m_mutex);
148 nIdle = 0;
149 nTotal = 0;
150 fAllOk = true;
151 }
152 assert(m_worker_threads.empty());
153 for (int n = 0; n < threads_num; ++n) {
154 m_worker_threads.emplace_back([this, n]() {
155 util::ThreadRename(strprintf("scriptch.%i", n));
156 Loop(false /* worker thread */);
157 });
158 }
159 }
160
164 return Loop(true /* master thread */);
165 }
166
168 void Add(std::vector<T> &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
169 LOCK(m_mutex);
170 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()),
171 std::make_move_iterator(vChecks.end()));
172 nTodo += vChecks.size();
173 if (vChecks.size() == 1) {
174 m_worker_cv.notify_one();
175 } else if (vChecks.size() > 1) {
176 m_worker_cv.notify_all();
177 }
178 }
179
182 WITH_LOCK(m_mutex, m_request_stop = true);
183 m_worker_cv.notify_all();
184 for (std::thread &t : m_worker_threads) {
185 t.join();
186 }
187 m_worker_threads.clear();
188 WITH_LOCK(m_mutex, m_request_stop = false);
189 }
190
192};
193
198template <typename T> class CCheckQueueControl {
199private:
201 bool fDone;
202
203public:
207 explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn)
208 : pqueue(pqueueIn), fDone(false) {
209 // passed queue is supposed to be unused, or nullptr
210 if (pqueue != nullptr) {
211 ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
212 }
213 }
214
215 bool Wait() {
216 if (pqueue == nullptr) {
217 return true;
218 }
219 bool fRet = pqueue->Wait();
220 fDone = true;
221 return fRet;
222 }
223
224 void Add(std::vector<T> &&vChecks) {
225 if (pqueue != nullptr) {
226 pqueue->Add(std::move(vChecks));
227 }
228 }
229
231 if (!fDone) {
232 Wait();
233 }
234 if (pqueue != nullptr) {
235 LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
236 }
237 }
238};
239
240#endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:198
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:200
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:207
CCheckQueueControl()=delete
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:224
CCheckQueueControl(const CCheckQueueControl &)=delete
Queue for verifications that have to be performed.
Definition: checkqueue.h:28
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:60
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:37
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:50
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:63
bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:66
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:47
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:62
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:137
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:34
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:31
bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:163
void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Create a pool of new worker threads.
Definition: checkqueue.h:144
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:140
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:168
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:44
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:57
void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all of the worker threads.
Definition: checkqueue.h:181
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:48
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:320
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:326
#define LOCK(cs)
Definition: sync.h:306
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:357
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:56
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1202
assert(!tx.IsCoinBase())