1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/native_scheduler.hpp>
20  
#include <boost/corosio/native/native_scheduler.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
21  
#include <boost/corosio/detail/scheduler_op.hpp>
22  

22  

23  
#include <boost/corosio/native/detail/select/select_op.hpp>
23  
#include <boost/corosio/native/detail/select/select_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

28  

29  
#include <boost/corosio/detail/except.hpp>
29  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
30  
#include <boost/corosio/detail/thread_local_ptr.hpp>
31  

31  

32  
#include <sys/select.h>
32  
#include <sys/select.h>
33  
#include <sys/socket.h>
33  
#include <sys/socket.h>
34  
#include <unistd.h>
34  
#include <unistd.h>
35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <fcntl.h>
36  
#include <fcntl.h>
37  

37  

38  
#include <algorithm>
38  
#include <algorithm>
39  
#include <atomic>
39  
#include <atomic>
40  
#include <chrono>
40  
#include <chrono>
41  
#include <condition_variable>
41  
#include <condition_variable>
42  
#include <cstddef>
42  
#include <cstddef>
43  
#include <limits>
43  
#include <limits>
44  
#include <mutex>
44  
#include <mutex>
45  
#include <unordered_map>
45  
#include <unordered_map>
46  

46  

47  
namespace boost::corosio::detail {
47  
namespace boost::corosio::detail {
48  

48  

49  
struct select_op;
49  
struct select_op;
50  

50  

51  
/** POSIX scheduler using select() for I/O multiplexing.
51  
/** POSIX scheduler using select() for I/O multiplexing.
52  

52  

53  
    This scheduler implements the scheduler interface using the POSIX select()
53  
    This scheduler implements the scheduler interface using the POSIX select()
54  
    call for I/O event notification. It uses a single reactor model
54  
    call for I/O event notification. It uses a single reactor model
55  
    where one thread runs select() while other threads wait on a condition
55  
    where one thread runs select() while other threads wait on a condition
56  
    variable for handler work. This design provides:
56  
    variable for handler work. This design provides:
57  

57  

58  
    - Handler parallelism: N posted handlers can execute on N threads
58  
    - Handler parallelism: N posted handlers can execute on N threads
59  
    - No thundering herd: condition_variable wakes exactly one thread
59  
    - No thundering herd: condition_variable wakes exactly one thread
60  
    - Portability: Works on all POSIX systems
60  
    - Portability: Works on all POSIX systems
61  

61  

62  
    The design mirrors epoll_scheduler for behavioral consistency:
62  
    The design mirrors epoll_scheduler for behavioral consistency:
63  
    - Same single-reactor thread coordination model
63  
    - Same single-reactor thread coordination model
64  
    - Same work counting semantics
64  
    - Same work counting semantics
65  
    - Same timer integration pattern
65  
    - Same timer integration pattern
66  

66  

67  
    Known Limitations:
67  
    Known Limitations:
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
69  
    - O(n) scanning: rebuilds fd_sets each iteration
69  
    - O(n) scanning: rebuilds fd_sets each iteration
70  
    - Level-triggered only (no edge-triggered mode)
70  
    - Level-triggered only (no edge-triggered mode)
71  

71  

72  
    @par Thread Safety
72  
    @par Thread Safety
73  
    All public member functions are thread-safe.
73  
    All public member functions are thread-safe.
74  
*/
74  
*/
75  
class BOOST_COROSIO_DECL select_scheduler final
75  
class BOOST_COROSIO_DECL select_scheduler final
76  
    : public native_scheduler
76  
    : public native_scheduler
77  
    , public capy::execution_context::service
77  
    , public capy::execution_context::service
78  
{
78  
{
79  
public:
79  
public:
80  
    using key_type = scheduler;
80  
    using key_type = scheduler;
81  

81  

82  
    /** Construct the scheduler.
82  
    /** Construct the scheduler.
83  

83  

84  
        Creates a self-pipe for reactor interruption.
84  
        Creates a self-pipe for reactor interruption.
85  

85  

86  
        @param ctx Reference to the owning execution_context.
86  
        @param ctx Reference to the owning execution_context.
87  
        @param concurrency_hint Hint for expected thread count (unused).
87  
        @param concurrency_hint Hint for expected thread count (unused).
88  
    */
88  
    */
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90  

90  

91  
    ~select_scheduler() override;
91  
    ~select_scheduler() override;
92  

92  

93  
    select_scheduler(select_scheduler const&)            = delete;
93  
    select_scheduler(select_scheduler const&)            = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
95  

95  

96  
    void shutdown() override;
96  
    void shutdown() override;
97  
    void post(std::coroutine_handle<> h) const override;
97  
    void post(std::coroutine_handle<> h) const override;
98  
    void post(scheduler_op* h) const override;
98  
    void post(scheduler_op* h) const override;
99  
    bool running_in_this_thread() const noexcept override;
99  
    bool running_in_this_thread() const noexcept override;
100  
    void stop() override;
100  
    void stop() override;
101  
    bool stopped() const noexcept override;
101  
    bool stopped() const noexcept override;
102  
    void restart() override;
102  
    void restart() override;
103  
    std::size_t run() override;
103  
    std::size_t run() override;
104  
    std::size_t run_one() override;
104  
    std::size_t run_one() override;
105  
    std::size_t wait_one(long usec) override;
105  
    std::size_t wait_one(long usec) override;
106  
    std::size_t poll() override;
106  
    std::size_t poll() override;
107  
    std::size_t poll_one() override;
107  
    std::size_t poll_one() override;
108  

108  

109  
    /** Return the maximum file descriptor value supported.
109  
    /** Return the maximum file descriptor value supported.
110  

110  

111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
113  
        will fail with EINVAL.
113  
        will fail with EINVAL.
114  

114  

115  
        @return The maximum supported file descriptor value.
115  
        @return The maximum supported file descriptor value.
116  
    */
116  
    */
117  
    static constexpr int max_fd() noexcept
117  
    static constexpr int max_fd() noexcept
118  
    {
118  
    {
119  
        return FD_SETSIZE - 1;
119  
        return FD_SETSIZE - 1;
120  
    }
120  
    }
121  

121  

122  
    /** Register a file descriptor for monitoring.
122  
    /** Register a file descriptor for monitoring.
123  

123  

124  
        @param fd The file descriptor to register.
124  
        @param fd The file descriptor to register.
125  
        @param op The operation associated with this fd.
125  
        @param op The operation associated with this fd.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
126  
        @param events Event mask: 1 = read, 2 = write, 3 = both.
127  
    */
127  
    */
128  
    void register_fd(int fd, select_op* op, int events) const;
128  
    void register_fd(int fd, select_op* op, int events) const;
129  

129  

130  
    /** Unregister a file descriptor from monitoring.
130  
    /** Unregister a file descriptor from monitoring.
131  

131  

132  
        @param fd The file descriptor to unregister.
132  
        @param fd The file descriptor to unregister.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
133  
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134  
    */
134  
    */
135  
    void deregister_fd(int fd, int events) const;
135  
    void deregister_fd(int fd, int events) const;
136  

136  

137  
    void work_started() noexcept override;
137  
    void work_started() noexcept override;
138  
    void work_finished() noexcept override;
138  
    void work_finished() noexcept override;
139  

139  

140  
    // Event flags for register_fd/deregister_fd
140  
    // Event flags for register_fd/deregister_fd
141  
    static constexpr int event_read  = 1;
141  
    static constexpr int event_read  = 1;
142  
    static constexpr int event_write = 2;
142  
    static constexpr int event_write = 2;
143  

143  

144  
private:
144  
private:
145  
    std::size_t do_one(long timeout_us);
145  
    std::size_t do_one(long timeout_us);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
146  
    void run_reactor(std::unique_lock<std::mutex>& lock);
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
147  
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148  
    void interrupt_reactor() const;
148  
    void interrupt_reactor() const;
149  
    long calculate_timeout(long requested_timeout_us) const;
149  
    long calculate_timeout(long requested_timeout_us) const;
150  

150  

151  
    // Self-pipe for interrupting select()
151  
    // Self-pipe for interrupting select()
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
153  

153  

154  
    mutable std::mutex mutex_;
154  
    mutable std::mutex mutex_;
155  
    mutable std::condition_variable wakeup_event_;
155  
    mutable std::condition_variable wakeup_event_;
156  
    mutable op_queue completed_ops_;
156  
    mutable op_queue completed_ops_;
157  
    mutable std::atomic<long> outstanding_work_;
157  
    mutable std::atomic<long> outstanding_work_;
158  
    std::atomic<bool> stopped_;
158  
    std::atomic<bool> stopped_;
 
159 +
    bool shutdown_;
159  

160  

160  
    // Per-fd state for tracking registered operations
161  
    // Per-fd state for tracking registered operations
161  
    struct fd_state
162  
    struct fd_state
162  
    {
163  
    {
163  
        select_op* read_op  = nullptr;
164  
        select_op* read_op  = nullptr;
164  
        select_op* write_op = nullptr;
165  
        select_op* write_op = nullptr;
165  
    };
166  
    };
166  
    mutable std::unordered_map<int, fd_state> registered_fds_;
167  
    mutable std::unordered_map<int, fd_state> registered_fds_;
167  
    mutable int max_fd_ = -1;
168  
    mutable int max_fd_ = -1;
168  

169  

169  
    // Single reactor thread coordination
170  
    // Single reactor thread coordination
170  
    mutable bool reactor_running_     = false;
171  
    mutable bool reactor_running_     = false;
171  
    mutable bool reactor_interrupted_ = false;
172  
    mutable bool reactor_interrupted_ = false;
172  
    mutable int idle_thread_count_    = 0;
173  
    mutable int idle_thread_count_    = 0;
173  

174  

174  
    // Sentinel operation for interleaving reactor runs with handler execution.
175  
    // Sentinel operation for interleaving reactor runs with handler execution.
175  
    // Ensures the reactor runs periodically even when handlers are continuously
176  
    // Ensures the reactor runs periodically even when handlers are continuously
176  
    // posted, preventing timer starvation.
177  
    // posted, preventing timer starvation.
177  
    struct task_op final : scheduler_op
178  
    struct task_op final : scheduler_op
178  
    {
179  
    {
179  
        void operator()() override {}
180  
        void operator()() override {}
180  
        void destroy() override {}
181  
        void destroy() override {}
181  
    };
182  
    };
182  
    task_op task_op_;
183  
    task_op task_op_;
183  
};
184  
};
184  

185  

185  
/*
186  
/*
186  
    select Scheduler - Single Reactor Model
187  
    select Scheduler - Single Reactor Model
187  
    =======================================
188  
    =======================================
188  

189  

189  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
190  
    This scheduler mirrors the epoll_scheduler design but uses select() instead
190  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
191  
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
191  
    one thread becomes the "reactor" while others wait on a condition variable.
192  
    one thread becomes the "reactor" while others wait on a condition variable.
192  

193  

193  
    Thread Model
194  
    Thread Model
194  
    ------------
195  
    ------------
195  
    - ONE thread runs select() at a time (the reactor thread)
196  
    - ONE thread runs select() at a time (the reactor thread)
196  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197  
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
197  
    - When work is posted, exactly one waiting thread wakes via notify_one()
198  
    - When work is posted, exactly one waiting thread wakes via notify_one()
198  

199  

199  
    Key Differences from epoll
200  
    Key Differences from epoll
200  
    --------------------------
201  
    --------------------------
201  
    - Uses self-pipe instead of eventfd for interruption (more portable)
202  
    - Uses self-pipe instead of eventfd for interruption (more portable)
202  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203  
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
203  
    - FD_SETSIZE limit (~1024 fds on most systems)
204  
    - FD_SETSIZE limit (~1024 fds on most systems)
204  
    - Level-triggered only (no edge-triggered mode)
205  
    - Level-triggered only (no edge-triggered mode)
205  

206  

206  
    Self-Pipe Pattern
207  
    Self-Pipe Pattern
207  
    -----------------
208  
    -----------------
208  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
209  
    To interrupt a blocking select() call (e.g., when work is posted or a timer
209  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210  
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
210  
    always in the read_fds set, so select() returns immediately. We drain the
211  
    always in the read_fds set, so select() returns immediately. We drain the
211  
    pipe to clear the readable state.
212  
    pipe to clear the readable state.
212  

213  

213  
    fd-to-op Mapping
214  
    fd-to-op Mapping
214  
    ----------------
215  
    ----------------
215  
    We use an unordered_map<int, fd_state> to track which operations are
216  
    We use an unordered_map<int, fd_state> to track which operations are
216  
    registered for each fd. This allows O(1) lookup when select() returns
217  
    registered for each fd. This allows O(1) lookup when select() returns
217  
    ready fds. Each fd can have at most one read op and one write op registered.
218  
    ready fds. Each fd can have at most one read op and one write op registered.
218  
*/
219  
*/
219  

220  

220  
namespace select {
221  
namespace select {
221  

222  

222  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223  
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
223  
{
224  
{
224  
    select_scheduler const* key;
225  
    select_scheduler const* key;
225  
    scheduler_context* next;
226  
    scheduler_context* next;
226  
};
227  
};
227  

228  

228  
inline thread_local_ptr<scheduler_context> context_stack;
229  
inline thread_local_ptr<scheduler_context> context_stack;
229  

230  

230  
struct thread_context_guard
231  
struct thread_context_guard
231  
{
232  
{
232  
    scheduler_context frame_;
233  
    scheduler_context frame_;
233  

234  

234  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
235  
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
235  
        : frame_{ctx, context_stack.get()}
236  
        : frame_{ctx, context_stack.get()}
236  
    {
237  
    {
237  
        context_stack.set(&frame_);
238  
        context_stack.set(&frame_);
238  
    }
239  
    }
239  

240  

240  
    ~thread_context_guard() noexcept
241  
    ~thread_context_guard() noexcept
241  
    {
242  
    {
242  
        context_stack.set(frame_.next);
243  
        context_stack.set(frame_.next);
243  
    }
244  
    }
244  
};
245  
};
245  

246  

246  
struct work_guard
247  
struct work_guard
247  
{
248  
{
248  
    select_scheduler* self;
249  
    select_scheduler* self;
249  
    ~work_guard()
250  
    ~work_guard()
250  
    {
251  
    {
251  
        self->work_finished();
252  
        self->work_finished();
252  
    }
253  
    }
253  
};
254  
};
254  

255  

255  
} // namespace select
256  
} // namespace select
256  

257  

257  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258  
    : pipe_fds_{-1, -1}
259  
    : pipe_fds_{-1, -1}
259  
    , outstanding_work_(0)
260  
    , outstanding_work_(0)
260  
    , stopped_(false)
261  
    , stopped_(false)
 
262 +
    , shutdown_(false)
261  
    , max_fd_(-1)
263  
    , max_fd_(-1)
262  
    , reactor_running_(false)
264  
    , reactor_running_(false)
263  
    , reactor_interrupted_(false)
265  
    , reactor_interrupted_(false)
264  
    , idle_thread_count_(0)
266  
    , idle_thread_count_(0)
265  
{
267  
{
266  
    // Create self-pipe for interrupting select()
268  
    // Create self-pipe for interrupting select()
267  
    if (::pipe(pipe_fds_) < 0)
269  
    if (::pipe(pipe_fds_) < 0)
268  
        detail::throw_system_error(make_err(errno), "pipe");
270  
        detail::throw_system_error(make_err(errno), "pipe");
269  

271  

270  
    // Set both ends to non-blocking and close-on-exec
272  
    // Set both ends to non-blocking and close-on-exec
271  
    for (int i = 0; i < 2; ++i)
273  
    for (int i = 0; i < 2; ++i)
272  
    {
274  
    {
273  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
275  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
274  
        if (flags == -1)
276  
        if (flags == -1)
275  
        {
277  
        {
276  
            int errn = errno;
278  
            int errn = errno;
277  
            ::close(pipe_fds_[0]);
279  
            ::close(pipe_fds_[0]);
278  
            ::close(pipe_fds_[1]);
280  
            ::close(pipe_fds_[1]);
279  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
281  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
280  
        }
282  
        }
281  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
283  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
282  
        {
284  
        {
283  
            int errn = errno;
285  
            int errn = errno;
284  
            ::close(pipe_fds_[0]);
286  
            ::close(pipe_fds_[0]);
285  
            ::close(pipe_fds_[1]);
287  
            ::close(pipe_fds_[1]);
286  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
288  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
287  
        }
289  
        }
288  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
290  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
289  
        {
291  
        {
290  
            int errn = errno;
292  
            int errn = errno;
291  
            ::close(pipe_fds_[0]);
293  
            ::close(pipe_fds_[0]);
292  
            ::close(pipe_fds_[1]);
294  
            ::close(pipe_fds_[1]);
293  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
295  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
294  
        }
296  
        }
295  
    }
297  
    }
296  

298  

297  
    timer_svc_ = &get_timer_service(ctx, *this);
299  
    timer_svc_ = &get_timer_service(ctx, *this);
298  
    timer_svc_->set_on_earliest_changed(
300  
    timer_svc_->set_on_earliest_changed(
299  
        timer_service::callback(this, [](void* p) {
301  
        timer_service::callback(this, [](void* p) {
300  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
302  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
301  
        }));
303  
        }));
302  

304  

303  
    // Initialize resolver service
305  
    // Initialize resolver service
304  
    get_resolver_service(ctx, *this);
306  
    get_resolver_service(ctx, *this);
305  

307  

306  
    // Initialize signal service
308  
    // Initialize signal service
307  
    get_signal_service(ctx, *this);
309  
    get_signal_service(ctx, *this);
308  

310  

309  
    // Push task sentinel to interleave reactor runs with handler execution
311  
    // Push task sentinel to interleave reactor runs with handler execution
310  
    completed_ops_.push(&task_op_);
312  
    completed_ops_.push(&task_op_);
311  
}
313  
}
312  

314  

313  
inline select_scheduler::~select_scheduler()
315  
inline select_scheduler::~select_scheduler()
314  
{
316  
{
315  
    if (pipe_fds_[0] >= 0)
317  
    if (pipe_fds_[0] >= 0)
316  
        ::close(pipe_fds_[0]);
318  
        ::close(pipe_fds_[0]);
317  
    if (pipe_fds_[1] >= 0)
319  
    if (pipe_fds_[1] >= 0)
318  
        ::close(pipe_fds_[1]);
320  
        ::close(pipe_fds_[1]);
319  
}
321  
}
320  

322  

321  
inline void
323  
inline void
322  
select_scheduler::shutdown()
324  
select_scheduler::shutdown()
323  
{
325  
{
324  
    {
326  
    {
325  
        std::unique_lock lock(mutex_);
327  
        std::unique_lock lock(mutex_);
 
328 +
        shutdown_ = true;
326  

329  

327  
        while (auto* h = completed_ops_.pop())
330  
        while (auto* h = completed_ops_.pop())
328  
        {
331  
        {
329  
            if (h == &task_op_)
332  
            if (h == &task_op_)
330  
                continue;
333  
                continue;
331  
            lock.unlock();
334  
            lock.unlock();
332  
            h->destroy();
335  
            h->destroy();
333  
            lock.lock();
336  
            lock.lock();
334  
        }
337  
        }
335  
    }
338  
    }
336  

339  

 
340 +
    outstanding_work_.store(0, std::memory_order_release);
 
341 +

337  
    if (pipe_fds_[1] >= 0)
342  
    if (pipe_fds_[1] >= 0)
338  
        interrupt_reactor();
343  
        interrupt_reactor();
339  

344  

340  
    wakeup_event_.notify_all();
345  
    wakeup_event_.notify_all();
341  
}
346  
}
342  

347  

343  
inline void
348  
inline void
344  
select_scheduler::post(std::coroutine_handle<> h) const
349  
select_scheduler::post(std::coroutine_handle<> h) const
345  
{
350  
{
346  
    struct post_handler final : scheduler_op
351  
    struct post_handler final : scheduler_op
347  
    {
352  
    {
348  
        std::coroutine_handle<> h_;
353  
        std::coroutine_handle<> h_;
349  

354  

350  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
355  
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
351  

356  

352  
        ~post_handler() override = default;
357  
        ~post_handler() override = default;
353  

358  

354  
        void operator()() override
359  
        void operator()() override
355  
        {
360  
        {
356  
            auto h = h_;
361  
            auto h = h_;
357  
            delete this;
362  
            delete this;
358  
            h.resume();
363  
            h.resume();
359  
        }
364  
        }
360  

365  

361  
        void destroy() override
366  
        void destroy() override
362 -
            auto h = h_;
 
363  
        {
367  
        {
364 -
            h.destroy();
 
365  
            delete this;
368  
            delete this;
366  
        }
369  
        }
367  
    };
370  
    };
368  

371  

369  
    auto ph = std::make_unique<post_handler>(h);
372  
    auto ph = std::make_unique<post_handler>(h);
370  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
373  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
371  

374  

372  
    std::unique_lock lock(mutex_);
375  
    std::unique_lock lock(mutex_);
373  
    completed_ops_.push(ph.release());
376  
    completed_ops_.push(ph.release());
374  
    wake_one_thread_and_unlock(lock);
377  
    wake_one_thread_and_unlock(lock);
375  
}
378  
}
376  

379  

377  
inline void
380  
inline void
378  
select_scheduler::post(scheduler_op* h) const
381  
select_scheduler::post(scheduler_op* h) const
379  
{
382  
{
380  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
383  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
381  

384  

382  
    std::unique_lock lock(mutex_);
385  
    std::unique_lock lock(mutex_);
383  
    completed_ops_.push(h);
386  
    completed_ops_.push(h);
384  
    wake_one_thread_and_unlock(lock);
387  
    wake_one_thread_and_unlock(lock);
385  
}
388  
}
386  

389  

387  
inline bool
390  
inline bool
388  
select_scheduler::running_in_this_thread() const noexcept
391  
select_scheduler::running_in_this_thread() const noexcept
389  
{
392  
{
390  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
393  
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
391  
        if (c->key == this)
394  
        if (c->key == this)
392  
            return true;
395  
            return true;
393  
    return false;
396  
    return false;
394  
}
397  
}
395  

398  

396  
inline void
399  
inline void
397  
select_scheduler::stop()
400  
select_scheduler::stop()
398  
{
401  
{
399  
    bool expected = false;
402  
    bool expected = false;
400  
    if (stopped_.compare_exchange_strong(
403  
    if (stopped_.compare_exchange_strong(
401  
            expected, true, std::memory_order_release,
404  
            expected, true, std::memory_order_release,
402  
            std::memory_order_relaxed))
405  
            std::memory_order_relaxed))
403  
    {
406  
    {
404  
        // Wake all threads so they notice stopped_ and exit
407  
        // Wake all threads so they notice stopped_ and exit
405  
        {
408  
        {
406  
            std::lock_guard lock(mutex_);
409  
            std::lock_guard lock(mutex_);
407  
            wakeup_event_.notify_all();
410  
            wakeup_event_.notify_all();
408  
        }
411  
        }
409  
        interrupt_reactor();
412  
        interrupt_reactor();
410  
    }
413  
    }
411  
}
414  
}
412  

415  

413  
inline bool
416  
inline bool
414  
select_scheduler::stopped() const noexcept
417  
select_scheduler::stopped() const noexcept
415  
{
418  
{
416  
    return stopped_.load(std::memory_order_acquire);
419  
    return stopped_.load(std::memory_order_acquire);
417  
}
420  
}
418  

421  

419  
inline void
422  
inline void
420  
select_scheduler::restart()
423  
select_scheduler::restart()
421  
{
424  
{
422  
    stopped_.store(false, std::memory_order_release);
425  
    stopped_.store(false, std::memory_order_release);
423  
}
426  
}
424  

427  

425  
inline std::size_t
428  
inline std::size_t
426  
select_scheduler::run()
429  
select_scheduler::run()
427  
{
430  
{
428  
    if (stopped_.load(std::memory_order_acquire))
431  
    if (stopped_.load(std::memory_order_acquire))
429  
        return 0;
432  
        return 0;
430  

433  

431  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
434  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
432  
    {
435  
    {
433  
        stop();
436  
        stop();
434  
        return 0;
437  
        return 0;
435  
    }
438  
    }
436  

439  

437  
    select::thread_context_guard ctx(this);
440  
    select::thread_context_guard ctx(this);
438  

441  

439  
    std::size_t n = 0;
442  
    std::size_t n = 0;
440  
    while (do_one(-1))
443  
    while (do_one(-1))
441  
        if (n != (std::numeric_limits<std::size_t>::max)())
444  
        if (n != (std::numeric_limits<std::size_t>::max)())
442  
            ++n;
445  
            ++n;
443  
    return n;
446  
    return n;
444  
}
447  
}
445  

448  

446  
inline std::size_t
449  
inline std::size_t
447  
select_scheduler::run_one()
450  
select_scheduler::run_one()
448  
{
451  
{
449  
    if (stopped_.load(std::memory_order_acquire))
452  
    if (stopped_.load(std::memory_order_acquire))
450  
        return 0;
453  
        return 0;
451  

454  

452  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
455  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
453  
    {
456  
    {
454  
        stop();
457  
        stop();
455  
        return 0;
458  
        return 0;
456  
    }
459  
    }
457  

460  

458  
    select::thread_context_guard ctx(this);
461  
    select::thread_context_guard ctx(this);
459  
    return do_one(-1);
462  
    return do_one(-1);
460  
}
463  
}
461  

464  

462  
inline std::size_t
465  
inline std::size_t
463  
select_scheduler::wait_one(long usec)
466  
select_scheduler::wait_one(long usec)
464  
{
467  
{
465  
    if (stopped_.load(std::memory_order_acquire))
468  
    if (stopped_.load(std::memory_order_acquire))
466  
        return 0;
469  
        return 0;
467  

470  

468  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
471  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
469  
    {
472  
    {
470  
        stop();
473  
        stop();
471  
        return 0;
474  
        return 0;
472  
    }
475  
    }
473  

476  

474  
    select::thread_context_guard ctx(this);
477  
    select::thread_context_guard ctx(this);
475  
    return do_one(usec);
478  
    return do_one(usec);
476  
}
479  
}
477  

480  

478  
inline std::size_t
481  
inline std::size_t
479  
select_scheduler::poll()
482  
select_scheduler::poll()
480  
{
483  
{
481  
    if (stopped_.load(std::memory_order_acquire))
484  
    if (stopped_.load(std::memory_order_acquire))
482  
        return 0;
485  
        return 0;
483  

486  

484  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
487  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
485  
    {
488  
    {
486  
        stop();
489  
        stop();
487  
        return 0;
490  
        return 0;
488  
    }
491  
    }
489  

492  

490  
    select::thread_context_guard ctx(this);
493  
    select::thread_context_guard ctx(this);
491  

494  

492  
    std::size_t n = 0;
495  
    std::size_t n = 0;
493  
    while (do_one(0))
496  
    while (do_one(0))
494  
        if (n != (std::numeric_limits<std::size_t>::max)())
497  
        if (n != (std::numeric_limits<std::size_t>::max)())
495  
            ++n;
498  
            ++n;
496  
    return n;
499  
    return n;
497  
}
500  
}
498  

501  

499  
inline std::size_t
502  
inline std::size_t
500  
select_scheduler::poll_one()
503  
select_scheduler::poll_one()
501  
{
504  
{
502  
    if (stopped_.load(std::memory_order_acquire))
505  
    if (stopped_.load(std::memory_order_acquire))
503  
        return 0;
506  
        return 0;
504  

507  

505  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
508  
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
506  
    {
509  
    {
507  
        stop();
510  
        stop();
508  
        return 0;
511  
        return 0;
509  
    }
512  
    }
510  

513  

511  
    select::thread_context_guard ctx(this);
514  
    select::thread_context_guard ctx(this);
512  
    return do_one(0);
515  
    return do_one(0);
513  
}
516  
}
514  

517  

515  
inline void
518  
inline void
516  
select_scheduler::register_fd(int fd, select_op* op, int events) const
519  
select_scheduler::register_fd(int fd, select_op* op, int events) const
517  
{
520  
{
518  
    // Validate fd is within select() limits
521  
    // Validate fd is within select() limits
519  
    if (fd < 0 || fd >= FD_SETSIZE)
522  
    if (fd < 0 || fd >= FD_SETSIZE)
520  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
523  
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
521  

524  

522  
    {
525  
    {
523  
        std::lock_guard lock(mutex_);
526  
        std::lock_guard lock(mutex_);
524  

527  

525  
        auto& state = registered_fds_[fd];
528  
        auto& state = registered_fds_[fd];
526  
        if (events & event_read)
529  
        if (events & event_read)
527  
            state.read_op = op;
530  
            state.read_op = op;
528  
        if (events & event_write)
531  
        if (events & event_write)
529  
            state.write_op = op;
532  
            state.write_op = op;
530  

533  

531  
        if (fd > max_fd_)
534  
        if (fd > max_fd_)
532  
            max_fd_ = fd;
535  
            max_fd_ = fd;
533  
    }
536  
    }
534  

537  

535  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
538  
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
536  
    // with the newly registered fd.
539  
    // with the newly registered fd.
537  
    interrupt_reactor();
540  
    interrupt_reactor();
538  
}
541  
}
539  

542  

540  
inline void
543  
inline void
541  
select_scheduler::deregister_fd(int fd, int events) const
544  
select_scheduler::deregister_fd(int fd, int events) const
542  
{
545  
{
543  
    std::lock_guard lock(mutex_);
546  
    std::lock_guard lock(mutex_);
544  

547  

545  
    auto it = registered_fds_.find(fd);
548  
    auto it = registered_fds_.find(fd);
546  
    if (it == registered_fds_.end())
549  
    if (it == registered_fds_.end())
547  
        return;
550  
        return;
548  

551  

549  
    if (events & event_read)
552  
    if (events & event_read)
550  
        it->second.read_op = nullptr;
553  
        it->second.read_op = nullptr;
551  
    if (events & event_write)
554  
    if (events & event_write)
552  
        it->second.write_op = nullptr;
555  
        it->second.write_op = nullptr;
553  

556  

554  
    // Remove entry if both are null
557  
    // Remove entry if both are null
555  
    if (!it->second.read_op && !it->second.write_op)
558  
    if (!it->second.read_op && !it->second.write_op)
556  
    {
559  
    {
557  
        registered_fds_.erase(it);
560  
        registered_fds_.erase(it);
558  

561  

559  
        // Recalculate max_fd_ if needed
562  
        // Recalculate max_fd_ if needed
560  
        if (fd == max_fd_)
563  
        if (fd == max_fd_)
561  
        {
564  
        {
562  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
565  
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
563  
            for (auto& [registered_fd, state] : registered_fds_)
566  
            for (auto& [registered_fd, state] : registered_fds_)
564  
            {
567  
            {
565  
                if (registered_fd > max_fd_)
568  
                if (registered_fd > max_fd_)
566  
                    max_fd_ = registered_fd;
569  
                    max_fd_ = registered_fd;
567  
            }
570  
            }
568  
        }
571  
        }
569  
    }
572  
    }
570  
}
573  
}
571  

574  

572  
inline void
575  
inline void
573  
select_scheduler::work_started() noexcept
576  
select_scheduler::work_started() noexcept
574  
{
577  
{
575  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
578  
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
576  
}
579  
}
577  

580  

578  
inline void
581  
inline void
579  
select_scheduler::work_finished() noexcept
582  
select_scheduler::work_finished() noexcept
580  
{
583  
{
581  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
584  
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
582  
        stop();
585  
        stop();
583  
}
586  
}
584  

587  

585  
inline void
588  
inline void
586  
select_scheduler::interrupt_reactor() const
589  
select_scheduler::interrupt_reactor() const
587  
{
590  
{
588  
    char byte               = 1;
591  
    char byte               = 1;
589  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
592  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
590  
}
593  
}
591  

594  

592  
inline void
595  
inline void
593  
select_scheduler::wake_one_thread_and_unlock(
596  
select_scheduler::wake_one_thread_and_unlock(
594  
    std::unique_lock<std::mutex>& lock) const
597  
    std::unique_lock<std::mutex>& lock) const
595  
{
598  
{
596  
    if (idle_thread_count_ > 0)
599  
    if (idle_thread_count_ > 0)
597  
    {
600  
    {
598  
        // Idle worker exists - wake it via condvar
601  
        // Idle worker exists - wake it via condvar
599  
        wakeup_event_.notify_one();
602  
        wakeup_event_.notify_one();
600  
        lock.unlock();
603  
        lock.unlock();
601  
    }
604  
    }
602  
    else if (reactor_running_ && !reactor_interrupted_)
605  
    else if (reactor_running_ && !reactor_interrupted_)
603  
    {
606  
    {
604  
        // No idle workers but reactor is running - interrupt it
607  
        // No idle workers but reactor is running - interrupt it
605  
        reactor_interrupted_ = true;
608  
        reactor_interrupted_ = true;
606  
        lock.unlock();
609  
        lock.unlock();
607  
        interrupt_reactor();
610  
        interrupt_reactor();
608  
    }
611  
    }
609  
    else
612  
    else
610  
    {
613  
    {
611  
        // No one to wake
614  
        // No one to wake
612  
        lock.unlock();
615  
        lock.unlock();
613  
    }
616  
    }
614  
}
617  
}
615  

618  

616  
inline long
619  
inline long
617  
select_scheduler::calculate_timeout(long requested_timeout_us) const
620  
select_scheduler::calculate_timeout(long requested_timeout_us) const
618  
{
621  
{
619  
    if (requested_timeout_us == 0)
622  
    if (requested_timeout_us == 0)
620  
        return 0;
623  
        return 0;
621  

624  

622  
    auto nearest = timer_svc_->nearest_expiry();
625  
    auto nearest = timer_svc_->nearest_expiry();
623  
    if (nearest == timer_service::time_point::max())
626  
    if (nearest == timer_service::time_point::max())
624  
        return requested_timeout_us;
627  
        return requested_timeout_us;
625  

628  

626  
    auto now = std::chrono::steady_clock::now();
629  
    auto now = std::chrono::steady_clock::now();
627  
    if (nearest <= now)
630  
    if (nearest <= now)
628  
        return 0;
631  
        return 0;
629  

632  

630  
    auto timer_timeout_us =
633  
    auto timer_timeout_us =
631  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
634  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
632  
            .count();
635  
            .count();
633  

636  

634  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
637  
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
635  
    constexpr auto long_max =
638  
    constexpr auto long_max =
636  
        static_cast<long long>((std::numeric_limits<long>::max)());
639  
        static_cast<long long>((std::numeric_limits<long>::max)());
637  
    auto capped_timer_us =
640  
    auto capped_timer_us =
638  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
641  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
639  
                              static_cast<long long>(0)),
642  
                              static_cast<long long>(0)),
640  
                   long_max);
643  
                   long_max);
