libs/corosio/src/corosio/src/detail/select/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/select/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include "src/detail/scheduler_impl.hpp"
21 #include "src/detail/scheduler_op.hpp"
22
23 #include <sys/select.h>
24
25 #include <atomic>
26 #include <condition_variable>
27 #include <cstddef>
28 #include <mutex>
29 #include <unordered_map>
30
31 namespace boost::corosio::detail {
32
33 struct select_op;
34
35 /** POSIX scheduler using select() for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using the POSIX select()
38 call for I/O event notification. It uses a single reactor model
39 where one thread runs select() while other threads wait on a condition
40 variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - Portability: Works on all POSIX systems
45
46 The design mirrors epoll_scheduler for behavioral consistency:
47 - Same single-reactor thread coordination model
48 - Same work counting semantics
49 - Same timer integration pattern
50
51 Known Limitations:
52 - FD_SETSIZE (~1024) limits maximum concurrent connections
53 - O(n) scanning: rebuilds fd_sets each iteration
54 - Level-triggered only (no edge-triggered mode)
55
56 @par Thread Safety
57 All public member functions are thread-safe.
58 */
59 class select_scheduler
60 : public scheduler_impl
61 , public capy::execution_context::service
62 {
63 public:
64 using key_type = scheduler;
65
66 /** Construct the scheduler.
67
68 Creates a self-pipe for reactor interruption.
69
70 @param ctx Reference to the owning execution_context.
71 @param concurrency_hint Hint for expected thread count (unused).
72 */
73 select_scheduler(
74 capy::execution_context& ctx,
75 int concurrency_hint = -1);
76
77 ~select_scheduler();
78
79 select_scheduler(select_scheduler const&) = delete;
80 select_scheduler& operator=(select_scheduler const&) = delete;
81
82 void shutdown() override;
83 void post(std::coroutine_handle<> h) const override;
84 void post(scheduler_op* h) const override;
85 void on_work_started() noexcept override;
86 void on_work_finished() noexcept override;
87 bool running_in_this_thread() const noexcept override;
88 void stop() override;
89 bool stopped() const noexcept override;
90 void restart() override;
91 std::size_t run() override;
92 std::size_t run_one() override;
93 std::size_t wait_one(long usec) override;
94 std::size_t poll() override;
95 std::size_t poll_one() override;
96
97 /** Return the maximum file descriptor value supported.
98
99 Returns FD_SETSIZE - 1, the maximum fd value that can be
100 monitored by select(). Operations with fd >= FD_SETSIZE
101 will fail with EINVAL.
102
103 @return The maximum supported file descriptor value.
104 */
105 static constexpr int max_fd() noexcept { return FD_SETSIZE - 1; }
106
107 /** Register a file descriptor for monitoring.
108
109 @param fd The file descriptor to register.
110 @param op The operation associated with this fd.
111 @param events Event mask: 1 = read, 2 = write, 3 = both.
112 */
113 void register_fd(int fd, select_op* op, int events) const;
114
115 /** Unregister a file descriptor from monitoring.
116
117 @param fd The file descriptor to unregister.
118 @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
119 */
120 void deregister_fd(int fd, int events) const;
121
122 /** For use by I/O operations to track pending work. */
123 void work_started() const noexcept override;
124
125 /** For use by I/O operations to track completed work. */
126 void work_finished() const noexcept override;
127
128 // Event flags for register_fd/deregister_fd
129 static constexpr int event_read = 1;
130 static constexpr int event_write = 2;
131
132 private:
133 std::size_t do_one(long timeout_us);
134 void run_reactor(std::unique_lock<std::mutex>& lock);
135 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
136 void interrupt_reactor() const;
137 long calculate_timeout(long requested_timeout_us) const;
138
139 // Self-pipe for interrupting select()
140 int pipe_fds_[2]; // [0]=read, [1]=write
141
142 mutable std::mutex mutex_;
143 mutable std::condition_variable wakeup_event_;
144 mutable op_queue completed_ops_;
145 mutable std::atomic<long> outstanding_work_;
146 std::atomic<bool> stopped_;
147 bool shutdown_;
148
149 // Per-fd state for tracking registered operations
150 struct fd_state
151 {
152 select_op* read_op = nullptr;
153 select_op* write_op = nullptr;
154 };
155 mutable std::unordered_map<int, fd_state> registered_fds_;
156 mutable int max_fd_ = -1;
157
158 // Single reactor thread coordination
159 mutable bool reactor_running_ = false;
160 mutable bool reactor_interrupted_ = false;
161 mutable int idle_thread_count_ = 0;
162
163 // Sentinel operation for interleaving reactor runs with handler execution.
164 // Ensures the reactor runs periodically even when handlers are continuously
165 // posted, preventing timer starvation.
166 struct task_op final : scheduler_op
167 {
168 void operator()() override {}
169 void destroy() override {}
170 };
171 task_op task_op_;
172 };
173
174 } // namespace boost::corosio::detail
175
176 #endif // BOOST_COROSIO_HAS_SELECT
177
178 #endif // BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
179