1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127  
    std::error_code
127  
    std::error_code
128  
    open_socket(tcp_socket::implementation& impl,
128  
    open_socket(tcp_socket::implementation& impl,
129  
                int family, int type, int protocol) override;
129  
                int family, int type, int protocol) override;
130  

130  

131  
    epoll_scheduler& scheduler() const noexcept
131  
    epoll_scheduler& scheduler() const noexcept
132  
    {
132  
    {
133  
        return state_->sched_;
133  
        return state_->sched_;
134  
    }
134  
    }
135  
    void post(epoll_op* op);
135  
    void post(epoll_op* op);
136  
    void work_started() noexcept;
136  
    void work_started() noexcept;
137  
    void work_finished() noexcept;
137  
    void work_finished() noexcept;
138  

138  

139  
private:
139  
private:
140  
    std::unique_ptr<epoll_socket_state> state_;
140  
    std::unique_ptr<epoll_socket_state> state_;
141  
};
141  
};
142  

142  

143  
//--------------------------------------------------------------------------
143  
//--------------------------------------------------------------------------
144  
//
144  
//
145  
// Implementation
145  
// Implementation
146  
//
146  
//
147  
//--------------------------------------------------------------------------
147  
//--------------------------------------------------------------------------
148  

148  

149  
// Register an op with the reactor, handling cached edge events.
149  
// Register an op with the reactor, handling cached edge events.
150  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
150  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151  
inline void
151  
inline void
152  
epoll_socket::register_op(
152  
epoll_socket::register_op(
153  
    epoll_op& op,
153  
    epoll_op& op,
154  
    epoll_op*& desc_slot,
154  
    epoll_op*& desc_slot,
155  
    bool& ready_flag,
155  
    bool& ready_flag,
156  
    bool& cancel_flag) noexcept
156  
    bool& cancel_flag) noexcept
157  
{
157  
{
158  
    svc_.work_started();
158  
    svc_.work_started();
159  

159  

160  
    std::lock_guard lock(desc_state_.mutex);
160  
    std::lock_guard lock(desc_state_.mutex);
161  
    bool io_done = false;
161  
    bool io_done = false;
162  
    if (ready_flag)
162  
    if (ready_flag)
163  
    {
163  
    {
164  
        ready_flag = false;
164  
        ready_flag = false;
165  
        op.perform_io();
165  
        op.perform_io();
166  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
166  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167  
        if (!io_done)
167  
        if (!io_done)
168  
            op.errn = 0;
168  
            op.errn = 0;
169  
    }
169  
    }
170  

170  

171  
    if (cancel_flag)
171  
    if (cancel_flag)
172  
    {
172  
    {
173  
        cancel_flag = false;
173  
        cancel_flag = false;
174  
        op.cancelled.store(true, std::memory_order_relaxed);
174  
        op.cancelled.store(true, std::memory_order_relaxed);
175  
    }
175  
    }
176  

176  

177  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
177  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
178  
    {
178  
    {
179  
        svc_.post(&op);
179  
        svc_.post(&op);
180  
        svc_.work_finished();
180  
        svc_.work_finished();
181  
    }
181  
    }
182  
    else
182  
    else
183  
    {
183  
    {
184  
        desc_slot = &op;
184  
        desc_slot = &op;
185  
    }
185  
    }
186  
}
186  
}
187  

187  

188  
inline void
188  
inline void
189  
epoll_op::canceller::operator()() const noexcept
189  
epoll_op::canceller::operator()() const noexcept
190  
{
190  
{
191  
    op->cancel();
191  
    op->cancel();
192  
}
192  
}
193  

193  

194  
inline void
194  
inline void
195  
epoll_connect_op::cancel() noexcept
195  
epoll_connect_op::cancel() noexcept
196  
{
196  
{
197  
    if (socket_impl_)
197  
    if (socket_impl_)
198  
        socket_impl_->cancel_single_op(*this);
198  
        socket_impl_->cancel_single_op(*this);
199  
    else
199  
    else
200  
        request_cancel();
200  
        request_cancel();
201  
}
201  
}
202  

202  

203  
inline void
203  
inline void
204  
epoll_read_op::cancel() noexcept
204  
epoll_read_op::cancel() noexcept
205  
{
205  
{
206  
    if (socket_impl_)
206  
    if (socket_impl_)
207  
        socket_impl_->cancel_single_op(*this);
207  
        socket_impl_->cancel_single_op(*this);
208  
    else
208  
    else
209  
        request_cancel();
209  
        request_cancel();
210  
}
210  
}
211  

211  

212  
inline void
212  
inline void
213  
epoll_write_op::cancel() noexcept
213  
epoll_write_op::cancel() noexcept
214  
{
214  
{
215  
    if (socket_impl_)
215  
    if (socket_impl_)
216  
        socket_impl_->cancel_single_op(*this);
216  
        socket_impl_->cancel_single_op(*this);
217  
    else
217  
    else
218  
        request_cancel();
218  
        request_cancel();
219  
}
219  
}
220  

220  

