1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <condition_variable>
34  
#include <condition_variable>
35  
#include <cstddef>
35  
#include <cstddef>
36  
#include <cstdint>
36  
#include <cstdint>
37  
#include <limits>
37  
#include <limits>
38  
#include <mutex>
38  
#include <mutex>
39  
#include <utility>
39  
#include <utility>
40  

40  

41  
#include <errno.h>
41  
#include <errno.h>
42  
#include <fcntl.h>
42  
#include <fcntl.h>
43  
#include <sys/epoll.h>
43  
#include <sys/epoll.h>
44  
#include <sys/eventfd.h>
44  
#include <sys/eventfd.h>
45  
#include <sys/socket.h>
45  
#include <sys/socket.h>
46  
#include <sys/timerfd.h>
46  
#include <sys/timerfd.h>
47  
#include <unistd.h>
47  
#include <unistd.h>
48  

48  

49  
namespace boost::corosio::detail {
49  
namespace boost::corosio::detail {
50  

50  

51  
struct epoll_op;
51  
struct epoll_op;
52  
struct descriptor_state;
52  
struct descriptor_state;
53  
namespace epoll {
53  
namespace epoll {
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
54  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55  
} // namespace epoll
55  
} // namespace epoll
56  

56  

57  
/** Linux scheduler using epoll for I/O multiplexing.
57  
/** Linux scheduler using epoll for I/O multiplexing.
58  

58  

59  
    This scheduler implements the scheduler interface using Linux epoll
59  
    This scheduler implements the scheduler interface using Linux epoll
60  
    for efficient I/O event notification. It uses a single reactor model
60  
    for efficient I/O event notification. It uses a single reactor model
61  
    where one thread runs epoll_wait while other threads
61  
    where one thread runs epoll_wait while other threads
62  
    wait on a condition variable for handler work. This design provides:
62  
    wait on a condition variable for handler work. This design provides:
63  

63  

64  
    - Handler parallelism: N posted handlers can execute on N threads
64  
    - Handler parallelism: N posted handlers can execute on N threads
65  
    - No thundering herd: condition_variable wakes exactly one thread
65  
    - No thundering herd: condition_variable wakes exactly one thread
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
67  

67  

68  
    When threads call run(), they first try to execute queued handlers.
68  
    When threads call run(), they first try to execute queued handlers.
69  
    If the queue is empty and no reactor is running, one thread becomes
69  
    If the queue is empty and no reactor is running, one thread becomes
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
71  
    variable until handlers are available.
71  
    variable until handlers are available.
72  

72  

73  
    @par Thread Safety
73  
    @par Thread Safety
74  
    All public member functions are thread-safe.
74  
    All public member functions are thread-safe.
75  
*/
75  
*/
76  
class BOOST_COROSIO_DECL epoll_scheduler final
76  
class BOOST_COROSIO_DECL epoll_scheduler final
77  
    : public native_scheduler
77  
    : public native_scheduler
78  
    , public capy::execution_context::service
78  
    , public capy::execution_context::service
79  
{
79  
{
80  
public:
80  
public:
81  
    using key_type = scheduler;
81  
    using key_type = scheduler;
82  

82  

83  
    /** Construct the scheduler.
83  
    /** Construct the scheduler.
84  

84  

85  
        Creates an epoll instance, eventfd for reactor interruption,
85  
        Creates an epoll instance, eventfd for reactor interruption,
86  
        and timerfd for kernel-managed timer expiry.
86  
        and timerfd for kernel-managed timer expiry.
87  

87  

88  
        @param ctx Reference to the owning execution_context.
88  
        @param ctx Reference to the owning execution_context.
89  
        @param concurrency_hint Hint for expected thread count (unused).
89  
        @param concurrency_hint Hint for expected thread count (unused).
90  
    */
90  
    */
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92  

92  

93  
    /// Destroy the scheduler.
93  
    /// Destroy the scheduler.
94  
    ~epoll_scheduler() override;
94  
    ~epoll_scheduler() override;
95  

95  

96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98  

98  

99  
    void shutdown() override;
99  
    void shutdown() override;
100  
    void post(std::coroutine_handle<> h) const override;
100  
    void post(std::coroutine_handle<> h) const override;
101  
    void post(scheduler_op* h) const override;
101  
    void post(scheduler_op* h) const override;
102  
    bool running_in_this_thread() const noexcept override;
102  
    bool running_in_this_thread() const noexcept override;
103  
    void stop() override;
103  
    void stop() override;
104  
    bool stopped() const noexcept override;
104  
    bool stopped() const noexcept override;
105  
    void restart() override;
105  
    void restart() override;
106  
    std::size_t run() override;
106  
    std::size_t run() override;
107  
    std::size_t run_one() override;
107  
    std::size_t run_one() override;
108  
    std::size_t wait_one(long usec) override;
108  
    std::size_t wait_one(long usec) override;
109  
    std::size_t poll() override;
109  
    std::size_t poll() override;
110  
    std::size_t poll_one() override;
110  
    std::size_t poll_one() override;
111  

111  

112  
    /** Return the epoll file descriptor.
112  
    /** Return the epoll file descriptor.
113  

113  

114  
        Used by socket services to register file descriptors
114  
        Used by socket services to register file descriptors
115  
        for I/O event notification.
115  
        for I/O event notification.
116  

116  

117  
        @return The epoll file descriptor.
117  
        @return The epoll file descriptor.
118  
    */
118  
    */
119  
    int epoll_fd() const noexcept
119  
    int epoll_fd() const noexcept
120  
    {
120  
    {
121  
        return epoll_fd_;
121  
        return epoll_fd_;
122  
    }
122  
    }
123  

123  

124  
    /** Reset the thread's inline completion budget.
124  
    /** Reset the thread's inline completion budget.
125  

125  

126  
        Called at the start of each posted completion handler to
126  
        Called at the start of each posted completion handler to
127  
        grant a fresh budget for speculative inline completions.
127  
        grant a fresh budget for speculative inline completions.
128  
    */
128  
    */
129  
    void reset_inline_budget() const noexcept;
129  
    void reset_inline_budget() const noexcept;
130  

130  

131  
    /** Consume one unit of inline budget if available.
131  
    /** Consume one unit of inline budget if available.
132  

132  

133  
        @return True if budget was available and consumed.
133  
        @return True if budget was available and consumed.
134  
    */
134  
    */
135  
    bool try_consume_inline_budget() const noexcept;
135  
    bool try_consume_inline_budget() const noexcept;
136  

136  

137  
    /** Register a descriptor for persistent monitoring.
137  
    /** Register a descriptor for persistent monitoring.
138  

138  

139  
        The fd is registered once and stays registered until explicitly
139  
        The fd is registered once and stays registered until explicitly
140  
        deregistered. Events are dispatched via descriptor_state which
140  
        deregistered. Events are dispatched via descriptor_state which
141  
        tracks pending read/write/connect operations.
141  
        tracks pending read/write/connect operations.
142  

142  

143  
        @param fd The file descriptor to register.
143  
        @param fd The file descriptor to register.
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145  
    */
145  
    */
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
147  

147  

148  
    /** Deregister a persistently registered descriptor.
148  
    /** Deregister a persistently registered descriptor.
149  

149  

150  
        @param fd The file descriptor to deregister.
150  
        @param fd The file descriptor to deregister.
151  
    */
151  
    */
152  
    void deregister_descriptor(int fd) const;
152  
    void deregister_descriptor(int fd) const;
153  

153  

154  
    void work_started() noexcept override;
154  
    void work_started() noexcept override;
155  
    void work_finished() noexcept override;
155  
    void work_finished() noexcept override;
156  

156  

157  
    /** Offset a forthcoming work_finished from work_cleanup.
157  
    /** Offset a forthcoming work_finished from work_cleanup.
158  

158  

159  
        Called by descriptor_state when all I/O returned EAGAIN and no
159  
        Called by descriptor_state when all I/O returned EAGAIN and no
160  
        handler will be executed. Must be called from a scheduler thread.
160  
        handler will be executed. Must be called from a scheduler thread.
161  
    */
161  
    */
162  
    void compensating_work_started() const noexcept;
162  
    void compensating_work_started() const noexcept;
163  

163  

164  
    /** Drain work from thread context's private queue to global queue.
164  
    /** Drain work from thread context's private queue to global queue.
165  

165  

166  
        Called by thread_context_guard destructor when a thread exits run().
166  
        Called by thread_context_guard destructor when a thread exits run().
167  
        Transfers pending work to the global queue under mutex protection.
167  
        Transfers pending work to the global queue under mutex protection.
168  

168  

169  
        @param queue The private queue to drain.
169  
        @param queue The private queue to drain.
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
170  
        @param count Item count for wakeup decisions (wakes other threads if positive).
171  
    */
171  
    */
172  
    void drain_thread_queue(op_queue& queue, long count) const;
172  
    void drain_thread_queue(op_queue& queue, long count) const;
173  

173  

174  
    /** Post completed operations for deferred invocation.
174  
    /** Post completed operations for deferred invocation.
175  

175  

176  
        If called from a thread running this scheduler, operations go to
176  
        If called from a thread running this scheduler, operations go to
177  
        the thread's private queue (fast path). Otherwise, operations are
177  
        the thread's private queue (fast path). Otherwise, operations are
178  
        added to the global queue under mutex and a waiter is signaled.
178  
        added to the global queue under mutex and a waiter is signaled.
179  

179  

180  
        @par Preconditions
180  
        @par Preconditions
181  
        work_started() must have been called for each operation.
181  
        work_started() must have been called for each operation.
182  

182  

183  
        @param ops Queue of operations to post.
183  
        @param ops Queue of operations to post.
184  
    */
184  
    */
185  
    void post_deferred_completions(op_queue& ops) const;
185  
    void post_deferred_completions(op_queue& ops) const;
186  

186  

187  
private:
187  
private:
188  
    struct work_cleanup
188  
    struct work_cleanup
189  
    {
189  
    {
190  
        epoll_scheduler* scheduler;
190  
        epoll_scheduler* scheduler;
191  
        std::unique_lock<std::mutex>* lock;
191  
        std::unique_lock<std::mutex>* lock;
192  
        epoll::scheduler_context* ctx;
192  
        epoll::scheduler_context* ctx;
193  
        ~work_cleanup();
193  
        ~work_cleanup();
194  
    };
194  
    };
195  

195  

196  
    struct task_cleanup
196  
    struct task_cleanup
197  
    {
197  
    {
198  
        epoll_scheduler const* scheduler;
198  
        epoll_scheduler const* scheduler;
199  
        std::unique_lock<std::mutex>* lock;
199  
        std::unique_lock<std::mutex>* lock;
200  
        epoll::scheduler_context* ctx;
200  
        epoll::scheduler_context* ctx;
201  
        ~task_cleanup();
201  
        ~task_cleanup();
202  
    };
202  
    };
203  

203  

204  
    std::size_t do_one(
204  
    std::size_t do_one(
205  
        std::unique_lock<std::mutex>& lock,
205  
        std::unique_lock<std::mutex>& lock,
206  
        long timeout_us,
206  
        long timeout_us,
207  
        epoll::scheduler_context* ctx);
207  
        epoll::scheduler_context* ctx);
208  
    void
208  
    void
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
209  
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
210  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211  
    void interrupt_reactor() const;
211  
    void interrupt_reactor() const;
212  
    void update_timerfd() const;
212  
    void update_timerfd() const;
213  

213  

214  
    /** Set the signaled state and wake all waiting threads.
214  
    /** Set the signaled state and wake all waiting threads.
215  

215  

216  
        @par Preconditions
216  
        @par Preconditions
217  
        Mutex must be held.
217  
        Mutex must be held.
218  

218  

219  
        @param lock The held mutex lock.
219  
        @param lock The held mutex lock.
220  
    */
220  
    */
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
221  
    void signal_all(std::unique_lock<std::mutex>& lock) const;
222  

222  

223  
    /** Set the signaled state and wake one waiter if any exist.
223  
    /** Set the signaled state and wake one waiter if any exist.
224  

224  

225  
        Only unlocks and signals if at least one thread is waiting.
225  
        Only unlocks and signals if at least one thread is waiting.
226  
        Use this when the caller needs to perform a fallback action
226  
        Use this when the caller needs to perform a fallback action
227  
        (such as interrupting the reactor) when no waiters exist.
227  
        (such as interrupting the reactor) when no waiters exist.
228  

228  

229  
        @par Preconditions
229  
        @par Preconditions
230  
        Mutex must be held.
230  
        Mutex must be held.
231  

231  

232  
        @param lock The held mutex lock.
232  
        @param lock The held mutex lock.
233  

233  

234  
        @return `true` if unlocked and signaled, `false` if lock still held.
234  
        @return `true` if unlocked and signaled, `false` if lock still held.
235  
    */
235  
    */
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
236  
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237  

237  

238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
238  
    /** Set the signaled state, unlock, and wake one waiter if any exist.
239  

239  

240  
        Always unlocks the mutex. Use this when the caller will release
240  
        Always unlocks the mutex. Use this when the caller will release
241  
        the lock regardless of whether a waiter exists.
241  
        the lock regardless of whether a waiter exists.
242  

242  

243  
        @par Preconditions
243  
        @par Preconditions
244  
        Mutex must be held.
244  
        Mutex must be held.
245  

245  

246  
        @param lock The held mutex lock.
246  
        @param lock The held mutex lock.
247  

247  

248  
        @return `true` if a waiter was signaled, `false` otherwise.
248  
        @return `true` if a waiter was signaled, `false` otherwise.
249  
    */
249  
    */
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
250  
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251  

251  

252  
    /** Clear the signaled state before waiting.
252  
    /** Clear the signaled state before waiting.
253  

253  

254  
        @par Preconditions
254  
        @par Preconditions
255  
        Mutex must be held.
255  
        Mutex must be held.
256  
    */
256  
    */
257  
    void clear_signal() const;
257  
    void clear_signal() const;
258  

258  

259  
    /** Block until the signaled state is set.
259  
    /** Block until the signaled state is set.
260  

260  

261  
        Returns immediately if already signaled (fast-path). Otherwise
261  
        Returns immediately if already signaled (fast-path). Otherwise
262  
        increments the waiter count, waits on the condition variable,
262  
        increments the waiter count, waits on the condition variable,
263  
        and decrements the waiter count upon waking.
263  
        and decrements the waiter count upon waking.
264  

264  

265  
        @par Preconditions
265  
        @par Preconditions
266  
        Mutex must be held.
266  
        Mutex must be held.
267  

267  

268  
        @param lock The held mutex lock.
268  
        @param lock The held mutex lock.
269  
    */
269  
    */
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
270  
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271  

271  

272  
    /** Block until signaled or timeout expires.
272  
    /** Block until signaled or timeout expires.
273  

273  

274  
        @par Preconditions
274  
        @par Preconditions
275  
        Mutex must be held.
275  
        Mutex must be held.
276  

276  

277  
        @param lock The held mutex lock.
277  
        @param lock The held mutex lock.
278  
        @param timeout_us Maximum time to wait in microseconds.
278  
        @param timeout_us Maximum time to wait in microseconds.
279  
    */
279  
    */
280  
    void wait_for_signal_for(
280  
    void wait_for_signal_for(
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
281  
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
282  

282  

283  
    int epoll_fd_;
283  
    int epoll_fd_;
284  
    int event_fd_; // for interrupting reactor
284  
    int event_fd_; // for interrupting reactor
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
285  
    int timer_fd_; // timerfd for kernel-managed timer expiry
286  
    mutable std::mutex mutex_;
286  
    mutable std::mutex mutex_;
287  
    mutable std::condition_variable cond_;
287  
    mutable std::condition_variable cond_;
288  
    mutable op_queue completed_ops_;
288  
    mutable op_queue completed_ops_;
289  
    mutable std::atomic<long> outstanding_work_;
289  
    mutable std::atomic<long> outstanding_work_;
290  
    bool stopped_;
290  
    bool stopped_;
 
291 +
    bool shutdown_;
291  

292  

292  
    // True while a thread is blocked in epoll_wait. Used by
293  
    // True while a thread is blocked in epoll_wait. Used by
293  
    // wake_one_thread_and_unlock and work_finished to know when
294  
    // wake_one_thread_and_unlock and work_finished to know when
294  
    // an eventfd interrupt is needed instead of a condvar signal.
295  
    // an eventfd interrupt is needed instead of a condvar signal.
295  
    mutable std::atomic<bool> task_running_{false};
296  
    mutable std::atomic<bool> task_running_{false};
296  

297  

297  
    // True when the reactor has been told to do a non-blocking poll
298  
    // True when the reactor has been told to do a non-blocking poll
298  
    // (more handlers queued or poll mode). Prevents redundant eventfd
299  
    // (more handlers queued or poll mode). Prevents redundant eventfd
299  
    // writes and controls the epoll_wait timeout.
300  
    // writes and controls the epoll_wait timeout.
300  
    mutable bool task_interrupted_ = false;
301  
    mutable bool task_interrupted_ = false;
301  

302  

302  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303  
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
303  
    mutable std::size_t state_ = 0;
304  
    mutable std::size_t state_ = 0;
304  

305  

305  
    // Edge-triggered eventfd state
306  
    // Edge-triggered eventfd state
306  
    mutable std::atomic<bool> eventfd_armed_{false};
307  
    mutable std::atomic<bool> eventfd_armed_{false};
307  

308  

308  
    // Set when the earliest timer changes; flushed before epoll_wait
309  
    // Set when the earliest timer changes; flushed before epoll_wait
309  
    // blocks. Avoids timerfd_settime syscalls for timers that are
310  
    // blocks. Avoids timerfd_settime syscalls for timers that are
310  
    // scheduled then cancelled without being waited on.
311  
    // scheduled then cancelled without being waited on.
311  
    mutable std::atomic<bool> timerfd_stale_{false};
312  
    mutable std::atomic<bool> timerfd_stale_{false};
312  

313  

313  
    // Sentinel operation for interleaving reactor runs with handler execution.
314  
    // Sentinel operation for interleaving reactor runs with handler execution.
314  
    // Ensures the reactor runs periodically even when handlers are continuously
315  
    // Ensures the reactor runs periodically even when handlers are continuously
315  
    // posted, preventing starvation of I/O events, timers, and signals.
316  
    // posted, preventing starvation of I/O events, timers, and signals.
316  
    struct task_op final : scheduler_op
317  
    struct task_op final : scheduler_op
317  
    {
318  
    {
318  
        void operator()() override {}
319  
        void operator()() override {}
319  
        void destroy() override {}
320  
        void destroy() override {}
320  
    };
321  
    };
321  
    task_op task_op_;
322  
    task_op task_op_;
322  
};
323  
};
323  

324  

324  
//--------------------------------------------------------------------------
325  
//--------------------------------------------------------------------------
325  
//
326  
//
326  
// Implementation
327  
// Implementation
327  
//
328  
//
328  
//--------------------------------------------------------------------------
329  
//--------------------------------------------------------------------------
329  

330  

330  
/*
331  
/*
331  
    epoll Scheduler - Single Reactor Model
332  
    epoll Scheduler - Single Reactor Model
332  
    ======================================
333  
    ======================================
333  

334  

334  
    This scheduler uses a thread coordination strategy to provide handler
335  
    This scheduler uses a thread coordination strategy to provide handler
335  
    parallelism and avoid the thundering herd problem.
336  
    parallelism and avoid the thundering herd problem.
336  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
337  
    Instead of all threads blocking on epoll_wait(), one thread becomes the
337  
    "reactor" while others wait on a condition variable for handler work.
338  
    "reactor" while others wait on a condition variable for handler work.
338  

339  

339  
    Thread Model
340  
    Thread Model
340  
    ------------
341  
    ------------
341  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
342  
    - ONE thread runs epoll_wait() at a time (the reactor thread)
342  
    - OTHER threads wait on cond_ (condition variable) for handlers
343  
    - OTHER threads wait on cond_ (condition variable) for handlers
343  
    - When work is posted, exactly one waiting thread wakes via notify_one()
344  
    - When work is posted, exactly one waiting thread wakes via notify_one()
344  
    - This matches Windows IOCP semantics where N posted items wake N threads
345  
    - This matches Windows IOCP semantics where N posted items wake N threads
345  

346  

346  
    Event Loop Structure (do_one)
347  
    Event Loop Structure (do_one)
347  
    -----------------------------
348  
    -----------------------------
348  
    1. Lock mutex, try to pop handler from queue
349  
    1. Lock mutex, try to pop handler from queue
349  
    2. If got handler: execute it (unlocked), return
350  
    2. If got handler: execute it (unlocked), return
350  
    3. If queue empty and no reactor running: become reactor
351  
    3. If queue empty and no reactor running: become reactor
351  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
352  
       - Run epoll_wait (unlocked), queue I/O completions, loop back
352  
    4. If queue empty and reactor running: wait on condvar for work
353  
    4. If queue empty and reactor running: wait on condvar for work
353  

354  

354  
    The task_running_ flag ensures only one thread owns epoll_wait().
355  
    The task_running_ flag ensures only one thread owns epoll_wait().
355  
    After the reactor queues I/O completions, it loops back to try getting
356  
    After the reactor queues I/O completions, it loops back to try getting
356  
    a handler, giving priority to handler execution over more I/O polling.
357  
    a handler, giving priority to handler execution over more I/O polling.
357  

358  

358  
    Signaling State (state_)
359  
    Signaling State (state_)
359  
    ------------------------
360  
    ------------------------
360  
    The state_ variable encodes two pieces of information:
361  
    The state_ variable encodes two pieces of information:
361  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
362  
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
362  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
363  
    - Upper bits: waiter count (each waiter adds 2 before blocking)
363  

364  

364  
    This allows efficient coordination:
365  
    This allows efficient coordination:
365  
    - Signalers only call notify when waiters exist (state_ > 1)
366  
    - Signalers only call notify when waiters exist (state_ > 1)
366  
    - Waiters check if already signaled before blocking (fast-path)
367  
    - Waiters check if already signaled before blocking (fast-path)
367  

368  

368  
    Wake Coordination (wake_one_thread_and_unlock)
369  
    Wake Coordination (wake_one_thread_and_unlock)
369  
    ----------------------------------------------
370  
    ----------------------------------------------
370  
    When posting work:
371  
    When posting work:
371  
    - If waiters exist (state_ > 1): signal and notify_one()
372  
    - If waiters exist (state_ > 1): signal and notify_one()
372  
    - Else if reactor running: interrupt via eventfd write
373  
    - Else if reactor running: interrupt via eventfd write
373  
    - Else: no-op (thread will find work when it checks queue)
374  
    - Else: no-op (thread will find work when it checks queue)
374  

375  

375  
    This avoids waking threads unnecessarily. With cascading wakes,
376  
    This avoids waking threads unnecessarily. With cascading wakes,
376  
    each handler execution wakes at most one additional thread if
377  
    each handler execution wakes at most one additional thread if
377  
    more work exists in the queue.
378  
    more work exists in the queue.
378  

379  

379  
    Work Counting
380  
    Work Counting
380  
    -------------
381  
    -------------
381  
    outstanding_work_ tracks pending operations. When it hits zero, run()
382  
    outstanding_work_ tracks pending operations. When it hits zero, run()
382  
    returns. Each operation increments on start, decrements on completion.
383  
    returns. Each operation increments on start, decrements on completion.
383  

384  

384  
    Timer Integration
385  
    Timer Integration
385  
    -----------------
386  
    -----------------
386  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
387  
    Timers are handled by timer_service. The reactor adjusts epoll_wait
387  
    timeout to wake for the nearest timer expiry. When a new timer is
388  
    timeout to wake for the nearest timer expiry. When a new timer is
388  
    scheduled earlier than current, timer_service calls interrupt_reactor()
389  
    scheduled earlier than current, timer_service calls interrupt_reactor()
389  
    to re-evaluate the timeout.
390  
    to re-evaluate the timeout.
390  
*/
391  
*/
391  

392  

392  
namespace epoll {
393  
namespace epoll {
393  

394  

394  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
395  
{
396  
{
396  
    epoll_scheduler const* key;
397  
    epoll_scheduler const* key;
397  
    scheduler_context* next;
398  
    scheduler_context* next;
398  
    op_queue private_queue;
399  
    op_queue private_queue;
399  
    long private_outstanding_work;
400  
    long private_outstanding_work;
400  
    int inline_budget;
401  
    int inline_budget;
401  
    int inline_budget_max;
402  
    int inline_budget_max;
402  
    bool unassisted;
403  
    bool unassisted;
403  

404  

404  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405  
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
405  
        : key(k)
406  
        : key(k)
406  
        , next(n)
407  
        , next(n)
407  
        , private_outstanding_work(0)
408  
        , private_outstanding_work(0)
408  
        , inline_budget(0)
409  
        , inline_budget(0)
409  
        , inline_budget_max(2)
410  
        , inline_budget_max(2)
410  
        , unassisted(false)
411  
        , unassisted(false)
411  
    {
412  
    {
412  
    }
413  
    }
413  
};
414  
};
414  

415  

415  
inline thread_local_ptr<scheduler_context> context_stack;
416  
inline thread_local_ptr<scheduler_context> context_stack;
416  

417  

417  
struct thread_context_guard
418  
struct thread_context_guard
418  
{
419  
{
419  
    scheduler_context frame_;
420  
    scheduler_context frame_;
420  

421  

421  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422  
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
422  
        : frame_(ctx, context_stack.get())
423  
        : frame_(ctx, context_stack.get())
423  
    {
424  
    {
424  
        context_stack.set(&frame_);
425  
        context_stack.set(&frame_);
425  
    }
426  
    }
426  

427  

427  
    ~thread_context_guard() noexcept
428  
    ~thread_context_guard() noexcept
428  
    {
429  
    {
429  
        if (!frame_.private_queue.empty())
430  
        if (!frame_.private_queue.empty())
430  
            frame_.key->drain_thread_queue(
431  
            frame_.key->drain_thread_queue(
431  
                frame_.private_queue, frame_.private_outstanding_work);
432  
                frame_.private_queue, frame_.private_outstanding_work);
432  
        context_stack.set(frame_.next);
433  
        context_stack.set(frame_.next);
433  
    }
434  
    }
434  
};
435  
};
435  

436  

436  
inline scheduler_context*
437  
inline scheduler_context*
437  
find_context(epoll_scheduler const* self) noexcept
438  
find_context(epoll_scheduler const* self) noexcept
438  
{
439  
{
439  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440  
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
440  
        if (c->key == self)
441  
        if (c->key == self)
441  
            return c;
442  
            return c;
442  
    return nullptr;
443  
    return nullptr;
443  
}
444  
}
444  

445  

445  
} // namespace epoll
446  
} // namespace epoll
446  

447  

447  
inline void
448  
inline void
448  
epoll_scheduler::reset_inline_budget() const noexcept
449  
epoll_scheduler::reset_inline_budget() const noexcept
449  
{
450  
{
450  
    if (auto* ctx = epoll::find_context(this))
451  
    if (auto* ctx = epoll::find_context(this))
451  
    {
452  
    {
452  
        // Cap when no other thread absorbed queued work. A moderate
453  
        // Cap when no other thread absorbed queued work. A moderate
453  
        // cap (4) amortizes scheduling for small buffers while avoiding
454  
        // cap (4) amortizes scheduling for small buffers while avoiding
454  
        // bursty I/O that fills socket buffers and stalls large transfers.
455  
        // bursty I/O that fills socket buffers and stalls large transfers.
455  
        if (ctx->unassisted)
456  
        if (ctx->unassisted)
456  
        {
457  
        {
457  
            ctx->inline_budget_max = 4;
458  
            ctx->inline_budget_max = 4;
458  
            ctx->inline_budget     = 4;
459  
            ctx->inline_budget     = 4;
459  
            return;
460  
            return;
460  
        }
461  
        }
461  
        // Ramp up when previous cycle fully consumed budget.
462  
        // Ramp up when previous cycle fully consumed budget.
462  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463  
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
463  
        if (ctx->inline_budget == 0)
464  
        if (ctx->inline_budget == 0)
464  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465  
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
465  
        else if (ctx->inline_budget < ctx->inline_budget_max)
466  
        else if (ctx->inline_budget < ctx->inline_budget_max)
466  
            ctx->inline_budget_max = 2;
467  
            ctx->inline_budget_max = 2;
467  
        ctx->inline_budget = ctx->inline_budget_max;
468  
        ctx->inline_budget = ctx->inline_budget_max;
468  
    }
469  
    }
469  
}
470  
}
470  

471  

471  
inline bool
472  
inline bool
472  
epoll_scheduler::try_consume_inline_budget() const noexcept
473  
epoll_scheduler::try_consume_inline_budget() const noexcept
473  
{
474  
{
474  
    if (auto* ctx = epoll::find_context(this))
475  
    if (auto* ctx = epoll::find_context(this))
475  
    {
476  
    {
476  
        if (ctx->inline_budget > 0)
477  
        if (ctx->inline_budget > 0)
477  
        {
478  
        {
478  
            --ctx->inline_budget;
479  
            --ctx->inline_budget;
479  
            return true;
480  
            return true;
480  
        }
481  
        }
481  
    }
482  
    }
482  
    return false;
483  
    return false;
483  
}
484  
}
484  

485  

485  
inline void
486  
inline void
486  
descriptor_state::operator()()
487  
descriptor_state::operator()()
487  
{
488  
{
488  
    is_enqueued_.store(false, std::memory_order_relaxed);
489  
    is_enqueued_.store(false, std::memory_order_relaxed);
489  

490  

490  
    // Take ownership of impl ref set by close_socket() to prevent
491  
    // Take ownership of impl ref set by close_socket() to prevent
491  
    // the owning impl from being freed while we're executing
492  
    // the owning impl from being freed while we're executing
492  
    auto prevent_impl_destruction = std::move(impl_ref_);
493  
    auto prevent_impl_destruction = std::move(impl_ref_);
493  

494  

494  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495  
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
495  
    if (ev == 0)
496  
    if (ev == 0)
496  
    {
497  
    {
497  
        scheduler_->compensating_work_started();
498  
        scheduler_->compensating_work_started();
498  
        return;
499  
        return;
499  
    }
500  
    }
500  

501  

501  
    op_queue local_ops;
502  
    op_queue local_ops;
502  

503  

503  
    int err = 0;
504  
    int err = 0;
504  
    if (ev & EPOLLERR)
505  
    if (ev & EPOLLERR)
505  
    {
506  
    {
506  
        socklen_t len = sizeof(err);
507  
        socklen_t len = sizeof(err);
507  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508  
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
508  
            err = errno;
509  
            err = errno;
509  
        if (err == 0)
510  
        if (err == 0)
510  
            err = EIO;
511  
            err = EIO;
511  
    }
512  
    }
512  

513  

513  
    {
514  
    {
514  
        std::lock_guard lock(mutex);
515  
        std::lock_guard lock(mutex);
515  
        if (ev & EPOLLIN)
516  
        if (ev & EPOLLIN)
516  
        {
517  
        {
517  
            if (read_op)
518  
            if (read_op)
518  
            {
519  
            {
519  
                auto* rd = read_op;
520  
                auto* rd = read_op;
520  
                if (err)
521  
                if (err)
521  
                    rd->complete(err, 0);
522  
                    rd->complete(err, 0);
522  
                else
523  
                else
523  
                    rd->perform_io();
524  
                    rd->perform_io();
524  

525  

525  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526  
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
526  
                {
527  
                {
527  
                    rd->errn = 0;
528  
                    rd->errn = 0;
528  
                }
529  
                }
529  
                else
530  
                else
530  
                {
531  
                {
531  
                    read_op = nullptr;
532  
                    read_op = nullptr;
532  
                    local_ops.push(rd);
533  
                    local_ops.push(rd);
533  
                }
534  
                }
534  
            }
535  
            }
535  
            else
536  
            else
536  
            {
537  
            {
537  
                read_ready = true;
538  
                read_ready = true;
538  
            }
539  
            }
539  
        }
540  
        }
540  
        if (ev & EPOLLOUT)
541  
        if (ev & EPOLLOUT)
541  
        {
542  
        {
542  
            bool had_write_op = (connect_op || write_op);
543  
            bool had_write_op = (connect_op || write_op);
543  
            if (connect_op)
544  
            if (connect_op)
544  
            {
545  
            {
545  
                auto* cn = connect_op;
546  
                auto* cn = connect_op;
546  
                if (err)
547  
                if (err)
547  
                    cn->complete(err, 0);
548  
                    cn->complete(err, 0);
548  
                else
549  
                else
549  
                    cn->perform_io();
550  
                    cn->perform_io();
550  
                connect_op = nullptr;
551  
                connect_op = nullptr;
551  
                local_ops.push(cn);
552  
                local_ops.push(cn);
552  
            }
553  
            }
553  
            if (write_op)
554  
            if (write_op)
554  
            {
555  
            {
555  
                auto* wr = write_op;
556  
                auto* wr = write_op;
556  
                if (err)
557  
                if (err)
557  
                    wr->complete(err, 0);
558  
                    wr->complete(err, 0);
558  
                else
559  
                else
559  
                    wr->perform_io();
560  
                    wr->perform_io();
560  

561  

561  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562  
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
562  
                {
563  
                {
563  
                    wr->errn = 0;
564  
                    wr->errn = 0;
564  
                }
565  
                }
565  
                else
566  
                else
566  
                {
567  
                {
567  
                    write_op = nullptr;
568  
                    write_op = nullptr;
568  
                    local_ops.push(wr);
569  
                    local_ops.push(wr);
569  
                }
570  
                }
570  
            }
571  
            }
571  
            if (!had_write_op)
572  
            if (!had_write_op)
572  
                write_ready = true;
573  
                write_ready = true;
573  
        }
574  
        }
574  
        if (err)
575  
        if (err)
575  
        {
576  
        {
576  
            if (read_op)
577  
            if (read_op)
577  
            {
578  
            {
578  
                read_op->complete(err, 0);
579  
                read_op->complete(err, 0);
579  
                local_ops.push(std::exchange(read_op, nullptr));
580  
                local_ops.push(std::exchange(read_op, nullptr));
580  
            }
581  
            }
581  
            if (write_op)
582  
            if (write_op)
582  
            {
583  
            {
583  
                write_op->complete(err, 0);
584  
                write_op->complete(err, 0);
584  
                local_ops.push(std::exchange(write_op, nullptr));
585  
                local_ops.push(std::exchange(write_op, nullptr));
585  
            }
586  
            }
586  
            if (connect_op)
587  
            if (connect_op)
587  
            {
588  
            {
588  
                connect_op->complete(err, 0);
589  
                connect_op->complete(err, 0);
589  
                local_ops.push(std::exchange(connect_op, nullptr));
590  
                local_ops.push(std::exchange(connect_op, nullptr));
590  
            }
591  
            }
591  
        }
592  
        }
592  
    }
593  
    }
593  

594  

594  
    // Execute first handler inline — the scheduler's work_cleanup
595  
    // Execute first handler inline — the scheduler's work_cleanup
595  
    // accounts for this as the "consumed" work item
596  
    // accounts for this as the "consumed" work item
596  
    scheduler_op* first = local_ops.pop();
597  
    scheduler_op* first = local_ops.pop();
597  
    if (first)
598  
    if (first)
598  
    {
599  
    {
599  
        scheduler_->post_deferred_completions(local_ops);
600  
        scheduler_->post_deferred_completions(local_ops);
600  
        (*first)();
601  
        (*first)();
601  
    }
602  
    }
602  
    else
603  
    else
603  
    {
604  
    {
604  
        scheduler_->compensating_work_started();
605  
        scheduler_->compensating_work_started();
605  
    }
606  
    }
606  
}
607  
}
607  

608  

608  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609  
    : epoll_fd_(-1)
610  
    : epoll_fd_(-1)
610  
    , event_fd_(-1)
611  
    , event_fd_(-1)
611  
    , timer_fd_(-1)
612  
    , timer_fd_(-1)
612  
    , outstanding_work_(0)
613  
    , outstanding_work_(0)
613  
    , stopped_(false)
614  
    , stopped_(false)
 
615 +
    , shutdown_(false)
614  
    , task_running_{false}
616  
    , task_running_{false}
615  
    , task_interrupted_(false)
617  
    , task_interrupted_(false)
616  
    , state_(0)
618  
    , state_(0)
617  
{
619  
{
618  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
620  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
619  
    if (epoll_fd_ < 0)
621  
    if (epoll_fd_ < 0)
620  
        detail::throw_system_error(make_err(errno), "epoll_create1");
622  
        detail::throw_system_error(make_err(errno), "epoll_create1");
621  

623  

622  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
624  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
623  
    if (event_fd_ < 0)
625  
    if (event_fd_ < 0)
624  
    {
626  
    {
625  
        int errn = errno;
627  
        int errn = errno;
626  
        ::close(epoll_fd_);
628  
        ::close(epoll_fd_);
627  
        detail::throw_system_error(make_err(errn), "eventfd");
629  
        detail::throw_system_error(make_err(errn), "eventfd");
628  
    }
630  
    }
629  

631  

630  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
632  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
631  
    if (timer_fd_ < 0)
633  
    if (timer_fd_ < 0)
632  
    {
634  
    {
633  
        int errn = errno;
635  
        int errn = errno;
634  
        ::close(event_fd_);
636  
        ::close(event_fd_);
635  
        ::close(epoll_fd_);
637  
        ::close(epoll_fd_);
636  
        detail::throw_system_error(make_err(errn), "timerfd_create");
638  
        detail::throw_system_error(make_err(errn), "timerfd_create");
637  
    }
639  
    }
638  

640  

639  
    epoll_event ev{};
641  
    epoll_event ev{};
640  
    ev.events   = EPOLLIN | EPOLLET;
642  
    ev.events   = EPOLLIN | EPOLLET;
641  
    ev.data.ptr = nullptr;
643  
    ev.data.ptr = nullptr;
642  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
644  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
643  
    {
645  
    {
644  
        int errn = errno;
646  
        int errn = errno;
645  
        ::close(timer_fd_);
647  
        ::close(timer_fd_);
646  
        ::close(event_fd_);
648  
        ::close(event_fd_);
647  
        ::close(epoll_fd_);
649  
        ::close(epoll_fd_);
648  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
650  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
649  
    }
651  
    }
650  

652  

651  
    epoll_event timer_ev{};
653  
    epoll_event timer_ev{};
652  
    timer_ev.events   = EPOLLIN | EPOLLERR;
654  
    timer_ev.events   = EPOLLIN | EPOLLERR;
653  
    timer_ev.data.ptr = &timer_fd_;
655  
    timer_ev.data.ptr = &timer_fd_;
654  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
656  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
655  
    {
657  
    {
656  
        int errn = errno;
658  
        int errn = errno;
657  
        ::close(timer_fd_);
659  
        ::close(timer_fd_);
658  
        ::close(event_fd_);
660  
        ::close(event_fd_);
659  
        ::close(epoll_fd_);
661  
        ::close(epoll_fd_);
660  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
662  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
661  
    }
663  
    }
662  

664  

663  
    timer_svc_ = &get_timer_service(ctx, *this);
665  
    timer_svc_ = &get_timer_service(ctx, *this);
664  
    timer_svc_->set_on_earliest_changed(
666  
    timer_svc_->set_on_earliest_changed(
665  
        timer_service::callback(this, [](void* p) {
667  
        timer_service::callback(this, [](void* p) {
666  
            auto* self = static_cast<epoll_scheduler*>(p);
668  
            auto* self = static_cast<epoll_scheduler*>(p);
667  
            self->timerfd_stale_.store(true, std::memory_order_release);
669  
            self->timerfd_stale_.store(true, std::memory_order_release);
668  
            if (self->task_running_.load(std::memory_order_acquire))
670  
            if (self->task_running_.load(std::memory_order_acquire))
669  
                self->interrupt_reactor();
671  
                self->interrupt_reactor();
670  
        }));
672  
        }));
671  

673  

672  
    // Initialize resolver service
674  
    // Initialize resolver service
673  
    get_resolver_service(ctx, *this);
675  
    get_resolver_service(ctx, *this);
674  

676  

675  
    // Initialize signal service
677  
    // Initialize signal service
676  
    get_signal_service(ctx, *this);
678  
    get_signal_service(ctx, *this);
677  

679  

678  
    // Push task sentinel to interleave reactor runs with handler execution
680  
    // Push task sentinel to interleave reactor runs with handler execution
679  
    completed_ops_.push(&task_op_);
681  
    completed_ops_.push(&task_op_);
680  
}
682  
}
681  

683  

682  
inline epoll_scheduler::~epoll_scheduler()
684  
inline epoll_scheduler::~epoll_scheduler()
683  
{
685  
{
684  
    if (timer_fd_ >= 0)
686  
    if (timer_fd_ >= 0)
685  
        ::close(timer_fd_);
687  
        ::close(timer_fd_);
686  
    if (event_fd_ >= 0)
688  
    if (event_fd_ >= 0)
687  
        ::close(event_fd_);
689  
        ::close(event_fd_);
688  
    if (epoll_fd_ >= 0)
690  
    if (epoll_fd_ >= 0)
689  
        ::close(epoll_fd_);
691  
        ::close(epoll_fd_);
690  
}
692  
}
691  

693  

692  
inline void
694  
inline void
693  
epoll_scheduler::shutdown()
695  
epoll_scheduler::shutdown()
694  
{
696  
{
695  
    {
697  
    {
696  
        std::unique_lock lock(mutex_);
698  
        std::unique_lock lock(mutex_);
 
699 +
        shutdown_ = true;
697  

700  

698  
        while (auto* h = completed_ops_.pop())
701  
        while (auto* h = completed_ops_.pop())
699  
        {
702  
        {
700  
            if (h == &task_op_)
703  
            if (h == &task_op_)
701  
                continue;
704  
                continue;
702  
            lock.unlock();
705  
            lock.unlock();
703  
            h->destroy();
706  
            h->destroy();
704  
            lock.lock();
707  
            lock.lock();
705  
        }
708  
        }
706  

709  

707  
        signal_all(lock);
710  
        signal_all(lock);
708  
    }
711  
    }
709  

712  

 
713 +
    outstanding_work_.store(0, std::memory_order_release);
 
714 +

710  
    if (event_fd_ >= 0)
715  
    if (event_fd_ >= 0)
711  
        interrupt_reactor();
716  
        interrupt_reactor();
712  
}
717  
}
713  

718  

714  
inline void
719  
inline void
715  
epoll_scheduler::post(std::coroutine_handle<> h) const
720  
epoll_scheduler::post(std::coroutine_handle<> h) const
716  
{
721  
{
717  
    struct post_handler final : scheduler_op
722  
    struct post_handler final : scheduler_op
718  
    {
723  
    {
719  
        std::coroutine_handle<> h_;
724  
        std::coroutine_handle<> h_;
720  

725  

721  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
726  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
722  

727  

723  
        ~post_handler() override = default;
728  
        ~post_handler() override = default;
724  

729  

725  
        void operator()() override
730  
        void operator()() override
726  
        {
731  
        {
727  
            auto h = h_;
732  
            auto h = h_;
728  
            delete this;
733  
            delete this;
729  
            h.resume();
734  
            h.resume();
730  
        }
735  
        }
731  

736  

732  
        void destroy() override
737  
        void destroy() override
733 -
            auto h = h_;
 
734  
        {
738  
        {
735 -
            h.destroy();
 
736  
            delete this;
739  
            delete this;
737  
        }
740  
        }
738  
    };
741  
    };
739  

742  

740  
    auto ph = std::make_unique<post_handler>(h);
743  
    auto ph = std::make_unique<post_handler>(h);
741  

744  

742  
    // Fast path: same thread posts to private queue
745  
    // Fast path: same thread posts to private queue
743  
    // Only count locally; work_cleanup batches to global counter
746  
    // Only count locally; work_cleanup batches to global counter
744  
    if (auto* ctx = epoll::find_context(this))
747  
    if (auto* ctx = epoll::find_context(this))
745  
    {
748  
    {
746  
        ++ctx->private_outstanding_work;
749  
        ++ctx->private_outstanding_work;
747  
        ctx->private_queue.push(ph.release());
750  
        ctx->private_queue.push(ph.release());
748  
        return;
751  
        return;
749  
    }
752  
    }
750  

753  

751  
    // Slow path: cross-thread post requires mutex
754  
    // Slow path: cross-thread post requires mutex
752  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
755  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
753  

756  

754  
    std::unique_lock lock(mutex_);
757  
    std::unique_lock lock(mutex_);
755  
    completed_ops_.push(ph.release());
758  
    completed_ops_.push(ph.release());
756  
    wake_one_thread_and_unlock(lock);
759  
    wake_one_thread_and_unlock(lock);
757  
}
760  
}
758  

761  

759  
inline void
762  
inline void
760  
epoll_scheduler::post(scheduler_op* h) const
763  
epoll_scheduler::post(scheduler_op* h) const
761  
{
764  
{
762  
    // Fast path: same thread posts to private queue
765  
    // Fast path: same thread posts to private queue
763  
    // Only count locally; work_cleanup batches to global counter
766  
    // Only count locally; work_cleanup batches to global counter
764  
    if (auto* ctx = epoll::find_context(this))
767  
    if (auto* ctx = epoll::find_context(this))
765  
    {
768  
    {
766  
        ++ctx->private_outstanding_work;
769  
        ++ctx->private_outstanding_work;
767  
        ctx->private_queue.push(h);
770  
        ctx->private_queue.push(h);
768  
        return;
771  
        return;
769  
    }
772  
    }
770  

773  

771  
    // Slow path: cross-thread post requires mutex
774  
    // Slow path: cross-thread post requires mutex
772  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
775  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
773  

776  

774  
    std::unique_lock lock(mutex_);
777  
    std::unique_lock lock(mutex_);
775  
    completed_ops_.push(h);
778  
    completed_ops_.push(h);
776  
    wake_one_thread_and_unlock(lock);
779  
    wake_one_thread_and_unlock(lock);
777  
}
780  
}
778  

781  

779  
inline bool
782  
inline bool
780  
epoll_scheduler::running_in_this_thread() const noexcept
783  
epoll_scheduler::running_in_this_thread() const noexcept
781  
{
784  
{
782  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
785  
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
783  
        if (c->key == this)
786  
        if (c->key == this)
784  
            return true;
787  
            return true;
785  
    return false;
788  
    return false;
786  
}
789  
}
787  

790  

788  
inline void
791  
inline void
789  
epoll_scheduler::stop()
792  
epoll_scheduler::stop()
790  
{
793  
{
791  
    std::unique_lock lock(mutex_);
794  
    std::unique_lock lock(mutex_);
792  
    if (!stopped_)
795  
    if (!stopped_)
793  
    {
796  
    {
794  
        stopped_ = true;
797  
        stopped_ = true;
795  
        signal_all(lock);
798  
        signal_all(lock);
796  
        interrupt_reactor();
799  
        interrupt_reactor();
797  
    }
800  
    }
798  
}
801  
}
799  

802  

800  
inline bool
803  
inline bool
801  
epoll_scheduler::stopped() const noexcept
804  
epoll_scheduler::stopped() const noexcept
802  
{
805  
{
803  
    std::unique_lock lock(mutex_);
806  
    std::unique_lock lock(mutex_);
804  
    return stopped_;
807  
    return stopped_;
805  
}
808  
}
806  

809  

807  
inline void
810  
inline void
808  
epoll_scheduler::restart()
811  
epoll_scheduler::restart()
809  
{
812  
{
810  
    std::unique_lock lock(mutex_);
813  
    std::unique_lock lock(mutex_);
811  
    stopped_ = false;
814  
    stopped_ = false;
812  
}
815  
}
813  

816  

814  
inline std::size_t
817  
inline std::size_t
815  
epoll_scheduler::run()
818  
epoll_scheduler::run()
816  
{
819  
{
817  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
820  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
818  
    {
821  
    {
819  
        stop();
822  
        stop();
820  
        return 0;
823  
        return 0;
821  
    }
824  
    }
822  

825  

823  
    epoll::thread_context_guard ctx(this);
826  
    epoll::thread_context_guard ctx(this);
824  
    std::unique_lock lock(mutex_);
827  
    std::unique_lock lock(mutex_);
825  

828  

826  
    std::size_t n = 0;
829  
    std::size_t n = 0;
827  
    for (;;)
830  
    for (;;)
828  
    {
831  
    {
829  
        if (!do_one(lock, -1, &ctx.frame_))
832  
        if (!do_one(lock, -1, &ctx.frame_))
830  
            break;
833  
            break;
831  
        if (n != (std::numeric_limits<std::size_t>::max)())
834  
        if (n != (std::numeric_limits<std::size_t>::max)())
832  
            ++n;
835  
            ++n;
833  
        if (!lock.owns_lock())
836  
        if (!lock.owns_lock())
834  
            lock.lock();
837  
            lock.lock();
835  
    }
838  
    }
836  
    return n;
839  
    return n;
837  
}
840  
}
838  

841  

839  
inline std::size_t
842  
inline std::size_t
840  
epoll_scheduler::run_one()
843  
epoll_scheduler::run_one()
841  
{
844  
{
842  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
845  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
843  
    {
846  
    {
844  
        stop();
847  
        stop();
845  
        return 0;
848  
        return 0;
846  
    }
849  
    }
847  

850  

848  
    epoll::thread_context_guard ctx(this);
851  
    epoll::thread_context_guard ctx(this);
849  
    std::unique_lock lock(mutex_);
852  
    std::unique_lock lock(mutex_);
850  
    return do_one(lock, -1, &ctx.frame_);
853  
    return do_one(lock, -1, &ctx.frame_);
851  
}
854  
}
852  

855  

853  
inline std::size_t
856  
inline std::size_t
854  
epoll_scheduler::wait_one(long usec)
857  
epoll_scheduler::wait_one(long usec)
855  
{
858  
{
856  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
859  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
857  
    {
860  
    {
858  
        stop();
861  
        stop();
859  
        return 0;
862  
        return 0;
860  
    }
863  
    }
861  

864  

862  
    epoll::thread_context_guard ctx(this);
865  
    epoll::thread_context_guard ctx(this);
863  
    std::unique_lock lock(mutex_);
866  
    std::unique_lock lock(mutex_);
864  
    return do_one(lock, usec, &ctx.frame_);
867  
    return do_one(lock, usec, &ctx.frame_);
865  
}
868  
}
866  

869  

867  
inline std::size_t
870  
inline std::size_t
868  
epoll_scheduler::poll()
871  
epoll_scheduler::poll()
869  
{
872  
{
870  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
873  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
871  
    {
874  
    {
872  
        stop();
875  
        stop();
873  
        return 0;
876  
        return 0;
874  
    }
877  
    }
875  

878  

876  
    epoll::thread_context_guard ctx(this);
879  
    epoll::thread_context_guard ctx(this);
877  
    std::unique_lock lock(mutex_);
880  
    std::unique_lock lock(mutex_);
878  

881  

879  
    std::size_t n = 0;
882  
    std::size_t n = 0;
880  
    for (;;)
883  
    for (;;)
881  
    {
884  
    {
882  
        if (!do_one(lock, 0, &ctx.frame_))
885  
        if (!do_one(lock, 0, &ctx.frame_))
883  
            break;
886  
            break;
884  
        if (n != (std::numeric_limits<std::size_t>::max)())
887  
        if (n != (std::numeric_limits<std::size_t>::max)())
885  
            ++n;
888  
            ++n;
886  
        if (!lock.owns_lock())
889  
        if (!lock.owns_lock())
887  
            lock.lock();
890  
            lock.lock();
888  
    }
891  
    }
889  
    return n;
892  
    return n;
890  
}
893  
}
891  

894  

892  
inline std::size_t
895  
inline std::size_t
893  
epoll_scheduler::poll_one()
896  
epoll_scheduler::poll_one()
894  
{
897  
{
895  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
898  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
896  
    {
899  
    {
897  
        stop();
900  
        stop();
898  
        return 0;
901  
        return 0;
899  
    }
902  
    }
900  

903  

901  
    epoll::thread_context_guard ctx(this);
904  
    epoll::thread_context_guard ctx(this);
902  
    std::unique_lock lock(mutex_);
905  
    std::unique_lock lock(mutex_);
903  
    return do_one(lock, 0, &ctx.frame_);
906  
    return do_one(lock, 0, &ctx.frame_);
904  
}
907  
}
905  

908  

906  
inline void
909  
inline void
907  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
910  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
908  
{
911  
{
909  
    epoll_event ev{};
912  
    epoll_event ev{};
910  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
913  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
911  
    ev.data.ptr = desc;
914  
    ev.data.ptr = desc;
912  

915  

913  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
916  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
914  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
917  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
915  

918  

916  
    desc->registered_events = ev.events;
919  
    desc->registered_events = ev.events;
917  
    desc->fd                = fd;
920  
    desc->fd                = fd;
918  
    desc->scheduler_        = this;
921  
    desc->scheduler_        = this;
919  

922  

920  
    std::lock_guard lock(desc->mutex);
923  
    std::lock_guard lock(desc->mutex);
921  
    desc->read_ready  = false;
924  
    desc->read_ready  = false;
922  
    desc->write_ready = false;
925  
    desc->write_ready = false;
923  
}
926  
}
924  

927  

925  
inline void
928  
inline void
926  
epoll_scheduler::deregister_descriptor(int fd) const
929  
epoll_scheduler::deregister_descriptor(int fd) const
927  
{
930  
{
928  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
931  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
929  
}
932  
}
930  

933  

931  
inline void
934  
inline void
932  
epoll_scheduler::work_started() noexcept
935  
epoll_scheduler::work_started() noexcept
933  
{
936  
{
934  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
937  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
935  
}
938  
}
936  

939  

937  
inline void
940  
inline void
938  
epoll_scheduler::work_finished() noexcept
941  
epoll_scheduler::work_finished() noexcept
939  
{
942  
{
940  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
943  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
941  
        stop();
944  
        stop();
942  
}
945  
}
943  

946  

944  
inline void
947  
inline void
945  
epoll_scheduler::compensating_work_started() const noexcept
948  
epoll_scheduler::compensating_work_started() const noexcept
946  
{
949  
{
947  
    auto* ctx = epoll::find_context(this);
950  
    auto* ctx = epoll::find_context(this);
948  
    if (ctx)
951  
    if (ctx)
949  
        ++ctx->private_outstanding_work;
952  
        ++ctx->private_outstanding_work;
950  
}
953  
}
951  

954  

952  
inline void
955  
inline void
953  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
956  
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
954  
{
957  
{
955  
    // Note: outstanding_work_ was already incremented when posting
958  
    // Note: outstanding_work_ was already incremented when posting
956  
    std::unique_lock lock(mutex_);
959  
    std::unique_lock lock(mutex_);
957  
    completed_ops_.splice(queue);
960  
    completed_ops_.splice(queue);
958  
    if (count > 0)
961  
    if (count > 0)
959  
        maybe_unlock_and_signal_one(lock);
962  
        maybe_unlock_and_signal_one(lock);
960  
}
963  
}
961  

964  

962  
inline void
965  
inline void
963  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
966  
epoll_scheduler::post_deferred_completions(op_queue& ops) const
964  
{
967  
{
965  
    if (ops.empty())
968  
    if (ops.empty())
966  
        return;
969  
        return;
967  

970  

968  
    // Fast path: if on scheduler thread, use private queue
971  
    // Fast path: if on scheduler thread, use private queue
969  
    if (auto* ctx = epoll::find_context(this))
972  
    if (auto* ctx = epoll::find_context(this))
970  
    {
973  
    {
971  
        ctx->private_queue.splice(ops);
974  
        ctx->private_queue.splice(ops);
972  
        return;
975  
        return;
973  
    }
976  
    }
974  

977  

975  
    // Slow path: add to global queue and wake a thread
978  
    // Slow path: add to global queue and wake a thread
976  
    std::unique_lock lock(mutex_);
979  
    std::unique_lock lock(mutex_);
977  
    completed_ops_.splice(ops);
980  
    completed_ops_.splice(ops);
978  
    wake_one_thread_and_unlock(lock);
981  
    wake_one_thread_and_unlock(lock);
979  
}
982  
}
980  

983  

981  
inline void
984  
inline void
982  
epoll_scheduler::interrupt_reactor() const
985  
epoll_scheduler::interrupt_reactor() const
983  
{
986  
{
984  
    // Only write if not already armed to avoid redundant writes
987  
    // Only write if not already armed to avoid redundant writes
985  
    bool expected = false;
988  
    bool expected = false;
986  
    if (eventfd_armed_.compare_exchange_strong(
989  
    if (eventfd_armed_.compare_exchange_strong(
987  
            expected, true, std::memory_order_release,
990  
            expected, true, std::memory_order_release,
988  
            std::memory_order_relaxed))
991  
            std::memory_order_relaxed))
989  
    {
992  
    {
990  
        std::uint64_t val       = 1;
993  
        std::uint64_t val       = 1;
991  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
994  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
992  
    }
995  
    }
993  
}
996  
}
994  

997  

995  
inline void
998  
inline void
996  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
999  
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
997  
{
1000  
{
998  
    state_ |= 1;
1001  
    state_ |= 1;
999  
    cond_.notify_all();
1002  
    cond_.notify_all();
1000  
}
1003  
}
1001  

1004  

1002  
inline bool
1005  
inline bool
1003  
epoll_scheduler::maybe_unlock_and_signal_one(
1006  
epoll_scheduler::maybe_unlock_and_signal_one(
1004  
    std::unique_lock<std::mutex>& lock) const
1007  
    std::unique_lock<std::mutex>& lock) const
1005  
{
1008  
{
1006  
    state_ |= 1;
1009  
    state_ |= 1;
1007  
    if (state_ > 1)
1010  
    if (state_ > 1)
1008  
    {
1011  
    {
1009  
        lock.unlock();
1012  
        lock.unlock();
1010  
        cond_.notify_one();
1013  
        cond_.notify_one();
1011  
        return true;
1014  
        return true;
1012  
    }
1015  
    }
1013  
    return false;
1016  
    return false;
1014  
}
1017  
}
1015  

1018  

1016  
inline bool
1019  
inline bool
1017  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1020  
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1018  
{
1021  
{
1019  
    state_ |= 1;
1022  
    state_ |= 1;
1020  
    bool have_waiters = state_ > 1;
1023  
    bool have_waiters = state_ > 1;
1021  
    lock.unlock();
1024  
    lock.unlock();
1022  
    if (have_waiters)
1025  
    if (have_waiters)
1023  
        cond_.notify_one();
1026  
        cond_.notify_one();
1024  
    return have_waiters;
1027  
    return have_waiters;
1025  
}
1028  
}
1026  

1029  

1027  
inline void
1030  
inline void
1028  
epoll_scheduler::clear_signal() const
1031  
epoll_scheduler::clear_signal() const
1029  
{
1032  
{
1030  
    state_ &= ~std::size_t(1);
1033  
    state_ &= ~std::size_t(1);
1031  
}
1034  
}
1032  

1035  

1033  
inline void
1036  
inline void
1034  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1037  
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1035  
{
1038  
{
1036  
    while ((state_ & 1) == 0)
1039  
    while ((state_ & 1) == 0)
1037  
    {
1040  
    {
1038  
        state_ += 2;
1041  
        state_ += 2;
1039  
        cond_.wait(lock);
1042  
        cond_.wait(lock);
1040  
        state_ -= 2;
1043  
        state_ -= 2;
1041  
    }
1044  
    }
1042  
}
1045  
}
1043  

1046  

1044  
inline void
1047  
inline void
1045  
epoll_scheduler::wait_for_signal_for(
1048  
epoll_scheduler::wait_for_signal_for(
1046  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1049  
    std::unique_lock<std::mutex>& lock, long timeout_us) const
1047  
{
1050  
{
1048  
    if ((state_ & 1) == 0)
1051  
    if ((state_ & 1) == 0)
1049  
    {
1052  
    {
1050  
        state_ += 2;
1053  
        state_ += 2;
1051  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1054  
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1052  
        state_ -= 2;
1055  
        state_ -= 2;
1053  
    }
1056  
    }
1054  
}
1057  
}
1055  

1058  

1056  
inline void
1059  
inline void
1057  
epoll_scheduler::wake_one_thread_and_unlock(
1060  
epoll_scheduler::wake_one_thread_and_unlock(
1058  
    std::unique_lock<std::mutex>& lock) const
1061  
    std::unique_lock<std::mutex>& lock) const
1059  
{
1062  
{
1060  
    if (maybe_unlock_and_signal_one(lock))
1063  
    if (maybe_unlock_and_signal_one(lock))
1061  
        return;
1064  
        return;
1062  

1065  

1063  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1066  
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1064  
    {
1067  
    {
1065  
        task_interrupted_ = true;
1068  
        task_interrupted_ = true;
1066  
        lock.unlock();
1069  
        lock.unlock();
1067  
        interrupt_reactor();
1070  
        interrupt_reactor();
1068  
    }
1071  
    }
1069  
    else
1072  
    else
1070  
    {
1073  
    {
1071  
        lock.unlock();
1074  
        lock.unlock();
1072  
    }
1075  
    }
1073  
}
1076  
}
1074  

1077  

1075  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1078  
inline epoll_scheduler::work_cleanup::~work_cleanup()
1076  
{
1079  
{
1077  
    if (ctx)
1080  
    if (ctx)
1078  
    {
1081  
    {
1079  
        long produced = ctx->private_outstanding_work;
1082  
        long produced = ctx->private_outstanding_work;
1080  
        if (produced > 1)
1083  
        if (produced > 1)
1081  
            scheduler->outstanding_work_.fetch_add(
1084  
            scheduler->outstanding_work_.fetch_add(
1082  
                produced - 1, std::memory_order_relaxed);
1085  
                produced - 1, std::memory_order_relaxed);
1083  
        else if (produced < 1)
1086  
        else if (produced < 1)
1084  
            scheduler->work_finished();
1087  
            scheduler->work_finished();
1085  
        ctx->private_outstanding_work = 0;
1088  
        ctx->private_outstanding_work = 0;
1086  

1089  

1087  
        if (!ctx->private_queue.empty())
1090  
        if (!ctx->private_queue.empty())
1088  
        {
1091  
        {
1089  
            lock->lock();
1092  
            lock->lock();
1090  
            scheduler->completed_ops_.splice(ctx->private_queue);
1093  
            scheduler->completed_ops_.splice(ctx->private_queue);
1091  
        }
1094  
        }
1092  
    }
1095  
    }
1093  
    else
1096  
    else
1094  
    {
1097  
    {
1095  
        scheduler->work_finished();
1098  
        scheduler->work_finished();
1096  
    }
1099  
    }
1097  
}
1100  
}
1098  

1101  

1099  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1102  
inline epoll_scheduler::task_cleanup::~task_cleanup()
1100  
{
1103  
{
1101  
    if (!ctx)
1104  
    if (!ctx)
1102  
        return;
1105  
        return;
1103  

1106  

1104  
    if (ctx->private_outstanding_work > 0)
1107  
    if (ctx->private_outstanding_work > 0)
1105  
    {
1108  
    {
1106  
        scheduler->outstanding_work_.fetch_add(
1109  
        scheduler->outstanding_work_.fetch_add(
1107  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1110  
            ctx->private_outstanding_work, std::memory_order_relaxed);
1108  
        ctx->private_outstanding_work = 0;
1111  
        ctx->private_outstanding_work = 0;
1109  
    }
1112  
    }
1110  

1113  

1111  
    if (!ctx->private_queue.empty())
1114  
    if (!ctx->private_queue.empty())
1112  
    {
1115  
    {
1113  
        if (!lock->owns_lock())
1116  
        if (!lock->owns_lock())
1114  
            lock->lock();
1117  
            lock->lock();
1115  
        scheduler->completed_ops_.splice(ctx->private_queue);
1118  
        scheduler->completed_ops_.splice(ctx->private_queue);
1116  
    }
1119  
    }
1117  
}
1120  
}
1118  

1121  

1119  
inline void
1122  
inline void
1120  
epoll_scheduler::update_timerfd() const
1123  
epoll_scheduler::update_timerfd() const
1121  
{
1124  
{
1122  
    auto nearest = timer_svc_->nearest_expiry();
1125  
    auto nearest = timer_svc_->nearest_expiry();
1123  

1126  

1124  
    itimerspec ts{};
1127  
    itimerspec ts{};
1125  
    int flags = 0;
1128  
    int flags = 0;
1126  

1129  

1127  
    if (nearest == timer_service::time_point::max())
1130  
    if (nearest == timer_service::time_point::max())
1128  
    {
1131  
    {
1129  
        // No timers - disarm by setting to 0 (relative)
1132  
        // No timers - disarm by setting to 0 (relative)
1130  
    }
1133  
    }
1131  
    else
1134  
    else
1132  
    {
1135  
    {
1133  
        auto now = std::chrono::steady_clock::now();
1136  
        auto now = std::chrono::steady_clock::now();
1134  
        if (nearest <= now)
1137  
        if (nearest <= now)
1135  
        {
1138  
        {
1136  
            // Use 1ns instead of 0 - zero disarms the timerfd
1139  
            // Use 1ns instead of 0 - zero disarms the timerfd
1137  
            ts.it_value.tv_nsec = 1;
1140  
            ts.it_value.tv_nsec = 1;
1138  
        }
1141  
        }
1139  
        else
1142  
        else
1140  
        {
1143  
        {
1141  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1144  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1142  
                            nearest - now)
1145  
                            nearest - now)
1143  
                            .count();
1146  
                            .count();
1144  
            ts.it_value.tv_sec  = nsec / 1000000000;
1147  
            ts.it_value.tv_sec  = nsec / 1000000000;
1145  
            ts.it_value.tv_nsec = nsec % 1000000000;
1148  
            ts.it_value.tv_nsec = nsec % 1000000000;
1146  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1149  
            // Ensure non-zero to avoid disarming if duration rounds to 0
1147  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1150  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1148  
                ts.it_value.tv_nsec = 1;
1151  
                ts.it_value.tv_nsec = 1;
1149  
        }
1152  
        }
1150  
    }
1153  
    }
1151  

1154  

1152  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1155  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1153  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1156  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1154  
}
1157  
}
1155  

1158  

1156  
inline void
1159  
inline void
1157  
epoll_scheduler::run_task(
1160  
epoll_scheduler::run_task(
1158  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1161  
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1159  
{
1162  
{
1160  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1163  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1161  

1164  

1162  
    if (lock.owns_lock())
1165  
    if (lock.owns_lock())
1163  
        lock.unlock();
1166  
        lock.unlock();
1164  

1167  

1165  
    task_cleanup on_exit{this, &lock, ctx};
1168  
    task_cleanup on_exit{this, &lock, ctx};
1166  

1169  

1167  
    // Flush deferred timerfd programming before blocking
1170  
    // Flush deferred timerfd programming before blocking
1168  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1171  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1169  
        update_timerfd();
1172  
        update_timerfd();
1170  

1173  

1171  
    // Event loop runs without mutex held
1174  
    // Event loop runs without mutex held
1172  
    epoll_event events[128];
1175  
    epoll_event events[128];
1173  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1176  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1174  

1177  

1175  
    if (nfds < 0 && errno != EINTR)
1178  
    if (nfds < 0 && errno != EINTR)
1176  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1179  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1177  

1180  

1178  
    bool check_timers = false;
1181  
    bool check_timers = false;
1179  
    op_queue local_ops;
1182  
    op_queue local_ops;
1180  

1183  

1181  
    // Process events without holding the mutex
1184  
    // Process events without holding the mutex
1182  
    for (int i = 0; i < nfds; ++i)
1185  
    for (int i = 0; i < nfds; ++i)
1183  
    {
1186  
    {
1184  
        if (events[i].data.ptr == nullptr)
1187  
        if (events[i].data.ptr == nullptr)
1185  
        {
1188  
        {
1186  
            std::uint64_t val;
1189  
            std::uint64_t val;
1187  
            // Mutex released above; analyzer can't track unlock via ref
1190  
            // Mutex released above; analyzer can't track unlock via ref
1188  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1191  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1189  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1192  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1190  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1193  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1191  
            continue;
1194  
            continue;
1192  
        }
1195  
        }
1193  

1196  

1194  
        if (events[i].data.ptr == &timer_fd_)
1197  
        if (events[i].data.ptr == &timer_fd_)
1195  
        {
1198  
        {
1196  
            std::uint64_t expirations;
1199  
            std::uint64_t expirations;
1197  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1200  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1198  
            [[maybe_unused]] auto r =
1201  
            [[maybe_unused]] auto r =
1199  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1202  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1200  
            check_timers = true;
1203  
            check_timers = true;
1201  
            continue;
1204  
            continue;
1202  
        }
1205  
        }
1203  

1206  

1204  
        // Deferred I/O: just set ready events and enqueue descriptor
1207  
        // Deferred I/O: just set ready events and enqueue descriptor
1205  
        // No per-descriptor mutex locking in reactor hot path!
1208  
        // No per-descriptor mutex locking in reactor hot path!
1206  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1209  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1207  
        desc->add_ready_events(events[i].events);
1210  
        desc->add_ready_events(events[i].events);
1208  

1211  

1209  
        // Only enqueue if not already enqueued
1212  
        // Only enqueue if not already enqueued
1210  
        bool expected = false;
1213  
        bool expected = false;
1211  
        if (desc->is_enqueued_.compare_exchange_strong(
1214  
        if (desc->is_enqueued_.compare_exchange_strong(
1212  
                expected, true, std::memory_order_release,
1215  
                expected, true, std::memory_order_release,
1213  
                std::memory_order_relaxed))
1216  
                std::memory_order_relaxed))
1214  
        {
1217  
        {
1215  
            local_ops.push(desc);
1218  
            local_ops.push(desc);
1216  
        }
1219  
        }
1217  
    }
1220  
    }
1218  

1221  

1219  
    // Process timers only when timerfd fires
1222  
    // Process timers only when timerfd fires
1220  
    if (check_timers)
1223  
    if (check_timers)
1221  
    {
1224  
    {
1222  
        timer_svc_->process_expired();
1225  
        timer_svc_->process_expired();
1223  
        update_timerfd();
1226  
        update_timerfd();
1224  
    }
1227  
    }
1225  

1228  

1226  
    lock.lock();
1229  
    lock.lock();
1227  

1230  

1228  
    if (!local_ops.empty())
1231  
    if (!local_ops.empty())
1229  
        completed_ops_.splice(local_ops);
1232  
        completed_ops_.splice(local_ops);
1230  
}
1233  
}
1231  

1234  

1232  
inline std::size_t
1235  
inline std::size_t
1233  
epoll_scheduler::do_one(
1236  
epoll_scheduler::do_one(
1234  
    std::unique_lock<std::mutex>& lock,
1237  
    std::unique_lock<std::mutex>& lock,
1235  
    long timeout_us,
1238  
    long timeout_us,
1236  
    epoll::scheduler_context* ctx)
1239  
    epoll::scheduler_context* ctx)
1237  
{
1240  
{
1238  
    for (;;)
1241  
    for (;;)
1239  
    {
1242  
    {
1240  
        if (stopped_)
1243  
        if (stopped_)
1241  
            return 0;
1244  
            return 0;
1242  

1245  

1243  
        scheduler_op* op = completed_ops_.pop();
1246  
        scheduler_op* op = completed_ops_.pop();
1244  

1247  

1245  
        // Handle reactor sentinel - time to poll for I/O
1248  
        // Handle reactor sentinel - time to poll for I/O
1246  
        if (op == &task_op_)
1249  
        if (op == &task_op_)
1247  
        {
1250  
        {
1248  
            bool more_handlers = !completed_ops_.empty();
1251  
            bool more_handlers = !completed_ops_.empty();
1249  

1252  

1250  
            // Nothing to run the reactor for: no pending work to wait on,
1253  
            // Nothing to run the reactor for: no pending work to wait on,
1251  
            // or caller requested a non-blocking poll
1254  
            // or caller requested a non-blocking poll
1252  
            if (!more_handlers &&
1255  
            if (!more_handlers &&
1253  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1256  
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1254  
                 timeout_us == 0))
1257  
                 timeout_us == 0))
1255  
            {
1258  
            {
1256  
                completed_ops_.push(&task_op_);
1259  
                completed_ops_.push(&task_op_);
1257  
                return 0;
1260  
                return 0;
1258  
            }
1261  
            }
1259  

1262  

1260  
            task_interrupted_ = more_handlers || timeout_us == 0;
1263  
            task_interrupted_ = more_handlers || timeout_us == 0;
1261  
            task_running_.store(true, std::memory_order_release);
1264  
            task_running_.store(true, std::memory_order_release);
1262  

1265  

1263  
            if (more_handlers)
1266  
            if (more_handlers)
1264  
                unlock_and_signal_one(lock);
1267  
                unlock_and_signal_one(lock);
1265  

1268  

1266  
            run_task(lock, ctx);
1269  
            run_task(lock, ctx);
1267  

1270  

1268  
            task_running_.store(false, std::memory_order_relaxed);
1271  
            task_running_.store(false, std::memory_order_relaxed);
1269  
            completed_ops_.push(&task_op_);
1272  
            completed_ops_.push(&task_op_);
1270  
            continue;
1273  
            continue;
1271  
        }
1274  
        }
1272  

1275  

1273  
        // Handle operation
1276  
        // Handle operation
1274  
        if (op != nullptr)
1277  
        if (op != nullptr)
1275  
        {
1278  
        {
1276  
            bool more = !completed_ops_.empty();
1279  
            bool more = !completed_ops_.empty();
1277  

1280  

1278  
            if (more)
1281  
            if (more)
1279  
                ctx->unassisted = !unlock_and_signal_one(lock);
1282  
                ctx->unassisted = !unlock_and_signal_one(lock);
1280  
            else
1283  
            else
1281  
            {
1284  
            {
1282  
                ctx->unassisted = false;
1285  
                ctx->unassisted = false;
1283  
                lock.unlock();
1286  
                lock.unlock();
1284  
            }
1287  
            }
1285  

1288  

1286  
            work_cleanup on_exit{this, &lock, ctx};
1289  
            work_cleanup on_exit{this, &lock, ctx};
1287  

1290  

1288  
            (*op)();
1291  
            (*op)();
1289  
            return 1;
1292  
            return 1;
1290  
        }
1293  
        }
1291  

1294  

1292  
        // No pending work to wait on, or caller requested non-blocking poll
1295  
        // No pending work to wait on, or caller requested non-blocking poll
1293  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1296  
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1294  
            timeout_us == 0)
1297  
            timeout_us == 0)
1295  
            return 0;
1298  
            return 0;
1296  

1299  

1297  
        clear_signal();
1300  
        clear_signal();
1298  
        if (timeout_us < 0)
1301  
        if (timeout_us < 0)
1299  
            wait_for_signal(lock);
1302  
            wait_for_signal(lock);
1300  
        else
1303  
        else
1301  
            wait_for_signal_for(lock, timeout_us);
1304  
            wait_for_signal_for(lock, timeout_us);
1302  
    }
1305  
    }
1303  
}
1306  
}
1304  

1307  

1305  
} // namespace boost::corosio::detail
1308  
} // namespace boost::corosio::detail
1306  

1309  

1307  
#endif // BOOST_COROSIO_HAS_EPOLL
1310  
#endif // BOOST_COROSIO_HAS_EPOLL
1308  

1311  

1309  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1312  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP