libs/corosio/src/corosio/src/detail/select/sockets.cpp

73.4% Lines (273/372) 94.1% Functions (32/34) 57.4% Branches (113/197)
libs/corosio/src/corosio/src/detail/select/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/dispatch_coro.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <boost/capy/buffers.hpp>
20
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <netinet/in.h>
24 #include <netinet/tcp.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 97 select_op::canceller::
32 operator()() const noexcept
33 {
34 97 op->cancel();
35 97 }
36
37 void
38 select_connect_op::
39 cancel() noexcept
40 {
41 if (socket_impl_)
42 socket_impl_->cancel_single_op(*this);
43 else
44 request_cancel();
45 }
46
47 void
48 97 select_read_op::
49 cancel() noexcept
50 {
51
1/2
✓ Branch 0 taken 97 times.
✗ Branch 1 not taken.
97 if (socket_impl_)
52 97 socket_impl_->cancel_single_op(*this);
53 else
54 request_cancel();
55 97 }
56
57 void
58 select_write_op::
59 cancel() noexcept
60 {
61 if (socket_impl_)
62 socket_impl_->cancel_single_op(*this);
63 else
64 request_cancel();
65 }
66
67 void
68 3403 select_connect_op::
69 operator()()
70 {
71 3403 stop_cb.reset();
72
73
3/4
✓ Branch 0 taken 3401 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 3401 times.
✗ Branch 4 not taken.
3403 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
74
75 // Cache endpoints on successful connect
76
3/4
✓ Branch 0 taken 3401 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 3401 times.
✗ Branch 3 not taken.
3403 if (success && socket_impl_)
77 {
78 // Query local endpoint via getsockname (may fail, but remote is always known)
79 3401 endpoint local_ep;
80 3401 sockaddr_in local_addr{};
81 3401 socklen_t local_len = sizeof(local_addr);
82
1/2
✓ Branch 1 taken 3401 times.
✗ Branch 2 not taken.
3401 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
83 3401 local_ep = from_sockaddr_in(local_addr);
84 // Always cache remote endpoint; local may be default if getsockname failed
85 3401 static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
86 }
87
88
1/2
✓ Branch 0 taken 3403 times.
✗ Branch 1 not taken.
3403 if (ec_out)
89 {
90
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3403 times.
3403 if (cancelled.load(std::memory_order_acquire))
91 *ec_out = capy::error::canceled;
92
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3401 times.
3403 else if (errn != 0)
93 2 *ec_out = make_err(errn);
94 else
95 3401 *ec_out = {};
96 }
97
98
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3403 times.
3403 if (bytes_out)
99 *bytes_out = bytes_transferred;
100
101 // Move to stack before destroying the frame
102 3403 capy::executor_ref saved_ex( std::move( ex ) );
103 3403 std::coroutine_handle<> saved_h( std::move( h ) );
104 3403 impl_ptr.reset();
105
2/2
✓ Branch 1 taken 3403 times.
✓ Branch 4 taken 3403 times.
3403 dispatch_coro(saved_ex, saved_h).resume();
106 3403 }
107
108 6815 select_socket_impl::
109 6815 select_socket_impl(select_socket_service& svc) noexcept
110 6815 : svc_(svc)
111 {
112 6815 }
113
114 void
115 6815 select_socket_impl::
116 release()
117 {
118 6815 close_socket();
119 6815 svc_.destroy_impl(*this);
120 6815 }
121
122 std::coroutine_handle<>
123 3403 select_socket_impl::
124 connect(
125 std::coroutine_handle<> h,
126 capy::executor_ref ex,
127 endpoint ep,
128 std::stop_token token,
129 std::error_code* ec)
130 {
131 3403 auto& op = conn_;
132 3403 op.reset();
133 3403 op.h = h;
134 3403 op.ex = ex;
135 3403 op.ec_out = ec;
136 3403 op.fd = fd_;
137 3403 op.target_endpoint = ep; // Store target for endpoint caching
138 3403 op.start(token, this);
139
140 3403 sockaddr_in addr = detail::to_sockaddr_in(ep);
141
1/1
✓ Branch 1 taken 3403 times.
3403 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
142
143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3403 times.
3403 if (result == 0)
144 {
145 // Sync success - cache endpoints immediately
146 sockaddr_in local_addr{};
147 socklen_t local_len = sizeof(local_addr);
148 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
149 local_endpoint_ = detail::from_sockaddr_in(local_addr);
150 remote_endpoint_ = ep;
151
152 op.complete(0, 0);
153 op.impl_ptr = shared_from_this();
154 svc_.post(&op);
155 // completion is always posted to scheduler queue, never inline.
156 return std::noop_coroutine();
157 }
158
159
1/2
✓ Branch 0 taken 3403 times.
✗ Branch 1 not taken.
3403 if (errno == EINPROGRESS)
160 {
161 3403 svc_.work_started();
162
1/1
✓ Branch 1 taken 3403 times.
3403 op.impl_ptr = shared_from_this();
163
164 // Set registering BEFORE register_fd to close the race window where
165 // reactor sees an event before we set registered. The reactor treats
166 // registering the same as registered when claiming the op.
167 3403 op.registered.store(select_registration_state::registering, std::memory_order_release);
168
1/1
✓ Branch 2 taken 3403 times.
3403 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
169
170 // Transition to registered. If this fails, reactor or cancel already
171 // claimed the op (state is now unregistered), so we're done. However,
172 // we must still deregister the fd because cancel's deregister_fd may
173 // have run before our register_fd, leaving the fd orphaned.
174 3403 auto expected = select_registration_state::registering;
175
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3403 times.
3403 if (!op.registered.compare_exchange_strong(
176 expected, select_registration_state::registered, std::memory_order_acq_rel))
177 {
178 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
179 // completion is always posted to scheduler queue, never inline.
180 return std::noop_coroutine();
181 }
182
183 // If cancelled was set before we registered, handle it now.
184
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3403 times.
3403 if (op.cancelled.load(std::memory_order_acquire))
185 {
186 auto prev = op.registered.exchange(
187 select_registration_state::unregistered, std::memory_order_acq_rel);
188 if (prev != select_registration_state::unregistered)
189 {
190 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
191 op.impl_ptr = shared_from_this();
192 svc_.post(&op);
193 svc_.work_finished();
194 }
195 }
196 // completion is always posted to scheduler queue, never inline.
197 3403 return std::noop_coroutine();
198 }
199
200 op.complete(errno, 0);
201 op.impl_ptr = shared_from_this();
202 svc_.post(&op);
203 // completion is always posted to scheduler queue, never inline.
204 return std::noop_coroutine();
205 }
206
207 std::coroutine_handle<>
208 87358 select_socket_impl::
209 read_some(
210 std::coroutine_handle<> h,
211 capy::executor_ref ex,
212 io_buffer_param param,
213 std::stop_token token,
214 std::error_code* ec,
215 std::size_t* bytes_out)
216 {
217 87358 auto& op = rd_;
218 87358 op.reset();
219 87358 op.h = h;
220 87358 op.ex = ex;
221 87358 op.ec_out = ec;
222 87358 op.bytes_out = bytes_out;
223 87358 op.fd = fd_;
224 87358 op.start(token, this);
225
226 87358 capy::mutable_buffer bufs[select_read_op::max_buffers];
227 87358 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
228
229
6/8
✓ Branch 0 taken 87357 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 87357 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 87357 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 87357 times.
87358 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
230 {
231 1 op.empty_buffer_read = true;
232 1 op.complete(0, 0);
233
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
234
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
235 1 return std::noop_coroutine();
236 }
237
238
2/2
✓ Branch 0 taken 87357 times.
✓ Branch 1 taken 87357 times.
174714 for (int i = 0; i < op.iovec_count; ++i)
239 {
240 87357 op.iovecs[i].iov_base = bufs[i].data();
241 87357 op.iovecs[i].iov_len = bufs[i].size();
242 }
243
244
1/1
✓ Branch 1 taken 87357 times.
87357 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
245
246
2/2
✓ Branch 0 taken 87075 times.
✓ Branch 1 taken 282 times.
87357 if (n > 0)
247 {
248 87075 op.complete(0, static_cast<std::size_t>(n));
249
1/1
✓ Branch 1 taken 87075 times.
87075 op.impl_ptr = shared_from_this();
250
1/1
✓ Branch 1 taken 87075 times.
87075 svc_.post(&op);
251 87075 return std::noop_coroutine();
252 }
253
254
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 277 times.
282 if (n == 0)
255 {
256 5 op.complete(0, 0);
257
1/1
✓ Branch 1 taken 5 times.
5 op.impl_ptr = shared_from_this();
258
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
259 5 return std::noop_coroutine();
260 }
261
262
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 277 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
277 if (errno == EAGAIN || errno == EWOULDBLOCK)
263 {
264 277 svc_.work_started();
265
1/1
✓ Branch 1 taken 277 times.
277 op.impl_ptr = shared_from_this();
266
267 // Set registering BEFORE register_fd to close the race window where
268 // reactor sees an event before we set registered.
269 277 op.registered.store(select_registration_state::registering, std::memory_order_release);
270
1/1
✓ Branch 2 taken 277 times.
277 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
271
272 // Transition to registered. If this fails, reactor or cancel already
273 // claimed the op (state is now unregistered), so we're done. However,
274 // we must still deregister the fd because cancel's deregister_fd may
275 // have run before our register_fd, leaving the fd orphaned.
276 277 auto expected = select_registration_state::registering;
277
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 277 times.
277 if (!op.registered.compare_exchange_strong(
278 expected, select_registration_state::registered, std::memory_order_acq_rel))
279 {
280 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 277 times.
277 if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered, std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
292 op.impl_ptr = shared_from_this();
293 svc_.post(&op);
294 svc_.work_finished();
295 }
296 }
297 277 return std::noop_coroutine();
298 }
299
300 op.complete(errno, 0);
301 op.impl_ptr = shared_from_this();
302 svc_.post(&op);
303 return std::noop_coroutine();
304 }
305
306 std::coroutine_handle<>
307 87198 select_socket_impl::
308 write_some(
309 std::coroutine_handle<> h,
310 capy::executor_ref ex,
311 io_buffer_param param,
312 std::stop_token token,
313 std::error_code* ec,
314 std::size_t* bytes_out)
315 {
316 87198 auto& op = wr_;
317 87198 op.reset();
318 87198 op.h = h;
319 87198 op.ex = ex;
320 87198 op.ec_out = ec;
321 87198 op.bytes_out = bytes_out;
322 87198 op.fd = fd_;
323 87198 op.start(token, this);
324
325 87198 capy::mutable_buffer bufs[select_write_op::max_buffers];
326 87198 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
327
328
6/8
✓ Branch 0 taken 87197 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 87197 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 87197 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 87197 times.
87198 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
329 {
330 1 op.complete(0, 0);
331
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
332
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
333 1 return std::noop_coroutine();
334 }
335
336
2/2
✓ Branch 0 taken 87197 times.
✓ Branch 1 taken 87197 times.
174394 for (int i = 0; i < op.iovec_count; ++i)
337 {
338 87197 op.iovecs[i].iov_base = bufs[i].data();
339 87197 op.iovecs[i].iov_len = bufs[i].size();
340 }
341
342 87197 msghdr msg{};
343 87197 msg.msg_iov = op.iovecs;
344 87197 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
345
346
1/1
✓ Branch 1 taken 87197 times.
87197 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
347
348
2/2
✓ Branch 0 taken 87196 times.
✓ Branch 1 taken 1 time.
87197 if (n > 0)
349 {
350 87196 op.complete(0, static_cast<std::size_t>(n));
351
1/1
✓ Branch 1 taken 87196 times.
87196 op.impl_ptr = shared_from_this();
352
1/1
✓ Branch 1 taken 87196 times.
87196 svc_.post(&op);
353 87196 return std::noop_coroutine();
354 }
355
356
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
357 {
358 svc_.work_started();
359 op.impl_ptr = shared_from_this();
360
361 // Set registering BEFORE register_fd to close the race window where
362 // reactor sees an event before we set registered.
363 op.registered.store(select_registration_state::registering, std::memory_order_release);
364 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
365
366 // Transition to registered. If this fails, reactor or cancel already
367 // claimed the op (state is now unregistered), so we're done. However,
368 // we must still deregister the fd because cancel's deregister_fd may
369 // have run before our register_fd, leaving the fd orphaned.
370 auto expected = select_registration_state::registering;
371 if (!op.registered.compare_exchange_strong(
372 expected, select_registration_state::registered, std::memory_order_acq_rel))
373 {
374 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
375 return std::noop_coroutine();
376 }
377
378 // If cancelled was set before we registered, handle it now.
379 if (op.cancelled.load(std::memory_order_acquire))
380 {
381 auto prev = op.registered.exchange(
382 select_registration_state::unregistered, std::memory_order_acq_rel);
383 if (prev != select_registration_state::unregistered)
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
386 op.impl_ptr = shared_from_this();
387 svc_.post(&op);
388 svc_.work_finished();
389 }
390 }
391 return std::noop_coroutine();
392 }
393
394
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
395
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
396
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
397 1 return std::noop_coroutine();
398 }
399
400 std::error_code
401 3 select_socket_impl::
402 shutdown(tcp_socket::shutdown_type what) noexcept
403 {
404 int how;
405
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
406 {
407 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
408 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
409 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
410 default:
411 return make_err(EINVAL);
412 }
413
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
414 return make_err(errno);
415 3 return {};
416 }
417
418 std::error_code
419 5 select_socket_impl::
420 set_no_delay(bool value) noexcept
421 {
422
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
423
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
424 return make_err(errno);
425 5 return {};
426 }
427
428 bool
429 5 select_socket_impl::
430 no_delay(std::error_code& ec) const noexcept
431 {
432 5 int flag = 0;
433 5 socklen_t len = sizeof(flag);
434
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
435 {
436 ec = make_err(errno);
437 return false;
438 }
439 5 ec = {};
440 5 return flag != 0;
441 }
442
443 std::error_code
444 4 select_socket_impl::
445 set_keep_alive(bool value) noexcept
446 {
447
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
448
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
449 return make_err(errno);
450 4 return {};
451 }
452
453 bool
454 4 select_socket_impl::
455 keep_alive(std::error_code& ec) const noexcept
456 {
457 4 int flag = 0;
458 4 socklen_t len = sizeof(flag);
459
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
460 {
461 ec = make_err(errno);
462 return false;
463 }
464 4 ec = {};
465 4 return flag != 0;
466 }
467
468 std::error_code
469 1 select_socket_impl::
470 set_receive_buffer_size(int size) noexcept
471 {
472
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
473 return make_err(errno);
474 1 return {};
475 }
476
477 int
478 3 select_socket_impl::
479 receive_buffer_size(std::error_code& ec) const noexcept
480 {
481 3 int size = 0;
482 3 socklen_t len = sizeof(size);
483
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
484 {
485 ec = make_err(errno);
486 return 0;
487 }
488 3 ec = {};
489 3 return size;
490 }
491
492 std::error_code
493 1 select_socket_impl::
494 set_send_buffer_size(int size) noexcept
495 {
496
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
497 return make_err(errno);
498 1 return {};
499 }
500
501 int
502 3 select_socket_impl::
503 send_buffer_size(std::error_code& ec) const noexcept
504 {
505 3 int size = 0;
506 3 socklen_t len = sizeof(size);
507
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
508 {
509 ec = make_err(errno);
510 return 0;
511 }
512 3 ec = {};
513 3 return size;
514 }
515
516 std::error_code
517 4 select_socket_impl::
518 set_linger(bool enabled, int timeout) noexcept
519 {
520
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
521 1 return make_err(EINVAL);
522 struct ::linger lg;
523
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
524 3 lg.l_linger = timeout;
525
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
526 return make_err(errno);
527 3 return {};
528 }
529
530 tcp_socket::linger_options
531 3 select_socket_impl::
532 linger(std::error_code& ec) const noexcept
533 {
534 3 struct ::linger lg{};
535 3 socklen_t len = sizeof(lg);
536
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
537 {
538 ec = make_err(errno);
539 return {};
540 }
541 3 ec = {};
542 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
543 }
544
545 void
546 10402 select_socket_impl::
547 cancel() noexcept
548 {
549 10402 std::shared_ptr<select_socket_impl> self;
550 try {
551
1/1
✓ Branch 1 taken 10402 times.
10402 self = shared_from_this();
552 } catch (const std::bad_weak_ptr&) {
553 return;
554 }
555
556 31206 auto cancel_op = [this, &self](select_op& op, int events) {
557 31206 auto prev = op.registered.exchange(
558 select_registration_state::unregistered, std::memory_order_acq_rel);
559 31206 op.request_cancel();
560
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 31115 times.
31206 if (prev != select_registration_state::unregistered)
561 {
562 91 svc_.scheduler().deregister_fd(fd_, events);
563 91 op.impl_ptr = self;
564 91 svc_.post(&op);
565 91 svc_.work_finished();
566 }
567 41608 };
568
569 10402 cancel_op(conn_, select_scheduler::event_write);
570 10402 cancel_op(rd_, select_scheduler::event_read);
571 10402 cancel_op(wr_, select_scheduler::event_write);
572 10402 }
573
574 void
575 97 select_socket_impl::
576 cancel_single_op(select_op& op) noexcept
577 {
578 // Called from stop_token callback to cancel a specific pending operation.
579 97 auto prev = op.registered.exchange(
580 select_registration_state::unregistered, std::memory_order_acq_rel);
581 97 op.request_cancel();
582
583
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 32 times.
97 if (prev != select_registration_state::unregistered)
584 {
585 // Determine which event type to deregister
586 65 int events = 0;
587
2/4
✓ Branch 0 taken 65 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 65 times.
65 if (&op == &conn_ || &op == &wr_)
588 events = select_scheduler::event_write;
589
1/2
✓ Branch 0 taken 65 times.
✗ Branch 1 not taken.
65 else if (&op == &rd_)
590 65 events = select_scheduler::event_read;
591
592 65 svc_.scheduler().deregister_fd(fd_, events);
593
594 // Keep impl alive until op completes
595 try {
596
1/1
✓ Branch 1 taken 65 times.
65 op.impl_ptr = shared_from_this();
597 } catch (const std::bad_weak_ptr&) {
598 // Impl is being destroyed, op will be orphaned but that's ok
599 }
600
601 65 svc_.post(&op);
602 65 svc_.work_finished();
603 }
604 97 }
605
606 void
607 10229 select_socket_impl::
608 close_socket() noexcept
609 {
610 10229 cancel();
611
612
2/2
✓ Branch 0 taken 6815 times.
✓ Branch 1 taken 3414 times.
10229 if (fd_ >= 0)
613 {
614 // Unconditionally remove from registered_fds_ to handle edge cases
615 // where the fd might be registered but cancel() didn't clean it up
616 // due to race conditions.
617 6815 svc_.scheduler().deregister_fd(fd_,
618 select_scheduler::event_read | select_scheduler::event_write);
619 6815 ::close(fd_);
620 6815 fd_ = -1;
621 }
622
623 // Clear cached endpoints
624 10229 local_endpoint_ = endpoint{};
625 10229 remote_endpoint_ = endpoint{};
626 10229 }
627
628 133 select_socket_service::
629 133 select_socket_service(capy::execution_context& ctx)
630
2/2
✓ Branch 2 taken 133 times.
✓ Branch 5 taken 133 times.
133 : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
631 {
632 133 }
633
634 266 select_socket_service::
635 133 ~select_socket_service()
636 {
637 266 }
638
639 void
640 133 select_socket_service::
641 shutdown()
642 {
643
1/1
✓ Branch 2 taken 133 times.
133 std::lock_guard lock(state_->mutex_);
644
645
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 133 times.
133 while (auto* impl = state_->socket_list_.pop_front())
646 impl->close_socket();
647
648 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
649 // drains completed_ops_, calling destroy() on each queued op. Letting
650 // ~state_ release the ptrs (during service destruction, after scheduler
651 // shutdown) keeps every impl alive until all ops have been drained.
652 133 }
653
654 tcp_socket::socket_impl&
655 6815 select_socket_service::
656 create_impl()
657 {
658
1/1
✓ Branch 1 taken 6815 times.
6815 auto impl = std::make_shared<select_socket_impl>(*this);
659 6815 auto* raw = impl.get();
660
661 {
662
1/1
✓ Branch 2 taken 6815 times.
6815 std::lock_guard lock(state_->mutex_);
663 6815 state_->socket_list_.push_back(raw);
664
1/1
✓ Branch 3 taken 6815 times.
6815 state_->socket_ptrs_.emplace(raw, std::move(impl));
665 6815 }
666
667 6815 return *raw;
668 6815 }
669
670 void
671 6815 select_socket_service::
672 destroy_impl(tcp_socket::socket_impl& impl)
673 {
674 6815 auto* select_impl = static_cast<select_socket_impl*>(&impl);
675
1/1
✓ Branch 2 taken 6815 times.
6815 std::lock_guard lock(state_->mutex_);
676 6815 state_->socket_list_.remove(select_impl);
677
1/1
✓ Branch 2 taken 6815 times.
6815 state_->socket_ptrs_.erase(select_impl);
678 6815 }
679
680 std::error_code
681 3414 select_socket_service::
682 open_socket(tcp_socket::socket_impl& impl)
683 {
684 3414 auto* select_impl = static_cast<select_socket_impl*>(&impl);
685 3414 select_impl->close_socket();
686
687 3414 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
688
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3414 times.
3414 if (fd < 0)
689 return make_err(errno);
690
691 // Set non-blocking and close-on-exec
692 3414 int flags = ::fcntl(fd, F_GETFL, 0);
693
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3414 times.
3414 if (flags == -1)
694 {
695 int errn = errno;
696 ::close(fd);
697 return make_err(errn);
698 }
699
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3414 times.
3414 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
700 {
701 int errn = errno;
702 ::close(fd);
703 return make_err(errn);
704 }
705
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3414 times.
3414 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
706 {
707 int errn = errno;
708 ::close(fd);
709 return make_err(errn);
710 }
711
712 // Check fd is within select() limits
713
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3414 times.
3414 if (fd >= FD_SETSIZE)
714 {
715 ::close(fd);
716 return make_err(EMFILE); // Too many open files
717 }
718
719 3414 select_impl->fd_ = fd;
720 3414 return {};
721 }
722
723 void
724 174435 select_socket_service::
725 post(select_op* op)
726 {
727 174435 state_->sched_.post(op);
728 174435 }
729
730 void
731 3680 select_socket_service::
732 work_started() noexcept
733 {
734 3680 state_->sched_.work_started();
735 3680 }
736
737 void
738 156 select_socket_service::
739 work_finished() noexcept
740 {
741 156 state_->sched_.work_finished();
742 156 }
743
744 } // namespace boost::corosio::detail
745
746 #endif // BOOST_COROSIO_HAS_SELECT
747