221  
inline void
221  
inline void
222  
epoll_op::operator()()
222  
epoll_op::operator()()
223  
{
223  
{
224  
    stop_cb.reset();
224  
    stop_cb.reset();
225  

225  

226  
    socket_impl_->svc_.scheduler().reset_inline_budget();
226  
    socket_impl_->svc_.scheduler().reset_inline_budget();
227  

227  

228  
    if (cancelled.load(std::memory_order_acquire))
228  
    if (cancelled.load(std::memory_order_acquire))
229  
        *ec_out = capy::error::canceled;
229  
        *ec_out = capy::error::canceled;
230  
    else if (errn != 0)
230  
    else if (errn != 0)
231  
        *ec_out = make_err(errn);
231  
        *ec_out = make_err(errn);
232  
    else if (is_read_operation() && bytes_transferred == 0)
232  
    else if (is_read_operation() && bytes_transferred == 0)
233  
        *ec_out = capy::error::eof;
233  
        *ec_out = capy::error::eof;
234  
    else
234  
    else
235  
        *ec_out = {};
235  
        *ec_out = {};
236  

236  

237  
    *bytes_out = bytes_transferred;
237  
    *bytes_out = bytes_transferred;
238  

238  

239  
    // Move to stack before resuming coroutine. The coroutine might close
239  
    // Move to stack before resuming coroutine. The coroutine might close
240  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
240  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
241  
    // last ref and we destroyed it while still in operator(), we'd have
241  
    // last ref and we destroyed it while still in operator(), we'd have
242  
    // use-after-free. Moving to local ensures destruction happens at
242  
    // use-after-free. Moving to local ensures destruction happens at
243  
    // function exit, after all member accesses are complete.
243  
    // function exit, after all member accesses are complete.
244  
    capy::executor_ref saved_ex(ex);
244  
    capy::executor_ref saved_ex(ex);
245  
    std::coroutine_handle<> saved_h(h);
245  
    std::coroutine_handle<> saved_h(h);
246  
    auto prevent_premature_destruction = std::move(impl_ptr);
246  
    auto prevent_premature_destruction = std::move(impl_ptr);
247  
    dispatch_coro(saved_ex, saved_h).resume();
247  
    dispatch_coro(saved_ex, saved_h).resume();
248  
}
248  
}
249  

249  

250  
inline void
250  
inline void
251  
epoll_connect_op::operator()()
251  
epoll_connect_op::operator()()
252  
{
252  
{
253  
    stop_cb.reset();
253  
    stop_cb.reset();
254  

254  

255  
    socket_impl_->svc_.scheduler().reset_inline_budget();
255  
    socket_impl_->svc_.scheduler().reset_inline_budget();
256  

256  

257  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
257  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258  

258  

259  
    // Cache endpoints on successful connect
259  
    // Cache endpoints on successful connect
260  
    if (success && socket_impl_)
260  
    if (success && socket_impl_)
261  
    {
261  
    {
262  
        endpoint local_ep;
262  
        endpoint local_ep;
263  
        sockaddr_storage local_storage{};
263  
        sockaddr_storage local_storage{};
264  
        socklen_t local_len = sizeof(local_storage);
264  
        socklen_t local_len = sizeof(local_storage);
265  
        if (::getsockname(
265  
        if (::getsockname(
266  
                fd, reinterpret_cast<sockaddr*>(&local_storage),
266  
                fd, reinterpret_cast<sockaddr*>(&local_storage),
267  
                &local_len) == 0)
267  
                &local_len) == 0)
268  
            local_ep = from_sockaddr(local_storage);
268  
            local_ep = from_sockaddr(local_storage);
269  
        static_cast<epoll_socket*>(socket_impl_)
269  
        static_cast<epoll_socket*>(socket_impl_)
270  
            ->set_endpoints(local_ep, target_endpoint);
270  
            ->set_endpoints(local_ep, target_endpoint);
271  
    }
271  
    }
272  

272  

273  
    if (cancelled.load(std::memory_order_acquire))
273  
    if (cancelled.load(std::memory_order_acquire))
274  
        *ec_out = capy::error::canceled;
274  
        *ec_out = capy::error::canceled;
275  
    else if (errn != 0)
275  
    else if (errn != 0)
276  
        *ec_out = make_err(errn);
276  
        *ec_out = make_err(errn);
277  
    else
277  
    else
278  
        *ec_out = {};
278  
        *ec_out = {};
279  

279  

280  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
280  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
281  
    capy::executor_ref saved_ex(ex);
281  
    capy::executor_ref saved_ex(ex);
282  
    std::coroutine_handle<> saved_h(h);
282  
    std::coroutine_handle<> saved_h(h);
283  
    auto prevent_premature_destruction = std::move(impl_ptr);
283  
    auto prevent_premature_destruction = std::move(impl_ptr);
284  
    dispatch_coro(saved_ex, saved_h).resume();
284  
    dispatch_coro(saved_ex, saved_h).resume();
285  
}
285  
}
286  

286  

287  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
287  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288  
    : svc_(svc)
288  
    : svc_(svc)
289  
{
289  
{
290  
}
290  
}
291  

291  

292  
inline epoll_socket::~epoll_socket() = default;
292  
inline epoll_socket::~epoll_socket() = default;
293  

293  

294  
inline std::coroutine_handle<>
294  
inline std::coroutine_handle<>
295  
epoll_socket::connect(
295  
epoll_socket::connect(
296  
    std::coroutine_handle<> h,
296  
    std::coroutine_handle<> h,
297  
    capy::executor_ref ex,
297  
    capy::executor_ref ex,
298  
    endpoint ep,
298  
    endpoint ep,
299  
    std::stop_token token,
299  
    std::stop_token token,
300  
    std::error_code* ec)
300  
    std::error_code* ec)
