TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 :
24 : #include <boost/corosio/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/except.hpp>
28 : #include <boost/capy/buffers.hpp>
29 :
30 : #include <coroutine>
31 : #include <mutex>
32 : #include <unordered_map>
33 : #include <utility>
34 :
35 : #include <errno.h>
36 : #include <netinet/in.h>
37 : #include <netinet/tcp.h>
38 : #include <sys/epoll.h>
39 : #include <sys/socket.h>
40 : #include <unistd.h>
41 :
42 : /*
43 : epoll Socket Implementation
44 : ===========================
45 :
46 : Each I/O operation follows the same pattern:
47 : 1. Try the syscall immediately (non-blocking socket)
48 : 2. If it succeeds or fails with a real error, post to completion queue
49 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50 :
51 : This "try first" approach avoids unnecessary epoll round-trips for
52 : operations that can complete immediately (common for small reads/writes
53 : on fast local connections).
54 :
55 : One-Shot Registration
56 : ---------------------
57 : We use one-shot epoll registration: each operation registers, waits for
58 : one event, then unregisters. This simplifies the state machine since we
59 : don't need to track whether an fd is currently registered or handle
60 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 : simplicity is worth it.
62 :
63 : Cancellation
64 : ------------
65 : See op.hpp for the completion/cancellation race handling via the
66 : `registered` atomic. cancel() must complete pending operations (post
67 : them with cancelled flag) so coroutines waiting on them can resume.
68 : close_socket() calls cancel() first to ensure this.
69 :
70 : Impl Lifetime with shared_ptr
71 : -----------------------------
72 : Socket impls use enable_shared_from_this. The service owns impls via
73 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 : removal. When a user calls close(), we call cancel() which posts pending
75 : ops to the scheduler.
76 :
77 : CRITICAL: The posted ops must keep the impl alive until they complete.
78 : Otherwise the scheduler would process a freed op (use-after-free). The
79 : cancel() method captures shared_from_this() into op.impl_ptr before
80 : posting. When the op completes, impl_ptr is cleared, allowing the impl
81 : to be destroyed if no other references exist.
82 :
83 : Service Ownership
84 : -----------------
85 : epoll_socket_service owns all socket impls. destroy_impl() removes the
86 : shared_ptr from the map, but the impl may survive if ops still hold
87 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 : in-flight ops will complete and release their refs.
89 : */
90 :
91 : namespace boost::corosio::detail {
92 :
93 : /** State for epoll socket service. */
94 : class epoll_socket_state
95 : {
96 : public:
97 HIT 224 : explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 : {
99 224 : }
100 :
101 : epoll_scheduler& sched_;
102 : std::mutex mutex_;
103 : intrusive_list<epoll_socket> socket_list_;
104 : std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 : socket_ptrs_;
106 : };
107 :
108 : /** epoll socket service implementation.
109 :
110 : Inherits from socket_service to enable runtime polymorphism.
111 : Uses key_type = socket_service for service lookup.
112 : */
113 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 : {
115 : public:
116 : explicit epoll_socket_service(capy::execution_context& ctx);
117 : ~epoll_socket_service() override;
118 :
119 : epoll_socket_service(epoll_socket_service const&) = delete;
120 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121 :
122 : void shutdown() override;
123 :
124 : io_object::implementation* construct() override;
125 : void destroy(io_object::implementation*) override;
126 : void close(io_object::handle&) override;
127 : std::error_code
128 : open_socket(tcp_socket::implementation& impl,
129 : int family, int type, int protocol) override;
130 :
131 278266 : epoll_scheduler& scheduler() const noexcept
132 : {
133 278266 : return state_->sched_;
134 : }
135 : void post(epoll_op* op);
136 : void work_started() noexcept;
137 : void work_finished() noexcept;
138 :
139 : private:
140 : std::unique_ptr<epoll_socket_state> state_;
141 : };
142 :
143 : //--------------------------------------------------------------------------
144 : //
145 : // Implementation
146 : //
147 : //--------------------------------------------------------------------------
148 :
149 : // Register an op with the reactor, handling cached edge events.
150 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 : inline void
152 2301 : epoll_socket::register_op(
153 : epoll_op& op,
154 : epoll_op*& desc_slot,
155 : bool& ready_flag,
156 : bool& cancel_flag) noexcept
157 : {
158 2301 : svc_.work_started();
159 :
160 2301 : std::lock_guard lock(desc_state_.mutex);
161 2301 : bool io_done = false;
162 2301 : if (ready_flag)
163 : {
164 140 : ready_flag = false;
165 140 : op.perform_io();
166 140 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 140 : if (!io_done)
168 140 : op.errn = 0;
169 : }
170 :
171 2301 : if (cancel_flag)
172 : {
173 93 : cancel_flag = false;
174 93 : op.cancelled.store(true, std::memory_order_relaxed);
175 : }
176 :
177 2301 : if (io_done || op.cancelled.load(std::memory_order_acquire))
178 : {
179 93 : svc_.post(&op);
180 93 : svc_.work_finished();
181 : }
182 : else
183 : {
184 2208 : desc_slot = &op;
185 : }
186 2301 : }
187 :
188 : inline void
189 104 : epoll_op::canceller::operator()() const noexcept
190 : {
191 104 : op->cancel();
192 104 : }
193 :
194 : inline void
195 MIS 0 : epoll_connect_op::cancel() noexcept
196 : {
197 0 : if (socket_impl_)
198 0 : socket_impl_->cancel_single_op(*this);
199 : else
200 0 : request_cancel();
201 0 : }
202 :
203 : inline void
204 HIT 98 : epoll_read_op::cancel() noexcept
205 : {
206 98 : if (socket_impl_)
207 98 : socket_impl_->cancel_single_op(*this);
208 : else
209 MIS 0 : request_cancel();
210 HIT 98 : }
211 :
212 : inline void
213 MIS 0 : epoll_write_op::cancel() noexcept
214 : {
215 0 : if (socket_impl_)
216 0 : socket_impl_->cancel_single_op(*this);
217 : else
218 0 : request_cancel();
219 0 : }
220 :
221 : inline void
222 HIT 44729 : epoll_op::operator()()
223 : {
224 44729 : stop_cb.reset();
225 :
226 44729 : socket_impl_->svc_.scheduler().reset_inline_budget();
227 :
228 44729 : if (cancelled.load(std::memory_order_acquire))
229 203 : *ec_out = capy::error::canceled;
230 44526 : else if (errn != 0)
231 MIS 0 : *ec_out = make_err(errn);
232 HIT 44526 : else if (is_read_operation() && bytes_transferred == 0)
233 MIS 0 : *ec_out = capy::error::eof;
234 : else
235 HIT 44526 : *ec_out = {};
236 :
237 44729 : *bytes_out = bytes_transferred;
238 :
239 : // Move to stack before resuming coroutine. The coroutine might close
240 : // the socket, releasing the last wrapper ref. If impl_ptr were the
241 : // last ref and we destroyed it while still in operator(), we'd have
242 : // use-after-free. Moving to local ensures destruction happens at
243 : // function exit, after all member accesses are complete.
244 44729 : capy::executor_ref saved_ex(ex);
245 44729 : std::coroutine_handle<> saved_h(h);
246 44729 : auto prevent_premature_destruction = std::move(impl_ptr);
247 44729 : dispatch_coro(saved_ex, saved_h).resume();
248 44729 : }
249 :
250 : inline void
251 2102 : epoll_connect_op::operator()()
252 : {
253 2102 : stop_cb.reset();
254 :
255 2102 : socket_impl_->svc_.scheduler().reset_inline_budget();
256 :
257 2102 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258 :
259 : // Cache endpoints on successful connect
260 2102 : if (success && socket_impl_)
261 : {
262 2100 : endpoint local_ep;
263 2100 : sockaddr_storage local_storage{};
264 2100 : socklen_t local_len = sizeof(local_storage);
265 2100 : if (::getsockname(
266 : fd, reinterpret_cast<sockaddr*>(&local_storage),
267 2100 : &local_len) == 0)
268 2100 : local_ep = from_sockaddr(local_storage);
269 2100 : static_cast<epoll_socket*>(socket_impl_)
270 2100 : ->set_endpoints(local_ep, target_endpoint);
271 : }
272 :
273 2102 : if (cancelled.load(std::memory_order_acquire))
274 MIS 0 : *ec_out = capy::error::canceled;
275 HIT 2102 : else if (errn != 0)
276 2 : *ec_out = make_err(errn);
277 : else
278 2100 : *ec_out = {};
279 :
280 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 2102 : capy::executor_ref saved_ex(ex);
282 2102 : std::coroutine_handle<> saved_h(h);
283 2102 : auto prevent_premature_destruction = std::move(impl_ptr);
284 2102 : dispatch_coro(saved_ex, saved_h).resume();
285 2102 : }
286 :
287 6363 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 6363 : : svc_(svc)
289 : {
290 6363 : }
291 :
292 6363 : inline epoll_socket::~epoll_socket() = default;
293 :
294 : inline std::coroutine_handle<>
295 2102 : epoll_socket::connect(
296 : std::coroutine_handle<> h,
297 : capy::executor_ref ex,
298 : endpoint ep,
299 : std::stop_token token,
300 : std::error_code* ec)
301 : {
302 2102 : auto& op = conn_;
303 :
304 2102 : sockaddr_storage storage{};
305 : socklen_t addrlen =
306 2102 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 : int result =
308 2102 : ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309 :
310 2102 : if (result == 0)
311 : {
312 MIS 0 : sockaddr_storage local_storage{};
313 0 : socklen_t local_len = sizeof(local_storage);
314 0 : if (::getsockname(
315 : fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 0 : &local_len) == 0)
317 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
318 0 : remote_endpoint_ = ep;
319 : }
320 :
321 HIT 2102 : if (result == 0 || errno != EINPROGRESS)
322 : {
323 MIS 0 : int err = (result < 0) ? errno : 0;
324 0 : if (svc_.scheduler().try_consume_inline_budget())
325 : {
326 0 : *ec = err ? make_err(err) : std::error_code{};
327 0 : return dispatch_coro(ex, h);
328 : }
329 0 : op.reset();
330 0 : op.h = h;
331 0 : op.ex = ex;
332 0 : op.ec_out = ec;
333 0 : op.fd = fd_;
334 0 : op.target_endpoint = ep;
335 0 : op.start(token, this);
336 0 : op.impl_ptr = shared_from_this();
337 0 : op.complete(err, 0);
338 0 : svc_.post(&op);
339 0 : return std::noop_coroutine();
340 : }
341 :
342 : // EINPROGRESS — register with reactor
343 HIT 2102 : op.reset();
344 2102 : op.h = h;
345 2102 : op.ex = ex;
346 2102 : op.ec_out = ec;
347 2102 : op.fd = fd_;
348 2102 : op.target_endpoint = ep;
349 2102 : op.start(token, this);
350 2102 : op.impl_ptr = shared_from_this();
351 :
352 2102 : register_op(
353 2102 : op, desc_state_.connect_op, desc_state_.write_ready,
354 2102 : desc_state_.connect_cancel_pending);
355 2102 : return std::noop_coroutine();
356 : }
357 :
358 : inline std::coroutine_handle<>
359 111700 : epoll_socket::read_some(
360 : std::coroutine_handle<> h,
361 : capy::executor_ref ex,
362 : io_buffer_param param,
363 : std::stop_token token,
364 : std::error_code* ec,
365 : std::size_t* bytes_out)
366 : {
367 111700 : auto& op = rd_;
368 111700 : op.reset();
369 :
370 111700 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 111700 : op.iovec_count =
372 111700 : static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373 :
374 111700 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 : {
376 1 : op.empty_buffer_read = true;
377 1 : op.h = h;
378 1 : op.ex = ex;
379 1 : op.ec_out = ec;
380 1 : op.bytes_out = bytes_out;
381 1 : op.start(token, this);
382 1 : op.impl_ptr = shared_from_this();
383 1 : op.complete(0, 0);
384 1 : svc_.post(&op);
385 1 : return std::noop_coroutine();
386 : }
387 :
388 223398 : for (int i = 0; i < op.iovec_count; ++i)
389 : {
390 111699 : op.iovecs[i].iov_base = bufs[i].data();
391 111699 : op.iovecs[i].iov_len = bufs[i].size();
392 : }
393 :
394 : // Speculative read
395 : ssize_t n;
396 : do
397 : {
398 111699 : n = ::readv(fd_, op.iovecs, op.iovec_count);
399 : }
400 111699 : while (n < 0 && errno == EINTR);
401 :
402 111699 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 : {
404 111500 : int err = (n < 0) ? errno : 0;
405 111500 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406 :
407 111500 : if (svc_.scheduler().try_consume_inline_budget())
408 : {
409 89245 : if (err)
410 MIS 0 : *ec = make_err(err);
411 HIT 89245 : else if (n == 0)
412 5 : *ec = capy::error::eof;
413 : else
414 89240 : *ec = {};
415 89245 : *bytes_out = bytes;
416 89245 : return dispatch_coro(ex, h);
417 : }
418 22255 : op.h = h;
419 22255 : op.ex = ex;
420 22255 : op.ec_out = ec;
421 22255 : op.bytes_out = bytes_out;
422 22255 : op.start(token, this);
423 22255 : op.impl_ptr = shared_from_this();
424 22255 : op.complete(err, bytes);
425 22255 : svc_.post(&op);
426 22255 : return std::noop_coroutine();
427 : }
428 :
429 : // EAGAIN — register with reactor
430 199 : op.h = h;
431 199 : op.ex = ex;
432 199 : op.ec_out = ec;
433 199 : op.bytes_out = bytes_out;
434 199 : op.fd = fd_;
435 199 : op.start(token, this);
436 199 : op.impl_ptr = shared_from_this();
437 :
438 199 : register_op(
439 199 : op, desc_state_.read_op, desc_state_.read_ready,
440 199 : desc_state_.read_cancel_pending);
441 199 : return std::noop_coroutine();
442 : }
443 :
444 : inline std::coroutine_handle<>
445 111502 : epoll_socket::write_some(
446 : std::coroutine_handle<> h,
447 : capy::executor_ref ex,
448 : io_buffer_param param,
449 : std::stop_token token,
450 : std::error_code* ec,
451 : std::size_t* bytes_out)
452 : {
453 111502 : auto& op = wr_;
454 111502 : op.reset();
455 :
456 111502 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 111502 : op.iovec_count =
458 111502 : static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459 :
460 111502 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 : {
462 1 : op.h = h;
463 1 : op.ex = ex;
464 1 : op.ec_out = ec;
465 1 : op.bytes_out = bytes_out;
466 1 : op.start(token, this);
467 1 : op.impl_ptr = shared_from_this();
468 1 : op.complete(0, 0);
469 1 : svc_.post(&op);
470 1 : return std::noop_coroutine();
471 : }
472 :
473 223002 : for (int i = 0; i < op.iovec_count; ++i)
474 : {
475 111501 : op.iovecs[i].iov_base = bufs[i].data();
476 111501 : op.iovecs[i].iov_len = bufs[i].size();
477 : }
478 :
479 : // Speculative write
480 111501 : msghdr msg{};
481 111501 : msg.msg_iov = op.iovecs;
482 111501 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483 :
484 : ssize_t n;
485 : do
486 : {
487 111501 : n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 : }
489 111501 : while (n < 0 && errno == EINTR);
490 :
491 111501 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 : {
493 111501 : int err = (n < 0) ? errno : 0;
494 111501 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495 :
496 111501 : if (svc_.scheduler().try_consume_inline_budget())
497 : {
498 89228 : *ec = err ? make_err(err) : std::error_code{};
499 89228 : *bytes_out = bytes;
500 89228 : return dispatch_coro(ex, h);
501 : }
502 22273 : op.h = h;
503 22273 : op.ex = ex;
504 22273 : op.ec_out = ec;
505 22273 : op.bytes_out = bytes_out;
506 22273 : op.start(token, this);
507 22273 : op.impl_ptr = shared_from_this();
508 22273 : op.complete(err, bytes);
509 22273 : svc_.post(&op);
510 22273 : return std::noop_coroutine();
511 : }
512 :
513 : // EAGAIN — register with reactor
514 MIS 0 : op.h = h;
515 0 : op.ex = ex;
516 0 : op.ec_out = ec;
517 0 : op.bytes_out = bytes_out;
518 0 : op.fd = fd_;
519 0 : op.start(token, this);
520 0 : op.impl_ptr = shared_from_this();
521 :
522 0 : register_op(
523 0 : op, desc_state_.write_op, desc_state_.write_ready,
524 0 : desc_state_.write_cancel_pending);
525 0 : return std::noop_coroutine();
526 : }
527 :
528 : inline std::error_code
529 HIT 3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 : {
531 : int how;
532 3 : switch (what)
533 : {
534 1 : case tcp_socket::shutdown_receive:
535 1 : how = SHUT_RD;
536 1 : break;
537 1 : case tcp_socket::shutdown_send:
538 1 : how = SHUT_WR;
539 1 : break;
540 1 : case tcp_socket::shutdown_both:
541 1 : how = SHUT_RDWR;
542 1 : break;
543 MIS 0 : default:
544 0 : return make_err(EINVAL);
545 : }
546 HIT 3 : if (::shutdown(fd_, how) != 0)
547 MIS 0 : return make_err(errno);
548 HIT 3 : return {};
549 : }
550 :
551 : inline std::error_code
552 32 : epoll_socket::set_option(
553 : int level, int optname,
554 : void const* data, std::size_t size) noexcept
555 : {
556 32 : if (::setsockopt(fd_, level, optname, data,
557 32 : static_cast<socklen_t>(size)) != 0)
558 MIS 0 : return make_err(errno);
559 HIT 32 : return {};
560 : }
561 :
562 : inline std::error_code
563 31 : epoll_socket::get_option(
564 : int level, int optname,
565 : void* data, std::size_t* size) const noexcept
566 : {
567 31 : socklen_t len = static_cast<socklen_t>(*size);
568 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 MIS 0 : return make_err(errno);
570 HIT 31 : *size = static_cast<std::size_t>(len);
571 31 : return {};
572 : }
573 :
574 : inline void
575 183 : epoll_socket::cancel() noexcept
576 : {
577 183 : auto self = weak_from_this().lock();
578 183 : if (!self)
579 MIS 0 : return;
580 :
581 HIT 183 : conn_.request_cancel();
582 183 : rd_.request_cancel();
583 183 : wr_.request_cancel();
584 :
585 183 : epoll_op* conn_claimed = nullptr;
586 183 : epoll_op* rd_claimed = nullptr;
587 183 : epoll_op* wr_claimed = nullptr;
588 : {
589 183 : std::lock_guard lock(desc_state_.mutex);
590 183 : if (desc_state_.connect_op == &conn_)
591 MIS 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 : else
593 HIT 183 : desc_state_.connect_cancel_pending = true;
594 183 : if (desc_state_.read_op == &rd_)
595 3 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 : else
597 180 : desc_state_.read_cancel_pending = true;
598 183 : if (desc_state_.write_op == &wr_)
599 MIS 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 : else
601 HIT 183 : desc_state_.write_cancel_pending = true;
602 183 : }
603 :
604 183 : if (conn_claimed)
605 : {
606 MIS 0 : conn_.impl_ptr = self;
607 0 : svc_.post(&conn_);
608 0 : svc_.work_finished();
609 : }
610 HIT 183 : if (rd_claimed)
611 : {
612 3 : rd_.impl_ptr = self;
613 3 : svc_.post(&rd_);
614 3 : svc_.work_finished();
615 : }
616 183 : if (wr_claimed)
617 : {
618 MIS 0 : wr_.impl_ptr = self;
619 0 : svc_.post(&wr_);
620 0 : svc_.work_finished();
621 : }
622 HIT 183 : }
623 :
624 : inline void
625 98 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 : {
627 98 : auto self = weak_from_this().lock();
628 98 : if (!self)
629 MIS 0 : return;
630 :
631 HIT 98 : op.request_cancel();
632 :
633 98 : epoll_op** desc_op_ptr = nullptr;
634 98 : if (&op == &conn_)
635 MIS 0 : desc_op_ptr = &desc_state_.connect_op;
636 HIT 98 : else if (&op == &rd_)
637 98 : desc_op_ptr = &desc_state_.read_op;
638 MIS 0 : else if (&op == &wr_)
639 0 : desc_op_ptr = &desc_state_.write_op;
640 :
641 HIT 98 : if (desc_op_ptr)
642 : {
643 98 : epoll_op* claimed = nullptr;
644 : {
645 98 : std::lock_guard lock(desc_state_.mutex);
646 98 : if (*desc_op_ptr == &op)
647 98 : claimed = std::exchange(*desc_op_ptr, nullptr);
648 MIS 0 : else if (&op == &conn_)
649 0 : desc_state_.connect_cancel_pending = true;
650 0 : else if (&op == &rd_)
651 0 : desc_state_.read_cancel_pending = true;
652 0 : else if (&op == &wr_)
653 0 : desc_state_.write_cancel_pending = true;
654 HIT 98 : }
655 98 : if (claimed)
656 : {
657 98 : op.impl_ptr = self;
658 98 : svc_.post(&op);
659 98 : svc_.work_finished();
660 : }
661 : }
662 98 : }
663 :
664 : inline void
665 19060 : epoll_socket::close_socket() noexcept
666 : {
667 19060 : auto self = weak_from_this().lock();
668 19060 : if (self)
669 : {
670 19060 : conn_.request_cancel();
671 19060 : rd_.request_cancel();
672 19060 : wr_.request_cancel();
673 :
674 19060 : epoll_op* conn_claimed = nullptr;
675 19060 : epoll_op* rd_claimed = nullptr;
676 19060 : epoll_op* wr_claimed = nullptr;
677 : {
678 19060 : std::lock_guard lock(desc_state_.mutex);
679 19060 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 19060 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 19060 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 19060 : desc_state_.read_ready = false;
683 19060 : desc_state_.write_ready = false;
684 19060 : desc_state_.read_cancel_pending = false;
685 19060 : desc_state_.write_cancel_pending = false;
686 19060 : desc_state_.connect_cancel_pending = false;
687 19060 : }
688 :
689 19060 : if (conn_claimed)
690 : {
691 MIS 0 : conn_.impl_ptr = self;
692 0 : svc_.post(&conn_);
693 0 : svc_.work_finished();
694 : }
695 HIT 19060 : if (rd_claimed)
696 : {
697 1 : rd_.impl_ptr = self;
698 1 : svc_.post(&rd_);
699 1 : svc_.work_finished();
700 : }
701 19060 : if (wr_claimed)
702 : {
703 MIS 0 : wr_.impl_ptr = self;
704 0 : svc_.post(&wr_);
705 0 : svc_.work_finished();
706 : }
707 :
708 HIT 19060 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 : desc_state_.impl_ref_ = self;
710 : }
711 :
712 19060 : if (fd_ >= 0)
713 : {
714 4217 : if (desc_state_.registered_events != 0)
715 4217 : svc_.scheduler().deregister_descriptor(fd_);
716 4217 : ::close(fd_);
717 4217 : fd_ = -1;
718 : }
719 :
720 19060 : desc_state_.fd = -1;
721 19060 : desc_state_.registered_events = 0;
722 :
723 19060 : local_endpoint_ = endpoint{};
724 19060 : remote_endpoint_ = endpoint{};
725 19060 : }
726 :
727 224 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 224 : : state_(
729 : std::make_unique<epoll_socket_state>(
730 224 : ctx.use_service<epoll_scheduler>()))
731 : {
732 224 : }
733 :
734 448 : inline epoll_socket_service::~epoll_socket_service() {}
735 :
736 : inline void
737 224 : epoll_socket_service::shutdown()
738 : {
739 224 : std::lock_guard lock(state_->mutex_);
740 :
741 224 : while (auto* impl = state_->socket_list_.pop_front())
742 MIS 0 : impl->close_socket();
743 :
744 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 : // drains completed_ops_, calling destroy() on each queued op. If we
746 : // released our shared_ptrs now, an epoll_op::destroy() could free the
747 : // last ref to an impl whose embedded descriptor_state is still linked
748 : // in the queue — use-after-free on the next pop(). Letting ~state_
749 : // release the ptrs (during service destruction, after scheduler
750 : // shutdown) keeps every impl alive until all ops have been drained.
751 HIT 224 : }
752 :
753 : inline io_object::implementation*
754 6363 : epoll_socket_service::construct()
755 : {
756 6363 : auto impl = std::make_shared<epoll_socket>(*this);
757 6363 : auto* raw = impl.get();
758 :
759 : {
760 6363 : std::lock_guard lock(state_->mutex_);
761 6363 : state_->socket_list_.push_back(raw);
762 6363 : state_->socket_ptrs_.emplace(raw, std::move(impl));
763 6363 : }
764 :
765 6363 : return raw;
766 6363 : }
767 :
768 : inline void
769 6363 : epoll_socket_service::destroy(io_object::implementation* impl)
770 : {
771 6363 : auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 6363 : epoll_impl->close_socket();
773 6363 : std::lock_guard lock(state_->mutex_);
774 6363 : state_->socket_list_.remove(epoll_impl);
775 6363 : state_->socket_ptrs_.erase(epoll_impl);
776 6363 : }
777 :
778 : inline std::error_code
779 2117 : epoll_socket_service::open_socket(
780 : tcp_socket::implementation& impl,
781 : int family, int type, int protocol)
782 : {
783 2117 : auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 2117 : epoll_impl->close_socket();
785 :
786 2117 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 2117 : if (fd < 0)
788 MIS 0 : return make_err(errno);
789 :
790 HIT 2117 : if (family == AF_INET6)
791 : {
792 5 : int one = 1;
793 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 : }
795 :
796 2117 : epoll_impl->fd_ = fd;
797 :
798 : // Register fd with epoll (edge-triggered mode)
799 2117 : epoll_impl->desc_state_.fd = fd;
800 : {
801 2117 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 2117 : epoll_impl->desc_state_.read_op = nullptr;
803 2117 : epoll_impl->desc_state_.write_op = nullptr;
804 2117 : epoll_impl->desc_state_.connect_op = nullptr;
805 2117 : }
806 2117 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807 :
808 2117 : return {};
809 : }
810 :
811 : inline void
812 10580 : epoll_socket_service::close(io_object::handle& h)
813 : {
814 10580 : static_cast<epoll_socket*>(h.get())->close_socket();
815 10580 : }
816 :
817 : inline void
818 44725 : epoll_socket_service::post(epoll_op* op)
819 : {
820 44725 : state_->sched_.post(op);
821 44725 : }
822 :
823 : inline void
824 2301 : epoll_socket_service::work_started() noexcept
825 : {
826 2301 : state_->sched_.work_started();
827 2301 : }
828 :
829 : inline void
830 195 : epoll_socket_service::work_finished() noexcept
831 : {
832 195 : state_->sched_.work_finished();
833 195 : }
834 :
835 : } // namespace boost::corosio::detail
836 :
837 : #endif // BOOST_COROSIO_HAS_EPOLL
838 :
839 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
|