libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include "src/detail/scheduler_impl.hpp"
21 #include "src/detail/scheduler_op.hpp"
22
23 #include <atomic>
24 #include <condition_variable>
25 #include <cstddef>
26 #include <cstdint>
27 #include <mutex>
28
29 namespace boost::corosio::detail {
30
31 struct epoll_op;
32 struct descriptor_state;
33 struct scheduler_context;
34
35 /** Linux scheduler using epoll for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using Linux epoll
38 for efficient I/O event notification. It uses a single reactor model
39 where one thread runs epoll_wait while other threads
40 wait on a condition variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - IOCP parity: Behavior matches Windows I/O completion port semantics
45
46 When threads call run(), they first try to execute queued handlers.
47 If the queue is empty and no reactor is running, one thread becomes
48 the reactor and runs epoll_wait. Other threads wait on a condition
49 variable until handlers are available.
50
51 @par Thread Safety
52 All public member functions are thread-safe.
53 */
54 class epoll_scheduler
55 : public scheduler_impl
56 , public capy::execution_context::service
57 {
58 public:
59 using key_type = scheduler;
60
61 /** Construct the scheduler.
62
63 Creates an epoll instance, eventfd for reactor interruption,
64 and timerfd for kernel-managed timer expiry.
65
66 @param ctx Reference to the owning execution_context.
67 @param concurrency_hint Hint for expected thread count (unused).
68 */
69 epoll_scheduler(
70 capy::execution_context& ctx,
71 int concurrency_hint = -1);
72
73 /// Destroy the scheduler.
74 ~epoll_scheduler();
75
76 epoll_scheduler(epoll_scheduler const&) = delete;
77 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
78
79 void shutdown() override;
80 void post(std::coroutine_handle<> h) const override;
81 void post(scheduler_op* h) const override;
82 void on_work_started() noexcept override;
83 void on_work_finished() noexcept override;
84 bool running_in_this_thread() const noexcept override;
85 void stop() override;
86 bool stopped() const noexcept override;
87 void restart() override;
88 std::size_t run() override;
89 std::size_t run_one() override;
90 std::size_t wait_one(long usec) override;
91 std::size_t poll() override;
92 std::size_t poll_one() override;
93
94 /** Return the epoll file descriptor.
95
96 Used by socket services to register file descriptors
97 for I/O event notification.
98
99 @return The epoll file descriptor.
100 */
101 int epoll_fd() const noexcept { return epoll_fd_; }
102
103 /** Reset the thread's inline completion budget.
104
105 Called at the start of each posted completion handler to
106 grant a fresh budget for speculative inline completions.
107 */
108 void reset_inline_budget() const noexcept;
109
110 /** Consume one unit of inline budget if available.
111
112 @return True if budget was available and consumed.
113 */
114 bool try_consume_inline_budget() const noexcept;
115
116 /** Register a descriptor for persistent monitoring.
117
118 The fd is registered once and stays registered until explicitly
119 deregistered. Events are dispatched via descriptor_state which
120 tracks pending read/write/connect operations.
121
122 @param fd The file descriptor to register.
123 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
124 */
125 void register_descriptor(int fd, descriptor_state* desc) const;
126
127 /** Deregister a persistently registered descriptor.
128
129 @param fd The file descriptor to deregister.
130 */
131 void deregister_descriptor(int fd) const;
132
133 /** For use by I/O operations to track pending work. */
134 void work_started() const noexcept override;
135
136 /** For use by I/O operations to track completed work. */
137 void work_finished() const noexcept override;
138
139 /** Offset a forthcoming work_finished from work_cleanup.
140
141 Called by descriptor_state when all I/O returned EAGAIN and no
142 handler will be executed. Must be called from a scheduler thread.
143 */
144 void compensating_work_started() const noexcept;
145
146 /** Drain work from thread context's private queue to global queue.
147
148 Called by thread_context_guard destructor when a thread exits run().
149 Transfers pending work to the global queue under mutex protection.
150
151 @param queue The private queue to drain.
152 @param count Item count for wakeup decisions (wakes other threads if positive).
153 */
154 void drain_thread_queue(op_queue& queue, long count) const;
155
156 /** Post completed operations for deferred invocation.
157
158 If called from a thread running this scheduler, operations go to
159 the thread's private queue (fast path). Otherwise, operations are
160 added to the global queue under mutex and a waiter is signaled.
161
162 @par Preconditions
163 work_started() must have been called for each operation.
164
165 @param ops Queue of operations to post.
166 */
167 void post_deferred_completions(op_queue& ops) const;
168
169 private:
170 friend struct work_cleanup;
171 friend struct task_cleanup;
172
173 std::size_t do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx);
174 void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
175 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
176 void interrupt_reactor() const;
177 void update_timerfd() const;
178
179 /** Set the signaled state and wake all waiting threads.
180
181 @par Preconditions
182 Mutex must be held.
183
184 @param lock The held mutex lock.
185 */
186 void signal_all(std::unique_lock<std::mutex>& lock) const;
187
188 /** Set the signaled state and wake one waiter if any exist.
189
190 Only unlocks and signals if at least one thread is waiting.
191 Use this when the caller needs to perform a fallback action
192 (such as interrupting the reactor) when no waiters exist.
193
194 @par Preconditions
195 Mutex must be held.
196
197 @param lock The held mutex lock.
198
199 @return `true` if unlocked and signaled, `false` if lock still held.
200 */
201 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
202
203 /** Set the signaled state, unlock, and wake one waiter if any exist.
204
205 Always unlocks the mutex. Use this when the caller will release
206 the lock regardless of whether a waiter exists.
207
208 @par Preconditions
209 Mutex must be held.
210
211 @param lock The held mutex lock.
212 */
213 void unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
214
215 /** Clear the signaled state before waiting.
216
217 @par Preconditions
218 Mutex must be held.
219 */
220 void clear_signal() const;
221
222 /** Block until the signaled state is set.
223
224 Returns immediately if already signaled (fast-path). Otherwise
225 increments the waiter count, waits on the condition variable,
226 and decrements the waiter count upon waking.
227
228 @par Preconditions
229 Mutex must be held.
230
231 @param lock The held mutex lock.
232 */
233 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
234
235 /** Block until signaled or timeout expires.
236
237 @par Preconditions
238 Mutex must be held.
239
240 @param lock The held mutex lock.
241 @param timeout_us Maximum time to wait in microseconds.
242 */
243 void wait_for_signal_for(
244 std::unique_lock<std::mutex>& lock,
245 long timeout_us) const;
246
247 int epoll_fd_;
248 int event_fd_; // for interrupting reactor
249 int timer_fd_; // timerfd for kernel-managed timer expiry
250 int max_inline_budget_ = 2;
251 mutable std::mutex mutex_;
252 mutable std::condition_variable cond_;
253 mutable op_queue completed_ops_;
254 mutable std::atomic<long> outstanding_work_;
255 bool stopped_;
256 bool shutdown_;
257
258 // True while a thread is blocked in epoll_wait. Used by
259 // wake_one_thread_and_unlock and work_finished to know when
260 // an eventfd interrupt is needed instead of a condvar signal.
261 mutable std::atomic<bool> task_running_{false};
262
263 // True when the reactor has been told to do a non-blocking poll
264 // (more handlers queued or poll mode). Prevents redundant eventfd
265 // writes and controls the epoll_wait timeout.
266 mutable bool task_interrupted_ = false;
267
268 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
269 mutable std::size_t state_ = 0;
270
271 // Edge-triggered eventfd state
272 mutable std::atomic<bool> eventfd_armed_{false};
273
274 // Set when the earliest timer changes; flushed before epoll_wait
275 // blocks. Avoids timerfd_settime syscalls for timers that are
276 // scheduled then cancelled without being waited on.
277 mutable std::atomic<bool> timerfd_stale_{false};
278
279 // Sentinel operation for interleaving reactor runs with handler execution.
280 // Ensures the reactor runs periodically even when handlers are continuously
281 // posted, preventing starvation of I/O events, timers, and signals.
282 struct task_op final : scheduler_op
283 {
284 void operator()() override {}
285 void destroy() override {}
286 };
287 task_op task_op_;
288 };
289
290 } // namespace boost::corosio::detail
291
292 #endif // BOOST_COROSIO_HAS_EPOLL
293
294 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
295