301  
{
301  
{
302  
    auto& op = conn_;
302  
    auto& op = conn_;
303  

303  

304  
    sockaddr_storage storage{};
304  
    sockaddr_storage storage{};
305  
    socklen_t addrlen =
305  
    socklen_t addrlen =
306  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
306  
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307  
    int result =
307  
    int result =
308  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
308  
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309  

309  

310  
    if (result == 0)
310  
    if (result == 0)
311  
    {
311  
    {
312  
        sockaddr_storage local_storage{};
312  
        sockaddr_storage local_storage{};
313  
        socklen_t local_len = sizeof(local_storage);
313  
        socklen_t local_len = sizeof(local_storage);
314  
        if (::getsockname(
314  
        if (::getsockname(
315  
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
315  
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
316  
                &local_len) == 0)
316  
                &local_len) == 0)
317  
            local_endpoint_ = detail::from_sockaddr(local_storage);
317  
            local_endpoint_ = detail::from_sockaddr(local_storage);
318  
        remote_endpoint_ = ep;
318  
        remote_endpoint_ = ep;
319  
    }
319  
    }
320  

320  

321  
    if (result == 0 || errno != EINPROGRESS)
321  
    if (result == 0 || errno != EINPROGRESS)
322  
    {
322  
    {
323  
        int err = (result < 0) ? errno : 0;
323  
        int err = (result < 0) ? errno : 0;
324  
        if (svc_.scheduler().try_consume_inline_budget())
324  
        if (svc_.scheduler().try_consume_inline_budget())
325  
        {
325  
        {
326  
            *ec = err ? make_err(err) : std::error_code{};
326  
            *ec = err ? make_err(err) : std::error_code{};
327  
            return dispatch_coro(ex, h);
327  
            return dispatch_coro(ex, h);
328  
        }
328  
        }
329  
        op.reset();
329  
        op.reset();
330  
        op.h               = h;
330  
        op.h               = h;
331  
        op.ex              = ex;
331  
        op.ex              = ex;
332  
        op.ec_out          = ec;
332  
        op.ec_out          = ec;
333  
        op.fd              = fd_;
333  
        op.fd              = fd_;
334  
        op.target_endpoint = ep;
334  
        op.target_endpoint = ep;
335  
        op.start(token, this);
335  
        op.start(token, this);
336  
        op.impl_ptr = shared_from_this();
336  
        op.impl_ptr = shared_from_this();
337  
        op.complete(err, 0);
337  
        op.complete(err, 0);
338  
        svc_.post(&op);
338  
        svc_.post(&op);
339  
        return std::noop_coroutine();
339  
        return std::noop_coroutine();
340  
    }
340  
    }
341  

341  

342  
    // EINPROGRESS — register with reactor
342  
    // EINPROGRESS — register with reactor
343  
    op.reset();
343  
    op.reset();
344  
    op.h               = h;
344  
    op.h               = h;
345  
    op.ex              = ex;
345  
    op.ex              = ex;
346  
    op.ec_out          = ec;
346  
    op.ec_out          = ec;
347  
    op.fd              = fd_;
347  
    op.fd              = fd_;
348  
    op.target_endpoint = ep;
348  
    op.target_endpoint = ep;
349  
    op.start(token, this);
349  
    op.start(token, this);
350  
    op.impl_ptr = shared_from_this();
350  
    op.impl_ptr = shared_from_this();
351  

351  

352  
    register_op(
352  
    register_op(
353  
        op, desc_state_.connect_op, desc_state_.write_ready,
353  
        op, desc_state_.connect_op, desc_state_.write_ready,
354  
        desc_state_.connect_cancel_pending);
354  
        desc_state_.connect_cancel_pending);
355  
    return std::noop_coroutine();
355  
    return std::noop_coroutine();
356  
}
356  
}
357  

357  

358  
inline std::coroutine_handle<>
358  
inline std::coroutine_handle<>
359  
epoll_socket::read_some(
359  
epoll_socket::read_some(
360  
    std::coroutine_handle<> h,
360  
    std::coroutine_handle<> h,
361  
    capy::executor_ref ex,
361  
    capy::executor_ref ex,
362  
    io_buffer_param param,
362  
    io_buffer_param param,
363  
    std::stop_token token,
363  
    std::stop_token token,
364  
    std::error_code* ec,
364  
    std::error_code* ec,
365  
    std::size_t* bytes_out)
365  
    std::size_t* bytes_out)
