include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (337/416) 93.3% Functions (28/30)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 224 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 224 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code
128 open_socket(tcp_socket::implementation& impl,
129 int family, int type, int protocol) override;
130
131 278266 epoll_scheduler& scheduler() const noexcept
132 {
133 278266 return state_->sched_;
134 }
135 void post(epoll_op* op);
136 void work_started() noexcept;
137 void work_finished() noexcept;
138
139 private:
140 std::unique_ptr<epoll_socket_state> state_;
141 };
142
143 //--------------------------------------------------------------------------
144 //
145 // Implementation
146 //
147 //--------------------------------------------------------------------------
148
149 // Register an op with the reactor, handling cached edge events.
150 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 inline void
152 2301 epoll_socket::register_op(
153 epoll_op& op,
154 epoll_op*& desc_slot,
155 bool& ready_flag,
156 bool& cancel_flag) noexcept
157 {
158 2301 svc_.work_started();
159
160 2301 std::lock_guard lock(desc_state_.mutex);
161 2301 bool io_done = false;
162 2301 if (ready_flag)
163 {
164 140 ready_flag = false;
165 140 op.perform_io();
166 140 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 140 if (!io_done)
168 140 op.errn = 0;
169 }
170
171 2301 if (cancel_flag)
172 {
173 93 cancel_flag = false;
174 93 op.cancelled.store(true, std::memory_order_relaxed);
175 }
176
177 2301 if (io_done || op.cancelled.load(std::memory_order_acquire))
178 {
179 93 svc_.post(&op);
180 93 svc_.work_finished();
181 }
182 else
183 {
184 2208 desc_slot = &op;
185 }
186 2301 }
187
188 inline void
189 104 epoll_op::canceller::operator()() const noexcept
190 {
191 104 op->cancel();
192 104 }
193
194 inline void
195 epoll_connect_op::cancel() noexcept
196 {
197 if (socket_impl_)
198 socket_impl_->cancel_single_op(*this);
199 else
200 request_cancel();
201 }
202
203 inline void
204 98 epoll_read_op::cancel() noexcept
205 {
206 98 if (socket_impl_)
207 98 socket_impl_->cancel_single_op(*this);
208 else
209 request_cancel();
210 98 }
211
212 inline void
213 epoll_write_op::cancel() noexcept
214 {
215 if (socket_impl_)
216 socket_impl_->cancel_single_op(*this);
217 else
218 request_cancel();
219 }
220
221 inline void
222 44729 epoll_op::operator()()
223 {
224 44729 stop_cb.reset();
225
226 44729 socket_impl_->svc_.scheduler().reset_inline_budget();
227
228 44729 if (cancelled.load(std::memory_order_acquire))
229 203 *ec_out = capy::error::canceled;
230 44526 else if (errn != 0)
231 *ec_out = make_err(errn);
232 44526 else if (is_read_operation() && bytes_transferred == 0)
233 *ec_out = capy::error::eof;
234 else
235 44526 *ec_out = {};
236
237 44729 *bytes_out = bytes_transferred;
238
239 // Move to stack before resuming coroutine. The coroutine might close
240 // the socket, releasing the last wrapper ref. If impl_ptr were the
241 // last ref and we destroyed it while still in operator(), we'd have
242 // use-after-free. Moving to local ensures destruction happens at
243 // function exit, after all member accesses are complete.
244 44729 capy::executor_ref saved_ex(ex);
245 44729 std::coroutine_handle<> saved_h(h);
246 44729 auto prevent_premature_destruction = std::move(impl_ptr);
247 44729 dispatch_coro(saved_ex, saved_h).resume();
248 44729 }
249
250 inline void
251 2102 epoll_connect_op::operator()()
252 {
253 2102 stop_cb.reset();
254
255 2102 socket_impl_->svc_.scheduler().reset_inline_budget();
256
257 2102 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258
259 // Cache endpoints on successful connect
260 2102 if (success && socket_impl_)
261 {
262 2100 endpoint local_ep;
263 2100 sockaddr_storage local_storage{};
264 2100 socklen_t local_len = sizeof(local_storage);
265 2100 if (::getsockname(
266 fd, reinterpret_cast<sockaddr*>(&local_storage),
267 2100 &local_len) == 0)
268 2100 local_ep = from_sockaddr(local_storage);
269 2100 static_cast<epoll_socket*>(socket_impl_)
270 2100 ->set_endpoints(local_ep, target_endpoint);
271 }
272
273 2102 if (cancelled.load(std::memory_order_acquire))
274 *ec_out = capy::error::canceled;
275 2102 else if (errn != 0)
276 2 *ec_out = make_err(errn);
277 else
278 2100 *ec_out = {};
279
280 // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 2102 capy::executor_ref saved_ex(ex);
282 2102 std::coroutine_handle<> saved_h(h);
283 2102 auto prevent_premature_destruction = std::move(impl_ptr);
284 2102 dispatch_coro(saved_ex, saved_h).resume();
285 2102 }
286
287 6363 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 6363 : svc_(svc)
289 {
290 6363 }
291
292 6363 inline epoll_socket::~epoll_socket() = default;
293
294 inline std::coroutine_handle<>
295 2102 epoll_socket::connect(
296 std::coroutine_handle<> h,
297 capy::executor_ref ex,
298 endpoint ep,
299 std::stop_token token,
300 std::error_code* ec)
301 {
302 2102 auto& op = conn_;
303
304 2102 sockaddr_storage storage{};
305 socklen_t addrlen =
306 2102 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 int result =
308 2102 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309
310 2102 if (result == 0)
311 {
312 sockaddr_storage local_storage{};
313 socklen_t local_len = sizeof(local_storage);
314 if (::getsockname(
315 fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 &local_len) == 0)
317 local_endpoint_ = detail::from_sockaddr(local_storage);
318 remote_endpoint_ = ep;
319 }
320
321 2102 if (result == 0 || errno != EINPROGRESS)
322 {
323 int err = (result < 0) ? errno : 0;
324 if (svc_.scheduler().try_consume_inline_budget())
325 {
326 *ec = err ? make_err(err) : std::error_code{};
327 return dispatch_coro(ex, h);
328 }
329 op.reset();
330 op.h = h;
331 op.ex = ex;
332 op.ec_out = ec;
333 op.fd = fd_;
334 op.target_endpoint = ep;
335 op.start(token, this);
336 op.impl_ptr = shared_from_this();
337 op.complete(err, 0);
338 svc_.post(&op);
339 return std::noop_coroutine();
340 }
341
342 // EINPROGRESS — register with reactor
343 2102 op.reset();
344 2102 op.h = h;
345 2102 op.ex = ex;
346 2102 op.ec_out = ec;
347 2102 op.fd = fd_;
348 2102 op.target_endpoint = ep;
349 2102 op.start(token, this);
350 2102 op.impl_ptr = shared_from_this();
351
352 2102 register_op(
353 2102 op, desc_state_.connect_op, desc_state_.write_ready,
354 2102 desc_state_.connect_cancel_pending);
355 2102 return std::noop_coroutine();
356 }
357
358 inline std::coroutine_handle<>
359 111700 epoll_socket::read_some(
360 std::coroutine_handle<> h,
361 capy::executor_ref ex,
362 io_buffer_param param,
363 std::stop_token token,
364 std::error_code* ec,
365 std::size_t* bytes_out)
366 {
367 111700 auto& op = rd_;
368 111700 op.reset();
369
370 111700 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 111700 op.iovec_count =
372 111700 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373
374 111700 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 {
376 1 op.empty_buffer_read = true;
377 1 op.h = h;
378 1 op.ex = ex;
379 1 op.ec_out = ec;
380 1 op.bytes_out = bytes_out;
381 1 op.start(token, this);
382 1 op.impl_ptr = shared_from_this();
383 1 op.complete(0, 0);
384 1 svc_.post(&op);
385 1 return std::noop_coroutine();
386 }
387
388 223398 for (int i = 0; i < op.iovec_count; ++i)
389 {
390 111699 op.iovecs[i].iov_base = bufs[i].data();
391 111699 op.iovecs[i].iov_len = bufs[i].size();
392 }
393
394 // Speculative read
395 ssize_t n;
396 do
397 {
398 111699 n = ::readv(fd_, op.iovecs, op.iovec_count);
399 }
400 111699 while (n < 0 && errno == EINTR);
401
402 111699 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 {
404 111500 int err = (n < 0) ? errno : 0;
405 111500 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406
407 111500 if (svc_.scheduler().try_consume_inline_budget())
408 {
409 89245 if (err)
410 *ec = make_err(err);
411 89245 else if (n == 0)
412 5 *ec = capy::error::eof;
413 else
414 89240 *ec = {};
415 89245 *bytes_out = bytes;
416 89245 return dispatch_coro(ex, h);
417 }
418 22255 op.h = h;
419 22255 op.ex = ex;
420 22255 op.ec_out = ec;
421 22255 op.bytes_out = bytes_out;
422 22255 op.start(token, this);
423 22255 op.impl_ptr = shared_from_this();
424 22255 op.complete(err, bytes);
425 22255 svc_.post(&op);
426 22255 return std::noop_coroutine();
427 }
428
429 // EAGAIN — register with reactor
430 199 op.h = h;
431 199 op.ex = ex;
432 199 op.ec_out = ec;
433 199 op.bytes_out = bytes_out;
434 199 op.fd = fd_;
435 199 op.start(token, this);
436 199 op.impl_ptr = shared_from_this();
437
438 199 register_op(
439 199 op, desc_state_.read_op, desc_state_.read_ready,
440 199 desc_state_.read_cancel_pending);
441 199 return std::noop_coroutine();
442 }
443
444 inline std::coroutine_handle<>
445 111502 epoll_socket::write_some(
446 std::coroutine_handle<> h,
447 capy::executor_ref ex,
448 io_buffer_param param,
449 std::stop_token token,
450 std::error_code* ec,
451 std::size_t* bytes_out)
452 {
453 111502 auto& op = wr_;
454 111502 op.reset();
455
456 111502 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 111502 op.iovec_count =
458 111502 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459
460 111502 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 {
462 1 op.h = h;
463 1 op.ex = ex;
464 1 op.ec_out = ec;
465 1 op.bytes_out = bytes_out;
466 1 op.start(token, this);
467 1 op.impl_ptr = shared_from_this();
468 1 op.complete(0, 0);
469 1 svc_.post(&op);
470 1 return std::noop_coroutine();
471 }
472
473 223002 for (int i = 0; i < op.iovec_count; ++i)
474 {
475 111501 op.iovecs[i].iov_base = bufs[i].data();
476 111501 op.iovecs[i].iov_len = bufs[i].size();
477 }
478
479 // Speculative write
480 111501 msghdr msg{};
481 111501 msg.msg_iov = op.iovecs;
482 111501 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483
484 ssize_t n;
485 do
486 {
487 111501 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 }
489 111501 while (n < 0 && errno == EINTR);
490
491 111501 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 {
493 111501 int err = (n < 0) ? errno : 0;
494 111501 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495
496 111501 if (svc_.scheduler().try_consume_inline_budget())
497 {
498 89228 *ec = err ? make_err(err) : std::error_code{};
499 89228 *bytes_out = bytes;
500 89228 return dispatch_coro(ex, h);
501 }
502 22273 op.h = h;
503 22273 op.ex = ex;
504 22273 op.ec_out = ec;
505 22273 op.bytes_out = bytes_out;
506 22273 op.start(token, this);
507 22273 op.impl_ptr = shared_from_this();
508 22273 op.complete(err, bytes);
509 22273 svc_.post(&op);
510 22273 return std::noop_coroutine();
511 }
512
513 // EAGAIN — register with reactor
514 op.h = h;
515 op.ex = ex;
516 op.ec_out = ec;
517 op.bytes_out = bytes_out;
518 op.fd = fd_;
519 op.start(token, this);
520 op.impl_ptr = shared_from_this();
521
522 register_op(
523 op, desc_state_.write_op, desc_state_.write_ready,
524 desc_state_.write_cancel_pending);
525 return std::noop_coroutine();
526 }
527
528 inline std::error_code
529 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 {
531 int how;
532 3 switch (what)
533 {
534 1 case tcp_socket::shutdown_receive:
535 1 how = SHUT_RD;
536 1 break;
537 1 case tcp_socket::shutdown_send:
538 1 how = SHUT_WR;
539 1 break;
540 1 case tcp_socket::shutdown_both:
541 1 how = SHUT_RDWR;
542 1 break;
543 default:
544 return make_err(EINVAL);
545 }
546 3 if (::shutdown(fd_, how) != 0)
547 return make_err(errno);
548 3 return {};
549 }
550
551 inline std::error_code
552 32 epoll_socket::set_option(
553 int level, int optname,
554 void const* data, std::size_t size) noexcept
555 {
556 32 if (::setsockopt(fd_, level, optname, data,
557 32 static_cast<socklen_t>(size)) != 0)
558 return make_err(errno);
559 32 return {};
560 }
561
562 inline std::error_code
563 31 epoll_socket::get_option(
564 int level, int optname,
565 void* data, std::size_t* size) const noexcept
566 {
567 31 socklen_t len = static_cast<socklen_t>(*size);
568 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 return make_err(errno);
570 31 *size = static_cast<std::size_t>(len);
571 31 return {};
572 }
573
574 inline void
575 183 epoll_socket::cancel() noexcept
576 {
577 183 auto self = weak_from_this().lock();
578 183 if (!self)
579 return;
580
581 183 conn_.request_cancel();
582 183 rd_.request_cancel();
583 183 wr_.request_cancel();
584
585 183 epoll_op* conn_claimed = nullptr;
586 183 epoll_op* rd_claimed = nullptr;
587 183 epoll_op* wr_claimed = nullptr;
588 {
589 183 std::lock_guard lock(desc_state_.mutex);
590 183 if (desc_state_.connect_op == &conn_)
591 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 else
593 183 desc_state_.connect_cancel_pending = true;
594 183 if (desc_state_.read_op == &rd_)
595 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 else
597 180 desc_state_.read_cancel_pending = true;
598 183 if (desc_state_.write_op == &wr_)
599 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 else
601 183 desc_state_.write_cancel_pending = true;
602 183 }
603
604 183 if (conn_claimed)
605 {
606 conn_.impl_ptr = self;
607 svc_.post(&conn_);
608 svc_.work_finished();
609 }
610 183 if (rd_claimed)
611 {
612 3 rd_.impl_ptr = self;
613 3 svc_.post(&rd_);
614 3 svc_.work_finished();
615 }
616 183 if (wr_claimed)
617 {
618 wr_.impl_ptr = self;
619 svc_.post(&wr_);
620 svc_.work_finished();
621 }
622 183 }
623
624 inline void
625 98 epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 {
627 98 auto self = weak_from_this().lock();
628 98 if (!self)
629 return;
630
631 98 op.request_cancel();
632
633 98 epoll_op** desc_op_ptr = nullptr;
634 98 if (&op == &conn_)
635 desc_op_ptr = &desc_state_.connect_op;
636 98 else if (&op == &rd_)
637 98 desc_op_ptr = &desc_state_.read_op;
638 else if (&op == &wr_)
639 desc_op_ptr = &desc_state_.write_op;
640
641 98 if (desc_op_ptr)
642 {
643 98 epoll_op* claimed = nullptr;
644 {
645 98 std::lock_guard lock(desc_state_.mutex);
646 98 if (*desc_op_ptr == &op)
647 98 claimed = std::exchange(*desc_op_ptr, nullptr);
648 else if (&op == &conn_)
649 desc_state_.connect_cancel_pending = true;
650 else if (&op == &rd_)
651 desc_state_.read_cancel_pending = true;
652 else if (&op == &wr_)
653 desc_state_.write_cancel_pending = true;
654 98 }
655 98 if (claimed)
656 {
657 98 op.impl_ptr = self;
658 98 svc_.post(&op);
659 98 svc_.work_finished();
660 }
661 }
662 98 }
663
664 inline void
665 19060 epoll_socket::close_socket() noexcept
666 {
667 19060 auto self = weak_from_this().lock();
668 19060 if (self)
669 {
670 19060 conn_.request_cancel();
671 19060 rd_.request_cancel();
672 19060 wr_.request_cancel();
673
674 19060 epoll_op* conn_claimed = nullptr;
675 19060 epoll_op* rd_claimed = nullptr;
676 19060 epoll_op* wr_claimed = nullptr;
677 {
678 19060 std::lock_guard lock(desc_state_.mutex);
679 19060 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 19060 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 19060 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 19060 desc_state_.read_ready = false;
683 19060 desc_state_.write_ready = false;
684 19060 desc_state_.read_cancel_pending = false;
685 19060 desc_state_.write_cancel_pending = false;
686 19060 desc_state_.connect_cancel_pending = false;
687 19060 }
688
689 19060 if (conn_claimed)
690 {
691 conn_.impl_ptr = self;
692 svc_.post(&conn_);
693 svc_.work_finished();
694 }
695 19060 if (rd_claimed)
696 {
697 1 rd_.impl_ptr = self;
698 1 svc_.post(&rd_);
699 1 svc_.work_finished();
700 }
701 19060 if (wr_claimed)
702 {
703 wr_.impl_ptr = self;
704 svc_.post(&wr_);
705 svc_.work_finished();
706 }
707
708 19060 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 desc_state_.impl_ref_ = self;
710 }
711
712 19060 if (fd_ >= 0)
713 {
714 4217 if (desc_state_.registered_events != 0)
715 4217 svc_.scheduler().deregister_descriptor(fd_);
716 4217 ::close(fd_);
717 4217 fd_ = -1;
718 }
719
720 19060 desc_state_.fd = -1;
721 19060 desc_state_.registered_events = 0;
722
723 19060 local_endpoint_ = endpoint{};
724 19060 remote_endpoint_ = endpoint{};
725 19060 }
726
727 224 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 224 : state_(
729 std::make_unique<epoll_socket_state>(
730 224 ctx.use_service<epoll_scheduler>()))
731 {
732 224 }
733
734 448 inline epoll_socket_service::~epoll_socket_service() {}
735
736 inline void
737 224 epoll_socket_service::shutdown()
738 {
739 224 std::lock_guard lock(state_->mutex_);
740
741 224 while (auto* impl = state_->socket_list_.pop_front())
742 impl->close_socket();
743
744 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 // drains completed_ops_, calling destroy() on each queued op. If we
746 // released our shared_ptrs now, an epoll_op::destroy() could free the
747 // last ref to an impl whose embedded descriptor_state is still linked
748 // in the queue — use-after-free on the next pop(). Letting ~state_
749 // release the ptrs (during service destruction, after scheduler
750 // shutdown) keeps every impl alive until all ops have been drained.
751 224 }
752
753 inline io_object::implementation*
754 6363 epoll_socket_service::construct()
755 {
756 6363 auto impl = std::make_shared<epoll_socket>(*this);
757 6363 auto* raw = impl.get();
758
759 {
760 6363 std::lock_guard lock(state_->mutex_);
761 6363 state_->socket_list_.push_back(raw);
762 6363 state_->socket_ptrs_.emplace(raw, std::move(impl));
763 6363 }
764
765 6363 return raw;
766 6363 }
767
768 inline void
769 6363 epoll_socket_service::destroy(io_object::implementation* impl)
770 {
771 6363 auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 6363 epoll_impl->close_socket();
773 6363 std::lock_guard lock(state_->mutex_);
774 6363 state_->socket_list_.remove(epoll_impl);
775 6363 state_->socket_ptrs_.erase(epoll_impl);
776 6363 }
777
778 inline std::error_code
779 2117 epoll_socket_service::open_socket(
780 tcp_socket::implementation& impl,
781 int family, int type, int protocol)
782 {
783 2117 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 2117 epoll_impl->close_socket();
785
786 2117 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 2117 if (fd < 0)
788 return make_err(errno);
789
790 2117 if (family == AF_INET6)
791 {
792 5 int one = 1;
793 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 }
795
796 2117 epoll_impl->fd_ = fd;
797
798 // Register fd with epoll (edge-triggered mode)
799 2117 epoll_impl->desc_state_.fd = fd;
800 {
801 2117 std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 2117 epoll_impl->desc_state_.read_op = nullptr;
803 2117 epoll_impl->desc_state_.write_op = nullptr;
804 2117 epoll_impl->desc_state_.connect_op = nullptr;
805 2117 }
806 2117 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807
808 2117 return {};
809 }
810
811 inline void
812 10580 epoll_socket_service::close(io_object::handle& h)
813 {
814 10580 static_cast<epoll_socket*>(h.get())->close_socket();
815 10580 }
816
817 inline void
818 44725 epoll_socket_service::post(epoll_op* op)
819 {
820 44725 state_->sched_.post(op);
821 44725 }
822
823 inline void
824 2301 epoll_socket_service::work_started() noexcept
825 {
826 2301 state_->sched_.work_started();
827 2301 }
828
829 inline void
830 195 epoll_socket_service::work_finished() noexcept
831 {
832 195 state_->sched_.work_finished();
833 195 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_HAS_EPOLL
838
839 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
840