74 return connect(
m_socket, addr, addr_len);
77std::unique_ptr<Sock>
Sock::Accept(sockaddr *addr, socklen_t *addr_len)
const {
84 std::unique_ptr<Sock> sock;
86 const auto socket = accept(
m_socket, addr, addr_len);
89 sock = std::make_unique<Sock>(socket);
90 }
catch (
const std::exception &) {
103 socklen_t *opt_len)
const {
104 return getsockopt(
m_socket, level, opt_name,
static_cast<char *
>(opt_val),
109 Event *occurred)
const {
113 std::shared_ptr<const Sock> shared{
this, [](
const Sock *) {}};
117 if (!
WaitMany(timeout, events_per_sock)) {
121 if (occurred !=
nullptr) {
122 *occurred = events_per_sock.begin()->second.occurred;
131 std::vector<pollfd> pfds;
132 for (
const auto &[sock, events] : events_per_sock) {
134 auto &pfd = pfds.back();
135 pfd.fd = sock->m_socket;
136 if (events.requested &
RECV) {
137 pfd.events |= POLLIN;
139 if (events.requested &
SEND) {
140 pfd.events |= POLLOUT;
149 assert(pfds.size() == events_per_sock.size());
151 for (
auto &[sock, events] : events_per_sock) {
152 assert(sock->m_socket ==
static_cast<SOCKET>(pfds[i].fd));
154 if (pfds[i].revents & POLLIN) {
155 events.occurred |=
RECV;
157 if (pfds[i].revents & POLLOUT) {
158 events.occurred |=
SEND;
160 if (pfds[i].revents & (POLLERR | POLLHUP)) {
161 events.occurred |=
ERR;
176 for (
const auto &[sock, events] : events_per_sock) {
177 const auto &s = sock->m_socket;
181 if (events.requested &
RECV) {
184 if (events.requested &
SEND) {
188 socket_max = std::max(socket_max, s);
197 for (
auto &[sock, events] : events_per_sock) {
198 const auto &s = sock->m_socket;
200 if (FD_ISSET(s, &recv)) {
201 events.occurred |=
RECV;
203 if (FD_ISSET(s, &
send)) {
204 events.occurred |=
SEND;
206 if (FD_ISSET(s, &err)) {
207 events.occurred |=
ERR;
216 std::chrono::milliseconds timeout,
218 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
226 sent +=
static_cast<size_t>(ret);
227 if (sent == data.size()) {
233 throw std::runtime_error(
238 const auto now = GetTime<std::chrono::milliseconds>();
240 if (now >= deadline) {
241 throw std::runtime_error(
242 strprintf(
"Send timeout (sent only %u of %u bytes before that)",
248 "Send interrupted (sent only %u of %u bytes before that)", sent,
254 const auto wait_time = std::min(
261 std::chrono::milliseconds timeout,
263 size_t max_data)
const {
264 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
266 bool terminator_found{
false};
278 if (data.size() >= max_data) {
279 throw std::runtime_error(
280 strprintf(
"Received too many bytes without a terminator (%u)",
286 const ssize_t peek_ret{
287 Recv(buf, std::min(
sizeof(buf), max_data - data.size()), MSG_PEEK)};
293 throw std::runtime_error(
299 throw std::runtime_error(
300 "Connection unexpectedly closed by peer");
302 auto end = buf + peek_ret;
303 auto terminator_pos = std::find(buf, end, terminator);
304 terminator_found = terminator_pos != end;
306 const size_t try_len{terminator_found
307 ? terminator_pos - buf + 1
308 :
static_cast<size_t>(peek_ret)};
310 const ssize_t read_ret{
Recv(buf, try_len, 0)};
312 if (read_ret < 0 ||
static_cast<size_t>(read_ret) != try_len) {
313 throw std::runtime_error(
314 strprintf(
"recv() returned %u bytes on attempt to read "
315 "%u bytes but previous "
316 "peek claimed %u bytes are available",
317 read_ret, try_len, peek_ret));
321 const size_t append_len{terminator_found ? try_len - 1
324 data.append(buf, buf + append_len);
326 if (terminator_found) {
331 const auto now = GetTime<std::chrono::milliseconds>();
333 if (now >= deadline) {
334 throw std::runtime_error(
335 strprintf(
"Receive timeout (received %u bytes without "
336 "terminator before that)",
341 throw std::runtime_error(
342 strprintf(
"Receive interrupted (received %u bytes without "
343 "terminator before that)",
349 const auto wait_time = std::min(
357 errmsg =
"not connected";
362 switch (
Recv(&c,
sizeof(c), MSG_PEEK)) {
383 if (FormatMessageW(FORMAT_MESSAGE_FROM_SYSTEM |
384 FORMAT_MESSAGE_IGNORE_INSERTS |
385 FORMAT_MESSAGE_MAX_WIDTH_MASK,
386 nullptr, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
387 buf, ARRAYSIZE(buf),
nullptr)) {
390 std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>,
wchar_t>()
394 return strprintf(
"Unknown error (%d)", err);
410 int ret = closesocket(hSocket);
412 int ret = close(hSocket);
415 LogPrintf(
"Socket close failed: %d. Error: %s\n", hSocket,
A helper class for interruptible sleeps.
RAII helper class that manages a socket.
virtual std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const
accept(2) wrapper.
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
virtual void SendComplete(const std::string &data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
Sock()
Default constructor, creates an empty object that does nothing when destroyed.
virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const
Same as Wait(), but wait on many sockets within the same timeout.
virtual SOCKET Release()
Get the value of the contained socket and drop ownership.
static constexpr Event ERR
Ignored if passed to Wait(), but could be set in the occurred events if an exceptional condition has ...
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
virtual SOCKET Get() const
Get the value of the contained socket.
virtual int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const
getsockopt(2) wrapper.
virtual int Connect(const sockaddr *addr, socklen_t addr_len) const
connect(2) wrapper.
virtual void Reset()
Close if non-empty.
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt, size_t max_data) const
Read from socket until a terminator character is encountered.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
#define WSAGetLastError()
static bool IsSelectableSocket(const SOCKET &s)
static bool IOErrorIsPermanent(int err)
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
bool CloseSocket(SOCKET &hSocket)
Close socket and set hSocket to INVALID_SOCKET.
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.
Auxiliary requested/occurred events to wait for in WaitMany().
std::string SysErrorString(int err)
Return system error string from errno value.
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)