366  
{
366  
{
367  
    auto& op = rd_;
367  
    auto& op = rd_;
368  
    op.reset();
368  
    op.reset();
369  

369  

370  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
370  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371  
    op.iovec_count =
371  
    op.iovec_count =
372  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
372  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373  

373  

374  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
374  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375  
    {
375  
    {
376  
        op.empty_buffer_read = true;
376  
        op.empty_buffer_read = true;
377  
        op.h                 = h;
377  
        op.h                 = h;
378  
        op.ex                = ex;
378  
        op.ex                = ex;
379  
        op.ec_out            = ec;
379  
        op.ec_out            = ec;
380  
        op.bytes_out         = bytes_out;
380  
        op.bytes_out         = bytes_out;
381  
        op.start(token, this);
381  
        op.start(token, this);
382  
        op.impl_ptr = shared_from_this();
382  
        op.impl_ptr = shared_from_this();
383  
        op.complete(0, 0);
383  
        op.complete(0, 0);
384  
        svc_.post(&op);
384  
        svc_.post(&op);
385  
        return std::noop_coroutine();
385  
        return std::noop_coroutine();
386  
    }
386  
    }
387  

387  

388  
    for (int i = 0; i < op.iovec_count; ++i)
388  
    for (int i = 0; i < op.iovec_count; ++i)
389  
    {
389  
    {
390  
        op.iovecs[i].iov_base = bufs[i].data();
390  
        op.iovecs[i].iov_base = bufs[i].data();
391  
        op.iovecs[i].iov_len  = bufs[i].size();
391  
        op.iovecs[i].iov_len  = bufs[i].size();
392  
    }
392  
    }
393  

393  

394  
    // Speculative read
394  
    // Speculative read
395  
    ssize_t n;
395  
    ssize_t n;
396  
    do
396  
    do
397  
    {
397  
    {
398  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
398  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
399  
    }
399  
    }
400  
    while (n < 0 && errno == EINTR);
400  
    while (n < 0 && errno == EINTR);
401  

401  

402  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
402  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403  
    {
403  
    {
404  
        int err    = (n < 0) ? errno : 0;
404  
        int err    = (n < 0) ? errno : 0;
405  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
405  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406  

406  

407  
        if (svc_.scheduler().try_consume_inline_budget())
407  
        if (svc_.scheduler().try_consume_inline_budget())
408  
        {
408  
        {
409  
            if (err)
409  
            if (err)
410  
                *ec = make_err(err);
410  
                *ec = make_err(err);
411  
            else if (n == 0)
411  
            else if (n == 0)
412  
                *ec = capy::error::eof;
412  
                *ec = capy::error::eof;
413  
            else
413  
            else
414  
                *ec = {};
414  
                *ec = {};
415  
            *bytes_out = bytes;
415  
            *bytes_out = bytes;
416  
            return dispatch_coro(ex, h);
416  
            return dispatch_coro(ex, h);
417  
        }
417  
        }
418  
        op.h         = h;
418  
        op.h         = h;
419  
        op.ex        = ex;
419  
        op.ex        = ex;
420  
        op.ec_out    = ec;
420  
        op.ec_out    = ec;
421  
        op.bytes_out = bytes_out;
421  
        op.bytes_out = bytes_out;
422  
        op.start(token, this);
422  
        op.start(token, this);
423  
        op.impl_ptr = shared_from_this();
423  
        op.impl_ptr = shared_from_this();
424  
        op.complete(err, bytes);
424  
        op.complete(err, bytes);
425  
        svc_.post(&op);
425  
        svc_.post(&op);
426  
        return std::noop_coroutine();
426  
        return std::noop_coroutine();
427  
    }
427  
    }
428  

428  

429  
    // EAGAIN — register with reactor
429  
    // EAGAIN — register with reactor
430  
    op.h         = h;
430  
    op.h         = h;
431  
    op.ex        = ex;
431  
    op.ex        = ex;
432  
    op.ec_out    = ec;
432  
    op.ec_out    = ec;
433  
    op.bytes_out = bytes_out;
433  
    op.bytes_out = bytes_out;
434  
    op.fd        = fd_;
434  
    op.fd        = fd_;
435  
    op.start(token, this);
435  
    op.start(token, this);
436  
    op.impl_ptr = shared_from_this();
436  
    op.impl_ptr = shared_from_this();
437  

437  

438  
    register_op(
438  
    register_op(
439  
        op, desc_state_.read_op, desc_state_.read_ready,
439  
        op, desc_state_.read_op, desc_state_.read_ready,
440  
        desc_state_.read_cancel_pending);
440  
        desc_state_.read_cancel_pending);
441  
    return std::noop_coroutine();
441  
    return std::noop_coroutine();
442  
}
442  
}
443  

443  

444  
inline std::coroutine_handle<>
444  
inline std::coroutine_handle<>
445  
epoll_socket::write_some(
445  
epoll_socket::write_some(
446  
    std::coroutine_handle<> h,
446  
    std::coroutine_handle<> h,
447  
    capy::executor_ref ex,
447  
    capy::executor_ref ex,
448  
    io_buffer_param param,
448  
    io_buffer_param param,
449  
    std::stop_token token,
449  
    std::stop_token token,
450  
    std::error_code* ec,
450  
    std::error_code* ec,
451  
    std::size_t* bytes_out)
451  
    std::size_t* bytes_out)