641  

644  

642  
    if (requested_timeout_us < 0)
645  
    if (requested_timeout_us < 0)
643  
        return static_cast<long>(capped_timer_us);
646  
        return static_cast<long>(capped_timer_us);
644  

647  

645  
    // requested_timeout_us is already long, so min() result fits in long
648  
    // requested_timeout_us is already long, so min() result fits in long
646  
    return static_cast<long>(
649  
    return static_cast<long>(
647  
        (std::min)(static_cast<long long>(requested_timeout_us),
650  
        (std::min)(static_cast<long long>(requested_timeout_us),
648  
                   capped_timer_us));
651  
                   capped_timer_us));
649  
}
652  
}
650  

653  

651  
inline void
654  
inline void
652  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
655  
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
653  
{
656  
{
654  
    // Calculate timeout considering timers, use 0 if interrupted
657  
    // Calculate timeout considering timers, use 0 if interrupted
655  
    long effective_timeout_us =
658  
    long effective_timeout_us =
656  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
659  
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
657  

660  

658  
    // Build fd_sets from registered_fds_
661  
    // Build fd_sets from registered_fds_
659  
    fd_set read_fds, write_fds, except_fds;
662  
    fd_set read_fds, write_fds, except_fds;
660  
    FD_ZERO(&read_fds);
663  
    FD_ZERO(&read_fds);
661  
    FD_ZERO(&write_fds);
664  
    FD_ZERO(&write_fds);
662  
    FD_ZERO(&except_fds);
665  
    FD_ZERO(&except_fds);
663  

666  

664  
    // Always include the interrupt pipe
667  
    // Always include the interrupt pipe
665  
    FD_SET(pipe_fds_[0], &read_fds);
668  
    FD_SET(pipe_fds_[0], &read_fds);
666  
    int nfds = pipe_fds_[0];
669  
    int nfds = pipe_fds_[0];
667  

670  

668  
    // Add registered fds
671  
    // Add registered fds
669  
    for (auto& [fd, state] : registered_fds_)
672  
    for (auto& [fd, state] : registered_fds_)
670  
    {
673  
    {
671  
        if (state.read_op)
674  
        if (state.read_op)
672  
            FD_SET(fd, &read_fds);
675  
            FD_SET(fd, &read_fds);
673  
        if (state.write_op)
676  
        if (state.write_op)
674  
        {
677  
        {
675  
            FD_SET(fd, &write_fds);
678  
            FD_SET(fd, &write_fds);
676  
            // Also monitor for errors on connect operations
679  
            // Also monitor for errors on connect operations
677  
            FD_SET(fd, &except_fds);
680  
            FD_SET(fd, &except_fds);
678  
        }
681  
        }
679  
        if (fd > nfds)
682  
        if (fd > nfds)
680  
            nfds = fd;
683  
            nfds = fd;
681  
    }
684  
    }
682  

685  

683  
    // Convert timeout to timeval
686  
    // Convert timeout to timeval
684  
    struct timeval tv;
687  
    struct timeval tv;
685  
    struct timeval* tv_ptr = nullptr;
688  
    struct timeval* tv_ptr = nullptr;
686  
    if (effective_timeout_us >= 0)
689  
    if (effective_timeout_us >= 0)
687  
    {
690  
    {
688  
        tv.tv_sec  = effective_timeout_us / 1000000;
691  
        tv.tv_sec  = effective_timeout_us / 1000000;
689  
        tv.tv_usec = effective_timeout_us % 1000000;
692  
        tv.tv_usec = effective_timeout_us % 1000000;
690  
        tv_ptr     = &tv;
693  
        tv_ptr     = &tv;
691  
    }
694  
    }
692  

695  

693  
    lock.unlock();
696  
    lock.unlock();
694  

697  

695  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
698  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
696  
    int saved_errno = errno;
699  
    int saved_errno = errno;
697  

700  

698  
    // Process timers outside the lock
701  
    // Process timers outside the lock
699  
    timer_svc_->process_expired();
702  
    timer_svc_->process_expired();
700  

703  

701  
    if (ready < 0 && saved_errno != EINTR)
704  
    if (ready < 0 && saved_errno != EINTR)
702  
        detail::throw_system_error(make_err(saved_errno), "select");
705  
        detail::throw_system_error(make_err(saved_errno), "select");
703  

706  

704  
    // Re-acquire lock before modifying completed_ops_
707  
    // Re-acquire lock before modifying completed_ops_
705  
    lock.lock();
708  
    lock.lock();
706  

709  

707  
    // Drain the interrupt pipe if readable
710  
    // Drain the interrupt pipe if readable
708  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
711  
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
709  
    {
712  
    {
710  
        char buf[256];
713  
        char buf[256];
711  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
714  
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
712  
        {
715  
        {
713  
        }
716  
        }
714  
    }
717  
    }
715  

718  

716  
    // Process I/O completions
719  
    // Process I/O completions
717  
    int completions_queued = 0;
720  
    int completions_queued = 0;
718  
    if (ready > 0)
721  
    if (ready > 0)
719  
    {
722  
    {
720  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
723  
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
721  
        std::vector<int> fds_to_check;
724  
        std::vector<int> fds_to_check;
722  
        fds_to_check.reserve(registered_fds_.size());
725  
        fds_to_check.reserve(registered_fds_.size());
723  
        for (auto& [fd, state] : registered_fds_)
726  
        for (auto& [fd, state] : registered_fds_)
724  
            fds_to_check.push_back(fd);
727  
            fds_to_check.push_back(fd);
725  

728  

726  
        for (int fd : fds_to_check)
729  
        for (int fd : fds_to_check)
727  
        {
730  
        {
728  
            auto it = registered_fds_.find(fd);
731  
            auto it = registered_fds_.find(fd);
729  
            if (it == registered_fds_.end())
732  
            if (it == registered_fds_.end())
730  
                continue;
733  
                continue;
731  

734  

732  
            auto& state = it->second;
735  
            auto& state = it->second;
733  

736  

734  
            // Check for errors (especially for connect operations)
737  
            // Check for errors (especially for connect operations)
735  
            bool has_error = FD_ISSET(fd, &except_fds);
738  
            bool has_error = FD_ISSET(fd, &except_fds);
736  

739  

737  
            // Process read readiness
740  
            // Process read readiness
738  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
741  
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
739  
            {
742  
            {
740  
                auto* op = state.read_op;
743  
                auto* op = state.read_op;
741  
                // Claim the op by exchanging to unregistered. Both registering and
744  
                // Claim the op by exchanging to unregistered. Both registering and
742  
                // registered states mean the op is ours to complete.
745  
                // registered states mean the op is ours to complete.
743  
                auto prev = op->registered.exchange(
746  
                auto prev = op->registered.exchange(
744  
                    select_registration_state::unregistered,
747  
                    select_registration_state::unregistered,
745  
                    std::memory_order_acq_rel);
748  
                    std::memory_order_acq_rel);
746  
                if (prev != select_registration_state::unregistered)
749  
                if (prev != select_registration_state::unregistered)
747  
                {
750  
                {
748  
                    state.read_op = nullptr;
751  
                    state.read_op = nullptr;
749  

752  

750  
                    if (has_error)
753  
                    if (has_error)
751  
                    {
754  
                    {
752  
                        int errn      = 0;
755  
                        int errn      = 0;
753  
                        socklen_t len = sizeof(errn);
756  
                        socklen_t len = sizeof(errn);
754  
                        if (::getsockopt(
757  
                        if (::getsockopt(
755  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
758  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
756  
                            errn = errno;
759  
                            errn = errno;
757  
                        if (errn == 0)
760  
                        if (errn == 0)
758  
                            errn = EIO;
761  
                            errn = EIO;
759  
                        op->complete(errn, 0);
762  
                        op->complete(errn, 0);
760  
                    }
763  
                    }
761  
                    else
764  
                    else
762  
                    {
765  
                    {
763  
                        op->perform_io();
766  
                        op->perform_io();
764  
                    }
767  
                    }
765  

768  

766  
                    completed_ops_.push(op);
769  
                    completed_ops_.push(op);
767  
                    ++completions_queued;
770  
                    ++completions_queued;
768  
                }
771  
                }
769  
            }
772  
            }
770  

773  

771  
            // Process write readiness
774  
            // Process write readiness
772  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
775  
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
773  
            {
776  
            {
774  
                auto* op = state.write_op;
777  
                auto* op = state.write_op;
775  
                // Claim the op by exchanging to unregistered. Both registering and
778  
                // Claim the op by exchanging to unregistered. Both registering and
776  
                // registered states mean the op is ours to complete.
779  
                // registered states mean the op is ours to complete.
777  
                auto prev = op->registered.exchange(
780  
                auto prev = op->registered.exchange(
778  
                    select_registration_state::unregistered,
781  
                    select_registration_state::unregistered,
779  
                    std::memory_order_acq_rel);
782  
                    std::memory_order_acq_rel);
780  
                if (prev != select_registration_state::unregistered)
783  
                if (prev != select_registration_state::unregistered)
781  
                {
784  
                {
782  
                    state.write_op = nullptr;
785  
                    state.write_op = nullptr;
783  

786  

784  
                    if (has_error)
787  
                    if (has_error)
785  
                    {
788  
                    {
786  
                        int errn      = 0;
789  
                        int errn      = 0;
787  
                        socklen_t len = sizeof(errn);
790  
                        socklen_t len = sizeof(errn);
788  
                        if (::getsockopt(
791  
                        if (::getsockopt(
789  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
792  
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
790  
                            errn = errno;
793  
                            errn = errno;
791  
                        if (errn == 0)
794  
                        if (errn == 0)
792  
                            errn = EIO;
795  
                            errn = EIO;
793  
                        op->complete(errn, 0);
796  
                        op->complete(errn, 0);
794  
                    }
797  
                    }
795  
                    else
798  
                    else
796  
                    {
799  
                    {
797  
                        op->perform_io();
800  
                        op->perform_io();
798  
                    }
801  
                    }
799  

802  

800  
                    completed_ops_.push(op);
803  
                    completed_ops_.push(op);
801  
                    ++completions_queued;
804  
                    ++completions_queued;
802  
                }
805  
                }
803  
            }
806  
            }
804  

807  

805  
            // Clean up empty entries
808  
            // Clean up empty entries
806  
            if (!state.read_op && !state.write_op)
809  
            if (!state.read_op && !state.write_op)
807  
                registered_fds_.erase(it);
810  
                registered_fds_.erase(it);
808  
        }
811  
        }
809  
    }
812  
    }
810  

813  

811  
    if (completions_queued > 0)
814  
    if (completions_queued > 0)
812  
    {
815  
    {
813  
        if (completions_queued == 1)
816  
        if (completions_queued == 1)
814  
            wakeup_event_.notify_one();
817  
            wakeup_event_.notify_one();
815  
        else
818  
        else
816  
            wakeup_event_.notify_all();
819  
            wakeup_event_.notify_all();
817  
    }
820  
    }
818  
}
821  
}
819  

822  

820  
inline std::size_t
823  
inline std::size_t
821  
select_scheduler::do_one(long timeout_us)
824  
select_scheduler::do_one(long timeout_us)
822  
{
825  
{
823  
    std::unique_lock lock(mutex_);
826  
    std::unique_lock lock(mutex_);
824  

827  

825  
    for (;;)
828  
    for (;;)
826  
    {
829  
    {
827  
        if (stopped_.load(std::memory_order_acquire))
830  
        if (stopped_.load(std::memory_order_acquire))
828  
            return 0;
831  
            return 0;
829  

832  

830  
        scheduler_op* op = completed_ops_.pop();
833  
        scheduler_op* op = completed_ops_.pop();
831  

834  

832  
        if (op == &task_op_)
835  
        if (op == &task_op_)
833  
        {
836  
        {
834  
            bool more_handlers = !completed_ops_.empty();
837  
            bool more_handlers = !completed_ops_.empty();
835  

838  

836  
            if (!more_handlers)
839  
            if (!more_handlers)
837  
            {
840  
            {
838  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
841  
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
839  
                {
842  
                {
840  
                    completed_ops_.push(&task_op_);
843  
                    completed_ops_.push(&task_op_);
841  
                    return 0;
844  
                    return 0;
842  
                }
845  
                }
843  
                if (timeout_us == 0)
846  
                if (timeout_us == 0)
844  
                {
847  
                {
845  
                    completed_ops_.push(&task_op_);
848  
                    completed_ops_.push(&task_op_);
846  
                    return 0;
849  
                    return 0;
847  
                }
850  
                }
848  
            }
851  
            }
849  

852  

850  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
853  
            reactor_interrupted_ = more_handlers || timeout_us == 0;
851  
            reactor_running_     = true;
854  
            reactor_running_     = true;
852  

855  

853  
            if (more_handlers && idle_thread_count_ > 0)
856  
            if (more_handlers && idle_thread_count_ > 0)
854  
                wakeup_event_.notify_one();
857  
                wakeup_event_.notify_one();
855  

858  

856  
            run_reactor(lock);
859  
            run_reactor(lock);
857  

860  

858  
            reactor_running_ = false;
861  
            reactor_running_ = false;
859  
            completed_ops_.push(&task_op_);
862  
            completed_ops_.push(&task_op_);
860  
            continue;
863  
            continue;
861  
        }
864  
        }
862  

865  

863  
        if (op != nullptr)
866  
        if (op != nullptr)
864  
        {
867  
        {
865  
            lock.unlock();
868  
            lock.unlock();
866  
            select::work_guard g{this};
869  
            select::work_guard g{this};
867  
            (*op)();
870  
            (*op)();
868  
            return 1;
871  
            return 1;
869  
        }
872  
        }
870  

873  

871  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
874  
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
872  
            return 0;
875  
            return 0;
873  

876  

874  
        if (timeout_us == 0)
877  
        if (timeout_us == 0)
875  
            return 0;
878  
            return 0;
876  

879  

877  
        ++idle_thread_count_;
880  
        ++idle_thread_count_;
878  
        if (timeout_us < 0)
881  
        if (timeout_us < 0)
879  
            wakeup_event_.wait(lock);
882  
            wakeup_event_.wait(lock);
880  
        else
883  
        else
881  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
884  
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
882  
        --idle_thread_count_;
885  
        --idle_thread_count_;
883  
    }
886  
    }
884  
}
887  
}
885  

888  

886  
} // namespace boost::corosio::detail
889  
} // namespace boost::corosio::detail
887  

890  

888  
#endif // BOOST_COROSIO_HAS_SELECT
891  
#endif // BOOST_COROSIO_HAS_SELECT
889  

892  

890  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
893  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP