Bitcoin ABC 0.31.0
P2P Digital Currency
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
zmqnotificationinterface.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2018 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
7#include <zmq/zmqutil.h>
8
9#include <zmq.h>
10
11#include <common/args.h>
12#include <kernel/chain.h>
13#include <logging.h>
14#include <primitives/block.h>
15
17
19 Shutdown();
20}
21
22std::list<const CZMQAbstractNotifier *>
24 std::list<const CZMQAbstractNotifier *> result;
25 for (const auto &n : notifiers) {
26 result.push_back(n.get());
27 }
28 return result;
29}
30
31std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(
32 std::function<bool(CBlock &, const CBlockIndex &)> get_block_by_index) {
33 std::map<std::string, CZMQNotifierFactory> factories;
34 factories["pubhashblock"] =
35 CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
36 factories["pubhashtx"] =
37 CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
38 factories["pubrawblock"] =
39 [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
40 return std::make_unique<CZMQPublishRawBlockNotifier>(
41 get_block_by_index);
42 };
43 factories["pubrawtx"] =
44 CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
45 factories["pubsequence"] =
46 CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
47
48 std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
49 for (const auto &entry : factories) {
50 std::string arg("-zmq" + entry.first);
51 const auto &factory = entry.second;
52 for (const std::string &address : gArgs.GetArgs(arg)) {
53 std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
54 notifier->SetType(entry.first);
55 notifier->SetAddress(address);
56 notifier->SetOutboundMessageHighWaterMark(
57 static_cast<int>(gArgs.GetIntArg(
59 notifiers.push_back(std::move(notifier));
60 }
61 }
62
63 if (!notifiers.empty()) {
64 std::unique_ptr<CZMQNotificationInterface> notificationInterface(
66 notificationInterface->notifiers = std::move(notifiers);
67
68 if (notificationInterface->Initialize()) {
69 return notificationInterface;
70 }
71 }
72
73 return nullptr;
74}
75
76// Called at startup to conditionally set up ZMQ socket(s)
78 int major = 0, minor = 0, patch = 0;
79 zmq_version(&major, &minor, &patch);
80 LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
81
82 LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
84
85 pcontext = zmq_ctx_new();
86
87 if (!pcontext) {
88 zmqError("Unable to initialize context");
89 return false;
90 }
91
92 for (auto &notifier : notifiers) {
93 if (notifier->Initialize(pcontext)) {
94 LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n",
95 notifier->GetType(), notifier->GetAddress());
96 } else {
97 LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n",
98 notifier->GetType(), notifier->GetAddress());
99 return false;
100 }
101 }
102
103 return true;
104}
105
106// Called during shutdown sequence
108 LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
109 if (pcontext) {
110 for (auto &notifier : notifiers) {
111 LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n",
112 notifier->GetType(), notifier->GetAddress());
113 notifier->Shutdown();
114 }
115 zmq_ctx_term(pcontext);
116
117 pcontext = nullptr;
118 }
119}
120
121namespace {
122
123template <typename Function>
124void TryForEachAndRemoveFailed(
125 std::list<std::unique_ptr<CZMQAbstractNotifier>> &notifiers,
126 const Function &func) {
127 for (auto i = notifiers.begin(); i != notifiers.end();) {
128 CZMQAbstractNotifier *notifier = i->get();
129 if (func(notifier)) {
130 ++i;
131 } else {
132 notifier->Shutdown();
133 i = notifiers.erase(i);
134 }
135 }
136}
137
138} // anonymous namespace
139
141 const CBlockIndex *pindexFork,
142 bool fInitialDownload) {
143 // In IBD or blocks were disconnected without any new ones
144 if (fInitialDownload || pindexNew == pindexFork) {
145 return;
146 }
147
148 TryForEachAndRemoveFailed(notifiers,
149 [pindexNew](CZMQAbstractNotifier *notifier) {
150 return notifier->NotifyBlock(pindexNew);
151 });
152}
153
155 const CTransactionRef &ptx, std::shared_ptr<const std::vector<Coin>>,
156 uint64_t mempool_sequence) {
157 const CTransaction &tx = *ptx;
158
159 TryForEachAndRemoveFailed(
160 notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
161 return notifier->NotifyTransaction(tx) &&
162 notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
163 });
164}
165
167 const CTransactionRef &ptx, MemPoolRemovalReason reason,
168 uint64_t mempool_sequence) {
169 // Called for all non-block inclusion reasons
170 const CTransaction &tx = *ptx;
171
172 TryForEachAndRemoveFailed(
173 notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
174 return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
175 });
176}
177
179 ChainstateRole role, const std::shared_ptr<const CBlock> &pblock,
180 const CBlockIndex *pindexConnected) {
181 if (role == ChainstateRole::BACKGROUND) {
182 return;
183 }
184 for (const CTransactionRef &ptx : pblock->vtx) {
185 const CTransaction &tx = *ptx;
186 TryForEachAndRemoveFailed(notifiers,
187 [&tx](CZMQAbstractNotifier *notifier) {
188 return notifier->NotifyTransaction(tx);
189 });
190 }
191
192 // Next we notify BlockConnect listeners for *all* blocks
193 TryForEachAndRemoveFailed(
194 notifiers, [pindexConnected](CZMQAbstractNotifier *notifier) {
195 return notifier->NotifyBlockConnect(pindexConnected);
196 });
197}
198
200 const std::shared_ptr<const CBlock> &pblock,
201 const CBlockIndex *pindexDisconnected) {
202 for (const CTransactionRef &ptx : pblock->vtx) {
203 const CTransaction &tx = *ptx;
204 TryForEachAndRemoveFailed(notifiers,
205 [&tx](CZMQAbstractNotifier *notifier) {
206 return notifier->NotifyTransaction(tx);
207 });
208 }
209
210 // Next we notify BlockDisconnect listeners for *all* blocks
211 TryForEachAndRemoveFailed(
212 notifiers, [pindexDisconnected](CZMQAbstractNotifier *notifier) {
213 return notifier->NotifyBlockDisconnect(pindexDisconnected);
214 });
215}
216
217std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:38
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:371
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:526
Definition: block.h:60
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: blockindex.h:25
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
void TransactionAddedToMempool(const CTransactionRef &tx, std::shared_ptr< const std::vector< Coin > >, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(CBlock &, const CBlockIndex &)> get_block_by_index)
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
void BlockConnected(ChainstateRole role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
ChainstateRole
This enum describes the various roles a specific Chainstate instance can take.
Definition: chain.h:14
#define LogPrint(category,...)
Definition: logging.h:238
@ ZMQ
Definition: logging.h:45
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:315
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
Definition: txmempool.h:153
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const char *str)
Definition: zmqutil.cpp:11