452  
{
452  
{
453  
    auto& op = wr_;
453  
    auto& op = wr_;
454  
    op.reset();
454  
    op.reset();
455  

455  

456  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
456  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457  
    op.iovec_count =
457  
    op.iovec_count =
458  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
458  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459  

459  

460  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
460  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461  
    {
461  
    {
462  
        op.h         = h;
462  
        op.h         = h;
463  
        op.ex        = ex;
463  
        op.ex        = ex;
464  
        op.ec_out    = ec;
464  
        op.ec_out    = ec;
465  
        op.bytes_out = bytes_out;
465  
        op.bytes_out = bytes_out;
466  
        op.start(token, this);
466  
        op.start(token, this);
467  
        op.impl_ptr = shared_from_this();
467  
        op.impl_ptr = shared_from_this();
468  
        op.complete(0, 0);
468  
        op.complete(0, 0);
469  
        svc_.post(&op);
469  
        svc_.post(&op);
470  
        return std::noop_coroutine();
470  
        return std::noop_coroutine();
471  
    }
471  
    }
472  

472  

473  
    for (int i = 0; i < op.iovec_count; ++i)
473  
    for (int i = 0; i < op.iovec_count; ++i)
474  
    {
474  
    {
475  
        op.iovecs[i].iov_base = bufs[i].data();
475  
        op.iovecs[i].iov_base = bufs[i].data();
476  
        op.iovecs[i].iov_len  = bufs[i].size();
476  
        op.iovecs[i].iov_len  = bufs[i].size();
477  
    }
477  
    }
478  

478  

479  
    // Speculative write
479  
    // Speculative write
480  
    msghdr msg{};
480  
    msghdr msg{};
481  
    msg.msg_iov    = op.iovecs;
481  
    msg.msg_iov    = op.iovecs;
482  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
482  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483  

483  

484  
    ssize_t n;
484  
    ssize_t n;
485  
    do
485  
    do
486  
    {
486  
    {
487  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
487  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488  
    }
488  
    }
489  
    while (n < 0 && errno == EINTR);
489  
    while (n < 0 && errno == EINTR);
490  

490  

491  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
491  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492  
    {
492  
    {
493  
        int err    = (n < 0) ? errno : 0;
493  
        int err    = (n < 0) ? errno : 0;
494  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
494  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495  

495  

496  
        if (svc_.scheduler().try_consume_inline_budget())
496  
        if (svc_.scheduler().try_consume_inline_budget())
497  
        {
497  
        {
498  
            *ec        = err ? make_err(err) : std::error_code{};
498  
            *ec        = err ? make_err(err) : std::error_code{};
499  
            *bytes_out = bytes;
499  
            *bytes_out = bytes;
500  
            return dispatch_coro(ex, h);
500  
            return dispatch_coro(ex, h);
501  
        }
501  
        }
502  
        op.h         = h;
502  
        op.h         = h;
503  
        op.ex        = ex;
503  
        op.ex        = ex;
504  
        op.ec_out    = ec;
504  
        op.ec_out    = ec;
505  
        op.bytes_out = bytes_out;
505  
        op.bytes_out = bytes_out;
506  
        op.start(token, this);
506  
        op.start(token, this);
507  
        op.impl_ptr = shared_from_this();
507  
        op.impl_ptr = shared_from_this();
508  
        op.complete(err, bytes);
508  
        op.complete(err, bytes);
509  
        svc_.post(&op);
509  
        svc_.post(&op);
510  
        return std::noop_coroutine();
510  
        return std::noop_coroutine();
511  
    }
511  
    }
512  

512  

513  
    // EAGAIN — register with reactor
513  
    // EAGAIN — register with reactor
514  
    op.h         = h;
514  
    op.h         = h;
515  
    op.ex        = ex;
515  
    op.ex        = ex;
516  
    op.ec_out    = ec;
516  
    op.ec_out    = ec;
517  
    op.bytes_out = bytes_out;
517  
    op.bytes_out = bytes_out;
518  
    op.fd        = fd_;
518  
    op.fd        = fd_;
519  
    op.start(token, this);
519  
    op.start(token, this);
520  
    op.impl_ptr = shared_from_this();
520  
    op.impl_ptr = shared_from_this();
521  

521  

522  
    register_op(
522  
    register_op(
523  
        op, desc_state_.write_op, desc_state_.write_ready,
523  
        op, desc_state_.write_op, desc_state_.write_ready,
524  
        desc_state_.write_cancel_pending);
524  
        desc_state_.write_cancel_pending);
525  
    return std::noop_coroutine();
525  
    return std::noop_coroutine();
526  
}
526  
}
527  

527  

528  
inline std::error_code
528  
inline std::error_code
529  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
529  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530  
{
530  
{
531  
    int how;
531  
    int how;
532  
    switch (what)
532  
    switch (what)
533  
    {
533  
    {
534  
    case tcp_socket::shutdown_receive:
534  
    case tcp_socket::shutdown_receive:
535  
        how = SHUT_RD;
535  
        how = SHUT_RD;
536  
        break;
536  
        break;
537  
    case tcp_socket::shutdown_send:
537  
    case tcp_socket::shutdown_send:
538  
        how = SHUT_WR;
538  
        how = SHUT_WR;
539  
        break;
539  
        break;
540  
    case tcp_socket::shutdown_both:
540  
    case tcp_socket::shutdown_both:
541  
        how = SHUT_RDWR;
541  
        how = SHUT_RDWR;
542  
        break;
542  
        break;
543  
    default:
543  
    default:
544  
        return make_err(EINVAL);
544  
        return make_err(EINVAL);
545  
    }
545  
    }
546  
    if (::shutdown(fd_, how) != 0)
546  
    if (::shutdown(fd_, how) != 0)
547  
        return make_err(errno);
547  
        return make_err(errno);
548  
    return {};
548  
    return {};
549  
}
549  
}
550  

550  

551  
inline std::error_code
551  
inline std::error_code
552  
epoll_socket::set_option(
552  
epoll_socket::set_option(
553  
    int level, int optname,
553  
    int level, int optname,
554  
    void const* data, std::size_t size) noexcept
554  
    void const* data, std::size_t size) noexcept
555  
{
555  
{
556  
    if (::setsockopt(fd_, level, optname, data,
556  
    if (::setsockopt(fd_, level, optname, data,
557  
            static_cast<socklen_t>(size)) != 0)
557  
            static_cast<socklen_t>(size)) != 0)
558  
        return make_err(errno);
558  
        return make_err(errno);
559  
    return {};
559  
    return {};
560  
}
560  
}
561  

561  

562  
inline std::error_code
562  
inline std::error_code
563  
epoll_socket::get_option(
563  
epoll_socket::get_option(
564  
    int level, int optname,
564  
    int level, int optname,
565  
    void* data, std::size_t* size) const noexcept
565  
    void* data, std::size_t* size) const noexcept
566  
{
566  
{
567  
    socklen_t len = static_cast<socklen_t>(*size);
567  
    socklen_t len = static_cast<socklen_t>(*size);
568  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
568  
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
569  
        return make_err(errno);
569  
        return make_err(errno);
570  
    *size = static_cast<std::size_t>(len);
570  
    *size = static_cast<std::size_t>(len);
571  
    return {};
571  
    return {};
572  
}
572  
}
573  

