Bitcoin ABC 0.30.5
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2018 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
7#include <zmq/zmqutil.h>
8
9#include <zmq.h>
10
11#include <common/args.h>
12#include <logging.h>
13#include <primitives/block.h>
14
16
18 Shutdown();
19}
20
21std::list<const CZMQAbstractNotifier *>
23 std::list<const CZMQAbstractNotifier *> result;
24 for (const auto &n : notifiers) {
25 result.push_back(n.get());
26 }
27 return result;
28}
29
30std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(
31 std::function<bool(CBlock &, const CBlockIndex &)> get_block_by_index) {
32 std::map<std::string, CZMQNotifierFactory> factories;
33 factories["pubhashblock"] =
34 CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
35 factories["pubhashtx"] =
36 CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
37 factories["pubrawblock"] =
38 [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
39 return std::make_unique<CZMQPublishRawBlockNotifier>(
40 get_block_by_index);
41 };
42 factories["pubrawtx"] =
43 CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
44 factories["pubsequence"] =
45 CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
46
47 std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
48 for (const auto &entry : factories) {
49 std::string arg("-zmq" + entry.first);
50 const auto &factory = entry.second;
51 for (const std::string &address : gArgs.GetArgs(arg)) {
52 std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
53 notifier->SetType(entry.first);
54 notifier->SetAddress(address);
55 notifier->SetOutboundMessageHighWaterMark(
56 static_cast<int>(gArgs.GetIntArg(
58 notifiers.push_back(std::move(notifier));
59 }
60 }
61
62 if (!notifiers.empty()) {
63 std::unique_ptr<CZMQNotificationInterface> notificationInterface(
65 notificationInterface->notifiers = std::move(notifiers);
66
67 if (notificationInterface->Initialize()) {
68 return notificationInterface;
69 }
70 }
71
72 return nullptr;
73}
74
75// Called at startup to conditionally set up ZMQ socket(s)
77 int major = 0, minor = 0, patch = 0;
78 zmq_version(&major, &minor, &patch);
79 LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
80
81 LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
83
84 pcontext = zmq_ctx_new();
85
86 if (!pcontext) {
87 zmqError("Unable to initialize context");
88 return false;
89 }
90
91 for (auto &notifier : notifiers) {
92 if (notifier->Initialize(pcontext)) {
93 LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n",
94 notifier->GetType(), notifier->GetAddress());
95 } else {
96 LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n",
97 notifier->GetType(), notifier->GetAddress());
98 return false;
99 }
100 }
101
102 return true;
103}
104
105// Called during shutdown sequence
107 LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
108 if (pcontext) {
109 for (auto &notifier : notifiers) {
110 LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n",
111 notifier->GetType(), notifier->GetAddress());
112 notifier->Shutdown();
113 }
114 zmq_ctx_term(pcontext);
115
116 pcontext = nullptr;
117 }
118}
119
120namespace {
121
122template <typename Function>
123void TryForEachAndRemoveFailed(
124 std::list<std::unique_ptr<CZMQAbstractNotifier>> &notifiers,
125 const Function &func) {
126 for (auto i = notifiers.begin(); i != notifiers.end();) {
127 CZMQAbstractNotifier *notifier = i->get();
128 if (func(notifier)) {
129 ++i;
130 } else {
131 notifier->Shutdown();
132 i = notifiers.erase(i);
133 }
134 }
135}
136
137} // anonymous namespace
138
140 const CBlockIndex *pindexFork,
141 bool fInitialDownload) {
142 // In IBD or blocks were disconnected without any new ones
143 if (fInitialDownload || pindexNew == pindexFork) {
144 return;
145 }
146
147 TryForEachAndRemoveFailed(notifiers,
148 [pindexNew](CZMQAbstractNotifier *notifier) {
149 return notifier->NotifyBlock(pindexNew);
150 });
151}
152
154 const CTransactionRef &ptx, std::shared_ptr<const std::vector<Coin>>,
155 uint64_t mempool_sequence) {
156 const CTransaction &tx = *ptx;
157
158 TryForEachAndRemoveFailed(
159 notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
160 return notifier->NotifyTransaction(tx) &&
161 notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
162 });
163}
164
166 const CTransactionRef &ptx, MemPoolRemovalReason reason,
167 uint64_t mempool_sequence) {
168 // Called for all non-block inclusion reasons
169 const CTransaction &tx = *ptx;
170
171 TryForEachAndRemoveFailed(
172 notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier *notifier) {
173 return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
174 });
175}
176
178 const std::shared_ptr<const CBlock> &pblock,
179 const CBlockIndex *pindexConnected) {
180 for (const CTransactionRef &ptx : pblock->vtx) {
181 const CTransaction &tx = *ptx;
182 TryForEachAndRemoveFailed(notifiers,
183 [&tx](CZMQAbstractNotifier *notifier) {
184 return notifier->NotifyTransaction(tx);
185 });
186 }
187
188 // Next we notify BlockConnect listeners for *all* blocks
189 TryForEachAndRemoveFailed(
190 notifiers, [pindexConnected](CZMQAbstractNotifier *notifier) {
191 return notifier->NotifyBlockConnect(pindexConnected);
192 });
193}
194
196 const std::shared_ptr<const CBlock> &pblock,
197 const CBlockIndex *pindexDisconnected) {
198 for (const CTransactionRef &ptx : pblock->vtx) {
199 const CTransaction &tx = *ptx;
200 TryForEachAndRemoveFailed(notifiers,
201 [&tx](CZMQAbstractNotifier *notifier) {
202 return notifier->NotifyTransaction(tx);
203 });
204 }
205
206 // Next we notify BlockDisconnect listeners for *all* blocks
207 TryForEachAndRemoveFailed(
208 notifiers, [pindexDisconnected](CZMQAbstractNotifier *notifier) {
209 return notifier->NotifyBlockDisconnect(pindexDisconnected);
210 });
211}
212
213std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:38
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:371
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:526
Definition: block.h:60
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: blockindex.h:25
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
void TransactionAddedToMempool(const CTransactionRef &tx, std::shared_ptr< const std::vector< Coin > >, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(CBlock &, const CBlockIndex &)> get_block_by_index)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
#define LogPrint(category,...)
Definition: logging.h:211
@ ZMQ
Definition: logging.h:45
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:315
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
Definition: txmempool.h:151
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const char *str)
Definition: zmqutil.cpp:11