libs/corosio/src/corosio/src/detail/timer_service.cpp

87.2% Lines (321/368) 93.3% Functions (42/45) 73.3% Branches (126/172)
libs/corosio/src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11 #include "src/detail/scheduler_impl.hpp"
12
13 #include <boost/corosio/basic_io_context.hpp>
14 #include <boost/corosio/detail/thread_local_ptr.hpp>
15 #include "src/detail/scheduler_op.hpp"
16 #include "src/detail/intrusive.hpp"
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <system_error>
20
21 #include <atomic>
22 #include <coroutine>
23 #include <limits>
24 #include <mutex>
25 #include <optional>
26 #include <stop_token>
27 #include <vector>
28
29 /*
30 Timer Service
31 =============
32
33 The public timer class holds an opaque timer_impl* and forwards
34 all operations through extern free functions defined at the bottom
35 of this file.
36
37 Data Structures
38 ---------------
39 waiter_node holds per-waiter state: coroutine handle, executor,
40 error output, stop_token, embedded completion_op. Each concurrent
41 co_await t.wait() allocates one waiter_node.
42
43 timer_impl holds per-timer state: expiry, heap index, and an
44 intrusive_list of waiter_nodes. Multiple coroutines can wait on
45 the same timer simultaneously.
46
47 timer_service_impl owns a min-heap of active timers, a free list
48 of recycled impls, and a free list of recycled waiter_nodes. The
49 heap is ordered by expiry time; the scheduler queries
50 nearest_expiry() to set the epoll/timerfd timeout.
51
52 Optimization Strategy
53 ---------------------
54 The common timer lifecycle is: construct, set expiry, cancel or
55 wait, destroy. Several optimizations target this path:
56
57 1. Deferred heap insertion — expires_after() stores the expiry
58 but does not insert into the heap. Insertion happens in
59 wait(). If the timer is cancelled or destroyed before wait(),
60 the heap is never touched and no mutex is taken. This also
61 enables the already-expired fast path: when wait() sees
62 expiry <= now before inserting, it posts the coroutine
63 handle to the executor and returns noop_coroutine — no
64 heap, no mutex, no epoll. This is only possible because
65 the coroutine API guarantees wait() always follows
66 expires_after(); callback APIs cannot assume this call
67 order.
68
69 2. Thread-local impl cache — A single-slot per-thread cache of
70 timer_impl avoids the mutex on create/destroy for the common
71 create-then-destroy-on-same-thread pattern. On pop, if the
72 cached impl's svc_ doesn't match the current service, the
73 stale impl is deleted eagerly rather than reused.
74
75 3. Embedded completion_op — Each waiter_node embeds a
76 scheduler_op subclass, eliminating heap allocation per
77 fire/cancel. Its destroy() is a no-op since the waiter_node
78 owns the lifetime.
79
80 4. Cached nearest expiry — An atomic<int64_t> mirrors the heap
81 root's time, updated under the lock. nearest_expiry() and
82 empty() read the atomic without locking.
83
84 5. might_have_pending_waits_ flag — Set on wait(), cleared on
85 cancel. Lets cancel_timer() return without locking when no
86 wait was ever issued.
87
88 6. Thread-local waiter cache — Single-slot per-thread cache of
89 waiter_node avoids the free-list mutex for the common
90 wait-then-complete-on-same-thread pattern.
91
92 With all fast paths hit (idle timer, same thread), the
93 schedule/cancel cycle takes zero mutex locks.
94
95 Concurrency
96 -----------
97 stop_token callbacks can fire from any thread. The impl_
98 pointer on waiter_node is used as a "still in list" marker:
99 set to nullptr under the mutex when a waiter is removed by
100 cancel_timer() or process_expired(). cancel_waiter() checks
101 this under the mutex to avoid double-removal races.
102
103 Multiple io_contexts in the same program are safe. The
104 service pointer is obtained directly from the scheduler,
105 and TL-cached impls are validated by comparing svc_ against
106 the current service pointer. Waiter nodes have no service
107 affinity and can safely migrate between contexts.
108 */
109
110 namespace boost::corosio::detail {
111
112 class timer_service_impl;
113 struct timer_impl;
114 struct waiter_node;
115
116 void timer_service_invalidate_cache() noexcept;
117
118 struct waiter_node
119 : intrusive_list<waiter_node>::node
120 {
121 // Embedded completion op — avoids heap allocation per fire/cancel
122 struct completion_op final : scheduler_op
123 {
124 waiter_node* waiter_ = nullptr;
125
126 static void do_complete(
127 void* owner,
128 scheduler_op* base,
129 std::uint32_t,
130 std::uint32_t);
131
132 142 completion_op() noexcept
133 142 : scheduler_op(&do_complete)
134 {
135 142 }
136
137 void operator()() override;
138 // No-op — lifetime owned by waiter_node, not the scheduler queue
139 void destroy() override {}
140 };
141
142 // Per-waiter stop_token cancellation
143 struct canceller
144 {
145 waiter_node* waiter_;
146 void operator()() const;
147 };
148
149 // nullptr once removed from timer's waiter list (concurrency marker)
150 timer_impl* impl_ = nullptr;
151 timer_service_impl* svc_ = nullptr;
152 std::coroutine_handle<> h_;
153 capy::executor_ref d_;
154 std::error_code* ec_out_ = nullptr;
155 std::stop_token token_;
156 std::optional<std::stop_callback<canceller>> stop_cb_;
157 completion_op op_;
158 std::error_code ec_value_;
159 waiter_node* next_free_ = nullptr;
160
161 142 waiter_node() noexcept
162 142 {
163 142 op_.waiter_ = this;
164 142 }
165 };
166
167 struct timer_impl
168 : timer::timer_impl
169 {
170 using clock_type = std::chrono::steady_clock;
171 using time_point = clock_type::time_point;
172 using duration = clock_type::duration;
173
174 timer_service_impl* svc_ = nullptr;
175 time_point expiry_;
176 std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
177 // Lets cancel_timer() skip the lock when no wait() was ever issued
178 bool might_have_pending_waits_ = false;
179 intrusive_list<waiter_node> waiters_;
180
181 // Free list linkage (reused when impl is on free_list)
182 timer_impl* next_free_ = nullptr;
183
184 explicit timer_impl(timer_service_impl& svc) noexcept;
185
186
187 void release() override;
188
189 std::coroutine_handle<> wait(
190 std::coroutine_handle<>,
191 capy::executor_ref,
192 std::stop_token,
193 std::error_code*) override;
194 };
195
196 timer_impl* try_pop_tl_cache(timer_service_impl*) noexcept;
197 bool try_push_tl_cache(timer_impl*) noexcept;
198 waiter_node* try_pop_waiter_tl_cache() noexcept;
199 bool try_push_waiter_tl_cache(waiter_node*) noexcept;
200
201 class timer_service_impl : public timer_service
202 {
203 public:
204 using clock_type = std::chrono::steady_clock;
205 using time_point = clock_type::time_point;
206 using key_type = timer_service;
207
208 private:
209 struct heap_entry
210 {
211 time_point time_;
212 timer_impl* timer_;
213 };
214
215 scheduler* sched_ = nullptr;
216 mutable std::mutex mutex_;
217 std::vector<heap_entry> heap_;
218 timer_impl* free_list_ = nullptr;
219 waiter_node* waiter_free_list_ = nullptr;
220 callback on_earliest_changed_;
221 // Avoids mutex in nearest_expiry() and empty()
222 mutable std::atomic<std::int64_t> cached_nearest_ns_{
223 (std::numeric_limits<std::int64_t>::max)()};
224
225 public:
226 336 timer_service_impl(capy::execution_context&, scheduler& sched)
227 336 : timer_service()
228 336 , sched_(&sched)
229 {
230 336 }
231
232 17000 scheduler& get_scheduler() noexcept { return *sched_; }
233
234 672 ~timer_service_impl() = default;
235
236 timer_service_impl(timer_service_impl const&) = delete;
237 timer_service_impl& operator=(timer_service_impl const&) = delete;
238
239 336 void set_on_earliest_changed(callback cb) override
240 {
241 336 on_earliest_changed_ = cb;
242 336 }
243
244 336 void shutdown() override
245 {
246 336 timer_service_invalidate_cache();
247
248 // Cancel waiting timers still in the heap
249
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 336 times.
336 for (auto& entry : heap_)
250 {
251 auto* impl = entry.timer_;
252 while (auto* w = impl->waiters_.pop_front())
253 {
254 w->stop_cb_.reset();
255 w->h_.destroy();
256 sched_->on_work_finished();
257 delete w;
258 }
259 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
260 delete impl;
261 }
262 336 heap_.clear();
263 336 cached_nearest_ns_.store(
264 (std::numeric_limits<std::int64_t>::max)(),
265 std::memory_order_release);
266
267 // Delete free-listed impls
268
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 336 times.
384 while (free_list_)
269 {
270 48 auto* next = free_list_->next_free_;
271
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 delete free_list_;
272 48 free_list_ = next;
273 }
274
275 // Delete free-listed waiters
276
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 336 times.
394 while (waiter_free_list_)
277 {
278 58 auto* next = waiter_free_list_->next_free_;
279
1/2
✓ Branch 0 taken 58 times.
✗ Branch 1 not taken.
58 delete waiter_free_list_;
280 58 waiter_free_list_ = next;
281 }
282 336 }
283
284 8780 timer::timer_impl* create_impl() override
285 {
286 8780 timer_impl* impl = try_pop_tl_cache(this);
287
2/2
✓ Branch 0 taken 8607 times.
✓ Branch 1 taken 173 times.
8780 if (impl)
288 {
289 8607 impl->svc_ = this;
290 8607 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
291 8607 impl->might_have_pending_waits_ = false;
292 8607 return impl;
293 }
294
295
1/1
✓ Branch 1 taken 173 times.
173 std::lock_guard lock(mutex_);
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 173 times.
173 if (free_list_)
297 {
298 impl = free_list_;
299 free_list_ = impl->next_free_;
300 impl->next_free_ = nullptr;
301 impl->svc_ = this;
302 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
303 impl->might_have_pending_waits_ = false;
304 }
305 else
306 {
307
1/1
✓ Branch 1 taken 173 times.
173 impl = new timer_impl(*this);
308 }
309 173 return impl;
310 173 }
311
312 8780 void destroy_impl(timer_impl& impl)
313 {
314
1/1
✓ Branch 1 taken 8780 times.
8780 cancel_timer(impl);
315
316
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8780 times.
8780 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
317 {
318 std::lock_guard lock(mutex_);
319 remove_timer_impl(impl);
320 refresh_cached_nearest();
321 }
322
323
2/2
✓ Branch 1 taken 8732 times.
✓ Branch 2 taken 48 times.
8780 if (try_push_tl_cache(&impl))
324 8732 return;
325
326
1/1
✓ Branch 1 taken 48 times.
48 std::lock_guard lock(mutex_);
327 48 impl.next_free_ = free_list_;
328 48 free_list_ = &impl;
329 48 }
330
331 8500 waiter_node* create_waiter()
332 {
333
2/2
✓ Branch 1 taken 8358 times.
✓ Branch 2 taken 142 times.
8500 if (auto* w = try_pop_waiter_tl_cache())
334 8358 return w;
335
336
1/1
✓ Branch 1 taken 142 times.
142 std::lock_guard lock(mutex_);
337
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 142 times.
142 if (waiter_free_list_)
338 {
339 auto* w = waiter_free_list_;
340 waiter_free_list_ = w->next_free_;
341 w->next_free_ = nullptr;
342 return w;
343 }
344
345
1/1
✓ Branch 1 taken 142 times.
142 return new waiter_node();
346 142 }
347
348 8500 void destroy_waiter(waiter_node* w)
349 {
350
2/2
✓ Branch 1 taken 8442 times.
✓ Branch 2 taken 58 times.
8500 if (try_push_waiter_tl_cache(w))
351 8442 return;
352
353
1/1
✓ Branch 1 taken 58 times.
58 std::lock_guard lock(mutex_);
354 58 w->next_free_ = waiter_free_list_;
355 58 waiter_free_list_ = w;
356 58 }
357
358 // Heap insertion deferred to wait() — avoids lock when timer is idle
359 8787 std::size_t update_timer(timer_impl& impl, time_point new_time)
360 {
361 bool in_heap =
362 8787 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
363
5/6
✓ Branch 0 taken 8781 times.
✓ Branch 1 taken 6 times.
✓ Branch 3 taken 8781 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 8781 times.
✓ Branch 6 taken 6 times.
8787 if (!in_heap && impl.waiters_.empty())
364 8781 return 0;
365
366 6 bool notify = false;
367 6 intrusive_list<waiter_node> canceled;
368
369 {
370
1/1
✓ Branch 1 taken 6 times.
6 std::lock_guard lock(mutex_);
371
372
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = impl.waiters_.pop_front())
373 {
374 10 w->impl_ = nullptr;
375 10 canceled.push_back(w);
376 10 }
377
378
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 if (impl.heap_index_ < heap_.size())
379 {
380 6 time_point old_time = heap_[impl.heap_index_].time_;
381 6 heap_[impl.heap_index_].time_ = new_time;
382
383
2/3
✓ Branch 1 taken 6 times.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
6 if (new_time < old_time)
384
1/1
✓ Branch 1 taken 6 times.
6 up_heap(impl.heap_index_);
385 else
386 down_heap(impl.heap_index_);
387
388 6 notify = (impl.heap_index_ == 0);
389 }
390
391 6 refresh_cached_nearest();
392 6 }
393
394 6 std::size_t count = 0;
395
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = canceled.pop_front())
396 {
397 10 w->ec_value_ = make_error_code(capy::error::canceled);
398
1/1
✓ Branch 1 taken 10 times.
10 sched_->post(&w->op_);
399 10 ++count;
400 10 }
401
402
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (notify)
403
1/1
✓ Branch 1 taken 6 times.
6 on_earliest_changed_();
404
405 6 return count;
406 }
407
408 // Inserts timer into heap if needed and pushes waiter, all under
409 // one lock to prevent races with cancel_waiter/process_expired
410 8500 void insert_waiter(timer_impl& impl, waiter_node* w)
411 {
412 8500 bool notify = false;
413 {
414
1/1
✓ Branch 1 taken 8500 times.
8500 std::lock_guard lock(mutex_);
415
2/2
✓ Branch 1 taken 8478 times.
✓ Branch 2 taken 22 times.
8500 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
416 {
417 8478 impl.heap_index_ = heap_.size();
418
1/1
✓ Branch 1 taken 8478 times.
8478 heap_.push_back({impl.expiry_, &impl});
419
1/1
✓ Branch 2 taken 8478 times.
8478 up_heap(heap_.size() - 1);
420 8478 notify = (impl.heap_index_ == 0);
421 8478 refresh_cached_nearest();
422 }
423 8500 impl.waiters_.push_back(w);
424 8500 }
425
2/2
✓ Branch 0 taken 8464 times.
✓ Branch 1 taken 36 times.
8500 if (notify)
426 8464 on_earliest_changed_();
427 8500 }
428
429 8800 std::size_t cancel_timer(timer_impl& impl)
430 {
431
2/2
✓ Branch 0 taken 8784 times.
✓ Branch 1 taken 16 times.
8800 if (!impl.might_have_pending_waits_)
432 8784 return 0;
433
434 // Not in heap and no waiters — just clear the flag
435 16 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()
436
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 16 times.
16 && impl.waiters_.empty())
437 {
438 impl.might_have_pending_waits_ = false;
439 return 0;
440 }
441
442 16 intrusive_list<waiter_node> canceled;
443
444 {
445
1/1
✓ Branch 1 taken 16 times.
16 std::lock_guard lock(mutex_);
446
1/1
✓ Branch 1 taken 16 times.
16 remove_timer_impl(impl);
447
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = impl.waiters_.pop_front())
448 {
449 20 w->impl_ = nullptr;
450 20 canceled.push_back(w);
451 20 }
452 16 refresh_cached_nearest();
453 16 }
454
455 16 impl.might_have_pending_waits_ = false;
456
457 16 std::size_t count = 0;
458
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = canceled.pop_front())
459 {
460 20 w->ec_value_ = make_error_code(capy::error::canceled);
461
1/1
✓ Branch 1 taken 20 times.
20 sched_->post(&w->op_);
462 20 ++count;
463 20 }
464
465 16 return count;
466 }
467
468 // Cancel a single waiter (called from stop_token callback, any thread)
469 4 void cancel_waiter(waiter_node* w)
470 {
471 {
472
1/1
✓ Branch 1 taken 4 times.
4 std::lock_guard lock(mutex_);
473 // Already removed by cancel_timer or process_expired
474
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (!w->impl_)
475 return;
476 4 auto* impl = w->impl_;
477 4 w->impl_ = nullptr;
478 4 impl->waiters_.remove(w);
479
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if (impl->waiters_.empty())
480 {
481
1/1
✓ Branch 1 taken 2 times.
2 remove_timer_impl(*impl);
482 2 impl->might_have_pending_waits_ = false;
483 }
484 4 refresh_cached_nearest();
485 4 }
486
487 4 w->ec_value_ = make_error_code(capy::error::canceled);
488 4 sched_->post(&w->op_);
489 }
490
491 // Cancel front waiter only (FIFO), return 0 or 1
492 4 std::size_t cancel_one_waiter(timer_impl& impl)
493 {
494
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (!impl.might_have_pending_waits_)
495 2 return 0;
496
497 2 waiter_node* w = nullptr;
498
499 {
500
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(mutex_);
501 2 w = impl.waiters_.pop_front();
502
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!w)
503 return 0;
504 2 w->impl_ = nullptr;
505
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (impl.waiters_.empty())
506 {
507 remove_timer_impl(impl);
508 impl.might_have_pending_waits_ = false;
509 }
510 2 refresh_cached_nearest();
511 2 }
512
513 2 w->ec_value_ = make_error_code(capy::error::canceled);
514 2 sched_->post(&w->op_);
515 2 return 1;
516 }
517
518 bool empty() const noexcept override
519 {
520 return cached_nearest_ns_.load(std::memory_order_acquire)
521 == (std::numeric_limits<std::int64_t>::max)();
522 }
523
524 20192 time_point nearest_expiry() const noexcept override
525 {
526 20192 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
527 20192 return time_point(time_point::duration(ns));
528 }
529
530 100907 std::size_t process_expired() override
531 {
532 100907 intrusive_list<waiter_node> expired;
533
534 {
535
1/1
✓ Branch 1 taken 100907 times.
100907 std::lock_guard lock(mutex_);
536 100907 auto now = clock_type::now();
537
538
7/7
✓ Branch 1 taken 109001 times.
✓ Branch 2 taken 366 times.
✓ Branch 5 taken 109001 times.
✓ Branch 8 taken 8460 times.
✓ Branch 9 taken 100541 times.
✓ Branch 10 taken 8460 times.
✓ Branch 11 taken 100907 times.
109367 while (!heap_.empty() && heap_[0].time_ <= now)
539 {
540 8460 timer_impl* t = heap_[0].timer_;
541
1/1
✓ Branch 1 taken 8460 times.
8460 remove_timer_impl(*t);
542
2/2
✓ Branch 1 taken 8464 times.
✓ Branch 2 taken 8460 times.
16924 while (auto* w = t->waiters_.pop_front())
543 {
544 8464 w->impl_ = nullptr;
545 8464 w->ec_value_ = {};
546 8464 expired.push_back(w);
547 8464 }
548 8460 t->might_have_pending_waits_ = false;
549 }
550
551 100907 refresh_cached_nearest();
552 100907 }
553
554 100907 std::size_t count = 0;
555
2/2
✓ Branch 1 taken 8464 times.
✓ Branch 2 taken 100907 times.
109371 while (auto* w = expired.pop_front())
556 {
557
1/1
✓ Branch 1 taken 8464 times.
8464 sched_->post(&w->op_);
558 8464 ++count;
559 8464 }
560
561 100907 return count;
562 }
563
564 private:
565 109413 void refresh_cached_nearest() noexcept
566 {
567 109413 auto ns = heap_.empty()
568
2/2
✓ Branch 0 taken 382 times.
✓ Branch 1 taken 109031 times.
109413 ? (std::numeric_limits<std::int64_t>::max)()
569 109031 : heap_[0].time_.time_since_epoch().count();
570 109413 cached_nearest_ns_.store(ns, std::memory_order_release);
571 109413 }
572
573 8478 void remove_timer_impl(timer_impl& impl)
574 {
575 8478 std::size_t index = impl.heap_index_;
576
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8478 times.
8478 if (index >= heap_.size())
577 return; // Not in heap
578
579
2/2
✓ Branch 1 taken 102 times.
✓ Branch 2 taken 8376 times.
8478 if (index == heap_.size() - 1)
580 {
581 // Last element, just pop
582 102 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
583 102 heap_.pop_back();
584 }
585 else
586 {
587 // Swap with last and reheapify
588 8376 swap_heap(index, heap_.size() - 1);
589 8376 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
590 8376 heap_.pop_back();
591
592
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 8376 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 8376 times.
8376 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
593 up_heap(index);
594 else
595 8376 down_heap(index);
596 }
597 }
598
599 8484 void up_heap(std::size_t index)
600 {
601
2/2
✓ Branch 0 taken 8378 times.
✓ Branch 1 taken 8470 times.
16848 while (index > 0)
602 {
603 8378 std::size_t parent = (index - 1) / 2;
604
2/2
✓ Branch 4 taken 14 times.
✓ Branch 5 taken 8364 times.
8378 if (!(heap_[index].time_ < heap_[parent].time_))
605 14 break;
606 8364 swap_heap(index, parent);
607 8364 index = parent;
608 }
609 8484 }
610
611 8376 void down_heap(std::size_t index)
612 {
613 8376 std::size_t child = index * 2 + 1;
614
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8372 times.
8376 while (child < heap_.size())
615 {
616 4 std::size_t min_child = (child + 1 == heap_.size() ||
617 heap_[child].time_ < heap_[child + 1].time_)
618
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 ? child : child + 1;
619
620
1/2
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 if (heap_[index].time_ < heap_[min_child].time_)
621 4 break;
622
623 swap_heap(index, min_child);
624 index = min_child;
625 child = index * 2 + 1;
626 }
627 8376 }
628
629 16740 void swap_heap(std::size_t i1, std::size_t i2)
630 {
631 16740 heap_entry tmp = heap_[i1];
632 16740 heap_[i1] = heap_[i2];
633 16740 heap_[i2] = tmp;
634 16740 heap_[i1].timer_->heap_index_ = i1;
635 16740 heap_[i2].timer_->heap_index_ = i2;
636 16740 }
637 };
638
639 173 timer_impl::
640 173 timer_impl(timer_service_impl& svc) noexcept
641 173 : svc_(&svc)
642 {
643 173 }
644
645 void
646 4 waiter_node::canceller::
647 operator()() const
648 {
649 4 waiter_->svc_->cancel_waiter(waiter_);
650 4 }
651
652 void
653 waiter_node::completion_op::
654 do_complete(
655 void* owner,
656 scheduler_op* base,
657 std::uint32_t,
658 std::uint32_t)
659 {
660 if (!owner)
661 return;
662 static_cast<completion_op*>(base)->operator()();
663 }
664
665 void
666 8500 waiter_node::completion_op::
667 operator()()
668 {
669 8500 auto* w = waiter_;
670 8500 w->stop_cb_.reset();
671
1/2
✓ Branch 0 taken 8500 times.
✗ Branch 1 not taken.
8500 if (w->ec_out_)
672 8500 *w->ec_out_ = w->ec_value_;
673
674 8500 auto h = w->h_;
675 8500 auto d = w->d_;
676 8500 auto* svc = w->svc_;
677 8500 auto& sched = svc->get_scheduler();
678
679
1/1
✓ Branch 1 taken 8500 times.
8500 svc->destroy_waiter(w);
680
681
1/1
✓ Branch 1 taken 8500 times.
8500 d.post(h);
682 8500 sched.on_work_finished();
683 8500 }
684
685 void
686 8780 timer_impl::
687 release()
688 {
689 8780 svc_->destroy_impl(*this);
690 8780 }
691
692 std::coroutine_handle<>
693 8769 timer_impl::
694 wait(
695 std::coroutine_handle<> h,
696 capy::executor_ref d,
697 std::stop_token token,
698 std::error_code* ec)
699 {
700 // Already-expired fast path — no waiter_node, no mutex.
701 // Post instead of dispatch so the coroutine yields to the
702 // scheduler, allowing other queued work to run.
703
2/2
✓ Branch 1 taken 8747 times.
✓ Branch 2 taken 22 times.
8769 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
704 {
705
3/3
✓ Branch 2 taken 8747 times.
✓ Branch 5 taken 269 times.
✓ Branch 6 taken 8478 times.
8747 if (expiry_ <= clock_type::now())
706 {
707
1/2
✓ Branch 0 taken 269 times.
✗ Branch 1 not taken.
269 if (ec)
708 269 *ec = {};
709 269 d.post(h);
710 269 return std::noop_coroutine();
711 }
712 }
713
714 8500 auto* w = svc_->create_waiter();
715 8500 w->impl_ = this;
716 8500 w->svc_ = svc_;
717 8500 w->h_ = h;
718 8500 w->d_ = std::move(d);
719 8500 w->token_ = std::move(token);
720 8500 w->ec_out_ = ec;
721
722 8500 svc_->insert_waiter(*this, w);
723 8500 might_have_pending_waits_ = true;
724 8500 svc_->get_scheduler().on_work_started();
725
726
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8496 times.
8500 if (w->token_.stop_possible())
727 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
728
729 8500 return std::noop_coroutine();
730 }
731
732 // Extern free functions called from timer.cpp
733 //
734 // Two thread-local caches avoid hot-path mutex acquisitions:
735 //
736 // 1. Impl cache — single-slot, validated by comparing svc_ on the
737 // impl against the current service pointer.
738 //
739 // 2. Waiter cache — single-slot, no service affinity.
740 //
741 // The service pointer is obtained from the scheduler_impl's
742 // timer_svc_ member, avoiding find_service() on the hot path.
743 // All caches are cleared by timer_service_invalidate_cache()
744 // during shutdown.
745
746 thread_local_ptr<timer_impl> tl_cached_impl;
747 thread_local_ptr<waiter_node> tl_cached_waiter;
748
749 timer_impl*
750 8780 try_pop_tl_cache(timer_service_impl* svc) noexcept
751 {
752 8780 auto* impl = tl_cached_impl.get();
753
2/2
✓ Branch 0 taken 8607 times.
✓ Branch 1 taken 173 times.
8780 if (impl)
754 {
755 8607 tl_cached_impl.set(nullptr);
756
1/2
✓ Branch 0 taken 8607 times.
✗ Branch 1 not taken.
8607 if (impl->svc_ == svc)
757 8607 return impl;
758 // Stale impl from a destroyed service
759 delete impl;
760 }
761 173 return nullptr;
762 }
763
764 bool
765 8780 try_push_tl_cache(timer_impl* impl) noexcept
766 {
767
2/2
✓ Branch 1 taken 8732 times.
✓ Branch 2 taken 48 times.
8780 if (!tl_cached_impl.get())
768 {
769 8732 tl_cached_impl.set(impl);
770 8732 return true;
771 }
772 48 return false;
773 }
774
775 waiter_node*
776 8500 try_pop_waiter_tl_cache() noexcept
777 {
778 8500 auto* w = tl_cached_waiter.get();
779
2/2
✓ Branch 0 taken 8358 times.
✓ Branch 1 taken 142 times.
8500 if (w)
780 {
781 8358 tl_cached_waiter.set(nullptr);
782 8358 return w;
783 }
784 142 return nullptr;
785 }
786
787 bool
788 8500 try_push_waiter_tl_cache(waiter_node* w) noexcept
789 {
790
2/2
✓ Branch 1 taken 8442 times.
✓ Branch 2 taken 58 times.
8500 if (!tl_cached_waiter.get())
791 {
792 8442 tl_cached_waiter.set(w);
793 8442 return true;
794 }
795 58 return false;
796 }
797
798 void
799 336 timer_service_invalidate_cache() noexcept
800 {
801
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 211 times.
336 delete tl_cached_impl.get();
802 336 tl_cached_impl.set(nullptr);
803
804
2/2
✓ Branch 1 taken 84 times.
✓ Branch 2 taken 252 times.
336 delete tl_cached_waiter.get();
805 336 tl_cached_waiter.set(nullptr);
806 336 }
807
808 struct timer_service_access
809 {
810 8780 static scheduler_impl& get_scheduler(basic_io_context& ctx) noexcept
811 {
812 8780 return static_cast<scheduler_impl&>(*ctx.sched_);
813 }
814 };
815
816 timer::timer_impl*
817 8780 timer_service_create(capy::execution_context& ctx)
818 {
819
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8780 times.
8780 if (!ctx.target<basic_io_context>())
820 detail::throw_logic_error();
821 8780 auto& ioctx = static_cast<basic_io_context&>(ctx);
822 auto* svc = static_cast<timer_service_impl*>(
823 8780 timer_service_access::get_scheduler(ioctx).timer_svc_);
824
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8780 times.
8780 if (!svc)
825 detail::throw_logic_error();
826 8780 return svc->create_impl();
827 }
828
829 void
830 8780 timer_service_destroy(timer::timer_impl& base) noexcept
831 {
832 8780 static_cast<timer_impl&>(base).release();
833 8780 }
834
835 timer::time_point
836 34 timer_service_expiry(timer::timer_impl& base) noexcept
837 {
838 34 return static_cast<timer_impl&>(base).expiry_;
839 }
840
841 std::size_t
842 18 timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
843 {
844 18 auto& impl = static_cast<timer_impl&>(base);
845 18 impl.expiry_ = t;
846 18 return impl.svc_->update_timer(impl, t);
847 }
848
849 std::size_t
850 8769 timer_service_expires_after(timer::timer_impl& base, timer::duration d)
851 {
852 8769 auto& impl = static_cast<timer_impl&>(base);
853
1/1
✓ Branch 2 taken 8769 times.
8769 impl.expiry_ = timer::clock_type::now() + d;
854 8769 return impl.svc_->update_timer(impl, impl.expiry_);
855 }
856
857 std::size_t
858 20 timer_service_cancel(timer::timer_impl& base) noexcept
859 {
860 20 auto& impl = static_cast<timer_impl&>(base);
861 20 return impl.svc_->cancel_timer(impl);
862 }
863
864 std::size_t
865 4 timer_service_cancel_one(timer::timer_impl& base) noexcept
866 {
867 4 auto& impl = static_cast<timer_impl&>(base);
868 4 return impl.svc_->cancel_one_waiter(impl);
869 }
870
871 timer_service&
872 336 get_timer_service(capy::execution_context& ctx, scheduler& sched)
873 {
874 336 return ctx.make_service<timer_service_impl>(sched);
875 }
876
877 } // namespace boost::corosio::detail
878