573  

574  
inline void
574  
inline void
575  
epoll_socket::cancel() noexcept
575  
epoll_socket::cancel() noexcept
576  
{
576  
{
577  
    auto self = weak_from_this().lock();
577  
    auto self = weak_from_this().lock();
578  
    if (!self)
578  
    if (!self)
579  
        return;
579  
        return;
580  

580  

581  
    conn_.request_cancel();
581  
    conn_.request_cancel();
582  
    rd_.request_cancel();
582  
    rd_.request_cancel();
583  
    wr_.request_cancel();
583  
    wr_.request_cancel();
584  

584  

585  
    epoll_op* conn_claimed = nullptr;
585  
    epoll_op* conn_claimed = nullptr;
586  
    epoll_op* rd_claimed   = nullptr;
586  
    epoll_op* rd_claimed   = nullptr;
587  
    epoll_op* wr_claimed   = nullptr;
587  
    epoll_op* wr_claimed   = nullptr;
588  
    {
588  
    {
589  
        std::lock_guard lock(desc_state_.mutex);
589  
        std::lock_guard lock(desc_state_.mutex);
590  
        if (desc_state_.connect_op == &conn_)
590  
        if (desc_state_.connect_op == &conn_)
591  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592  
        else
592  
        else
593  
            desc_state_.connect_cancel_pending = true;
593  
            desc_state_.connect_cancel_pending = true;
594  
        if (desc_state_.read_op == &rd_)
594  
        if (desc_state_.read_op == &rd_)
595  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596  
        else
596  
        else
597  
            desc_state_.read_cancel_pending = true;
597  
            desc_state_.read_cancel_pending = true;
598  
        if (desc_state_.write_op == &wr_)
598  
        if (desc_state_.write_op == &wr_)
599  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600  
        else
600  
        else
601  
            desc_state_.write_cancel_pending = true;
601  
            desc_state_.write_cancel_pending = true;
602  
    }
602  
    }
603  

603  

604  
    if (conn_claimed)
604  
    if (conn_claimed)
605  
    {
605  
    {
606  
        conn_.impl_ptr = self;
606  
        conn_.impl_ptr = self;
607  
        svc_.post(&conn_);
607  
        svc_.post(&conn_);
608  
        svc_.work_finished();
608  
        svc_.work_finished();
609  
    }
609  
    }
610  
    if (rd_claimed)
610  
    if (rd_claimed)
611  
    {
611  
    {
612  
        rd_.impl_ptr = self;
612  
        rd_.impl_ptr = self;
613  
        svc_.post(&rd_);
613  
        svc_.post(&rd_);
614  
        svc_.work_finished();
614  
        svc_.work_finished();
615  
    }
615  
    }
616  
    if (wr_claimed)
616  
    if (wr_claimed)
617  
    {
617  
    {
618  
        wr_.impl_ptr = self;
618  
        wr_.impl_ptr = self;
619  
        svc_.post(&wr_);
619  
        svc_.post(&wr_);
620  
        svc_.work_finished();
620  
        svc_.work_finished();
621  
    }
621  
    }
622  
}
622  
}
623  

623  

624  
inline void
624  
inline void
625  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
625  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
626  
{
626  
{
627  
    auto self = weak_from_this().lock();
627  
    auto self = weak_from_this().lock();
628  
    if (!self)
628  
    if (!self)
629  
        return;
629  
        return;
630  

630  

631  
    op.request_cancel();
631  
    op.request_cancel();
632  

632  

633  
    epoll_op** desc_op_ptr = nullptr;
633  
    epoll_op** desc_op_ptr = nullptr;
634  
    if (&op == &conn_)
634  
    if (&op == &conn_)
635  
        desc_op_ptr = &desc_state_.connect_op;
635  
        desc_op_ptr = &desc_state_.connect_op;
636  
    else if (&op == &rd_)
636  
    else if (&op == &rd_)
637  
        desc_op_ptr = &desc_state_.read_op;
637  
        desc_op_ptr = &desc_state_.read_op;
638  
    else if (&op == &wr_)
638  
    else if (&op == &wr_)
639  
        desc_op_ptr = &desc_state_.write_op;
639  
        desc_op_ptr = &desc_state_.write_op;
640  

640  

641  
    if (desc_op_ptr)
641  
    if (desc_op_ptr)
642  
    {
642  
    {
643  
        epoll_op* claimed = nullptr;
643  
        epoll_op* claimed = nullptr;
644  
        {
644  
        {
645  
            std::lock_guard lock(desc_state_.mutex);
645  
            std::lock_guard lock(desc_state_.mutex);
646  
            if (*desc_op_ptr == &op)
646  
            if (*desc_op_ptr == &op)
647  
                claimed = std::exchange(*desc_op_ptr, nullptr);
647  
                claimed = std::exchange(*desc_op_ptr, nullptr);
648  
            else if (&op == &conn_)
648  
            else if (&op == &conn_)
649  
                desc_state_.connect_cancel_pending = true;
649  
                desc_state_.connect_cancel_pending = true;
650  
            else if (&op == &rd_)
650  
            else if (&op == &rd_)
651  
                desc_state_.read_cancel_pending = true;
651  
                desc_state_.read_cancel_pending = true;
652  
            else if (&op == &wr_)
652  
            else if (&op == &wr_)
653  
                desc_state_.write_cancel_pending = true;
653  
                desc_state_.write_cancel_pending = true;
654  
        }
654  
        }
655  
        if (claimed)
655  
        if (claimed)
656  
        {
656  
        {
657  
            op.impl_ptr = self;
657  
            op.impl_ptr = self;
658  
            svc_.post(&op);
658  
            svc_.post(&op);
659  
            svc_.work_finished();
659  
            svc_.work_finished();
660  
        }
660  
        }
661  
    }
661  
    }
662  
}
662  
}
663  

