src/corosio/src/detail/epoll/scheduler.cpp

82.5% Lines (419/508) 93.8% Functions (45/48) 69.6% Branches (215/309)
src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107 int inline_budget_max;
108 bool unassisted;
109
110 203 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
111 203 : key(k)
112 203 , next(n)
113 203 , private_outstanding_work(0)
114 203 , inline_budget(0)
115 203 , inline_budget_max(2)
116 203 , unassisted(false)
117 {
118 203 }
119 };
120
121 namespace {
122
123 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
124
125 struct thread_context_guard
126 {
127 scheduler_context frame_;
128
129 203 explicit thread_context_guard(
130 epoll_scheduler const* ctx) noexcept
131 203 : frame_(ctx, context_stack.get())
132 {
133 203 context_stack.set(&frame_);
134 203 }
135
136 203 ~thread_context_guard() noexcept
137 {
138
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (!frame_.private_queue.empty())
139 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
140 203 context_stack.set(frame_.next);
141 203 }
142 };
143
144 scheduler_context*
145 650972 find_context(epoll_scheduler const* self) noexcept
146 {
147
2/2
✓ Branch 1 taken 649293 times.
✓ Branch 2 taken 1679 times.
650972 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
148
1/2
✓ Branch 0 taken 649293 times.
✗ Branch 1 not taken.
649293 if (c->key == self)
149 649293 return c;
150 1679 return nullptr;
151 }
152
153 } // namespace
154
155 void
156 91464 epoll_scheduler::
157 reset_inline_budget() const noexcept
158 {
159
1/2
✓ Branch 1 taken 91464 times.
✗ Branch 2 not taken.
91464 if (auto* ctx = find_context(this))
160 {
161 // Cap when no other thread absorbed queued work. A moderate
162 // cap (4) amortizes scheduling for small buffers while avoiding
163 // bursty I/O that fills socket buffers and stalls large transfers.
164
1/2
✓ Branch 0 taken 91464 times.
✗ Branch 1 not taken.
91464 if (ctx->unassisted)
165 {
166 91464 ctx->inline_budget_max = 4;
167 91464 ctx->inline_budget = 4;
168 91464 return;
169 }
170 // Ramp up when previous cycle fully consumed budget.
171 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
172 if (ctx->inline_budget == 0)
173 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
174 else if (ctx->inline_budget < ctx->inline_budget_max)
175 ctx->inline_budget_max = 2;
176 ctx->inline_budget = ctx->inline_budget_max;
177 }
178 }
179
180 bool
181 408313 epoll_scheduler::
182 try_consume_inline_budget() const noexcept
183 {
184
1/2
✓ Branch 1 taken 408313 times.
✗ Branch 2 not taken.
408313 if (auto* ctx = find_context(this))
185 {
186
2/2
✓ Branch 0 taken 326726 times.
✓ Branch 1 taken 81587 times.
408313 if (ctx->inline_budget > 0)
187 {
188 326726 --ctx->inline_budget;
189 326726 return true;
190 }
191 }
192 81587 return false;
193 }
194
195 void
196 67086 descriptor_state::
197 operator()()
198 {
199 67086 is_enqueued_.store(false, std::memory_order_relaxed);
200
201 // Take ownership of impl ref set by close_socket() to prevent
202 // the owning impl from being freed while we're executing
203 67086 auto prevent_impl_destruction = std::move(impl_ref_);
204
205 67086 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 67086 times.
67086 if (ev == 0)
207 {
208 scheduler_->compensating_work_started();
209 return;
210 }
211
212 67086 op_queue local_ops;
213
214 67086 int err = 0;
215
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 67085 times.
67086 if (ev & EPOLLERR)
216 {
217 1 socklen_t len = sizeof(err);
218
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
219 err = errno;
220
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
221 1 err = EIO;
222 }
223
224 {
225
1/1
✓ Branch 1 taken 67086 times.
67086 std::lock_guard lock(mutex);
226
2/2
✓ Branch 0 taken 21311 times.
✓ Branch 1 taken 45775 times.
67086 if (ev & EPOLLIN)
227 {
228
2/2
✓ Branch 0 taken 4834 times.
✓ Branch 1 taken 16477 times.
21311 if (read_op)
229 {
230 4834 auto* rd = read_op;
231
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4834 times.
4834 if (err)
232 rd->complete(err, 0);
233 else
234 4834 rd->perform_io();
235
236
2/4
✓ Branch 0 taken 4834 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4834 times.
4834 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
237 {
238 rd->errn = 0;
239 }
240 else
241 {
242 4834 read_op = nullptr;
243 4834 local_ops.push(rd);
244 }
245 }
246 else
247 {
248 16477 read_ready = true;
249 }
250 }
251
2/2
✓ Branch 0 taken 62256 times.
✓ Branch 1 taken 4830 times.
67086 if (ev & EPOLLOUT)
252 {
253
3/4
✓ Branch 0 taken 57423 times.
✓ Branch 1 taken 4833 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 57423 times.
62256 bool had_write_op = (connect_op || write_op);
254
2/2
✓ Branch 0 taken 4833 times.
✓ Branch 1 taken 57423 times.
62256 if (connect_op)
255 {
256 4833 auto* cn = connect_op;
257
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4833 times.
4833 if (err)
258 cn->complete(err, 0);
259 else
260 4833 cn->perform_io();
261 4833 connect_op = nullptr;
262 4833 local_ops.push(cn);
263 }
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 62256 times.
62256 if (write_op)
265 {
266 auto* wr = write_op;
267 if (err)
268 wr->complete(err, 0);
269 else
270 wr->perform_io();
271
272 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
273 {
274 wr->errn = 0;
275 }
276 else
277 {
278 write_op = nullptr;
279 local_ops.push(wr);
280 }
281 }
282
2/2
✓ Branch 0 taken 57423 times.
✓ Branch 1 taken 4833 times.
62256 if (!had_write_op)
283 57423 write_ready = true;
284 }
285
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 67085 times.
67086 if (err)
286 {
287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
288 {
289 read_op->complete(err, 0);
290 local_ops.push(std::exchange(read_op, nullptr));
291 }
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
293 {
294 write_op->complete(err, 0);
295 local_ops.push(std::exchange(write_op, nullptr));
296 }
297
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
298 {
299 connect_op->complete(err, 0);
300 local_ops.push(std::exchange(connect_op, nullptr));
301 }
302 }
303 67086 }
304
305 // Execute first handler inline — the scheduler's work_cleanup
306 // accounts for this as the "consumed" work item
307 67086 scheduler_op* first = local_ops.pop();
308
2/2
✓ Branch 0 taken 9667 times.
✓ Branch 1 taken 57419 times.
67086 if (first)
309 {
310
1/1
✓ Branch 1 taken 9667 times.
9667 scheduler_->post_deferred_completions(local_ops);
311
1/1
✓ Branch 1 taken 9667 times.
9667 (*first)();
312 }
313 else
314 {
315 57419 scheduler_->compensating_work_started();
316 }
317 67086 }
318
319 203 epoll_scheduler::
320 epoll_scheduler(
321 capy::execution_context& ctx,
322 203 int)
323 203 : epoll_fd_(-1)
324 203 , event_fd_(-1)
325 203 , timer_fd_(-1)
326 203 , outstanding_work_(0)
327 203 , stopped_(false)
328 203 , shutdown_(false)
329 203 , task_running_{false}
330 203 , task_interrupted_(false)
331 406 , state_(0)
332 {
333 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
334
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
335 detail::throw_system_error(make_err(errno), "epoll_create1");
336
337 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
338
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
339 {
340 int errn = errno;
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "eventfd");
343 }
344
345 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
347 {
348 int errn = errno;
349 ::close(event_fd_);
350 ::close(epoll_fd_);
351 detail::throw_system_error(make_err(errn), "timerfd_create");
352 }
353
354 203 epoll_event ev{};
355 203 ev.events = EPOLLIN | EPOLLET;
356 203 ev.data.ptr = nullptr;
357
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
358 {
359 int errn = errno;
360 ::close(timer_fd_);
361 ::close(event_fd_);
362 ::close(epoll_fd_);
363 detail::throw_system_error(make_err(errn), "epoll_ctl");
364 }
365
366 203 epoll_event timer_ev{};
367 203 timer_ev.events = EPOLLIN | EPOLLERR;
368 203 timer_ev.data.ptr = &timer_fd_;
369
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
370 {
371 int errn = errno;
372 ::close(timer_fd_);
373 ::close(event_fd_);
374 ::close(epoll_fd_);
375 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
376 }
377
378
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
379
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
380 timer_service::callback(
381 this,
382 [](void* p) {
383 5045 auto* self = static_cast<epoll_scheduler*>(p);
384 5045 self->timerfd_stale_.store(true, std::memory_order_release);
385
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5045 times.
5045 if (self->task_running_.load(std::memory_order_acquire))
386 self->interrupt_reactor();
387 5045 }));
388
389 // Initialize resolver service
390
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
391
392 // Initialize signal service
393
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
394
395 // Push task sentinel to interleave reactor runs with handler execution
396 203 completed_ops_.push(&task_op_);
397 203 }
398
399 406 epoll_scheduler::
400 203 ~epoll_scheduler()
401 {
402
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
403 203 ::close(timer_fd_);
404
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
405 203 ::close(event_fd_);
406
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
407 203 ::close(epoll_fd_);
408 406 }
409
410 void
411 203 epoll_scheduler::
412 shutdown()
413 {
414 {
415
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
416 203 shutdown_ = true;
417
418
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
419 {
420
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
421 203 continue;
422 lock.unlock();
423 h->destroy();
424 lock.lock();
425 203 }
426
427 203 signal_all(lock);
428 203 }
429
430 203 outstanding_work_.store(0, std::memory_order_release);
431
432
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
433 203 interrupt_reactor();
434 203 }
435
436 void
437 6882 epoll_scheduler::
438 post(std::coroutine_handle<> h) const
439 {
440 struct post_handler final
441 : scheduler_op
442 {
443 std::coroutine_handle<> h_;
444
445 explicit
446 6882 post_handler(std::coroutine_handle<> h)
447 6882 : h_(h)
448 {
449 6882 }
450
451 13764 ~post_handler() = default;
452
453 6882 void operator()() override
454 {
455 6882 auto h = h_;
456
1/2
✓ Branch 0 taken 6882 times.
✗ Branch 1 not taken.
6882 delete this;
457
1/1
✓ Branch 1 taken 6882 times.
6882 h.resume();
458 6882 }
459
460 void destroy() override
461 {
462 delete this;
463 }
464 };
465
466
1/1
✓ Branch 1 taken 6882 times.
6882 auto ph = std::make_unique<post_handler>(h);
467
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 5229 times.
✓ Branch 2 taken 1653 times.
6882 if (auto* ctx = find_context(this))
471 {
472 5229 ++ctx->private_outstanding_work;
473 5229 ctx->private_queue.push(ph.release());
474 5229 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
481 1653 completed_ops_.push(ph.release());
482
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
483 6882 }
484
485 void
486 86894 epoll_scheduler::
487 post(scheduler_op* h) const
488 {
489 // Fast path: same thread posts to private queue
490 // Only count locally; work_cleanup batches to global counter
491
2/2
✓ Branch 1 taken 86868 times.
✓ Branch 2 taken 26 times.
86894 if (auto* ctx = find_context(this))
492 {
493 86868 ++ctx->private_outstanding_work;
494 86868 ctx->private_queue.push(h);
495 86868 return;
496 }
497
498 // Slow path: cross-thread post requires mutex
499 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
500
501
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
502 26 completed_ops_.push(h);
503
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
504 26 }
505
506 void
507 5772 epoll_scheduler::
508 on_work_started() noexcept
509 {
510 5772 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
511 5772 }
512
513 void
514 5740 epoll_scheduler::
515 on_work_finished() noexcept
516 {
517
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5740 times.
11480 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
518 stop();
519 5740 }
520
521 bool
522 327425 epoll_scheduler::
523 running_in_this_thread() const noexcept
524 {
525
2/2
✓ Branch 1 taken 327185 times.
✓ Branch 2 taken 240 times.
327425 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
526
1/2
✓ Branch 0 taken 327185 times.
✗ Branch 1 not taken.
327185 if (c->key == this)
527 327185 return true;
528 240 return false;
529 }
530
531 void
532 23 epoll_scheduler::
533 stop()
534 {
535
1/1
✓ Branch 1 taken 23 times.
23 std::unique_lock lock(mutex_);
536
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 6 times.
23 if (!stopped_)
537 {
538 17 stopped_ = true;
539 17 signal_all(lock);
540
1/1
✓ Branch 1 taken 17 times.
17 interrupt_reactor();
541 }
542 23 }
543
544 bool
545 18 epoll_scheduler::
546 stopped() const noexcept
547 {
548 18 std::unique_lock lock(mutex_);
549 36 return stopped_;
550 18 }
551
552 void
553 49 epoll_scheduler::
554 restart()
555 {
556
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
557 49 stopped_ = false;
558 49 }
559
560 std::size_t
561 183 epoll_scheduler::
562 run()
563 {
564
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 171 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
565 {
566
1/1
✓ Branch 1 taken 12 times.
12 stop();
567 12 return 0;
568 }
569
570 171 thread_context_guard ctx(this);
571
1/1
✓ Branch 1 taken 171 times.
171 std::unique_lock lock(mutex_);
572
573 171 std::size_t n = 0;
574 for (;;)
575 {
576
3/3
✓ Branch 1 taken 161000 times.
✓ Branch 3 taken 171 times.
✓ Branch 4 taken 160829 times.
161000 if (!do_one(lock, -1, &ctx.frame_))
577 171 break;
578
1/2
✓ Branch 1 taken 160829 times.
✗ Branch 2 not taken.
160829 if (n != (std::numeric_limits<std::size_t>::max)())
579 160829 ++n;
580
2/2
✓ Branch 1 taken 73794 times.
✓ Branch 2 taken 87035 times.
160829 if (!lock.owns_lock())
581
1/1
✓ Branch 1 taken 73794 times.
73794 lock.lock();
582 }
583 171 return n;
584 171 }
585
586 std::size_t
587 2 epoll_scheduler::
588 run_one()
589 {
590
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
591 {
592 stop();
593 return 0;
594 }
595
596 2 thread_context_guard ctx(this);
597
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
598
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
599 2 }
600
601 std::size_t
602 34 epoll_scheduler::
603 wait_one(long usec)
604 {
605
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
606 {
607
1/1
✓ Branch 1 taken 7 times.
7 stop();
608 7 return 0;
609 }
610
611 27 thread_context_guard ctx(this);
612
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
613
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
614 27 }
615
616 std::size_t
617 2 epoll_scheduler::
618 poll()
619 {
620
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
621 {
622
1/1
✓ Branch 1 taken 1 time.
1 stop();
623 1 return 0;
624 }
625
626 1 thread_context_guard ctx(this);
627
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
628
629 1 std::size_t n = 0;
630 for (;;)
631 {
632
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
633 1 break;
634
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
635 2 ++n;
636
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
637
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
638 }
639 1 return n;
640 1 }
641
642 std::size_t
643 4 epoll_scheduler::
644 poll_one()
645 {
646
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
647 {
648
1/1
✓ Branch 1 taken 2 times.
2 stop();
649 2 return 0;
650 }
651
652 2 thread_context_guard ctx(this);
653
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
654
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
655 2 }
656
657 void
658 9738 epoll_scheduler::
659 register_descriptor(int fd, descriptor_state* desc) const
660 {
661 9738 epoll_event ev{};
662 9738 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
663 9738 ev.data.ptr = desc;
664
665
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9738 times.
9738 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
666 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
667
668 9738 desc->registered_events = ev.events;
669 9738 desc->fd = fd;
670 9738 desc->scheduler_ = this;
671
672
1/1
✓ Branch 1 taken 9738 times.
9738 std::lock_guard lock(desc->mutex);
673 9738 desc->read_ready = false;
674 9738 desc->write_ready = false;
675 9738 }
676
677 void
678 9738 epoll_scheduler::
679 deregister_descriptor(int fd) const
680 {
681 9738 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
682 9738 }
683
684 void
685 9875 epoll_scheduler::
686 work_started() const noexcept
687 {
688 9875 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
689 9875 }
690
691 void
692 16637 epoll_scheduler::
693 work_finished() const noexcept
694 {
695
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 16479 times.
33274 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
696 {
697 // Last work item completed - wake all threads so they can exit.
698 // signal_all() wakes threads waiting on the condvar.
699 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
700 // Both are needed because they target different blocking mechanisms.
701 158 std::unique_lock lock(mutex_);
702 158 signal_all(lock);
703
5/6
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 150 times.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 8 times.
✓ Branch 6 taken 150 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
704 {
705 8 task_interrupted_ = true;
706 8 lock.unlock();
707 8 interrupt_reactor();
708 }
709 158 }
710 16637 }
711
712 void
713 57419 epoll_scheduler::
714 compensating_work_started() const noexcept
715 {
716 57419 auto* ctx = find_context(this);
717
1/2
✓ Branch 0 taken 57419 times.
✗ Branch 1 not taken.
57419 if (ctx)
718 57419 ++ctx->private_outstanding_work;
719 57419 }
720
721 void
722 epoll_scheduler::
723 drain_thread_queue(op_queue& queue, long count) const
724 {
725 // Note: outstanding_work_ was already incremented when posting
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(queue);
728 if (count > 0)
729 maybe_unlock_and_signal_one(lock);
730 }
731
732 void
733 9667 epoll_scheduler::
734 post_deferred_completions(op_queue& ops) const
735 {
736
1/2
✓ Branch 1 taken 9667 times.
✗ Branch 2 not taken.
9667 if (ops.empty())
737 9667 return;
738
739 // Fast path: if on scheduler thread, use private queue
740 if (auto* ctx = find_context(this))
741 {
742 ctx->private_queue.splice(ops);
743 return;
744 }
745
746 // Slow path: add to global queue and wake a thread
747 std::unique_lock lock(mutex_);
748 completed_ops_.splice(ops);
749 wake_one_thread_and_unlock(lock);
750 }
751
752 void
753 254 epoll_scheduler::
754 interrupt_reactor() const
755 {
756 // Only write if not already armed to avoid redundant writes
757 254 bool expected = false;
758
2/2
✓ Branch 1 taken 243 times.
✓ Branch 2 taken 11 times.
254 if (eventfd_armed_.compare_exchange_strong(expected, true,
759 std::memory_order_release, std::memory_order_relaxed))
760 {
761 243 std::uint64_t val = 1;
762
1/1
✓ Branch 1 taken 243 times.
243 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
763 }
764 254 }
765
766 void
767 378 epoll_scheduler::
768 signal_all(std::unique_lock<std::mutex>&) const
769 {
770 378 state_ |= 1;
771 378 cond_.notify_all();
772 378 }
773
774 bool
775 1679 epoll_scheduler::
776 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
777 {
778 1679 state_ |= 1;
779
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
780 {
781 lock.unlock();
782 cond_.notify_one();
783 return true;
784 }
785 1679 return false;
786 }
787
788 bool
789 205219 epoll_scheduler::
790 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
791 {
792 205219 state_ |= 1;
793 205219 bool have_waiters = state_ > 1;
794 205219 lock.unlock();
795
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 205219 times.
205219 if (have_waiters)
796 cond_.notify_one();
797 205219 return have_waiters;
798 }
799
800 void
801 1 epoll_scheduler::
802 clear_signal() const
803 {
804 1 state_ &= ~std::size_t(1);
805 1 }
806
807 void
808 1 epoll_scheduler::
809 wait_for_signal(std::unique_lock<std::mutex>& lock) const
810 {
811
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 while ((state_ & 1) == 0)
812 {
813 1 state_ += 2;
814 1 cond_.wait(lock);
815 1 state_ -= 2;
816 }
817 1 }
818
819 void
820 epoll_scheduler::
821 wait_for_signal_for(
822 std::unique_lock<std::mutex>& lock,
823 long timeout_us) const
824 {
825 if ((state_ & 1) == 0)
826 {
827 state_ += 2;
828 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
829 state_ -= 2;
830 }
831 }
832
833 void
834 1679 epoll_scheduler::
835 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
836 {
837
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
838 return;
839
840
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
841 {
842 26 task_interrupted_ = true;
843 26 lock.unlock();
844 26 interrupt_reactor();
845 }
846 else
847 {
848 1653 lock.unlock();
849 }
850 }
851
852 /** RAII guard for handler execution work accounting.
853
854 Handler consumes 1 work item, may produce N new items via fast-path posts.
855 Net change = N - 1:
856 - If N > 1: add (N-1) to global (more work produced than consumed)
857 - If N == 1: net zero, do nothing
858 - If N < 1: call work_finished() (work consumed, may trigger stop)
859
860 Also drains private queue to global for other threads to process.
861 */
862 struct work_cleanup
863 {
864 epoll_scheduler const* scheduler;
865 std::unique_lock<std::mutex>* lock;
866 scheduler_context* ctx;
867
868 160862 ~work_cleanup()
869 {
870
1/2
✓ Branch 0 taken 160862 times.
✗ Branch 1 not taken.
160862 if (ctx)
871 {
872 160862 long produced = ctx->private_outstanding_work;
873
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 160855 times.
160862 if (produced > 1)
874 7 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
875
2/2
✓ Branch 0 taken 16397 times.
✓ Branch 1 taken 144458 times.
160855 else if (produced < 1)
876 16397 scheduler->work_finished();
877 // produced == 1: net zero, handler consumed what it produced
878 160862 ctx->private_outstanding_work = 0;
879
880
2/2
✓ Branch 1 taken 87046 times.
✓ Branch 2 taken 73816 times.
160862 if (!ctx->private_queue.empty())
881 {
882 87046 lock->lock();
883 87046 scheduler->completed_ops_.splice(ctx->private_queue);
884 }
885 }
886 else
887 {
888 // No thread context - slow-path op was already counted globally
889 scheduler->work_finished();
890 }
891 160862 }
892 };
893
894 /** RAII guard for reactor work accounting.
895
896 Reactor only produces work via timer/signal callbacks posting handlers.
897 Unlike handler execution which consumes 1, the reactor consumes nothing.
898 All produced work must be flushed to global counter.
899 */
900 struct task_cleanup
901 {
902 epoll_scheduler const* scheduler;
903 std::unique_lock<std::mutex>* lock;
904 scheduler_context* ctx;
905
906 54263 ~task_cleanup()
907 54263 {
908
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54263 times.
54263 if (!ctx)
909 return;
910
911
2/2
✓ Branch 0 taken 5039 times.
✓ Branch 1 taken 49224 times.
54263 if (ctx->private_outstanding_work > 0)
912 {
913 5039 scheduler->outstanding_work_.fetch_add(
914 5039 ctx->private_outstanding_work, std::memory_order_relaxed);
915 5039 ctx->private_outstanding_work = 0;
916 }
917
918
2/2
✓ Branch 1 taken 5039 times.
✓ Branch 2 taken 49224 times.
54263 if (!ctx->private_queue.empty())
919 {
920
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5039 times.
5039 if (!lock->owns_lock())
921 lock->lock();
922 5039 scheduler->completed_ops_.splice(ctx->private_queue);
923 }
924 54263 }
925 };
926
927 void
928 10072 epoll_scheduler::
929 update_timerfd() const
930 {
931 10072 auto nearest = timer_svc_->nearest_expiry();
932
933 10072 itimerspec ts{};
934 10072 int flags = 0;
935
936
3/3
✓ Branch 2 taken 10072 times.
✓ Branch 4 taken 10028 times.
✓ Branch 5 taken 44 times.
10072 if (nearest == timer_service::time_point::max())
937 {
938 // No timers - disarm by setting to 0 (relative)
939 }
940 else
941 {
942 10028 auto now = std::chrono::steady_clock::now();
943
3/3
✓ Branch 1 taken 10028 times.
✓ Branch 4 taken 100 times.
✓ Branch 5 taken 9928 times.
10028 if (nearest <= now)
944 {
945 // Use 1ns instead of 0 - zero disarms the timerfd
946 100 ts.it_value.tv_nsec = 1;
947 }
948 else
949 {
950 9928 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
951
1/1
✓ Branch 1 taken 9928 times.
19856 nearest - now).count();
952 9928 ts.it_value.tv_sec = nsec / 1000000000;
953 9928 ts.it_value.tv_nsec = nsec % 1000000000;
954 // Ensure non-zero to avoid disarming if duration rounds to 0
955
3/4
✓ Branch 0 taken 9917 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9917 times.
9928 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
956 ts.it_value.tv_nsec = 1;
957 }
958 }
959
960
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10072 times.
10072 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
961 detail::throw_system_error(make_err(errno), "timerfd_settime");
962 10072 }
963
964 void
965 54263 epoll_scheduler::
966 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
967 {
968
2/2
✓ Branch 0 taken 44357 times.
✓ Branch 1 taken 9906 times.
54263 int timeout_ms = task_interrupted_ ? 0 : -1;
969
970
2/2
✓ Branch 1 taken 9906 times.
✓ Branch 2 taken 44357 times.
54263 if (lock.owns_lock())
971
1/1
✓ Branch 1 taken 9906 times.
9906 lock.unlock();
972
973 54263 task_cleanup on_exit{this, &lock, ctx};
974
975 // Flush deferred timerfd programming before blocking
976
2/2
✓ Branch 1 taken 5033 times.
✓ Branch 2 taken 49230 times.
54263 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
977
1/1
✓ Branch 1 taken 5033 times.
5033 update_timerfd();
978
979 // Event loop runs without mutex held
980 epoll_event events[128];
981
1/1
✓ Branch 1 taken 54263 times.
54263 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
982
983
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 54263 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
54263 if (nfds < 0 && errno != EINTR)
984 detail::throw_system_error(make_err(errno), "epoll_wait");
985
986 54263 bool check_timers = false;
987 54263 op_queue local_ops;
988
989 // Process events without holding the mutex
990
2/2
✓ Branch 0 taken 72165 times.
✓ Branch 1 taken 54263 times.
126428 for (int i = 0; i < nfds; ++i)
991 {
992
2/2
✓ Branch 0 taken 40 times.
✓ Branch 1 taken 72125 times.
72165 if (events[i].data.ptr == nullptr)
993 {
994 std::uint64_t val;
995
1/1
✓ Branch 1 taken 40 times.
40 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
996 40 eventfd_armed_.store(false, std::memory_order_relaxed);
997 40 continue;
998 40 }
999
1000
2/2
✓ Branch 0 taken 5039 times.
✓ Branch 1 taken 67086 times.
72125 if (events[i].data.ptr == &timer_fd_)
1001 {
1002 std::uint64_t expirations;
1003
1/1
✓ Branch 1 taken 5039 times.
5039 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
1004 5039 check_timers = true;
1005 5039 continue;
1006 5039 }
1007
1008 // Deferred I/O: just set ready events and enqueue descriptor
1009 // No per-descriptor mutex locking in reactor hot path!
1010 67086 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1011 67086 desc->add_ready_events(events[i].events);
1012
1013 // Only enqueue if not already enqueued
1014 67086 bool expected = false;
1015
1/2
✓ Branch 1 taken 67086 times.
✗ Branch 2 not taken.
67086 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
1016 std::memory_order_release, std::memory_order_relaxed))
1017 {
1018 67086 local_ops.push(desc);
1019 }
1020 }
1021
1022 // Process timers only when timerfd fires
1023
2/2
✓ Branch 0 taken 5039 times.
✓ Branch 1 taken 49224 times.
54263 if (check_timers)
1024 {
1025
1/1
✓ Branch 1 taken 5039 times.
5039 timer_svc_->process_expired();
1026
1/1
✓ Branch 1 taken 5039 times.
5039 update_timerfd();
1027 }
1028
1029
1/1
✓ Branch 1 taken 54263 times.
54263 lock.lock();
1030
1031
2/2
✓ Branch 1 taken 43897 times.
✓ Branch 2 taken 10366 times.
54263 if (!local_ops.empty())
1032 43897 completed_ops_.splice(local_ops);
1033 54263 }
1034
1035 std::size_t
1036 161034 epoll_scheduler::
1037 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1038 {
1039 for (;;)
1040 {
1041
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 215294 times.
215298 if (stopped_)
1042 4 return 0;
1043
1044 215294 scheduler_op* op = completed_ops_.pop();
1045
1046 // Handle reactor sentinel - time to poll for I/O
1047
2/2
✓ Branch 0 taken 54414 times.
✓ Branch 1 taken 160880 times.
215294 if (op == &task_op_)
1048 {
1049 54414 bool more_handlers = !completed_ops_.empty();
1050
1051 // Nothing to run the reactor for: no pending work to wait on,
1052 // or caller requested a non-blocking poll
1053
4/4
✓ Branch 0 taken 10057 times.
✓ Branch 1 taken 44357 times.
✓ Branch 2 taken 151 times.
✓ Branch 3 taken 54263 times.
64471 if (!more_handlers &&
1054
3/4
✓ Branch 1 taken 9906 times.
✓ Branch 2 taken 151 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9906 times.
20114 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1055 timeout_us == 0))
1056 {
1057 151 completed_ops_.push(&task_op_);
1058 151 return 0;
1059 }
1060
1061
3/4
✓ Branch 0 taken 9906 times.
✓ Branch 1 taken 44357 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9906 times.
54263 task_interrupted_ = more_handlers || timeout_us == 0;
1062 54263 task_running_.store(true, std::memory_order_release);
1063
1064
2/2
✓ Branch 0 taken 44357 times.
✓ Branch 1 taken 9906 times.
54263 if (more_handlers)
1065 44357 unlock_and_signal_one(lock);
1066
1067 54263 run_task(lock, ctx);
1068
1069 54263 task_running_.store(false, std::memory_order_relaxed);
1070 54263 completed_ops_.push(&task_op_);
1071 54263 continue;
1072 54263 }
1073
1074 // Handle operation
1075
2/2
✓ Branch 0 taken 160862 times.
✓ Branch 1 taken 18 times.
160880 if (op != nullptr)
1076 {
1077 160862 bool more = !completed_ops_.empty();
1078
1079
1/2
✓ Branch 0 taken 160862 times.
✗ Branch 1 not taken.
160862 if (more)
1080
1/1
✓ Branch 1 taken 160862 times.
160862 ctx->unassisted = !unlock_and_signal_one(lock);
1081 else
1082 {
1083 ctx->unassisted = false;
1084 lock.unlock();
1085 }
1086
1087 160862 work_cleanup on_exit{this, &lock, ctx};
1088
1089
1/1
✓ Branch 1 taken 160862 times.
160862 (*op)();
1090 160862 return 1;
1091 160862 }
1092
1093 // No pending work to wait on, or caller requested non-blocking poll
1094
5/6
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✓ Branch 5 taken 17 times.
✓ Branch 6 taken 1 time.
36 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1095 timeout_us == 0)
1096 17 return 0;
1097
1098 1 clear_signal();
1099
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (timeout_us < 0)
1100 1 wait_for_signal(lock);
1101 else
1102 wait_for_signal_for(lock, timeout_us);
1103 54264 }
1104 }
1105
1106 } // namespace boost::corosio::detail
1107
1108 #endif
1109