Bitcoin ABC 0.30.5
P2P Digital Currency
rcu.cpp
Go to the documentation of this file.
1// Copyright (c) 2018-2019 The Bitcoin developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <rcu.h>
6
7#include <sync.h>
8
9#include <algorithm>
10#include <chrono>
11#include <condition_variable>
12
13std::atomic<uint64_t> RCUInfos::revision{0};
14thread_local RCUInfos RCUInfos::infos{};
15
19static constexpr int RCU_ACTIVE_LOOP_COUNT = 10;
20
104static std::atomic<RCUInfos *> threadInfos{nullptr};
106
107RCUInfos::RCUInfos() : state(0), next(nullptr) {
108 RCUInfos *head = threadInfos.load();
109 do {
110 next.store(head);
111 } while (!threadInfos.compare_exchange_weak(head, this));
112
113 // Release the lock.
114 readFree();
115}
116
122 runCleanups();
123 while (cleanups.size() > 0) {
124 synchronize();
125 }
126
127 while (true) {
129
130 std::atomic<RCUInfos *> *ptr;
131
132 {
133 RCULock lock(this);
134 ptr = &threadInfos;
135 while (true) {
136 RCUInfos *current = ptr->load();
137 if (current == this) {
138 break;
139 }
140
141 assert(current != nullptr);
142 ptr = &current->next;
143 }
144 }
145
153 RCUInfos *current = this;
154 if (!ptr->compare_exchange_strong(current, next.load())) {
155 continue;
156 }
157
164 synchronize();
165 break;
166 }
167}
168
170 uint64_t syncRev = ++revision;
171
172 // Loop a few time lock free.
173 for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) {
174 runCleanups();
175 if (cleanups.empty() && hasSyncedTo(syncRev)) {
176 return;
177 }
178 }
179
180 // It seems like we have some contention. Let's try to not starve the
181 // system. Let's make sure threads that land here proceed one by one.
182 // XXX: The best option long term is most likely to use a futex on one of
183 // the thread causing synchronization delay so this thread can be waked up
184 // at an appropriate time.
185 static std::condition_variable cond;
186 static Mutex cs;
187 WAIT_LOCK(cs, lock);
188
189 do {
190 runCleanups();
191 cond.notify_one();
192 } while (!cond.wait_for(lock, std::chrono::microseconds(1), [&] {
193 return cleanups.empty() && hasSyncedTo(syncRev);
194 }));
195}
196
199
200public:
201 explicit RCUCleanupGuard(RCUInfos *infosIn) : infos(infosIn) {
202 infos->isCleaningUp = true;
203 }
204
206};
207
209 if (isCleaningUp || cleanups.empty()) {
210 // We don't want to run cleanups within cleanups.
211 return;
212 }
213
214 RCUCleanupGuard guard(this);
215
216 // By the time we run a set of cleanups, we may have more cleanups
217 // available so we loop until there is nothing available for cleanup.
218 while (!cleanups.empty()) {
219 auto it = cleanups.begin();
220 uint64_t syncedTo = hasSyncedTo(it->first);
221 if (it->first > syncedTo) {
222 // We have nothing more ready to be cleaned up.
223 return;
224 }
225
226 while (it != cleanups.end() && it->first <= syncedTo) {
227 // Run the cleanup and remove it from the map.
228 auto fun = std::move(it->second);
229 cleanups.erase(it++);
230 fun();
231 }
232 }
233}
234
235uint64_t RCUInfos::hasSyncedTo(uint64_t cutoff) {
236 uint64_t syncedTo = revision.load();
237
238 // Go over the list and check all threads are past the synchronization
239 // point.
240 RCULock lock(this);
241 RCUInfos *current = threadInfos.load();
242 while (current != nullptr) {
243 syncedTo = std::min(syncedTo, current->state.load());
244 if (syncedTo < cutoff) {
245 return 0;
246 }
247
248 current = current->next.load();
249 }
250
251 return syncedTo;
252}
RCUCleanupGuard(RCUInfos *infosIn)
Definition: rcu.cpp:201
Definition: rcu.h:20
uint64_t hasSyncedTo(uint64_t cutoff=UNLOCKED)
Definition: rcu.cpp:235
std::atomic< RCUInfos * > next
Definition: rcu.h:22
void runCleanups()
Definition: rcu.cpp:208
~RCUInfos()
Definition: rcu.cpp:117
static thread_local RCUInfos infos
Definition: rcu.h:58
std::map< uint64_t, std::function< void()> > cleanups
Definition: rcu.h:27
std::atomic< uint64_t > state
Definition: rcu.h:21
void readFree()
Definition: rcu.h:40
static std::atomic< uint64_t > revision
Definition: rcu.h:57
void synchronize()
Definition: rcu.cpp:169
bool isCleaningUp
Definition: rcu.h:24
RCUInfos()
Definition: rcu.cpp:107
Definition: rcu.h:61
static void pool cs
static std::atomic< RCUInfos * > threadInfos
We maintain a linked list of all the RCUInfos for each active thread.
Definition: rcu.cpp:104
static RecursiveMutex csThreadInfosDelete
Definition: rcu.cpp:105
static constexpr int RCU_ACTIVE_LOOP_COUNT
How many time a busy loop runs before yelding.
Definition: rcu.cpp:19
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define LOCK(cs)
Definition: sync.h:306
assert(!tx.IsCoinBase())