663  

664  
inline void
664  
inline void
665  
epoll_socket::close_socket() noexcept
665  
epoll_socket::close_socket() noexcept
666  
{
666  
{
667  
    auto self = weak_from_this().lock();
667  
    auto self = weak_from_this().lock();
668  
    if (self)
668  
    if (self)
669  
    {
669  
    {
670  
        conn_.request_cancel();
670  
        conn_.request_cancel();
671  
        rd_.request_cancel();
671  
        rd_.request_cancel();
672  
        wr_.request_cancel();
672  
        wr_.request_cancel();
673  

673  

674  
        epoll_op* conn_claimed = nullptr;
674  
        epoll_op* conn_claimed = nullptr;
675  
        epoll_op* rd_claimed   = nullptr;
675  
        epoll_op* rd_claimed   = nullptr;
676  
        epoll_op* wr_claimed   = nullptr;
676  
        epoll_op* wr_claimed   = nullptr;
677  
        {
677  
        {
678  
            std::lock_guard lock(desc_state_.mutex);
678  
            std::lock_guard lock(desc_state_.mutex);
679  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
680  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
681  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
681  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
682  
            desc_state_.read_ready             = false;
682  
            desc_state_.read_ready             = false;
683  
            desc_state_.write_ready            = false;
683  
            desc_state_.write_ready            = false;
684  
            desc_state_.read_cancel_pending    = false;
684  
            desc_state_.read_cancel_pending    = false;
685  
            desc_state_.write_cancel_pending   = false;
685  
            desc_state_.write_cancel_pending   = false;
686  
            desc_state_.connect_cancel_pending = false;
686  
            desc_state_.connect_cancel_pending = false;
687  
        }
687  
        }
688  

688  

689  
        if (conn_claimed)
689  
        if (conn_claimed)
690  
        {
690  
        {
691  
            conn_.impl_ptr = self;
691  
            conn_.impl_ptr = self;
692  
            svc_.post(&conn_);
692  
            svc_.post(&conn_);
693  
            svc_.work_finished();
693  
            svc_.work_finished();
694  
        }
694  
        }
695  
        if (rd_claimed)
695  
        if (rd_claimed)
696  
        {
696  
        {
697  
            rd_.impl_ptr = self;
697  
            rd_.impl_ptr = self;
698  
            svc_.post(&rd_);
698  
            svc_.post(&rd_);
699  
            svc_.work_finished();
699  
            svc_.work_finished();
700  
        }
700  
        }
701  
        if (wr_claimed)
701  
        if (wr_claimed)
702  
        {
702  
        {
703  
            wr_.impl_ptr = self;
703  
            wr_.impl_ptr = self;
704  
            svc_.post(&wr_);
704  
            svc_.post(&wr_);
705  
            svc_.work_finished();
705  
            svc_.work_finished();
706  
        }
706  
        }
707  

707  

708  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709  
            desc_state_.impl_ref_ = self;
709  
            desc_state_.impl_ref_ = self;
710  
    }
710  
    }
711  

711  

712  
    if (fd_ >= 0)
712  
    if (fd_ >= 0)
713  
    {
713  
    {
714  
        if (desc_state_.registered_events != 0)
714  
        if (desc_state_.registered_events != 0)
715  
            svc_.scheduler().deregister_descriptor(fd_);
715  
            svc_.scheduler().deregister_descriptor(fd_);
716  
        ::close(fd_);
716  
        ::close(fd_);
717  
        fd_ = -1;
717  
        fd_ = -1;
718  
    }
718  
    }
719  

719  

720  
    desc_state_.fd                = -1;
720  
    desc_state_.fd                = -1;
721  
    desc_state_.registered_events = 0;
721  
    desc_state_.registered_events = 0;
722  

722  

723  
    local_endpoint_  = endpoint{};
723  
    local_endpoint_  = endpoint{};
724  
    remote_endpoint_ = endpoint{};
724  
    remote_endpoint_ = endpoint{};
725  
}
725  
}
726  

726  

727  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728  
    : state_(
728  
    : state_(
729  
          std::make_unique<epoll_socket_state>(
729  
          std::make_unique<epoll_socket_state>(
730  
              ctx.use_service<epoll_scheduler>()))
730  
              ctx.use_service<epoll_scheduler>()))
731  
{
731  
{
732  
}
732  
}
733  

733  

734  
inline epoll_socket_service::~epoll_socket_service() {}
734  
inline epoll_socket_service::~epoll_socket_service() {}
735  

735  

