src/corosio/src/detail/timer_service.cpp

86.5% Lines (313/362) 93.0% Functions (40/43) 71.0% Branches (125/176)
src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11 #include "src/detail/scheduler_impl.hpp"
12
13 #include <boost/corosio/basic_io_context.hpp>
14 #include <boost/corosio/detail/thread_local_ptr.hpp>
15 #include "src/detail/scheduler_op.hpp"
16 #include "src/detail/intrusive.hpp"
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/executor_ref.hpp>
19 #include <system_error>
20
21 #include <atomic>
22 #include <coroutine>
23 #include <limits>
24 #include <mutex>
25 #include <optional>
26 #include <stop_token>
27 #include <vector>
28
29 /*
30 Timer Service
31 =============
32
33 The public timer class holds an opaque timer_impl* and forwards
34 all operations through extern free functions defined at the bottom
35 of this file.
36
37 Data Structures
38 ---------------
39 waiter_node holds per-waiter state: coroutine handle, executor,
40 error output, stop_token, embedded completion_op. Each concurrent
41 co_await t.wait() allocates one waiter_node.
42
43 timer_impl holds per-timer state: expiry, heap index, and an
44 intrusive_list of waiter_nodes. Multiple coroutines can wait on
45 the same timer simultaneously.
46
47 timer_service_impl owns a min-heap of active timers, a free list
48 of recycled impls, and a free list of recycled waiter_nodes. The
49 heap is ordered by expiry time; the scheduler queries
50 nearest_expiry() to set the epoll/timerfd timeout.
51
52 Optimization Strategy
53 ---------------------
54 The common timer lifecycle is: construct, set expiry, cancel or
55 wait, destroy. Several optimizations target this path:
56
57 1. Deferred heap insertion — expires_after() stores the expiry
58 but does not insert into the heap. Insertion happens in
59 wait(). If the timer is cancelled or destroyed before wait(),
60 the heap is never touched and no mutex is taken. This also
61 enables the already-expired fast path: when wait() sees
62 expiry <= now before inserting, it posts the coroutine
63 handle to the executor and returns noop_coroutine — no
64 heap, no mutex, no epoll. This is only possible because
65 the coroutine API guarantees wait() always follows
66 expires_after(); callback APIs cannot assume this call
67 order.
68
69 2. Thread-local impl cache — A single-slot per-thread cache of
70 timer_impl avoids the mutex on create/destroy for the common
71 create-then-destroy-on-same-thread pattern. On pop, if the
72 cached impl's svc_ doesn't match the current service, the
73 stale impl is deleted eagerly rather than reused.
74
75 3. Embedded completion_op — Each waiter_node embeds a
76 scheduler_op subclass, eliminating heap allocation per
77 fire/cancel. Its destroy() is a no-op since the waiter_node
78 owns the lifetime.
79
80 4. Cached nearest expiry — An atomic<int64_t> mirrors the heap
81 root's time, updated under the lock. nearest_expiry() and
82 empty() read the atomic without locking.
83
84 5. might_have_pending_waits_ flag — Set on wait(), cleared on
85 cancel. Lets cancel_timer() return without locking when no
86 wait was ever issued.
87
88 6. Thread-local waiter cache — Single-slot per-thread cache of
89 waiter_node avoids the free-list mutex for the common
90 wait-then-complete-on-same-thread pattern.
91
92 With all fast paths hit (idle timer, same thread), the
93 schedule/cancel cycle takes zero mutex locks.
94
95 Concurrency
96 -----------
97 stop_token callbacks can fire from any thread. The impl_
98 pointer on waiter_node is used as a "still in list" marker:
99 set to nullptr under the mutex when a waiter is removed by
100 cancel_timer() or process_expired(). cancel_waiter() checks
101 this under the mutex to avoid double-removal races.
102
103 Multiple io_contexts in the same program are safe. The
104 service pointer is obtained directly from the scheduler,
105 and TL-cached impls are validated by comparing svc_ against
106 the current service pointer. Waiter nodes have no service
107 affinity and can safely migrate between contexts.
108 */
109
110 namespace boost::corosio::detail {
111
112 class timer_service_impl;
113 struct timer_impl;
114 struct waiter_node;
115
116 void timer_service_invalidate_cache() noexcept;
117
118 struct waiter_node
119 : intrusive_list<waiter_node>::node
120 {
121 // Embedded completion op — avoids heap allocation per fire/cancel
122 struct completion_op final : scheduler_op
123 {
124 waiter_node* waiter_ = nullptr;
125
126 static void do_complete(
127 void* owner,
128 scheduler_op* base,
129 std::uint32_t,
130 std::uint32_t);
131
132 142 completion_op() noexcept
133 142 : scheduler_op(&do_complete)
134 {
135 142 }
136
137 void operator()() override;
138 // No-op — lifetime owned by waiter_node, not the scheduler queue
139 void destroy() override {}
140 };
141
142 // Per-waiter stop_token cancellation
143 struct canceller
144 {
145 waiter_node* waiter_;
146 void operator()() const;
147 };
148
149 // nullptr once removed from timer's waiter list (concurrency marker)
150 timer_impl* impl_ = nullptr;
151 timer_service_impl* svc_ = nullptr;
152 std::coroutine_handle<> h_;
153 capy::executor_ref d_;
154 std::error_code* ec_out_ = nullptr;
155 std::stop_token token_;
156 std::optional<std::stop_callback<canceller>> stop_cb_;
157 completion_op op_;
158 std::error_code ec_value_;
159 waiter_node* next_free_ = nullptr;
160
161 142 waiter_node() noexcept
162 142 {
163 142 op_.waiter_ = this;
164 142 }
165 };
166
167 struct timer_impl
168 : timer::timer_impl
169 {
170 using clock_type = std::chrono::steady_clock;
171 using time_point = clock_type::time_point;
172 using duration = clock_type::duration;
173
174 timer_service_impl* svc_ = nullptr;
175 intrusive_list<waiter_node> waiters_;
176
177 // Free list linkage (reused when impl is on free_list)
178 timer_impl* next_free_ = nullptr;
179
180 explicit timer_impl(timer_service_impl& svc) noexcept;
181
182
183 void release() override;
184
185 std::coroutine_handle<> wait(
186 std::coroutine_handle<>,
187 capy::executor_ref,
188 std::stop_token,
189 std::error_code*) override;
190 };
191
192 timer_impl* try_pop_tl_cache(timer_service_impl*) noexcept;
193 bool try_push_tl_cache(timer_impl*) noexcept;
194 waiter_node* try_pop_waiter_tl_cache() noexcept;
195 bool try_push_waiter_tl_cache(waiter_node*) noexcept;
196
197 class timer_service_impl : public timer_service
198 {
199 public:
200 using clock_type = std::chrono::steady_clock;
201 using time_point = clock_type::time_point;
202 using key_type = timer_service;
203
204 private:
205 struct heap_entry
206 {
207 time_point time_;
208 timer_impl* timer_;
209 };
210
211 scheduler* sched_ = nullptr;
212 mutable std::mutex mutex_;
213 std::vector<heap_entry> heap_;
214 timer_impl* free_list_ = nullptr;
215 waiter_node* waiter_free_list_ = nullptr;
216 callback on_earliest_changed_;
217 // Avoids mutex in nearest_expiry() and empty()
218 mutable std::atomic<std::int64_t> cached_nearest_ns_{
219 (std::numeric_limits<std::int64_t>::max)()};
220
221 public:
222 336 timer_service_impl(capy::execution_context&, scheduler& sched)
223 336 : timer_service()
224 336 , sched_(&sched)
225 {
226 336 }
227
228 17694 scheduler& get_scheduler() noexcept { return *sched_; }
229
230 672 ~timer_service_impl() = default;
231
232 timer_service_impl(timer_service_impl const&) = delete;
233 timer_service_impl& operator=(timer_service_impl const&) = delete;
234
235 336 void set_on_earliest_changed(callback cb) override
236 {
237 336 on_earliest_changed_ = cb;
238 336 }
239
240 336 void shutdown() override
241 {
242 336 timer_service_invalidate_cache();
243
244 // Cancel waiting timers still in the heap
245
1/2
✗ Branch 5 not taken.
✓ Branch 6 taken 336 times.
336 for (auto& entry : heap_)
246 {
247 auto* impl = entry.timer_;
248 while (auto* w = impl->waiters_.pop_front())
249 {
250 w->stop_cb_.reset();
251 w->h_.destroy();
252 sched_->on_work_finished();
253 delete w;
254 }
255 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
256 delete impl;
257 }
258 336 heap_.clear();
259 336 cached_nearest_ns_.store(
260 (std::numeric_limits<std::int64_t>::max)(),
261 std::memory_order_release);
262
263 // Delete free-listed impls
264
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 336 times.
384 while (free_list_)
265 {
266 48 auto* next = free_list_->next_free_;
267
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 delete free_list_;
268 48 free_list_ = next;
269 }
270
271 // Delete free-listed waiters
272
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 336 times.
394 while (waiter_free_list_)
273 {
274 58 auto* next = waiter_free_list_->next_free_;
275
1/2
✓ Branch 0 taken 58 times.
✗ Branch 1 not taken.
58 delete waiter_free_list_;
276 58 waiter_free_list_ = next;
277 }
278 336 }
279
280 9136 timer::timer_impl* create_impl() override
281 {
282 9136 timer_impl* impl = try_pop_tl_cache(this);
283
2/2
✓ Branch 0 taken 8963 times.
✓ Branch 1 taken 173 times.
9136 if (impl)
284 {
285 8963 impl->svc_ = this;
286 8963 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
287 8963 impl->might_have_pending_waits_ = false;
288 8963 return impl;
289 }
290
291
1/1
✓ Branch 1 taken 173 times.
173 std::lock_guard lock(mutex_);
292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 173 times.
173 if (free_list_)
293 {
294 impl = free_list_;
295 free_list_ = impl->next_free_;
296 impl->next_free_ = nullptr;
297 impl->svc_ = this;
298 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
299 impl->might_have_pending_waits_ = false;
300 }
301 else
302 {
303
1/1
✓ Branch 1 taken 173 times.
173 impl = new timer_impl(*this);
304 }
305 173 return impl;
306 173 }
307
308 9136 void destroy_impl(timer_impl& impl)
309 {
310
1/1
✓ Branch 1 taken 9136 times.
9136 cancel_timer(impl);
311
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9136 times.
9136 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
313 {
314 std::lock_guard lock(mutex_);
315 remove_timer_impl(impl);
316 refresh_cached_nearest();
317 }
318
319
2/2
✓ Branch 1 taken 9088 times.
✓ Branch 2 taken 48 times.
9136 if (try_push_tl_cache(&impl))
320 9088 return;
321
322
1/1
✓ Branch 1 taken 48 times.
48 std::lock_guard lock(mutex_);
323 48 impl.next_free_ = free_list_;
324 48 free_list_ = &impl;
325 48 }
326
327 8847 waiter_node* create_waiter()
328 {
329
2/2
✓ Branch 1 taken 8705 times.
✓ Branch 2 taken 142 times.
8847 if (auto* w = try_pop_waiter_tl_cache())
330 8705 return w;
331
332
1/1
✓ Branch 1 taken 142 times.
142 std::lock_guard lock(mutex_);
333
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 142 times.
142 if (waiter_free_list_)
334 {
335 auto* w = waiter_free_list_;
336 waiter_free_list_ = w->next_free_;
337 w->next_free_ = nullptr;
338 return w;
339 }
340
341
1/1
✓ Branch 1 taken 142 times.
142 return new waiter_node();
342 142 }
343
344 8847 void destroy_waiter(waiter_node* w)
345 {
346
2/2
✓ Branch 1 taken 8789 times.
✓ Branch 2 taken 58 times.
8847 if (try_push_waiter_tl_cache(w))
347 8789 return;
348
349
1/1
✓ Branch 1 taken 58 times.
58 std::lock_guard lock(mutex_);
350 58 w->next_free_ = waiter_free_list_;
351 58 waiter_free_list_ = w;
352 58 }
353
354 // Heap insertion deferred to wait() — avoids lock when timer is idle
355 6 std::size_t update_timer(timer_impl& impl, time_point new_time)
356 {
357 bool in_heap =
358 6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
359
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 6 times.
6 if (!in_heap && impl.waiters_.empty())
360 return 0;
361
362 6 bool notify = false;
363 6 intrusive_list<waiter_node> canceled;
364
365 {
366
1/1
✓ Branch 1 taken 6 times.
6 std::lock_guard lock(mutex_);
367
368
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = impl.waiters_.pop_front())
369 {
370 10 w->impl_ = nullptr;
371 10 canceled.push_back(w);
372 10 }
373
374
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 if (impl.heap_index_ < heap_.size())
375 {
376 6 time_point old_time = heap_[impl.heap_index_].time_;
377 6 heap_[impl.heap_index_].time_ = new_time;
378
379
2/3
✓ Branch 1 taken 6 times.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
6 if (new_time < old_time)
380
1/1
✓ Branch 1 taken 6 times.
6 up_heap(impl.heap_index_);
381 else
382 down_heap(impl.heap_index_);
383
384 6 notify = (impl.heap_index_ == 0);
385 }
386
387 6 refresh_cached_nearest();
388 6 }
389
390 6 std::size_t count = 0;
391
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 6 times.
16 while (auto* w = canceled.pop_front())
392 {
393 10 w->ec_value_ = make_error_code(capy::error::canceled);
394
1/1
✓ Branch 1 taken 10 times.
10 sched_->post(&w->op_);
395 10 ++count;
396 10 }
397
398
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (notify)
399
1/1
✓ Branch 1 taken 6 times.
6 on_earliest_changed_();
400
401 6 return count;
402 }
403
404 // Inserts timer into heap if needed and pushes waiter, all under
405 // one lock to prevent races with cancel_waiter/process_expired
406 8847 void insert_waiter(timer_impl& impl, waiter_node* w)
407 {
408 8847 bool notify = false;
409 {
410
1/1
✓ Branch 1 taken 8847 times.
8847 std::lock_guard lock(mutex_);
411
2/2
✓ Branch 1 taken 8825 times.
✓ Branch 2 taken 22 times.
8847 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
412 {
413 8825 impl.heap_index_ = heap_.size();
414
1/1
✓ Branch 1 taken 8825 times.
8825 heap_.push_back({impl.expiry_, &impl});
415
1/1
✓ Branch 2 taken 8825 times.
8825 up_heap(heap_.size() - 1);
416 8825 notify = (impl.heap_index_ == 0);
417 8825 refresh_cached_nearest();
418 }
419 8847 impl.waiters_.push_back(w);
420 8847 }
421
2/2
✓ Branch 0 taken 8812 times.
✓ Branch 1 taken 35 times.
8847 if (notify)
422 8812 on_earliest_changed_();
423 8847 }
424
425 9144 std::size_t cancel_timer(timer_impl& impl)
426 {
427
2/2
✓ Branch 0 taken 9128 times.
✓ Branch 1 taken 16 times.
9144 if (!impl.might_have_pending_waits_)
428 9128 return 0;
429
430 // Not in heap and no waiters — just clear the flag
431 16 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()
432
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 16 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 16 times.
16 && impl.waiters_.empty())
433 {
434 impl.might_have_pending_waits_ = false;
435 return 0;
436 }
437
438 16 intrusive_list<waiter_node> canceled;
439
440 {
441
1/1
✓ Branch 1 taken 16 times.
16 std::lock_guard lock(mutex_);
442
1/1
✓ Branch 1 taken 16 times.
16 remove_timer_impl(impl);
443
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = impl.waiters_.pop_front())
444 {
445 20 w->impl_ = nullptr;
446 20 canceled.push_back(w);
447 20 }
448 16 refresh_cached_nearest();
449 16 }
450
451 16 impl.might_have_pending_waits_ = false;
452
453 16 std::size_t count = 0;
454
2/2
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 16 times.
36 while (auto* w = canceled.pop_front())
455 {
456 20 w->ec_value_ = make_error_code(capy::error::canceled);
457
1/1
✓ Branch 1 taken 20 times.
20 sched_->post(&w->op_);
458 20 ++count;
459 20 }
460
461 16 return count;
462 }
463
464 // Cancel a single waiter (called from stop_token callback, any thread)
465 4 void cancel_waiter(waiter_node* w)
466 {
467 {
468
1/1
✓ Branch 1 taken 4 times.
4 std::lock_guard lock(mutex_);
469 // Already removed by cancel_timer or process_expired
470
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 if (!w->impl_)
471 return;
472 4 auto* impl = w->impl_;
473 4 w->impl_ = nullptr;
474 4 impl->waiters_.remove(w);
475
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if (impl->waiters_.empty())
476 {
477
1/1
✓ Branch 1 taken 2 times.
2 remove_timer_impl(*impl);
478 2 impl->might_have_pending_waits_ = false;
479 }
480 4 refresh_cached_nearest();
481 4 }
482
483 4 w->ec_value_ = make_error_code(capy::error::canceled);
484 4 sched_->post(&w->op_);
485 }
486
487 // Cancel front waiter only (FIFO), return 0 or 1
488 2 std::size_t cancel_one_waiter(timer_impl& impl)
489 {
490
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!impl.might_have_pending_waits_)
491 return 0;
492
493 2 waiter_node* w = nullptr;
494
495 {
496
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(mutex_);
497 2 w = impl.waiters_.pop_front();
498
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (!w)
499 return 0;
500 2 w->impl_ = nullptr;
501
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (impl.waiters_.empty())
502 {
503 remove_timer_impl(impl);
504 impl.might_have_pending_waits_ = false;
505 }
506 2 refresh_cached_nearest();
507 2 }
508
509 2 w->ec_value_ = make_error_code(capy::error::canceled);
510 2 sched_->post(&w->op_);
511 2 return 1;
512 }
513
514 bool empty() const noexcept override
515 {
516 return cached_nearest_ns_.load(std::memory_order_acquire)
517 == (std::numeric_limits<std::int64_t>::max)();
518 }
519
520 21018 time_point nearest_expiry() const noexcept override
521 {
522 21018 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
523 21018 return time_point(time_point::duration(ns));
524 }
525
526 142751 std::size_t process_expired() override
527 {
528 142751 intrusive_list<waiter_node> expired;
529
530 {
531
1/1
✓ Branch 1 taken 142751 times.
142751 std::lock_guard lock(mutex_);
532 142751 auto now = clock_type::now();
533
534
7/7
✓ Branch 1 taken 151191 times.
✓ Branch 2 taken 367 times.
✓ Branch 5 taken 151191 times.
✓ Branch 8 taken 8807 times.
✓ Branch 9 taken 142384 times.
✓ Branch 10 taken 8807 times.
✓ Branch 11 taken 142751 times.
151558 while (!heap_.empty() && heap_[0].time_ <= now)
535 {
536 8807 timer_impl* t = heap_[0].timer_;
537
1/1
✓ Branch 1 taken 8807 times.
8807 remove_timer_impl(*t);
538
2/2
✓ Branch 1 taken 8811 times.
✓ Branch 2 taken 8807 times.
17618 while (auto* w = t->waiters_.pop_front())
539 {
540 8811 w->impl_ = nullptr;
541 8811 w->ec_value_ = {};
542 8811 expired.push_back(w);
543 8811 }
544 8807 t->might_have_pending_waits_ = false;
545 }
546
547 142751 refresh_cached_nearest();
548 142751 }
549
550 142751 std::size_t count = 0;
551
2/2
✓ Branch 1 taken 8811 times.
✓ Branch 2 taken 142751 times.
151562 while (auto* w = expired.pop_front())
552 {
553
1/1
✓ Branch 1 taken 8811 times.
8811 sched_->post(&w->op_);
554 8811 ++count;
555 8811 }
556
557 142751 return count;
558 }
559
560 private:
561 151604 void refresh_cached_nearest() noexcept
562 {
563 151604 auto ns = heap_.empty()
564
2/2
✓ Branch 0 taken 383 times.
✓ Branch 1 taken 151221 times.
151604 ? (std::numeric_limits<std::int64_t>::max)()
565 151221 : heap_[0].time_.time_since_epoch().count();
566 151604 cached_nearest_ns_.store(ns, std::memory_order_release);
567 151604 }
568
569 8825 void remove_timer_impl(timer_impl& impl)
570 {
571 8825 std::size_t index = impl.heap_index_;
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 8825 times.
8825 if (index >= heap_.size())
573 return; // Not in heap
574
575
2/2
✓ Branch 1 taken 102 times.
✓ Branch 2 taken 8723 times.
8825 if (index == heap_.size() - 1)
576 {
577 // Last element, just pop
578 102 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
579 102 heap_.pop_back();
580 }
581 else
582 {
583 // Swap with last and reheapify
584 8723 swap_heap(index, heap_.size() - 1);
585 8723 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
586 8723 heap_.pop_back();
587
588
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 8723 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 8723 times.
8723 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
589 up_heap(index);
590 else
591 8723 down_heap(index);
592 }
593 }
594
595 8831 void up_heap(std::size_t index)
596 {
597
2/2
✓ Branch 0 taken 8725 times.
✓ Branch 1 taken 8818 times.
17543 while (index > 0)
598 {
599 8725 std::size_t parent = (index - 1) / 2;
600
2/2
✓ Branch 4 taken 13 times.
✓ Branch 5 taken 8712 times.
8725 if (!(heap_[index].time_ < heap_[parent].time_))
601 13 break;
602 8712 swap_heap(index, parent);
603 8712 index = parent;
604 }
605 8831 }
606
607 8723 void down_heap(std::size_t index)
608 {
609 8723 std::size_t child = index * 2 + 1;
610
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8719 times.
8723 while (child < heap_.size())
611 {
612 4 std::size_t min_child = (child + 1 == heap_.size() ||
613 heap_[child].time_ < heap_[child + 1].time_)
614
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4 ? child : child + 1;
615
616
1/2
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
4 if (heap_[index].time_ < heap_[min_child].time_)
617 4 break;
618
619 swap_heap(index, min_child);
620 index = min_child;
621 child = index * 2 + 1;
622 }
623 8723 }
624
625 17435 void swap_heap(std::size_t i1, std::size_t i2)
626 {
627 17435 heap_entry tmp = heap_[i1];
628 17435 heap_[i1] = heap_[i2];
629 17435 heap_[i2] = tmp;
630 17435 heap_[i1].timer_->heap_index_ = i1;
631 17435 heap_[i2].timer_->heap_index_ = i2;
632 17435 }
633 };
634
635 173 timer_impl::
636 173 timer_impl(timer_service_impl& svc) noexcept
637 173 : svc_(&svc)
638 {
639 173 }
640
641 void
642 4 waiter_node::canceller::
643 operator()() const
644 {
645 4 waiter_->svc_->cancel_waiter(waiter_);
646 4 }
647
648 void
649 waiter_node::completion_op::
650 do_complete(
651 void* owner,
652 scheduler_op* base,
653 std::uint32_t,
654 std::uint32_t)
655 {
656 if (!owner)
657 return;
658 static_cast<completion_op*>(base)->operator()();
659 }
660
661 void
662 8847 waiter_node::completion_op::
663 operator()()
664 {
665 8847 auto* w = waiter_;
666 8847 w->stop_cb_.reset();
667
1/2
✓ Branch 0 taken 8847 times.
✗ Branch 1 not taken.
8847 if (w->ec_out_)
668 8847 *w->ec_out_ = w->ec_value_;
669
670 8847 auto h = w->h_;
671 8847 auto d = w->d_;
672 8847 auto* svc = w->svc_;
673 8847 auto& sched = svc->get_scheduler();
674
675
1/1
✓ Branch 1 taken 8847 times.
8847 svc->destroy_waiter(w);
676
677
1/1
✓ Branch 1 taken 8847 times.
8847 d.post(h);
678 8847 sched.on_work_finished();
679 8847 }
680
681 void
682 9136 timer_impl::
683 release()
684 {
685 9136 svc_->destroy_impl(*this);
686 9136 }
687
688 std::coroutine_handle<>
689 8869 timer_impl::
690 wait(
691 std::coroutine_handle<> h,
692 capy::executor_ref d,
693 std::stop_token token,
694 std::error_code* ec)
695 {
696 // Already-expired fast path — no waiter_node, no mutex.
697 // Post instead of dispatch so the coroutine yields to the
698 // scheduler, allowing other queued work to run.
699
2/2
✓ Branch 1 taken 8847 times.
✓ Branch 2 taken 22 times.
8869 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
700 {
701
4/5
✓ Branch 2 taken 8847 times.
✓ Branch 4 taken 8847 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 22 times.
✓ Branch 7 taken 8825 times.
17694 if (expiry_ == (time_point::min)() ||
702
3/3
✓ Branch 2 taken 8847 times.
✓ Branch 5 taken 22 times.
✓ Branch 6 taken 8825 times.
17694 expiry_ <= clock_type::now())
703 {
704
1/2
✓ Branch 0 taken 22 times.
✗ Branch 1 not taken.
22 if (ec)
705 22 *ec = {};
706 22 d.post(h);
707 22 return std::noop_coroutine();
708 }
709 }
710
711 8847 auto* w = svc_->create_waiter();
712 8847 w->impl_ = this;
713 8847 w->svc_ = svc_;
714 8847 w->h_ = h;
715 8847 w->d_ = std::move(d);
716 8847 w->token_ = std::move(token);
717 8847 w->ec_out_ = ec;
718
719 8847 svc_->insert_waiter(*this, w);
720 8847 might_have_pending_waits_ = true;
721 8847 svc_->get_scheduler().on_work_started();
722
723
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 8843 times.
8847 if (w->token_.stop_possible())
724 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
725
726 8847 return std::noop_coroutine();
727 }
728
729 // Extern free functions called from timer.cpp
730 //
731 // Two thread-local caches avoid hot-path mutex acquisitions:
732 //
733 // 1. Impl cache — single-slot, validated by comparing svc_ on the
734 // impl against the current service pointer.
735 //
736 // 2. Waiter cache — single-slot, no service affinity.
737 //
738 // The service pointer is obtained from the scheduler_impl's
739 // timer_svc_ member, avoiding find_service() on the hot path.
740 // All caches are cleared by timer_service_invalidate_cache()
741 // during shutdown.
742
743 thread_local_ptr<timer_impl> tl_cached_impl;
744 thread_local_ptr<waiter_node> tl_cached_waiter;
745
746 timer_impl*
747 9136 try_pop_tl_cache(timer_service_impl* svc) noexcept
748 {
749 9136 auto* impl = tl_cached_impl.get();
750
2/2
✓ Branch 0 taken 8963 times.
✓ Branch 1 taken 173 times.
9136 if (impl)
751 {
752 8963 tl_cached_impl.set(nullptr);
753
1/2
✓ Branch 0 taken 8963 times.
✗ Branch 1 not taken.
8963 if (impl->svc_ == svc)
754 8963 return impl;
755 // Stale impl from a destroyed service
756 delete impl;
757 }
758 173 return nullptr;
759 }
760
761 bool
762 9136 try_push_tl_cache(timer_impl* impl) noexcept
763 {
764
2/2
✓ Branch 1 taken 9088 times.
✓ Branch 2 taken 48 times.
9136 if (!tl_cached_impl.get())
765 {
766 9088 tl_cached_impl.set(impl);
767 9088 return true;
768 }
769 48 return false;
770 }
771
772 waiter_node*
773 8847 try_pop_waiter_tl_cache() noexcept
774 {
775 8847 auto* w = tl_cached_waiter.get();
776
2/2
✓ Branch 0 taken 8705 times.
✓ Branch 1 taken 142 times.
8847 if (w)
777 {
778 8705 tl_cached_waiter.set(nullptr);
779 8705 return w;
780 }
781 142 return nullptr;
782 }
783
784 bool
785 8847 try_push_waiter_tl_cache(waiter_node* w) noexcept
786 {
787
2/2
✓ Branch 1 taken 8789 times.
✓ Branch 2 taken 58 times.
8847 if (!tl_cached_waiter.get())
788 {
789 8789 tl_cached_waiter.set(w);
790 8789 return true;
791 }
792 58 return false;
793 }
794
795 void
796 336 timer_service_invalidate_cache() noexcept
797 {
798
2/2
✓ Branch 1 taken 125 times.
✓ Branch 2 taken 211 times.
336 delete tl_cached_impl.get();
799 336 tl_cached_impl.set(nullptr);
800
801
2/2
✓ Branch 1 taken 84 times.
✓ Branch 2 taken 252 times.
336 delete tl_cached_waiter.get();
802 336 tl_cached_waiter.set(nullptr);
803 336 }
804
805 struct timer_service_access
806 {
807 9136 static scheduler_impl& get_scheduler(basic_io_context& ctx) noexcept
808 {
809 9136 return static_cast<scheduler_impl&>(*ctx.sched_);
810 }
811 };
812
813 timer::timer_impl*
814 9136 timer_service_create(capy::execution_context& ctx)
815 {
816
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9136 times.
9136 if (!ctx.target<basic_io_context>())
817 detail::throw_logic_error();
818 9136 auto& ioctx = static_cast<basic_io_context&>(ctx);
819 auto* svc = static_cast<timer_service_impl*>(
820 9136 timer_service_access::get_scheduler(ioctx).timer_svc_);
821
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9136 times.
9136 if (!svc)
822 detail::throw_logic_error();
823 9136 return svc->create_impl();
824 }
825
826 void
827 9136 timer_service_destroy(timer::timer_impl& base) noexcept
828 {
829 9136 static_cast<timer_impl&>(base).release();
830 9136 }
831
832 std::size_t
833 6 timer_service_update_expiry(timer::timer_impl& base)
834 {
835 6 auto& impl = static_cast<timer_impl&>(base);
836 6 return impl.svc_->update_timer(impl, impl.expiry_);
837 }
838
839 std::size_t
840 8 timer_service_cancel(timer::timer_impl& base) noexcept
841 {
842 8 auto& impl = static_cast<timer_impl&>(base);
843 8 return impl.svc_->cancel_timer(impl);
844 }
845
846 std::size_t
847 2 timer_service_cancel_one(timer::timer_impl& base) noexcept
848 {
849 2 auto& impl = static_cast<timer_impl&>(base);
850 2 return impl.svc_->cancel_one_waiter(impl);
851 }
852
853 timer_service&
854 336 get_timer_service(capy::execution_context& ctx, scheduler& sched)
855 {
856 336 return ctx.make_service<timer_service_impl>(sched);
857 }
858
859 } // namespace boost::corosio::detail
860