libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

80.3% Lines (346/431) 94.4% Functions (34/36) 63.8% Branches (146/229)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 // Register an op with the reactor, handling cached edge events.
34 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
35 void
36 4843 epoll_socket_impl::
37 register_op(
38 epoll_op& op,
39 epoll_op*& desc_slot,
40 bool& ready_flag,
41 bool& cancel_flag) noexcept
42 {
43 4843 svc_.work_started();
44
45 4843 std::lock_guard lock(desc_state_.mutex);
46 4843 bool io_done = false;
47
2/2
✓ Branch 0 taken 141 times.
✓ Branch 1 taken 4702 times.
4843 if (ready_flag)
48 {
49 141 ready_flag = false;
50 141 op.perform_io();
51
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 141 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
141 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
52
1/2
✓ Branch 0 taken 141 times.
✗ Branch 1 not taken.
141 if (!io_done)
53 141 op.errn = 0;
54 }
55
56
2/2
✓ Branch 0 taken 94 times.
✓ Branch 1 taken 4749 times.
4843 if (cancel_flag)
57 {
58 94 cancel_flag = false;
59 94 op.cancelled.store(true, std::memory_order_relaxed);
60 }
61
62
5/6
✓ Branch 0 taken 4843 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 94 times.
✓ Branch 4 taken 4749 times.
✓ Branch 5 taken 94 times.
✓ Branch 6 taken 4749 times.
4843 if (io_done || op.cancelled.load(std::memory_order_acquire))
63 {
64 94 svc_.post(&op);
65 94 svc_.work_finished();
66 }
67 else
68 {
69 4749 desc_slot = &op;
70 }
71 4843 }
72
73 void
74 104 epoll_op::canceller::
75 operator()() const noexcept
76 {
77 104 op->cancel();
78 104 }
79
80 void
81 epoll_connect_op::
82 cancel() noexcept
83 {
84 if (socket_impl_)
85 socket_impl_->cancel_single_op(*this);
86 else
87 request_cancel();
88 }
89
90 void
91 98 epoll_read_op::
92 cancel() noexcept
93 {
94
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
95 98 socket_impl_->cancel_single_op(*this);
96 else
97 request_cancel();
98 98 }
99
100 void
101 epoll_write_op::
102 cancel() noexcept
103 {
104 if (socket_impl_)
105 socket_impl_->cancel_single_op(*this);
106 else
107 request_cancel();
108 }
109
110 void
111 78434 epoll_op::
112 operator()()
113 {
114 78434 stop_cb.reset();
115
116 78434 socket_impl_->svc_.scheduler().reset_inline_budget();
117
118
2/2
✓ Branch 1 taken 204 times.
✓ Branch 2 taken 78230 times.
78434 if (cancelled.load(std::memory_order_acquire))
119 204 *ec_out = capy::error::canceled;
120
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78230 times.
78230 else if (errn != 0)
121 *ec_out = make_err(errn);
122
4/6
✓ Branch 1 taken 39153 times.
✓ Branch 2 taken 39077 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 39153 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 78230 times.
78230 else if (is_read_operation() && bytes_transferred == 0)
123 *ec_out = capy::error::eof;
124 else
125 78230 *ec_out = {};
126
127 78434 *bytes_out = bytes_transferred;
128
129 // Move to stack before resuming coroutine. The coroutine might close
130 // the socket, releasing the last wrapper ref. If impl_ptr were the
131 // last ref and we destroyed it while still in operator(), we'd have
132 // use-after-free. Moving to local ensures destruction happens at
133 // function exit, after all member accesses are complete.
134 78434 capy::executor_ref saved_ex( std::move( ex ) );
135 78434 std::coroutine_handle<> saved_h( std::move( h ) );
136 78434 auto prevent_premature_destruction = std::move(impl_ptr);
137
2/2
✓ Branch 1 taken 78434 times.
✓ Branch 4 taken 78434 times.
78434 dispatch_coro(saved_ex, saved_h).resume();
138 78434 }
139
140 void
141 4643 epoll_connect_op::
142 operator()()
143 {
144 4643 stop_cb.reset();
145
146 4643 socket_impl_->svc_.scheduler().reset_inline_budget();
147
148
3/4
✓ Branch 0 taken 4642 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 4642 times.
✗ Branch 4 not taken.
4643 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
149
150 // Cache endpoints on successful connect
151
3/4
✓ Branch 0 taken 4642 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 4642 times.
✗ Branch 3 not taken.
4643 if (success && socket_impl_)
152 {
153 // Query local endpoint via getsockname (may fail, but remote is always known)
154 4642 endpoint local_ep;
155 4642 sockaddr_in local_addr{};
156 4642 socklen_t local_len = sizeof(local_addr);
157
1/2
✓ Branch 1 taken 4642 times.
✗ Branch 2 not taken.
4642 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
158 4642 local_ep = from_sockaddr_in(local_addr);
159 // Always cache remote endpoint; local may be default if getsockname failed
160 4642 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
161 }
162
163
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4643 times.
4643 if (cancelled.load(std::memory_order_acquire))
164 *ec_out = capy::error::canceled;
165
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 4642 times.
4643 else if (errn != 0)
166 1 *ec_out = make_err(errn);
167 else
168 4642 *ec_out = {};
169
170 // Move to stack before resuming. See epoll_op::operator()() for rationale.
171 4643 capy::executor_ref saved_ex( std::move( ex ) );
172 4643 std::coroutine_handle<> saved_h( std::move( h ) );
173 4643 auto prevent_premature_destruction = std::move(impl_ptr);
174
2/2
✓ Branch 1 taken 4643 times.
✓ Branch 4 taken 4643 times.
4643 dispatch_coro(saved_ex, saved_h).resume();
175 4643 }
176
177 9296 epoll_socket_impl::
178 9296 epoll_socket_impl(epoll_socket_service& svc) noexcept
179 9296 : svc_(svc)
180 {
181 9296 }
182
183 9296 epoll_socket_impl::
184 ~epoll_socket_impl() = default;
185
186 void
187 9296 epoll_socket_impl::
188 release()
189 {
190 9296 close_socket();
191 9296 svc_.destroy_impl(*this);
192 9296 }
193
194 std::coroutine_handle<>
195 4643 epoll_socket_impl::
196 connect(
197 std::coroutine_handle<> h,
198 capy::executor_ref ex,
199 endpoint ep,
200 std::stop_token token,
201 std::error_code* ec)
202 {
203 4643 auto& op = conn_;
204
205 4643 sockaddr_in addr = detail::to_sockaddr_in(ep);
206
1/1
✓ Branch 1 taken 4643 times.
4643 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
207
208
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4643 times.
4643 if (result == 0)
209 {
210 sockaddr_in local_addr{};
211 socklen_t local_len = sizeof(local_addr);
212 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
213 local_endpoint_ = detail::from_sockaddr_in(local_addr);
214 remote_endpoint_ = ep;
215 }
216
217
2/4
✓ Branch 0 taken 4643 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4643 times.
4643 if (result == 0 || errno != EINPROGRESS)
218 {
219 int err = (result < 0) ? errno : 0;
220 if (svc_.scheduler().try_consume_inline_budget())
221 {
222 *ec = err ? make_err(err) : std::error_code{};
223 return ex.dispatch(h);
224 }
225 op.reset();
226 op.h = h;
227 op.ex = ex;
228 op.ec_out = ec;
229 op.fd = fd_;
230 op.target_endpoint = ep;
231 op.start(token, this);
232 op.impl_ptr = shared_from_this();
233 op.complete(err, 0);
234 svc_.post(&op);
235 return std::noop_coroutine();
236 }
237
238 // EINPROGRESS — register with reactor
239 4643 op.reset();
240 4643 op.h = h;
241 4643 op.ex = ex;
242 4643 op.ec_out = ec;
243 4643 op.fd = fd_;
244 4643 op.target_endpoint = ep;
245 4643 op.start(token, this);
246
1/1
✓ Branch 1 taken 4643 times.
4643 op.impl_ptr = shared_from_this();
247
248 4643 register_op(op, desc_state_.connect_op, desc_state_.write_ready,
249 4643 desc_state_.connect_cancel_pending);
250 4643 return std::noop_coroutine();
251 }
252
253 std::coroutine_handle<>
254 117634 epoll_socket_impl::
255 read_some(
256 std::coroutine_handle<> h,
257 capy::executor_ref ex,
258 io_buffer_param param,
259 std::stop_token token,
260 std::error_code* ec,
261 std::size_t* bytes_out)
262 {
263 117634 auto& op = rd_;
264 117634 op.reset();
265
266 117634 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
267 117634 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
268
269
6/8
✓ Branch 0 taken 117633 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 117633 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 117633 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 117633 times.
117634 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
270 {
271 1 op.empty_buffer_read = true;
272 1 op.h = h;
273 1 op.ex = ex;
274 1 op.ec_out = ec;
275 1 op.bytes_out = bytes_out;
276 1 op.start(token, this);
277
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
278 1 op.complete(0, 0);
279
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
280 1 return std::noop_coroutine();
281 }
282
283
2/2
✓ Branch 0 taken 117633 times.
✓ Branch 1 taken 117633 times.
235266 for (int i = 0; i < op.iovec_count; ++i)
284 {
285 117633 op.iovecs[i].iov_base = bufs[i].data();
286 117633 op.iovecs[i].iov_len = bufs[i].size();
287 }
288
289 // Speculative read
290 ssize_t n;
291 do {
292
1/1
✓ Branch 1 taken 117633 times.
117633 n = ::readv(fd_, op.iovecs, op.iovec_count);
293
3/4
✓ Branch 0 taken 200 times.
✓ Branch 1 taken 117433 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 200 times.
117633 } while (n < 0 && errno == EINTR);
294
295
3/6
✓ Branch 0 taken 200 times.
✓ Branch 1 taken 117433 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 200 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
117633 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
296 {
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 117433 times.
117433 int err = (n < 0) ? errno : 0;
298 117433 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
299
300
2/2
✓ Branch 2 taken 78280 times.
✓ Branch 3 taken 39153 times.
117433 if (svc_.scheduler().try_consume_inline_budget())
301 {
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78280 times.
78280 if (err)
303 *ec = make_err(err);
304
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 78275 times.
78280 else if (n == 0)
305 5 *ec = capy::error::eof;
306 else
307 78275 *ec = {};
308 78280 *bytes_out = bytes;
309
1/1
✓ Branch 1 taken 78280 times.
78280 return ex.dispatch(h);
310 }
311 39153 op.h = h;
312 39153 op.ex = ex;
313 39153 op.ec_out = ec;
314 39153 op.bytes_out = bytes_out;
315 39153 op.start(token, this);
316
1/1
✓ Branch 1 taken 39153 times.
39153 op.impl_ptr = shared_from_this();
317 39153 op.complete(err, bytes);
318
1/1
✓ Branch 1 taken 39153 times.
39153 svc_.post(&op);
319 39153 return std::noop_coroutine();
320 }
321
322 // EAGAIN — register with reactor
323 200 op.h = h;
324 200 op.ex = ex;
325 200 op.ec_out = ec;
326 200 op.bytes_out = bytes_out;
327 200 op.fd = fd_;
328 200 op.start(token, this);
329
1/1
✓ Branch 1 taken 200 times.
200 op.impl_ptr = shared_from_this();
330
331 200 register_op(op, desc_state_.read_op, desc_state_.read_ready,
332 200 desc_state_.read_cancel_pending);
333 200 return std::noop_coroutine();
334 }
335
336 std::coroutine_handle<>
337 117435 epoll_socket_impl::
338 write_some(
339 std::coroutine_handle<> h,
340 capy::executor_ref ex,
341 io_buffer_param param,
342 std::stop_token token,
343 std::error_code* ec,
344 std::size_t* bytes_out)
345 {
346 117435 auto& op = wr_;
347 117435 op.reset();
348
349 117435 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
350 117435 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
351
352
6/8
✓ Branch 0 taken 117434 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 117434 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 117434 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 117434 times.
117435 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
353 {
354 1 op.h = h;
355 1 op.ex = ex;
356 1 op.ec_out = ec;
357 1 op.bytes_out = bytes_out;
358 1 op.start(token, this);
359
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
360 1 op.complete(0, 0);
361
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
362 1 return std::noop_coroutine();
363 }
364
365
2/2
✓ Branch 0 taken 117434 times.
✓ Branch 1 taken 117434 times.
234868 for (int i = 0; i < op.iovec_count; ++i)
366 {
367 117434 op.iovecs[i].iov_base = bufs[i].data();
368 117434 op.iovecs[i].iov_len = bufs[i].size();
369 }
370
371 // Speculative write
372 117434 msghdr msg{};
373 117434 msg.msg_iov = op.iovecs;
374 117434 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
375
376 ssize_t n;
377 do {
378
1/1
✓ Branch 1 taken 117434 times.
117434 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
379
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 117433 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
117434 } while (n < 0 && errno == EINTR);
380
381
4/6
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 117433 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✗ Branch 5 not taken.
117434 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
382 {
383
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 117433 times.
117434 int err = (n < 0) ? errno : 0;
384 117434 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
385
386
2/2
✓ Branch 2 taken 78355 times.
✓ Branch 3 taken 39079 times.
117434 if (svc_.scheduler().try_consume_inline_budget())
387 {
388
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 78354 times.
78355 *ec = err ? make_err(err) : std::error_code{};
389 78355 *bytes_out = bytes;
390
1/1
✓ Branch 1 taken 78355 times.
78355 return ex.dispatch(h);
391 }
392 39079 op.h = h;
393 39079 op.ex = ex;
394 39079 op.ec_out = ec;
395 39079 op.bytes_out = bytes_out;
396 39079 op.start(token, this);
397
1/1
✓ Branch 1 taken 39079 times.
39079 op.impl_ptr = shared_from_this();
398 39079 op.complete(err, bytes);
399
1/1
✓ Branch 1 taken 39079 times.
39079 svc_.post(&op);
400 39079 return std::noop_coroutine();
401 }
402
403 // EAGAIN — register with reactor
404 op.h = h;
405 op.ex = ex;
406 op.ec_out = ec;
407 op.bytes_out = bytes_out;
408 op.fd = fd_;
409 op.start(token, this);
410 op.impl_ptr = shared_from_this();
411
412 register_op(op, desc_state_.write_op, desc_state_.write_ready,
413 desc_state_.write_cancel_pending);
414 return std::noop_coroutine();
415 }
416
417 std::error_code
418 3 epoll_socket_impl::
419 shutdown(tcp_socket::shutdown_type what) noexcept
420 {
421 int how;
422
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
423 {
424 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
425 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
426 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
427 default:
428 return make_err(EINVAL);
429 }
430
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
431 return make_err(errno);
432 3 return {};
433 }
434
435 std::error_code
436 5 epoll_socket_impl::
437 set_no_delay(bool value) noexcept
438 {
439
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
440
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
441 return make_err(errno);
442 5 return {};
443 }
444
445 bool
446 5 epoll_socket_impl::
447 no_delay(std::error_code& ec) const noexcept
448 {
449 5 int flag = 0;
450 5 socklen_t len = sizeof(flag);
451
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
452 {
453 ec = make_err(errno);
454 return false;
455 }
456 5 ec = {};
457 5 return flag != 0;
458 }
459
460 std::error_code
461 4 epoll_socket_impl::
462 set_keep_alive(bool value) noexcept
463 {
464
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
465
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
466 return make_err(errno);
467 4 return {};
468 }
469
470 bool
471 4 epoll_socket_impl::
472 keep_alive(std::error_code& ec) const noexcept
473 {
474 4 int flag = 0;
475 4 socklen_t len = sizeof(flag);
476
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
477 {
478 ec = make_err(errno);
479 return false;
480 }
481 4 ec = {};
482 4 return flag != 0;
483 }
484
485 std::error_code
486 1 epoll_socket_impl::
487 set_receive_buffer_size(int size) noexcept
488 {
489
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
490 return make_err(errno);
491 1 return {};
492 }
493
494 int
495 3 epoll_socket_impl::
496 receive_buffer_size(std::error_code& ec) const noexcept
497 {
498 3 int size = 0;
499 3 socklen_t len = sizeof(size);
500
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
501 {
502 ec = make_err(errno);
503 return 0;
504 }
505 3 ec = {};
506 3 return size;
507 }
508
509 std::error_code
510 1 epoll_socket_impl::
511 set_send_buffer_size(int size) noexcept
512 {
513
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
514 return make_err(errno);
515 1 return {};
516 }
517
518 int
519 3 epoll_socket_impl::
520 send_buffer_size(std::error_code& ec) const noexcept
521 {
522 3 int size = 0;
523 3 socklen_t len = sizeof(size);
524
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
525 {
526 ec = make_err(errno);
527 return 0;
528 }
529 3 ec = {};
530 3 return size;
531 }
532
533 std::error_code
534 8 epoll_socket_impl::
535 set_linger(bool enabled, int timeout) noexcept
536 {
537
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 7 times.
8 if (timeout < 0)
538 1 return make_err(EINVAL);
539 struct ::linger lg;
540
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 1 time.
7 lg.l_onoff = enabled ? 1 : 0;
541 7 lg.l_linger = timeout;
542
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
543 return make_err(errno);
544 7 return {};
545 }
546
547 tcp_socket::linger_options
548 3 epoll_socket_impl::
549 linger(std::error_code& ec) const noexcept
550 {
551 3 struct ::linger lg{};
552 3 socklen_t len = sizeof(lg);
553
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
554 {
555 ec = make_err(errno);
556 return {};
557 }
558 3 ec = {};
559 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
560 }
561
562 void
563 14135 epoll_socket_impl::
564 cancel() noexcept
565 {
566 14135 std::shared_ptr<epoll_socket_impl> self;
567 try {
568
1/1
✓ Branch 1 taken 14135 times.
14135 self = shared_from_this();
569 } catch (const std::bad_weak_ptr&) {
570 return;
571 }
572
573 14135 conn_.request_cancel();
574 14135 rd_.request_cancel();
575 14135 wr_.request_cancel();
576
577 14135 epoll_op* conn_claimed = nullptr;
578 14135 epoll_op* rd_claimed = nullptr;
579 14135 epoll_op* wr_claimed = nullptr;
580 {
581 14135 std::lock_guard lock(desc_state_.mutex);
582
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14135 times.
14135 if (desc_state_.connect_op == &conn_)
583 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
584 else
585 14135 desc_state_.connect_cancel_pending = true;
586
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 14131 times.
14135 if (desc_state_.read_op == &rd_)
587 4 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
588 else
589 14131 desc_state_.read_cancel_pending = true;
590
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14135 times.
14135 if (desc_state_.write_op == &wr_)
591 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
592 else
593 14135 desc_state_.write_cancel_pending = true;
594 14135 }
595
596
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14135 times.
14135 if (conn_claimed)
597 {
598 conn_.impl_ptr = self;
599 svc_.post(&conn_);
600 svc_.work_finished();
601 }
602
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 14131 times.
14135 if (rd_claimed)
603 {
604 4 rd_.impl_ptr = self;
605 4 svc_.post(&rd_);
606 4 svc_.work_finished();
607 }
608
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 14135 times.
14135 if (wr_claimed)
609 {
610 wr_.impl_ptr = self;
611 svc_.post(&wr_);
612 svc_.work_finished();
613 }
614 14135 }
615
616 void
617 98 epoll_socket_impl::
618 cancel_single_op(epoll_op& op) noexcept
619 {
620 98 op.request_cancel();
621
622 98 epoll_op** desc_op_ptr = nullptr;
623
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
624
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
625 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
626
627
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
628 {
629 98 epoll_op* claimed = nullptr;
630 {
631 98 std::lock_guard lock(desc_state_.mutex);
632
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (*desc_op_ptr == &op)
633 98 claimed = std::exchange(*desc_op_ptr, nullptr);
634 else if (&op == &conn_)
635 desc_state_.connect_cancel_pending = true;
636 else if (&op == &rd_)
637 desc_state_.read_cancel_pending = true;
638 else if (&op == &wr_)
639 desc_state_.write_cancel_pending = true;
640 98 }
641
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (claimed)
642 {
643 try {
644
1/1
✓ Branch 1 taken 98 times.
98 op.impl_ptr = shared_from_this();
645 } catch (const std::bad_weak_ptr&) {}
646 98 svc_.post(&op);
647 98 svc_.work_finished();
648 }
649 }
650 98 }
651
652 void
653 13950 epoll_socket_impl::
654 close_socket() noexcept
655 {
656 13950 cancel();
657
658 // Keep impl alive if descriptor_state is queued in the scheduler.
659 // Without this, destroy_impl() drops the last shared_ptr while
660 // the queued descriptor_state node would become dangling.
661
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 13943 times.
13950 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
662 {
663 try {
664
1/1
✓ Branch 1 taken 7 times.
7 desc_state_.impl_ref_ = shared_from_this();
665 } catch (std::bad_weak_ptr const&) {}
666 }
667
668
2/2
✓ Branch 0 taken 9296 times.
✓ Branch 1 taken 4654 times.
13950 if (fd_ >= 0)
669 {
670
1/2
✓ Branch 0 taken 9296 times.
✗ Branch 1 not taken.
9296 if (desc_state_.registered_events != 0)
671 9296 svc_.scheduler().deregister_descriptor(fd_);
672 9296 ::close(fd_);
673 9296 fd_ = -1;
674 }
675
676 13950 desc_state_.fd = -1;
677 {
678 13950 std::lock_guard lock(desc_state_.mutex);
679 13950 desc_state_.read_op = nullptr;
680 13950 desc_state_.write_op = nullptr;
681 13950 desc_state_.connect_op = nullptr;
682 13950 desc_state_.read_ready = false;
683 13950 desc_state_.write_ready = false;
684 13950 desc_state_.read_cancel_pending = false;
685 13950 desc_state_.write_cancel_pending = false;
686 13950 desc_state_.connect_cancel_pending = false;
687 13950 }
688 13950 desc_state_.registered_events = 0;
689
690 13950 local_endpoint_ = endpoint{};
691 13950 remote_endpoint_ = endpoint{};
692 13950 }
693
694 203 epoll_socket_service::
695 203 epoll_socket_service(capy::execution_context& ctx)
696
2/2
✓ Branch 2 taken 203 times.
✓ Branch 5 taken 203 times.
203 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
697 {
698 203 }
699
700 406 epoll_socket_service::
701 203 ~epoll_socket_service()
702 {
703 406 }
704
705 void
706 203 epoll_socket_service::
707 shutdown()
708 {
709
1/1
✓ Branch 2 taken 203 times.
203 std::lock_guard lock(state_->mutex_);
710
711
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 203 times.
203 while (auto* impl = state_->socket_list_.pop_front())
712 impl->close_socket();
713
714 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
715 // drains completed_ops_, calling destroy() on each queued op. If we
716 // released our shared_ptrs now, an epoll_op::destroy() could free the
717 // last ref to an impl whose embedded descriptor_state is still linked
718 // in the queue — use-after-free on the next pop(). Letting ~state_
719 // release the ptrs (during service destruction, after scheduler
720 // shutdown) keeps every impl alive until all ops have been drained.
721 203 }
722
723 tcp_socket::socket_impl&
724 9296 epoll_socket_service::
725 create_impl()
726 {
727
1/1
✓ Branch 1 taken 9296 times.
9296 auto impl = std::make_shared<epoll_socket_impl>(*this);
728 9296 auto* raw = impl.get();
729
730 {
731
1/1
✓ Branch 2 taken 9296 times.
9296 std::lock_guard lock(state_->mutex_);
732 9296 state_->socket_list_.push_back(raw);
733
1/1
✓ Branch 3 taken 9296 times.
9296 state_->socket_ptrs_.emplace(raw, std::move(impl));
734 9296 }
735
736 9296 return *raw;
737 9296 }
738
739 void
740 9296 epoll_socket_service::
741 destroy_impl(tcp_socket::socket_impl& impl)
742 {
743 9296 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
744
1/1
✓ Branch 2 taken 9296 times.
9296 std::lock_guard lock(state_->mutex_);
745 9296 state_->socket_list_.remove(epoll_impl);
746
1/1
✓ Branch 2 taken 9296 times.
9296 state_->socket_ptrs_.erase(epoll_impl);
747 9296 }
748
749 std::error_code
750 4654 epoll_socket_service::
751 open_socket(tcp_socket::socket_impl& impl)
752 {
753 4654 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
754 4654 epoll_impl->close_socket();
755
756 4654 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
757
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4654 times.
4654 if (fd < 0)
758 return make_err(errno);
759
760 4654 epoll_impl->fd_ = fd;
761
762 // Register fd with epoll (edge-triggered mode)
763 4654 epoll_impl->desc_state_.fd = fd;
764 {
765
1/1
✓ Branch 1 taken 4654 times.
4654 std::lock_guard lock(epoll_impl->desc_state_.mutex);
766 4654 epoll_impl->desc_state_.read_op = nullptr;
767 4654 epoll_impl->desc_state_.write_op = nullptr;
768 4654 epoll_impl->desc_state_.connect_op = nullptr;
769 4654 }
770 4654 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
771
772 4654 return {};
773 }
774
775 void
776 78430 epoll_socket_service::
777 post(epoll_op* op)
778 {
779 78430 state_->sched_.post(op);
780 78430 }
781
782 void
783 4843 epoll_socket_service::
784 work_started() noexcept
785 {
786 4843 state_->sched_.work_started();
787 4843 }
788
789 void
790 196 epoll_socket_service::
791 work_finished() noexcept
792 {
793 196 state_->sched_.work_finished();
794 196 }
795
796 } // namespace boost::corosio::detail
797
798 #endif // BOOST_COROSIO_HAS_EPOLL
799