736  
inline void
736  
inline void
737  
epoll_socket_service::shutdown()
737  
epoll_socket_service::shutdown()
738  
{
738  
{
739  
    std::lock_guard lock(state_->mutex_);
739  
    std::lock_guard lock(state_->mutex_);
740  

740  

741  
    while (auto* impl = state_->socket_list_.pop_front())
741  
    while (auto* impl = state_->socket_list_.pop_front())
742  
        impl->close_socket();
742  
        impl->close_socket();
743  

743  

744  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745  
    // drains completed_ops_, calling destroy() on each queued op. If we
745  
    // drains completed_ops_, calling destroy() on each queued op. If we
746  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
746  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
747  
    // last ref to an impl whose embedded descriptor_state is still linked
747  
    // last ref to an impl whose embedded descriptor_state is still linked
748  
    // in the queue — use-after-free on the next pop(). Letting ~state_
748  
    // in the queue — use-after-free on the next pop(). Letting ~state_
749  
    // release the ptrs (during service destruction, after scheduler
749  
    // release the ptrs (during service destruction, after scheduler
750  
    // shutdown) keeps every impl alive until all ops have been drained.
750  
    // shutdown) keeps every impl alive until all ops have been drained.
751  
}
751  
}
752  

752  

753  
inline io_object::implementation*
753  
inline io_object::implementation*
754  
epoll_socket_service::construct()
754  
epoll_socket_service::construct()
755  
{
755  
{
756  
    auto impl = std::make_shared<epoll_socket>(*this);
756  
    auto impl = std::make_shared<epoll_socket>(*this);
757  
    auto* raw = impl.get();
757  
    auto* raw = impl.get();
758  

758  

759  
    {
759  
    {
760  
        std::lock_guard lock(state_->mutex_);
760  
        std::lock_guard lock(state_->mutex_);
761  
        state_->socket_list_.push_back(raw);
761  
        state_->socket_list_.push_back(raw);
762  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
762  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
763  
    }
763  
    }
764  

764  

765  
    return raw;
765  
    return raw;
766  
}
766  
}
767  

767  

768  
inline void
768  
inline void
769  
epoll_socket_service::destroy(io_object::implementation* impl)
769  
epoll_socket_service::destroy(io_object::implementation* impl)
770  
{
770  
{
771  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
771  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
772  
    epoll_impl->close_socket();
772  
    epoll_impl->close_socket();
773  
    std::lock_guard lock(state_->mutex_);
773  
    std::lock_guard lock(state_->mutex_);
774  
    state_->socket_list_.remove(epoll_impl);
774  
    state_->socket_list_.remove(epoll_impl);
775  
    state_->socket_ptrs_.erase(epoll_impl);
775  
    state_->socket_ptrs_.erase(epoll_impl);
776  
}
776  
}
777  

777  

778  
inline std::error_code
778  
inline std::error_code
779  
epoll_socket_service::open_socket(
779  
epoll_socket_service::open_socket(
780  
    tcp_socket::implementation& impl,
780  
    tcp_socket::implementation& impl,
781  
    int family, int type, int protocol)
781  
    int family, int type, int protocol)
782  
{
782  
{
783  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
783  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784  
    epoll_impl->close_socket();
784  
    epoll_impl->close_socket();
785  

785  

786  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
786  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787  
    if (fd < 0)
787  
    if (fd < 0)
788  
        return make_err(errno);
788  
        return make_err(errno);
789  

789  

790  
    if (family == AF_INET6)
790  
    if (family == AF_INET6)
791  
    {
791  
    {
792  
        int one = 1;
792  
        int one = 1;
793  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
793  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794  
    }
794  
    }
795  

795  

796  
    epoll_impl->fd_ = fd;
796  
    epoll_impl->fd_ = fd;
797  

797  

798  
    // Register fd with epoll (edge-triggered mode)
798  
    // Register fd with epoll (edge-triggered mode)
799  
    epoll_impl->desc_state_.fd = fd;
799  
    epoll_impl->desc_state_.fd = fd;
800  
    {
800  
    {
801  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
801  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
802  
        epoll_impl->desc_state_.read_op    = nullptr;
802  
        epoll_impl->desc_state_.read_op    = nullptr;
803  
        epoll_impl->desc_state_.write_op   = nullptr;
803  
        epoll_impl->desc_state_.write_op   = nullptr;
804  
        epoll_impl->desc_state_.connect_op = nullptr;
804  
        epoll_impl->desc_state_.connect_op = nullptr;
805  
    }
805  
    }
806  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
806  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807  

807  

808  
    return {};
808  
    return {};
809  
}
809  
}
810  

810  

811  
inline void
811  
inline void
812  
epoll_socket_service::close(io_object::handle& h)
812  
epoll_socket_service::close(io_object::handle& h)
813  
{
813  
{
814  
    static_cast<epoll_socket*>(h.get())->close_socket();
814  
    static_cast<epoll_socket*>(h.get())->close_socket();
815  
}
815  
}
816  

816  

817  
inline void
817  
inline void
818  
epoll_socket_service::post(epoll_op* op)
818  
epoll_socket_service::post(epoll_op* op)
819  
{
819  
{
820  
    state_->sched_.post(op);
820  
    state_->sched_.post(op);
821  
}
821  
}
822  

822  

823  
inline void
823  
inline void
824  
epoll_socket_service::work_started() noexcept
824  
epoll_socket_service::work_started() noexcept
825  
{
825  
{
826  
    state_->sched_.work_started();
826  
    state_->sched_.work_started();
827  
}
827  
}
828  

828  

829  
inline void
829  
inline void
830  
epoll_socket_service::work_finished() noexcept
830  
epoll_socket_service::work_finished() noexcept
831  
{
831  
{
832  
    state_->sched_.work_finished();
832  
    state_->sched_.work_finished();
833  
}
833  
}
834  

834  

835  
} // namespace boost::corosio::detail
835  
} // namespace boost::corosio::detail
836  

836  

837  
#endif // BOOST_COROSIO_HAS_EPOLL
837  
#endif // BOOST_COROSIO_HAS_EPOLL
838  

838  

839  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
839  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP