libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.9% Lines (402/497) 89.6% Functions (43/48) 68.6% Branches (208/303)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107
108 187 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
109 187 : key(k)
110 187 , next(n)
111 187 , private_outstanding_work(0)
112 187 , inline_budget(0)
113 {
114 187 }
115 };
116
117 namespace {
118
119 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
120
121 struct thread_context_guard
122 {
123 scheduler_context frame_;
124
125 187 explicit thread_context_guard(
126 epoll_scheduler const* ctx) noexcept
127 187 : frame_(ctx, context_stack.get())
128 {
129 187 context_stack.set(&frame_);
130 187 }
131
132 187 ~thread_context_guard() noexcept
133 {
134
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 187 times.
187 if (!frame_.private_queue.empty())
135 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
136 187 context_stack.set(frame_.next);
137 187 }
138 };
139
140 scheduler_context*
141 466687 find_context(epoll_scheduler const* self) noexcept
142 {
143
2/2
✓ Branch 1 taken 465008 times.
✓ Branch 2 taken 1679 times.
466687 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
144
1/2
✓ Branch 0 taken 465008 times.
✗ Branch 1 not taken.
465008 if (c->key == self)
145 465008 return c;
146 1679 return nullptr;
147 }
148
149 } // namespace
150
151 void
152 87728 epoll_scheduler::
153 reset_inline_budget() const noexcept
154 {
155
1/2
✓ Branch 1 taken 87728 times.
✗ Branch 2 not taken.
87728 if (auto* ctx = find_context(this))
156 87728 ctx->inline_budget = max_inline_budget_;
157 87728 }
158
159 bool
160 234869 epoll_scheduler::
161 try_consume_inline_budget() const noexcept
162 {
163
1/2
✓ Branch 1 taken 234869 times.
✗ Branch 2 not taken.
234869 if (auto* ctx = find_context(this))
164 {
165
2/2
✓ Branch 0 taken 156635 times.
✓ Branch 1 taken 78234 times.
234869 if (ctx->inline_budget > 0)
166 {
167 156635 --ctx->inline_budget;
168 156635 return true;
169 }
170 }
171 78234 return false;
172 }
173
174 void
175 63342 descriptor_state::
176 operator()()
177 {
178 63342 is_enqueued_.store(false, std::memory_order_relaxed);
179
180 // Take ownership of impl ref set by close_socket() to prevent
181 // the owning impl from being freed while we're executing
182 63342 auto prevent_impl_destruction = std::move(impl_ref_);
183
184 63342 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63342 times.
63342 if (ev == 0)
186 {
187 scheduler_->compensating_work_started();
188 return;
189 }
190
191 63342 op_queue local_ops;
192
193 63342 int err = 0;
194
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 63341 times.
63342 if (ev & EPOLLERR)
195 {
196 1 socklen_t len = sizeof(err);
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
198 err = errno;
199
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
200 1 err = EIO;
201 }
202
203 {
204
1/1
✓ Branch 1 taken 63342 times.
63342 std::lock_guard lock(mutex);
205
2/2
✓ Branch 0 taken 19476 times.
✓ Branch 1 taken 43866 times.
63342 if (ev & EPOLLIN)
206 {
207
2/2
✓ Branch 0 taken 4644 times.
✓ Branch 1 taken 14832 times.
19476 if (read_op)
208 {
209 4644 auto* rd = read_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4644 times.
4644 if (err)
211 rd->complete(err, 0);
212 else
213 4644 rd->perform_io();
214
215
2/4
✓ Branch 0 taken 4644 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4644 times.
4644 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
216 {
217 rd->errn = 0;
218 }
219 else
220 {
221 4644 read_op = nullptr;
222 4644 local_ops.push(rd);
223 }
224 }
225 else
226 {
227 14832 read_ready = true;
228 }
229 }
230
2/2
✓ Branch 0 taken 58702 times.
✓ Branch 1 taken 4640 times.
63342 if (ev & EPOLLOUT)
231 {
232
3/4
✓ Branch 0 taken 54059 times.
✓ Branch 1 taken 4643 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 54059 times.
58702 bool had_write_op = (connect_op || write_op);
233
2/2
✓ Branch 0 taken 4643 times.
✓ Branch 1 taken 54059 times.
58702 if (connect_op)
234 {
235 4643 auto* cn = connect_op;
236
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4643 times.
4643 if (err)
237 cn->complete(err, 0);
238 else
239 4643 cn->perform_io();
240 4643 connect_op = nullptr;
241 4643 local_ops.push(cn);
242 }
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 58702 times.
58702 if (write_op)
244 {
245 auto* wr = write_op;
246 if (err)
247 wr->complete(err, 0);
248 else
249 wr->perform_io();
250
251 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
252 {
253 wr->errn = 0;
254 }
255 else
256 {
257 write_op = nullptr;
258 local_ops.push(wr);
259 }
260 }
261
2/2
✓ Branch 0 taken 54059 times.
✓ Branch 1 taken 4643 times.
58702 if (!had_write_op)
262 54059 write_ready = true;
263 }
264
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 63341 times.
63342 if (err)
265 {
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
267 {
268 read_op->complete(err, 0);
269 local_ops.push(std::exchange(read_op, nullptr));
270 }
271
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
272 {
273 write_op->complete(err, 0);
274 local_ops.push(std::exchange(write_op, nullptr));
275 }
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
277 {
278 connect_op->complete(err, 0);
279 local_ops.push(std::exchange(connect_op, nullptr));
280 }
281 }
282 63342 }
283
284 // Execute first handler inline — the scheduler's work_cleanup
285 // accounts for this as the "consumed" work item
286 63342 scheduler_op* first = local_ops.pop();
287
2/2
✓ Branch 0 taken 9287 times.
✓ Branch 1 taken 54055 times.
63342 if (first)
288 {
289
1/1
✓ Branch 1 taken 9287 times.
9287 scheduler_->post_deferred_completions(local_ops);
290
1/1
✓ Branch 1 taken 9287 times.
9287 (*first)();
291 }
292 else
293 {
294 54055 scheduler_->compensating_work_started();
295 }
296 63342 }
297
298 203 epoll_scheduler::
299 epoll_scheduler(
300 capy::execution_context& ctx,
301 203 int)
302 203 : epoll_fd_(-1)
303 203 , event_fd_(-1)
304 203 , timer_fd_(-1)
305 203 , outstanding_work_(0)
306 203 , stopped_(false)
307 203 , shutdown_(false)
308 203 , task_running_{false}
309 203 , task_interrupted_(false)
310 406 , state_(0)
311 {
312 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
314 detail::throw_system_error(make_err(errno), "epoll_create1");
315
316 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
318 {
319 int errn = errno;
320 ::close(epoll_fd_);
321 detail::throw_system_error(make_err(errn), "eventfd");
322 }
323
324 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
326 {
327 int errn = errno;
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "timerfd_create");
331 }
332
333 203 epoll_event ev{};
334 203 ev.events = EPOLLIN | EPOLLET;
335 203 ev.data.ptr = nullptr;
336
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
337 {
338 int errn = errno;
339 ::close(timer_fd_);
340 ::close(event_fd_);
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "epoll_ctl");
343 }
344
345 203 epoll_event timer_ev{};
346 203 timer_ev.events = EPOLLIN | EPOLLERR;
347 203 timer_ev.data.ptr = &timer_fd_;
348
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
349 {
350 int errn = errno;
351 ::close(timer_fd_);
352 ::close(event_fd_);
353 ::close(epoll_fd_);
354 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
355 }
356
357
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
358
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
359 timer_service::callback(
360 this,
361 [](void* p) {
362 4854 auto* self = static_cast<epoll_scheduler*>(p);
363 4854 self->timerfd_stale_.store(true, std::memory_order_release);
364
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4854 times.
4854 if (self->task_running_.load(std::memory_order_acquire))
365 self->interrupt_reactor();
366 4854 }));
367
368 // Initialize resolver service
369
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
370
371 // Initialize signal service
372
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
373
374 // Push task sentinel to interleave reactor runs with handler execution
375 203 completed_ops_.push(&task_op_);
376 203 }
377
378 406 epoll_scheduler::
379 203 ~epoll_scheduler()
380 {
381
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
382 203 ::close(timer_fd_);
383
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
384 203 ::close(event_fd_);
385
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
386 203 ::close(epoll_fd_);
387 406 }
388
389 void
390 203 epoll_scheduler::
391 shutdown()
392 {
393 {
394
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
395 203 shutdown_ = true;
396
397
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
398 {
399
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
400 203 continue;
401 lock.unlock();
402 h->destroy();
403 lock.lock();
404 203 }
405
406 203 signal_all(lock);
407 203 }
408
409 203 outstanding_work_.store(0, std::memory_order_release);
410
411
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
412 203 interrupt_reactor();
413 203 }
414
415 void
416 6687 epoll_scheduler::
417 post(std::coroutine_handle<> h) const
418 {
419 struct post_handler final
420 : scheduler_op
421 {
422 std::coroutine_handle<> h_;
423
424 explicit
425 6687 post_handler(std::coroutine_handle<> h)
426 6687 : h_(h)
427 {
428 6687 }
429
430 13374 ~post_handler() = default;
431
432 6687 void operator()() override
433 {
434 6687 auto h = h_;
435
1/2
✓ Branch 0 taken 6687 times.
✗ Branch 1 not taken.
6687 delete this;
436
1/1
✓ Branch 1 taken 6687 times.
6687 h.resume();
437 6687 }
438
439 void destroy() override
440 {
441 delete this;
442 }
443 };
444
445
1/1
✓ Branch 1 taken 6687 times.
6687 auto ph = std::make_unique<post_handler>(h);
446
447 // Fast path: same thread posts to private queue
448 // Only count locally; work_cleanup batches to global counter
449
2/2
✓ Branch 1 taken 5034 times.
✓ Branch 2 taken 1653 times.
6687 if (auto* ctx = find_context(this))
450 {
451 5034 ++ctx->private_outstanding_work;
452 5034 ctx->private_queue.push(ph.release());
453 5034 return;
454 }
455
456 // Slow path: cross-thread post requires mutex
457 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
458
459
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
460 1653 completed_ops_.push(ph.release());
461
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
462 6687 }
463
464 void
465 83348 epoll_scheduler::
466 post(scheduler_op* h) const
467 {
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 83322 times.
✓ Branch 2 taken 26 times.
83348 if (auto* ctx = find_context(this))
471 {
472 83322 ++ctx->private_outstanding_work;
473 83322 ctx->private_queue.push(h);
474 83322 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
481 26 completed_ops_.push(h);
482
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
483 26 }
484
485 void
486 5576 epoll_scheduler::
487 on_work_started() noexcept
488 {
489 5576 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
490 5576 }
491
492 void
493 5544 epoll_scheduler::
494 on_work_finished() noexcept
495 {
496
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5544 times.
11088 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
497 stop();
498 5544 }
499
500 bool
501 157328 epoll_scheduler::
502 running_in_this_thread() const noexcept
503 {
504
2/2
✓ Branch 1 taken 157088 times.
✓ Branch 2 taken 240 times.
157328 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
505
1/2
✓ Branch 0 taken 157088 times.
✗ Branch 1 not taken.
157088 if (c->key == this)
506 157088 return true;
507 240 return false;
508 }
509
510 void
511 39 epoll_scheduler::
512 stop()
513 {
514
1/1
✓ Branch 1 taken 39 times.
39 std::unique_lock lock(mutex_);
515
2/2
✓ Branch 0 taken 21 times.
✓ Branch 1 taken 18 times.
39 if (!stopped_)
516 {
517 21 stopped_ = true;
518 21 signal_all(lock);
519
1/1
✓ Branch 1 taken 21 times.
21 interrupt_reactor();
520 }
521 39 }
522
523 bool
524 18 epoll_scheduler::
525 stopped() const noexcept
526 {
527 18 std::unique_lock lock(mutex_);
528 36 return stopped_;
529 18 }
530
531 void
532 49 epoll_scheduler::
533 restart()
534 {
535
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
536 49 stopped_ = false;
537 49 }
538
539 std::size_t
540 183 epoll_scheduler::
541 run()
542 {
543
2/2
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 155 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545
1/1
✓ Branch 1 taken 28 times.
28 stop();
546 28 return 0;
547 }
548
549 155 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 155 times.
155 std::unique_lock lock(mutex_);
551
552 155 std::size_t n = 0;
553 for (;;)
554 {
555
3/3
✓ Branch 1 taken 153499 times.
✓ Branch 3 taken 155 times.
✓ Branch 4 taken 153344 times.
153499 if (!do_one(lock, -1, &ctx.frame_))
556 155 break;
557
1/2
✓ Branch 1 taken 153344 times.
✗ Branch 2 not taken.
153344 if (n != (std::numeric_limits<std::size_t>::max)())
558 153344 ++n;
559
2/2
✓ Branch 1 taken 69948 times.
✓ Branch 2 taken 83396 times.
153344 if (!lock.owns_lock())
560
1/1
✓ Branch 1 taken 69948 times.
69948 lock.lock();
561 }
562 155 return n;
563 155 }
564
565 std::size_t
566 2 epoll_scheduler::
567 run_one()
568 {
569
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 stop();
572 return 0;
573 }
574
575 2 thread_context_guard ctx(this);
576
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
577
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
578 2 }
579
580 std::size_t
581 34 epoll_scheduler::
582 wait_one(long usec)
583 {
584
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
585 {
586
1/1
✓ Branch 1 taken 7 times.
7 stop();
587 7 return 0;
588 }
589
590 27 thread_context_guard ctx(this);
591
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
592
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
593 27 }
594
595 std::size_t
596 2 epoll_scheduler::
597 poll()
598 {
599
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 1 time.
1 stop();
602 1 return 0;
603 }
604
605 1 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
607
608 1 std::size_t n = 0;
609 for (;;)
610 {
611
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
612 1 break;
613
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
614 2 ++n;
615
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
616
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
617 }
618 1 return n;
619 1 }
620
621 std::size_t
622 4 epoll_scheduler::
623 poll_one()
624 {
625
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627
1/1
✓ Branch 1 taken 2 times.
2 stop();
628 2 return 0;
629 }
630
631 2 thread_context_guard ctx(this);
632
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
633
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
634 2 }
635
636 void
637 9358 epoll_scheduler::
638 register_descriptor(int fd, descriptor_state* desc) const
639 {
640 9358 epoll_event ev{};
641 9358 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
642 9358 ev.data.ptr = desc;
643
644
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9358 times.
9358 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
645 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
646
647 9358 desc->registered_events = ev.events;
648 9358 desc->fd = fd;
649 9358 desc->scheduler_ = this;
650
651
1/1
✓ Branch 1 taken 9358 times.
9358 std::lock_guard lock(desc->mutex);
652 9358 desc->read_ready = false;
653 9358 desc->write_ready = false;
654 9358 }
655
656 void
657 9358 epoll_scheduler::
658 deregister_descriptor(int fd) const
659 {
660 9358 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
661 9358 }
662
663 void
664 9492 epoll_scheduler::
665 work_started() const noexcept
666 {
667 9492 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
668 9492 }
669
670 void
671 16152 epoll_scheduler::
672 work_finished() const noexcept
673 {
674
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 15994 times.
32304 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
675 {
676 // Last work item completed - wake all threads so they can exit.
677 // signal_all() wakes threads waiting on the condvar.
678 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
679 // Both are needed because they target different blocking mechanisms.
680 158 std::unique_lock lock(mutex_);
681 158 signal_all(lock);
682
5/6
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 157 times.
✓ Branch 3 taken 1 time.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 time.
✓ Branch 6 taken 157 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
683 {
684 1 task_interrupted_ = true;
685 1 lock.unlock();
686 1 interrupt_reactor();
687 }
688 158 }
689 16152 }
690
691 void
692 54055 epoll_scheduler::
693 compensating_work_started() const noexcept
694 {
695 54055 auto* ctx = find_context(this);
696
1/2
✓ Branch 0 taken 54055 times.
✗ Branch 1 not taken.
54055 if (ctx)
697 54055 ++ctx->private_outstanding_work;
698 54055 }
699
700 void
701 epoll_scheduler::
702 drain_thread_queue(op_queue& queue, long count) const
703 {
704 // Note: outstanding_work_ was already incremented when posting
705 std::unique_lock lock(mutex_);
706 completed_ops_.splice(queue);
707 if (count > 0)
708 maybe_unlock_and_signal_one(lock);
709 }
710
711 void
712 9287 epoll_scheduler::
713 post_deferred_completions(op_queue& ops) const
714 {
715
1/2
✓ Branch 1 taken 9287 times.
✗ Branch 2 not taken.
9287 if (ops.empty())
716 9287 return;
717
718 // Fast path: if on scheduler thread, use private queue
719 if (auto* ctx = find_context(this))
720 {
721 ctx->private_queue.splice(ops);
722 return;
723 }
724
725 // Slow path: add to global queue and wake a thread
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(ops);
728 wake_one_thread_and_unlock(lock);
729 }
730
731 void
732 251 epoll_scheduler::
733 interrupt_reactor() const
734 {
735 // Only write if not already armed to avoid redundant writes
736 251 bool expected = false;
737
2/2
✓ Branch 1 taken 236 times.
✓ Branch 2 taken 15 times.
251 if (eventfd_armed_.compare_exchange_strong(expected, true,
738 std::memory_order_release, std::memory_order_relaxed))
739 {
740 236 std::uint64_t val = 1;
741
1/1
✓ Branch 1 taken 236 times.
236 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
742 }
743 251 }
744
745 void
746 382 epoll_scheduler::
747 signal_all(std::unique_lock<std::mutex>&) const
748 {
749 382 state_ |= 1;
750 382 cond_.notify_all();
751 382 }
752
753 bool
754 1679 epoll_scheduler::
755 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
756 {
757 1679 state_ |= 1;
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
759 {
760 lock.unlock();
761 cond_.notify_one();
762 return true;
763 }
764 1679 return false;
765 }
766
767 void
768 194235 epoll_scheduler::
769 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
770 {
771 194235 state_ |= 1;
772 194235 bool have_waiters = state_ > 1;
773 194235 lock.unlock();
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 194235 times.
194235 if (have_waiters)
775 cond_.notify_one();
776 194235 }
777
778 void
779 epoll_scheduler::
780 clear_signal() const
781 {
782 state_ &= ~std::size_t(1);
783 }
784
785 void
786 epoll_scheduler::
787 wait_for_signal(std::unique_lock<std::mutex>& lock) const
788 {
789 while ((state_ & 1) == 0)
790 {
791 state_ += 2;
792 cond_.wait(lock);
793 state_ -= 2;
794 }
795 }
796
797 void
798 epoll_scheduler::
799 wait_for_signal_for(
800 std::unique_lock<std::mutex>& lock,
801 long timeout_us) const
802 {
803 if ((state_ & 1) == 0)
804 {
805 state_ += 2;
806 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
807 state_ -= 2;
808 }
809 }
810
811 void
812 1679 epoll_scheduler::
813 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
814 {
815
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
816 return;
817
818
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
819 {
820 26 task_interrupted_ = true;
821 26 lock.unlock();
822 26 interrupt_reactor();
823 }
824 else
825 {
826 1653 lock.unlock();
827 }
828 }
829
830 /** RAII guard for handler execution work accounting.
831
832 Handler consumes 1 work item, may produce N new items via fast-path posts.
833 Net change = N - 1:
834 - If N > 1: add (N-1) to global (more work produced than consumed)
835 - If N == 1: net zero, do nothing
836 - If N < 1: call work_finished() (work consumed, may trigger stop)
837
838 Also drains private queue to global for other threads to process.
839 */
840 struct work_cleanup
841 {
842 epoll_scheduler const* scheduler;
843 std::unique_lock<std::mutex>* lock;
844 scheduler_context* ctx;
845
846 153377 ~work_cleanup()
847 {
848
1/2
✓ Branch 0 taken 153377 times.
✗ Branch 1 not taken.
153377 if (ctx)
849 {
850 153377 long produced = ctx->private_outstanding_work;
851
2/2
✓ Branch 0 taken 95 times.
✓ Branch 1 taken 153282 times.
153377 if (produced > 1)
852 95 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
853
2/2
✓ Branch 0 taken 15915 times.
✓ Branch 1 taken 137367 times.
153282 else if (produced < 1)
854 15915 scheduler->work_finished();
855 // produced == 1: net zero, handler consumed what it produced
856 153377 ctx->private_outstanding_work = 0;
857
858
2/2
✓ Branch 1 taken 83407 times.
✓ Branch 2 taken 69970 times.
153377 if (!ctx->private_queue.empty())
859 {
860 83407 lock->lock();
861 83407 scheduler->completed_ops_.splice(ctx->private_queue);
862 }
863 }
864 else
865 {
866 // No thread context - slow-path op was already counted globally
867 scheduler->work_finished();
868 }
869 153377 }
870 };
871
872 /** RAII guard for reactor work accounting.
873
874 Reactor only produces work via timer/signal callbacks posting handlers.
875 Unlike handler execution which consumes 1, the reactor consumes nothing.
876 All produced work must be flushed to global counter.
877 */
878 struct task_cleanup
879 {
880 epoll_scheduler const* scheduler;
881 std::unique_lock<std::mutex>* lock;
882 scheduler_context* ctx;
883
884 50377 ~task_cleanup()
885 50377 {
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 50377 times.
50377 if (!ctx)
887 return;
888
889
2/2
✓ Branch 0 taken 4849 times.
✓ Branch 1 taken 45528 times.
50377 if (ctx->private_outstanding_work > 0)
890 {
891 4849 scheduler->outstanding_work_.fetch_add(
892 4849 ctx->private_outstanding_work, std::memory_order_relaxed);
893 4849 ctx->private_outstanding_work = 0;
894 }
895
896
2/2
✓ Branch 1 taken 4849 times.
✓ Branch 2 taken 45528 times.
50377 if (!ctx->private_queue.empty())
897 {
898
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4849 times.
4849 if (!lock->owns_lock())
899 lock->lock();
900 4849 scheduler->completed_ops_.splice(ctx->private_queue);
901 }
902 50377 }
903 };
904
905 void
906 9691 epoll_scheduler::
907 update_timerfd() const
908 {
909 9691 auto nearest = timer_svc_->nearest_expiry();
910
911 9691 itimerspec ts{};
912 9691 int flags = 0;
913
914
3/3
✓ Branch 2 taken 9691 times.
✓ Branch 4 taken 9647 times.
✓ Branch 5 taken 44 times.
9691 if (nearest == timer_service::time_point::max())
915 {
916 // No timers - disarm by setting to 0 (relative)
917 }
918 else
919 {
920 9647 auto now = std::chrono::steady_clock::now();
921
3/3
✓ Branch 1 taken 9647 times.
✓ Branch 4 taken 94 times.
✓ Branch 5 taken 9553 times.
9647 if (nearest <= now)
922 {
923 // Use 1ns instead of 0 - zero disarms the timerfd
924 94 ts.it_value.tv_nsec = 1;
925 }
926 else
927 {
928 9553 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
929
1/1
✓ Branch 1 taken 9553 times.
19106 nearest - now).count();
930 9553 ts.it_value.tv_sec = nsec / 1000000000;
931 9553 ts.it_value.tv_nsec = nsec % 1000000000;
932 // Ensure non-zero to avoid disarming if duration rounds to 0
933
3/4
✓ Branch 0 taken 9542 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9542 times.
9553 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
934 ts.it_value.tv_nsec = 1;
935 }
936 }
937
938
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9691 times.
9691 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
939 detail::throw_system_error(make_err(errno), "timerfd_settime");
940 9691 }
941
942 void
943 50377 epoll_scheduler::
944 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
945 {
946
2/2
✓ Branch 0 taken 40858 times.
✓ Branch 1 taken 9519 times.
50377 int timeout_ms = task_interrupted_ ? 0 : -1;
947
948
2/2
✓ Branch 1 taken 9519 times.
✓ Branch 2 taken 40858 times.
50377 if (lock.owns_lock())
949
1/1
✓ Branch 1 taken 9519 times.
9519 lock.unlock();
950
951 50377 task_cleanup on_exit{this, &lock, ctx};
952
953 // Flush deferred timerfd programming before blocking
954
2/2
✓ Branch 1 taken 4842 times.
✓ Branch 2 taken 45535 times.
50377 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
955
1/1
✓ Branch 1 taken 4842 times.
4842 update_timerfd();
956
957 // Event loop runs without mutex held
958 epoll_event events[128];
959
1/1
✓ Branch 1 taken 50377 times.
50377 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
960
961
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 50377 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
50377 if (nfds < 0 && errno != EINTR)
962 detail::throw_system_error(make_err(errno), "epoll_wait");
963
964 50377 bool check_timers = false;
965 50377 op_queue local_ops;
966
967 // Process events without holding the mutex
968
2/2
✓ Branch 0 taken 68224 times.
✓ Branch 1 taken 50377 times.
118601 for (int i = 0; i < nfds; ++i)
969 {
970
2/2
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 68191 times.
68224 if (events[i].data.ptr == nullptr)
971 {
972 std::uint64_t val;
973
1/1
✓ Branch 1 taken 33 times.
33 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
974 33 eventfd_armed_.store(false, std::memory_order_relaxed);
975 33 continue;
976 33 }
977
978
2/2
✓ Branch 0 taken 4849 times.
✓ Branch 1 taken 63342 times.
68191 if (events[i].data.ptr == &timer_fd_)
979 {
980 std::uint64_t expirations;
981
1/1
✓ Branch 1 taken 4849 times.
4849 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
982 4849 check_timers = true;
983 4849 continue;
984 4849 }
985
986 // Deferred I/O: just set ready events and enqueue descriptor
987 // No per-descriptor mutex locking in reactor hot path!
988 63342 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
989 63342 desc->add_ready_events(events[i].events);
990
991 // Only enqueue if not already enqueued
992 63342 bool expected = false;
993
1/2
✓ Branch 1 taken 63342 times.
✗ Branch 2 not taken.
63342 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
994 std::memory_order_release, std::memory_order_relaxed))
995 {
996 63342 local_ops.push(desc);
997 }
998 }
999
1000 // Process timers only when timerfd fires
1001
2/2
✓ Branch 0 taken 4849 times.
✓ Branch 1 taken 45528 times.
50377 if (check_timers)
1002 {
1003
1/1
✓ Branch 1 taken 4849 times.
4849 timer_svc_->process_expired();
1004
1/1
✓ Branch 1 taken 4849 times.
4849 update_timerfd();
1005 }
1006
1007
1/1
✓ Branch 1 taken 50377 times.
50377 lock.lock();
1008
1009
2/2
✓ Branch 1 taken 40403 times.
✓ Branch 2 taken 9974 times.
50377 if (!local_ops.empty())
1010 40403 completed_ops_.splice(local_ops);
1011 50377 }
1012
1013 std::size_t
1014 153533 epoll_scheduler::
1015 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1016 {
1017 for (;;)
1018 {
1019
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 203908 times.
203910 if (stopped_)
1020 2 return 0;
1021
1022 203908 scheduler_op* op = completed_ops_.pop();
1023
1024 // Handle reactor sentinel - time to poll for I/O
1025
2/2
✓ Branch 0 taken 50529 times.
✓ Branch 1 taken 153379 times.
203908 if (op == &task_op_)
1026 {
1027 50529 bool more_handlers = !completed_ops_.empty();
1028
1029 // Nothing to run the reactor for: no pending work to wait on,
1030 // or caller requested a non-blocking poll
1031
4/4
✓ Branch 0 taken 9671 times.
✓ Branch 1 taken 40858 times.
✓ Branch 2 taken 152 times.
✓ Branch 3 taken 50377 times.
60200 if (!more_handlers &&
1032
3/4
✓ Branch 1 taken 9519 times.
✓ Branch 2 taken 152 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9519 times.
19342 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1033 timeout_us == 0))
1034 {
1035 152 completed_ops_.push(&task_op_);
1036 152 return 0;
1037 }
1038
1039
3/4
✓ Branch 0 taken 9519 times.
✓ Branch 1 taken 40858 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9519 times.
50377 task_interrupted_ = more_handlers || timeout_us == 0;
1040 50377 task_running_.store(true, std::memory_order_release);
1041
1042
2/2
✓ Branch 0 taken 40858 times.
✓ Branch 1 taken 9519 times.
50377 if (more_handlers)
1043 40858 unlock_and_signal_one(lock);
1044
1045 50377 run_task(lock, ctx);
1046
1047 50377 task_running_.store(false, std::memory_order_relaxed);
1048 50377 completed_ops_.push(&task_op_);
1049 50377 continue;
1050 50377 }
1051
1052 // Handle operation
1053
2/2
✓ Branch 0 taken 153377 times.
✓ Branch 1 taken 2 times.
153379 if (op != nullptr)
1054 {
1055
1/2
✓ Branch 1 taken 153377 times.
✗ Branch 2 not taken.
153377 if (!completed_ops_.empty())
1056
1/1
✓ Branch 1 taken 153377 times.
153377 unlock_and_signal_one(lock);
1057 else
1058 lock.unlock();
1059
1060 153377 work_cleanup on_exit{this, &lock, ctx};
1061
1062
1/1
✓ Branch 1 taken 153377 times.
153377 (*op)();
1063 153377 return 1;
1064 153377 }
1065
1066 // No pending work to wait on, or caller requested non-blocking poll
1067
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1068 timeout_us == 0)
1069 2 return 0;
1070
1071 clear_signal();
1072 if (timeout_us < 0)
1073 wait_for_signal(lock);
1074 else
1075 wait_for_signal_for(lock, timeout_us);
1076 50377 }
1077 }
1078
1079 } // namespace boost::corosio::detail
1080
1081 #endif
1082