58 return connect(
m_socket, addr, addr_len);
61int Sock::Bind(
const sockaddr *addr, socklen_t addr_len)
const {
62 return bind(
m_socket, addr, addr_len);
69std::unique_ptr<Sock>
Sock::Accept(sockaddr *addr, socklen_t *addr_len)
const {
76 std::unique_ptr<Sock> sock;
78 const auto socket = accept(
m_socket, addr, addr_len);
81 sock = std::make_unique<Sock>(socket);
82 }
catch (
const std::exception &) {
95 socklen_t *opt_len)
const {
96 return getsockopt(
m_socket, level, opt_name,
static_cast<char *
>(opt_val),
101 socklen_t opt_len)
const {
102 return setsockopt(
m_socket, level, opt_name,
103 static_cast<const char *
>(opt_val), opt_len);
129#if defined(USE_POLL) || defined(WIN32)
137 Event *occurred)
const {
141 std::shared_ptr<const Sock> shared{
this, [](
const Sock *) {}};
145 if (!
WaitMany(timeout, events_per_sock)) {
149 if (occurred !=
nullptr) {
150 *occurred = events_per_sock.begin()->second.occurred;
159 std::vector<pollfd> pfds;
160 for (
const auto &[sock, events] : events_per_sock) {
162 auto &pfd = pfds.back();
163 pfd.fd = sock->m_socket;
164 if (events.requested &
RECV) {
165 pfd.events |= POLLIN;
167 if (events.requested &
SEND) {
168 pfd.events |= POLLOUT;
177 assert(pfds.size() == events_per_sock.size());
179 for (
auto &[sock, events] : events_per_sock) {
180 assert(sock->m_socket ==
static_cast<SOCKET>(pfds[i].fd));
182 if (pfds[i].revents & POLLIN) {
183 events.occurred |=
RECV;
185 if (pfds[i].revents & POLLOUT) {
186 events.occurred |=
SEND;
188 if (pfds[i].revents & (POLLERR | POLLHUP)) {
189 events.occurred |=
ERR;
204 for (
const auto &[sock, events] : events_per_sock) {
205 if (!sock->IsSelectable()) {
208 const auto &s = sock->m_socket;
209 if (events.requested &
RECV) {
212 if (events.requested &
SEND) {
216 socket_max = std::max(socket_max, s);
225 for (
auto &[sock, events] : events_per_sock) {
226 const auto &s = sock->m_socket;
228 if (FD_ISSET(s, &recv)) {
229 events.occurred |=
RECV;
231 if (FD_ISSET(s, &
send)) {
232 events.occurred |=
SEND;
234 if (FD_ISSET(s, &err)) {
235 events.occurred |=
ERR;
248 FD_ZERO(&fdsetError);
251 return select(
m_socket + 1, &fdsetRecv,
nullptr, &fdsetError, timeout);
255 std::chrono::milliseconds timeout,
257 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
265 sent +=
static_cast<size_t>(ret);
266 if (sent == data.size()) {
272 throw std::runtime_error(
277 const auto now = GetTime<std::chrono::milliseconds>();
279 if (now >= deadline) {
280 throw std::runtime_error(
281 strprintf(
"Send timeout (sent only %u of %u bytes before that)",
287 "Send interrupted (sent only %u of %u bytes before that)", sent,
293 const auto wait_time = std::min(
300 std::chrono::milliseconds timeout,
302 size_t max_data)
const {
303 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
305 bool terminator_found{
false};
317 if (data.size() >= max_data) {
318 throw std::runtime_error(
319 strprintf(
"Received too many bytes without a terminator (%u)",
325 const ssize_t peek_ret{
326 Recv(buf, std::min(
sizeof(buf), max_data - data.size()), MSG_PEEK)};
332 throw std::runtime_error(
338 throw std::runtime_error(
339 "Connection unexpectedly closed by peer");
341 auto end = buf + peek_ret;
342 auto terminator_pos = std::find(buf, end, terminator);
343 terminator_found = terminator_pos != end;
345 const size_t try_len{terminator_found
346 ? terminator_pos - buf + 1
347 :
static_cast<size_t>(peek_ret)};
349 const ssize_t read_ret{
Recv(buf, try_len, 0)};
351 if (read_ret < 0 ||
static_cast<size_t>(read_ret) != try_len) {
352 throw std::runtime_error(
353 strprintf(
"recv() returned %u bytes on attempt to read "
354 "%u bytes but previous "
355 "peek claimed %u bytes are available",
356 read_ret, try_len, peek_ret));
360 const size_t append_len{terminator_found ? try_len - 1
363 data.append(buf, buf + append_len);
365 if (terminator_found) {
370 const auto now = GetTime<std::chrono::milliseconds>();
372 if (now >= deadline) {
373 throw std::runtime_error(
374 strprintf(
"Receive timeout (received %u bytes without "
375 "terminator before that)",
380 throw std::runtime_error(
381 strprintf(
"Receive interrupted (received %u bytes without "
382 "terminator before that)",
388 const auto wait_time = std::min(
396 errmsg =
"not connected";
401 switch (
Recv(&c,
sizeof(c), MSG_PEEK)) {
440 return Win32ErrorString(err);
A helper class for interruptible sleeps.
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
virtual std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const
accept(2) wrapper.
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
int WaitReadableOrException(timeval *timeout) const
Wait until the socket is readable or has an exceptional condition, or the timeout expires,...
SOCKET m_socket
Contained socket.
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
virtual void SendComplete(const std::string &data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
virtual int Bind(const sockaddr *addr, socklen_t addr_len) const
bind(2) wrapper.
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
virtual int GetSockName(sockaddr *name, socklen_t *name_len) const
getsockname(2) wrapper.
void Close()
Close m_socket if it is not INVALID_SOCKET.
virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const
Same as Wait(), but wait on many sockets within the same timeout.
static constexpr Event ERR
Ignored if passed to Wait(), but could be set in the occurred events if an exceptional condition has ...
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
virtual int SetSockOpt(int level, int opt_name, const void *opt_val, socklen_t opt_len) const
setsockopt(2) wrapper.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
virtual int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const
getsockopt(2) wrapper.
virtual int Connect(const sockaddr *addr, socklen_t addr_len) const
connect(2) wrapper.
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt, size_t max_data) const
Read from socket until a terminator character is encountered.
virtual int Listen(int backlog) const
listen(2) wrapper.
virtual bool SetNonBlocking() const
Set the non-blocking option on the socket.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
virtual bool IsSelectable() const
Check if the underlying socket can be used for select(2) (or the Wait() method).
bool operator==(SOCKET s) const
Check if the internal socket is equal to s.
#define WSAGetLastError()
static bool IOErrorIsPermanent(int err)
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.
Auxiliary requested/occurred events to wait for in WaitMany().
std::string SysErrorString(int err)
Return system error string from errno value.
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)