Bitcoin ABC 0.30.7
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2016 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
6
7#include <chain.h>
8#include <chainparams.h>
9#include <common/system.h>
10#include <config.h>
11#include <logging.h>
12#include <node/blockstorage.h>
14#include <primitives/txid.h>
15#include <rpc/server.h>
16#include <streams.h>
17#include <zmq/zmqutil.h>
18
19#include <zmq.h>
20
21#include <cstdarg>
22#include <cstddef>
23#include <map>
24#include <string>
25#include <utility>
26
27static std::multimap<std::string, CZMQAbstractPublishNotifier *>
29
30static const char *MSG_HASHBLOCK = "hashblock";
31static const char *MSG_HASHTX = "hashtx";
32static const char *MSG_RAWBLOCK = "rawblock";
33static const char *MSG_RAWTX = "rawtx";
34static const char *MSG_SEQUENCE = "sequence";
35
36// Internal function to send multipart message
37static int zmq_send_multipart(void *sock, const void *data, size_t size, ...) {
38 va_list args;
39 va_start(args, size);
40
41 while (1) {
42 zmq_msg_t msg;
43
44 int rc = zmq_msg_init_size(&msg, size);
45 if (rc != 0) {
46 zmqError("Unable to initialize ZMQ msg");
47 va_end(args);
48 return -1;
49 }
50
51 void *buf = zmq_msg_data(&msg);
52 memcpy(buf, data, size);
53
54 data = va_arg(args, const void *);
55
56 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
57 if (rc == -1) {
58 zmqError("Unable to send ZMQ msg");
59 zmq_msg_close(&msg);
60 va_end(args);
61 return -1;
62 }
63
64 zmq_msg_close(&msg);
65
66 if (!data) {
67 break;
68 }
69
70 size = va_arg(args, size_t);
71 }
72 va_end(args);
73 return 0;
74}
75
78
79 // check if address is being used by other publish notifier
80 std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator i =
82
83 if (i == mapPublishNotifiers.end()) {
84 psocket = zmq_socket(pcontext, ZMQ_PUB);
85 if (!psocket) {
86 zmqError("Failed to create socket");
87 return false;
88 }
89
91 "zmq: Outbound message high water mark for %s at %s is %d\n",
93
94 int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM,
97 if (rc != 0) {
98 zmqError("Failed to set outbound message high water mark");
99 zmq_close(psocket);
100 return false;
101 }
102
103 const int so_keepalive_option{1};
104 rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option,
105 sizeof(so_keepalive_option));
106 if (rc != 0) {
107 zmqError("Failed to set SO_KEEPALIVE");
108 zmq_close(psocket);
109 return false;
110 }
111
112 rc = zmq_bind(psocket, address.c_str());
113 if (rc != 0) {
114 zmqError("Failed to bind address");
115 zmq_close(psocket);
116 return false;
117 }
118
119 // register this notifier for the address, so it can be reused for other
120 // publish notifier
121 mapPublishNotifiers.insert(std::make_pair(address, this));
122 return true;
123 } else {
124 LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
126 "zmq: Outbound message high water mark for %s at %s is %d\n",
128
129 psocket = i->second->psocket;
130 mapPublishNotifiers.insert(std::make_pair(address, this));
131
132 return true;
133 }
134}
135
137 // Early return if Initialize was not called
138 if (!psocket) {
139 return;
140 }
141
142 int count = mapPublishNotifiers.count(address);
143
144 // remove this notifier from the list of publishers using this address
145 typedef std::multimap<std::string, CZMQAbstractPublishNotifier *>::iterator
146 iterator;
147 std::pair<iterator, iterator> iterpair =
148 mapPublishNotifiers.equal_range(address);
149
150 for (iterator it = iterpair.first; it != iterpair.second; ++it) {
151 if (it->second == this) {
152 mapPublishNotifiers.erase(it);
153 break;
154 }
155 }
156
157 if (count == 1) {
158 LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
159 int linger = 0;
160 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
161 zmq_close(psocket);
162 }
163
164 psocket = nullptr;
165}
166
168 const void *data,
169 size_t size) {
171
172 /* send three parts, command & data & a LE 4byte sequence number */
173 uint8_t msgseq[sizeof(uint32_t)];
174 WriteLE32(msgseq, nSequence);
175 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size,
176 msgseq, (size_t)sizeof(uint32_t), nullptr);
177 if (rc == -1) {
178 return false;
179 }
180
181 /* increment memory only sequence number after sending */
182 nSequence++;
183
184 return true;
185}
186
188 BlockHash hash = pindex->GetBlockHash();
189 LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(),
190 this->address);
191 uint8_t data[32];
192 for (unsigned int i = 0; i < 32; i++) {
193 data[31 - i] = hash.begin()[i];
194 }
195 return SendZmqMessage(MSG_HASHBLOCK, data, 32);
196}
197
199 const CTransaction &transaction) {
200 TxId txid = transaction.GetId();
201 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", txid.GetHex(),
202 this->address);
203 uint8_t data[32];
204 for (unsigned int i = 0; i < 32; i++) {
205 data[31 - i] = txid.begin()[i];
206 }
207 return SendZmqMessage(MSG_HASHTX, data, 32);
208}
209
211 LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n",
212 pindex->GetBlockHash().GetHex(), this->address);
213
215 CBlock block;
216 if (!m_get_block_by_index(block, *pindex)) {
217 zmqError("Can't read block from disk");
218 return false;
219 }
220
221 ss << block;
222
223 return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
224}
225
227 const CTransaction &transaction) {
228 TxId txid = transaction.GetId();
229 LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", txid.GetHex(),
230 this->address);
232 ss << transaction;
233 return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
234}
235
236// TODO: Dedup this code to take label char, log string
238 const CBlockIndex *pindex) {
239 BlockHash hash = pindex->GetBlockHash();
240 LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n",
241 hash.GetHex(), this->address);
242 char data[sizeof(BlockHash) + 1];
243 for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
244 data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
245 }
246 // Block (C)onnect
247 data[sizeof(data) - 1] = 'C';
248 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
249}
250
252 const CBlockIndex *pindex) {
253 BlockHash hash = pindex->GetBlockHash();
254 LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n",
255 hash.GetHex(), this->address);
256 char data[sizeof(BlockHash) + 1];
257 for (unsigned int i = 0; i < sizeof(BlockHash); i++) {
258 data[sizeof(BlockHash) - 1 - i] = hash.begin()[i];
259 }
260 // Block (D)isconnect
261 data[sizeof(data) - 1] = 'D';
262 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
263}
264
266 const CTransaction &transaction, uint64_t mempool_sequence) {
267 TxId txid = transaction.GetId();
268 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n",
269 txid.GetHex(), this->address);
270 uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
271 for (unsigned int i = 0; i < sizeof(TxId); i++) {
272 data[sizeof(TxId) - 1 - i] = txid.begin()[i];
273 }
274 // Mempool (A)cceptance
275 data[sizeof(TxId)] = 'A';
276 WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
277 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
278}
279
281 const CTransaction &transaction, uint64_t mempool_sequence) {
282 TxId txid = transaction.GetId();
283 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n",
284 txid.GetHex(), this->address);
285 uint8_t data[sizeof(TxId) + sizeof(mempool_sequence) + 1];
286 for (unsigned int i = 0; i < sizeof(TxId); i++) {
287 data[sizeof(TxId) - 1 - i] = txid.begin()[i];
288 }
289 // Mempool (R)emoval
290 data[sizeof(TxId)] = 'R';
291 WriteLE64(data + sizeof(TxId) + 1, mempool_sequence);
292 return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data));
293}
Definition: block.h:60
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: blockindex.h:25
BlockHash GetBlockHash() const
Definition: blockindex.h:146
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:177
const_iterator begin() const
Definition: streams.h:219
size_type size() const
Definition: streams.h:223
bool SendZmqMessage(const char *command, const void *data, size_t size)
uint32_t nSequence
upcounting per message sequence number
bool Initialize(void *pcontext) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
const std::function< bool(CBlock &, const CBlockIndex &)> m_get_block_by_index
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyBlockConnect(const CBlockIndex *pindex) override
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
uint8_t * begin()
Definition: uint256.h:85
std::string GetHex() const
Definition: uint256.cpp:16
static void WriteLE32(uint8_t *ptr, uint32_t x)
Definition: common.h:40
static void WriteLE64(uint8_t *ptr, uint64_t x)
Definition: common.h:45
#define LogPrint(category,...)
Definition: logging.h:238
@ ZMQ
Definition: logging.h:45
@ SER_NETWORK
Definition: serialize.h:152
int RPCSerializationFlags()
Retrieves any serialization flags requested in command line argument.
Definition: server.cpp:679
A BlockHash is a unqiue identifier for a block.
Definition: blockhash.h:13
A TxId is the identifier of a transaction.
Definition: txid.h:14
static int count
Definition: tests.c:31
assert(!tx.IsCoinBase())
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:11
static const char * MSG_HASHBLOCK
static const char * MSG_SEQUENCE
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static const char * MSG_RAWBLOCK
static const char * MSG_RAWTX
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
static const char * MSG_HASHTX
void zmqError(const char *str)
Definition: zmqutil.cpp:11