libs/corosio/src/corosio/src/detail/epoll/sockets.hpp

91.7% Lines (11/12) 85.7% Functions (6/7) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/sockets.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/tcp_socket.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include "src/detail/intrusive.hpp"
22 #include "src/detail/socket_service.hpp"
23
24 #include "src/detail/epoll/op.hpp"
25 #include "src/detail/epoll/scheduler.hpp"
26
27 #include <coroutine>
28 #include <memory>
29 #include <mutex>
30 #include <unordered_map>
31
32 /*
33 epoll Socket Implementation
34 ===========================
35
36 Each I/O operation follows the same pattern:
37 1. Try the syscall immediately (non-blocking socket)
38 2. If it succeeds or fails with a real error, post to completion queue
39 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
40
41 This "try first" approach avoids unnecessary epoll round-trips for
42 operations that can complete immediately (common for small reads/writes
43 on fast local connections).
44
45 One-Shot Registration
46 ---------------------
47 We use one-shot epoll registration: each operation registers, waits for
48 one event, then unregisters. This simplifies the state machine since we
49 don't need to track whether an fd is currently registered or handle
50 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
51 simplicity is worth it.
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 epoll_socket_service owns all socket impls. destroy_impl() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 class epoll_socket_service;
84 class epoll_socket_impl;
85
86 /// Socket implementation for epoll backend.
87 class epoll_socket_impl
88 : public tcp_socket::socket_impl
89 , public std::enable_shared_from_this<epoll_socket_impl>
90 , public intrusive_list<epoll_socket_impl>::node
91 {
92 friend class epoll_socket_service;
93
94 public:
95 explicit epoll_socket_impl(epoll_socket_service& svc) noexcept;
96 ~epoll_socket_impl();
97
98 void release() override;
99
100 std::coroutine_handle<> connect(
101 std::coroutine_handle<>,
102 capy::executor_ref,
103 endpoint,
104 std::stop_token,
105 std::error_code*) override;
106
107 std::coroutine_handle<> read_some(
108 std::coroutine_handle<>,
109 capy::executor_ref,
110 io_buffer_param,
111 std::stop_token,
112 std::error_code*,
113 std::size_t*) override;
114
115 std::coroutine_handle<> write_some(
116 std::coroutine_handle<>,
117 capy::executor_ref,
118 io_buffer_param,
119 std::stop_token,
120 std::error_code*,
121 std::size_t*) override;
122
123 std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
124
125 native_handle_type native_handle() const noexcept override { return fd_; }
126
127 // Socket options
128 std::error_code set_no_delay(bool value) noexcept override;
129 bool no_delay(std::error_code& ec) const noexcept override;
130
131 std::error_code set_keep_alive(bool value) noexcept override;
132 bool keep_alive(std::error_code& ec) const noexcept override;
133
134 std::error_code set_receive_buffer_size(int size) noexcept override;
135 int receive_buffer_size(std::error_code& ec) const noexcept override;
136
137 std::error_code set_send_buffer_size(int size) noexcept override;
138 int send_buffer_size(std::error_code& ec) const noexcept override;
139
140 std::error_code set_linger(bool enabled, int timeout) noexcept override;
141 tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
142
143 16 endpoint local_endpoint() const noexcept override { return local_endpoint_; }
144 16 endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
145 bool is_open() const noexcept { return fd_ >= 0; }
146 void cancel() noexcept override;
147 void cancel_single_op(epoll_op& op) noexcept;
148 void close_socket() noexcept;
149 4642 void set_socket(int fd) noexcept { fd_ = fd; }
150 9284 void set_endpoints(endpoint local, endpoint remote) noexcept
151 {
152 9284 local_endpoint_ = local;
153 9284 remote_endpoint_ = remote;
154 9284 }
155
156 epoll_connect_op conn_;
157 epoll_read_op rd_;
158 epoll_write_op wr_;
159
160 /// Per-descriptor state for persistent epoll registration
161 descriptor_state desc_state_;
162
163 private:
164 epoll_socket_service& svc_;
165 int fd_ = -1;
166 endpoint local_endpoint_;
167 endpoint remote_endpoint_;
168
169 void register_op(
170 epoll_op& op,
171 epoll_op*& desc_slot,
172 bool& ready_flag,
173 bool& cancel_flag) noexcept;
174
175 friend struct epoll_op;
176 friend struct epoll_connect_op;
177 };
178
179 /** State for epoll socket service. */
180 class epoll_socket_state
181 {
182 public:
183 203 explicit epoll_socket_state(epoll_scheduler& sched) noexcept
184 203 : sched_(sched)
185 {
186 203 }
187
188 epoll_scheduler& sched_;
189 std::mutex mutex_;
190 intrusive_list<epoll_socket_impl> socket_list_;
191 std::unordered_map<epoll_socket_impl*, std::shared_ptr<epoll_socket_impl>> socket_ptrs_;
192 };
193
194 /** epoll socket service implementation.
195
196 Inherits from socket_service to enable runtime polymorphism.
197 Uses key_type = socket_service for service lookup.
198 */
199 class epoll_socket_service : public socket_service
200 {
201 public:
202 explicit epoll_socket_service(capy::execution_context& ctx);
203 ~epoll_socket_service();
204
205 epoll_socket_service(epoll_socket_service const&) = delete;
206 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
207
208 void shutdown() override;
209
210 tcp_socket::socket_impl& create_impl() override;
211 void destroy_impl(tcp_socket::socket_impl& impl) override;
212 std::error_code open_socket(tcp_socket::socket_impl& impl) override;
213
214 336536 epoll_scheduler& scheduler() const noexcept { return state_->sched_; }
215 void post(epoll_op* op);
216 void work_started() noexcept;
217 void work_finished() noexcept;
218
219 private:
220 std::unique_ptr<epoll_socket_state> state_;
221 };
222
223 } // namespace boost::corosio::detail
224
225 #endif // BOOST_COROSIO_HAS_EPOLL
226
227 #endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
228