include/boost/corosio/native/detail/select/select_socket_service.hpp

75.8% Lines (263/347) 93.1% Functions (27/29)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 154 explicit select_socket_state(select_scheduler& sched) noexcept
88 154 : sched_(sched)
89 {
90 154 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code
119 open_socket(tcp_socket::implementation& impl,
120 int family, int type, int protocol) override;
121
122 8498 select_scheduler& scheduler() const noexcept
123 {
124 8498 return state_->sched_;
125 }
126 void post(select_op* op);
127 void work_started() noexcept;
128 void work_finished() noexcept;
129
130 private:
131 std::unique_ptr<select_socket_state> state_;
132 };
133
134 // Backward compatibility alias
135 using select_sockets = select_socket_service;
136
137 inline void
138 97 select_op::canceller::operator()() const noexcept
139 {
140 97 op->cancel();
141 97 }
142
143 inline void
144 select_connect_op::cancel() noexcept
145 {
146 if (socket_impl_)
147 socket_impl_->cancel_single_op(*this);
148 else
149 request_cancel();
150 }
151
152 inline void
153 97 select_read_op::cancel() noexcept
154 {
155 97 if (socket_impl_)
156 97 socket_impl_->cancel_single_op(*this);
157 else
158 request_cancel();
159 97 }
160
161 inline void
162 select_write_op::cancel() noexcept
163 {
164 if (socket_impl_)
165 socket_impl_->cancel_single_op(*this);
166 else
167 request_cancel();
168 }
169
170 inline void
171 2683 select_connect_op::operator()()
172 {
173 2683 stop_cb.reset();
174
175 2683 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176
177 // Cache endpoints on successful connect
178 2683 if (success && socket_impl_)
179 {
180 2681 endpoint local_ep;
181 2681 sockaddr_storage local_storage{};
182 2681 socklen_t local_len = sizeof(local_storage);
183 2681 if (::getsockname(
184 fd, reinterpret_cast<sockaddr*>(&local_storage),
185 2681 &local_len) == 0)
186 2681 local_ep = from_sockaddr(local_storage);
187 2681 static_cast<select_socket*>(socket_impl_)
188 2681 ->set_endpoints(local_ep, target_endpoint);
189 }
190
191 2683 if (ec_out)
192 {
193 2683 if (cancelled.load(std::memory_order_acquire))
194 *ec_out = capy::error::canceled;
195 2683 else if (errn != 0)
196 2 *ec_out = make_err(errn);
197 else
198 2681 *ec_out = {};
199 }
200
201 2683 if (bytes_out)
202 *bytes_out = bytes_transferred;
203
204 // Move to stack before destroying the frame
205 2683 capy::executor_ref saved_ex(ex);
206 2683 std::coroutine_handle<> saved_h(h);
207 2683 impl_ptr.reset();
208 2683 dispatch_coro(saved_ex, saved_h).resume();
209 2683 }
210
211 8069 inline select_socket::select_socket(select_socket_service& svc) noexcept
212 8069 : svc_(svc)
213 {
214 8069 }
215
216 inline std::coroutine_handle<>
217 2683 select_socket::connect(
218 std::coroutine_handle<> h,
219 capy::executor_ref ex,
220 endpoint ep,
221 std::stop_token token,
222 std::error_code* ec)
223 {
224 2683 auto& op = conn_;
225 2683 op.reset();
226 2683 op.h = h;
227 2683 op.ex = ex;
228 2683 op.ec_out = ec;
229 2683 op.fd = fd_;
230 2683 op.target_endpoint = ep; // Store target for endpoint caching
231 2683 op.start(token, this);
232
233 2683 sockaddr_storage storage{};
234 socklen_t addrlen =
235 2683 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
236 int result =
237 2683 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
238
239 2683 if (result == 0)
240 {
241 // Sync success — cache endpoints immediately
242 sockaddr_storage local_storage{};
243 socklen_t local_len = sizeof(local_storage);
244 if (::getsockname(
245 fd_, reinterpret_cast<sockaddr*>(&local_storage),
246 &local_len) == 0)
247 local_endpoint_ = detail::from_sockaddr(local_storage);
248 remote_endpoint_ = ep;
249
250 op.complete(0, 0);
251 op.impl_ptr = shared_from_this();
252 svc_.post(&op);
253 // completion is always posted to scheduler queue, never inline.
254 return std::noop_coroutine();
255 }
256
257 2683 if (errno == EINPROGRESS)
258 {
259 2683 svc_.work_started();
260 2683 op.impl_ptr = shared_from_this();
261
262 // Set registering BEFORE register_fd to close the race window where
263 // reactor sees an event before we set registered. The reactor treats
264 // registering the same as registered when claiming the op.
265 2683 op.registered.store(
266 select_registration_state::registering, std::memory_order_release);
267 2683 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268
269 // Transition to registered. If this fails, reactor or cancel already
270 // claimed the op (state is now unregistered), so we're done. However,
271 // we must still deregister the fd because cancel's deregister_fd may
272 // have run before our register_fd, leaving the fd orphaned.
273 2683 auto expected = select_registration_state::registering;
274 2683 if (!op.registered.compare_exchange_strong(
275 expected, select_registration_state::registered,
276 std::memory_order_acq_rel))
277 {
278 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279 // completion is always posted to scheduler queue, never inline.
280 return std::noop_coroutine();
281 }
282
283 // If cancelled was set before we registered, handle it now.
284 2683 if (op.cancelled.load(std::memory_order_acquire))
285 {
286 auto prev = op.registered.exchange(
287 select_registration_state::unregistered,
288 std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(
292 fd_, select_scheduler::event_write);
293 op.impl_ptr = shared_from_this();
294 svc_.post(&op);
295 svc_.work_finished();
296 }
297 }
298 // completion is always posted to scheduler queue, never inline.
299 2683 return std::noop_coroutine();
300 }
301
302 op.complete(errno, 0);
303 op.impl_ptr = shared_from_this();
304 svc_.post(&op);
305 // completion is always posted to scheduler queue, never inline.
306 return std::noop_coroutine();
307 }
308
309 inline std::coroutine_handle<>
310 65347 select_socket::read_some(
311 std::coroutine_handle<> h,
312 capy::executor_ref ex,
313 io_buffer_param param,
314 std::stop_token token,
315 std::error_code* ec,
316 std::size_t* bytes_out)
317 {
318 65347 auto& op = rd_;
319 65347 op.reset();
320 65347 op.h = h;
321 65347 op.ex = ex;
322 65347 op.ec_out = ec;
323 65347 op.bytes_out = bytes_out;
324 65347 op.fd = fd_;
325 65347 op.start(token, this);
326
327 65347 capy::mutable_buffer bufs[select_read_op::max_buffers];
328 65347 op.iovec_count =
329 65347 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330
331 65347 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332 {
333 1 op.empty_buffer_read = true;
334 1 op.complete(0, 0);
335 1 op.impl_ptr = shared_from_this();
336 1 svc_.post(&op);
337 1 return std::noop_coroutine();
338 }
339
340 130692 for (int i = 0; i < op.iovec_count; ++i)
341 {
342 65346 op.iovecs[i].iov_base = bufs[i].data();
343 65346 op.iovecs[i].iov_len = bufs[i].size();
344 }
345
346 65346 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347
348 65346 if (n > 0)
349 {
350 65062 op.complete(0, static_cast<std::size_t>(n));
351 65062 op.impl_ptr = shared_from_this();
352 65062 svc_.post(&op);
353 65062 return std::noop_coroutine();
354 }
355
356 284 if (n == 0)
357 {
358 5 op.complete(0, 0);
359 5 op.impl_ptr = shared_from_this();
360 5 svc_.post(&op);
361 5 return std::noop_coroutine();
362 }
363
364 279 if (errno == EAGAIN || errno == EWOULDBLOCK)
365 {
366 279 svc_.work_started();
367 279 op.impl_ptr = shared_from_this();
368
369 // Set registering BEFORE register_fd to close the race window where
370 // reactor sees an event before we set registered.
371 279 op.registered.store(
372 select_registration_state::registering, std::memory_order_release);
373 279 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374
375 // Transition to registered. If this fails, reactor or cancel already
376 // claimed the op (state is now unregistered), so we're done. However,
377 // we must still deregister the fd because cancel's deregister_fd may
378 // have run before our register_fd, leaving the fd orphaned.
379 279 auto expected = select_registration_state::registering;
380 279 if (!op.registered.compare_exchange_strong(
381 expected, select_registration_state::registered,
382 std::memory_order_acq_rel))
383 {
384 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385 return std::noop_coroutine();
386 }
387
388 // If cancelled was set before we registered, handle it now.
389 279 if (op.cancelled.load(std::memory_order_acquire))
390 {
391 auto prev = op.registered.exchange(
392 select_registration_state::unregistered,
393 std::memory_order_acq_rel);
394 if (prev != select_registration_state::unregistered)
395 {
396 svc_.scheduler().deregister_fd(
397 fd_, select_scheduler::event_read);
398 op.impl_ptr = shared_from_this();
399 svc_.post(&op);
400 svc_.work_finished();
401 }
402 }
403 279 return std::noop_coroutine();
404 }
405
406 op.complete(errno, 0);
407 op.impl_ptr = shared_from_this();
408 svc_.post(&op);
409 return std::noop_coroutine();
410 }
411
412 inline std::coroutine_handle<>
413 65186 select_socket::write_some(
414 std::coroutine_handle<> h,
415 capy::executor_ref ex,
416 io_buffer_param param,
417 std::stop_token token,
418 std::error_code* ec,
419 std::size_t* bytes_out)
420 {
421 65186 auto& op = wr_;
422 65186 op.reset();
423 65186 op.h = h;
424 65186 op.ex = ex;
425 65186 op.ec_out = ec;
426 65186 op.bytes_out = bytes_out;
427 65186 op.fd = fd_;
428 65186 op.start(token, this);
429
430 65186 capy::mutable_buffer bufs[select_write_op::max_buffers];
431 65186 op.iovec_count =
432 65186 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433
434 65186 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435 {
436 1 op.complete(0, 0);
437 1 op.impl_ptr = shared_from_this();
438 1 svc_.post(&op);
439 1 return std::noop_coroutine();
440 }
441
442 130370 for (int i = 0; i < op.iovec_count; ++i)
443 {
444 65185 op.iovecs[i].iov_base = bufs[i].data();
445 65185 op.iovecs[i].iov_len = bufs[i].size();
446 }
447
448 65185 msghdr msg{};
449 65185 msg.msg_iov = op.iovecs;
450 65185 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451
452 65185 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453
454 65185 if (n > 0)
455 {
456 65184 op.complete(0, static_cast<std::size_t>(n));
457 65184 op.impl_ptr = shared_from_this();
458 65184 svc_.post(&op);
459 65184 return std::noop_coroutine();
460 }
461
462 1 if (errno == EAGAIN || errno == EWOULDBLOCK)
463 {
464 svc_.work_started();
465 op.impl_ptr = shared_from_this();
466
467 // Set registering BEFORE register_fd to close the race window where
468 // reactor sees an event before we set registered.
469 op.registered.store(
470 select_registration_state::registering, std::memory_order_release);
471 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472
473 // Transition to registered. If this fails, reactor or cancel already
474 // claimed the op (state is now unregistered), so we're done. However,
475 // we must still deregister the fd because cancel's deregister_fd may
476 // have run before our register_fd, leaving the fd orphaned.
477 auto expected = select_registration_state::registering;
478 if (!op.registered.compare_exchange_strong(
479 expected, select_registration_state::registered,
480 std::memory_order_acq_rel))
481 {
482 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483 return std::noop_coroutine();
484 }
485
486 // If cancelled was set before we registered, handle it now.
487 if (op.cancelled.load(std::memory_order_acquire))
488 {
489 auto prev = op.registered.exchange(
490 select_registration_state::unregistered,
491 std::memory_order_acq_rel);
492 if (prev != select_registration_state::unregistered)
493 {
494 svc_.scheduler().deregister_fd(
495 fd_, select_scheduler::event_write);
496 op.impl_ptr = shared_from_this();
497 svc_.post(&op);
498 svc_.work_finished();
499 }
500 }
501 return std::noop_coroutine();
502 }
503
504 1 op.complete(errno ? errno : EIO, 0);
505 1 op.impl_ptr = shared_from_this();
506 1 svc_.post(&op);
507 1 return std::noop_coroutine();
508 }
509
510 inline std::error_code
511 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512 {
513 int how;
514 3 switch (what)
515 {
516 1 case tcp_socket::shutdown_receive:
517 1 how = SHUT_RD;
518 1 break;
519 1 case tcp_socket::shutdown_send:
520 1 how = SHUT_WR;
521 1 break;
522 1 case tcp_socket::shutdown_both:
523 1 how = SHUT_RDWR;
524 1 break;
525 default:
526 return make_err(EINVAL);
527 }
528 3 if (::shutdown(fd_, how) != 0)
529 return make_err(errno);
530 3 return {};
531 }
532
533 inline std::error_code
534 28 select_socket::set_option(
535 int level, int optname,
536 void const* data, std::size_t size) noexcept
537 {
538 28 if (::setsockopt(fd_, level, optname, data,
539 28 static_cast<socklen_t>(size)) != 0)
540 return make_err(errno);
541 28 return {};
542 }
543
544 inline std::error_code
545 31 select_socket::get_option(
546 int level, int optname,
547 void* data, std::size_t* size) const noexcept
548 {
549 31 socklen_t len = static_cast<socklen_t>(*size);
550 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
551 return make_err(errno);
552 31 *size = static_cast<std::size_t>(len);
553 31 return {};
554 }
555
556 inline void
557 175 select_socket::cancel() noexcept
558 {
559 175 auto self = weak_from_this().lock();
560 175 if (!self)
561 return;
562
563 525 auto cancel_op = [this, &self](select_op& op, int events) {
564 525 auto prev = op.registered.exchange(
565 select_registration_state::unregistered, std::memory_order_acq_rel);
566 525 op.request_cancel();
567 525 if (prev != select_registration_state::unregistered)
568 {
569 91 svc_.scheduler().deregister_fd(fd_, events);
570 91 op.impl_ptr = self;
571 91 svc_.post(&op);
572 91 svc_.work_finished();
573 }
574 700 };
575
576 175 cancel_op(conn_, select_scheduler::event_write);
577 175 cancel_op(rd_, select_scheduler::event_read);
578 175 cancel_op(wr_, select_scheduler::event_write);
579 175 }
580
581 inline void
582 97 select_socket::cancel_single_op(select_op& op) noexcept
583 {
584 97 auto self = weak_from_this().lock();
585 97 if (!self)
586 return;
587
588 // Called from stop_token callback to cancel a specific pending operation.
589 97 auto prev = op.registered.exchange(
590 select_registration_state::unregistered, std::memory_order_acq_rel);
591 97 op.request_cancel();
592
593 97 if (prev != select_registration_state::unregistered)
594 {
595 // Determine which event type to deregister
596 65 int events = 0;
597 65 if (&op == &conn_ || &op == &wr_)
598 events = select_scheduler::event_write;
599 65 else if (&op == &rd_)
600 65 events = select_scheduler::event_read;
601
602 65 svc_.scheduler().deregister_fd(fd_, events);
603
604 65 op.impl_ptr = self;
605 65 svc_.post(&op);
606 65 svc_.work_finished();
607 }
608 97 }
609
610 inline void
611 24215 select_socket::close_socket() noexcept
612 {
613 24215 auto self = weak_from_this().lock();
614 24215 if (self)
615 {
616 72645 auto cancel_op = [this, &self](select_op& op, int events) {
617 72645 auto prev = op.registered.exchange(
618 select_registration_state::unregistered,
619 std::memory_order_acq_rel);
620 72645 op.request_cancel();
621 72645 if (prev != select_registration_state::unregistered)
622 {
623 1 svc_.scheduler().deregister_fd(fd_, events);
624 1 op.impl_ptr = self;
625 1 svc_.post(&op);
626 1 svc_.work_finished();
627 }
628 96860 };
629
630 24215 cancel_op(conn_, select_scheduler::event_write);
631 24215 cancel_op(rd_, select_scheduler::event_read);
632 24215 cancel_op(wr_, select_scheduler::event_write);
633 }
634
635 24215 if (fd_ >= 0)
636 {
637 5379 svc_.scheduler().deregister_fd(
638 fd_, select_scheduler::event_read | select_scheduler::event_write);
639 5379 ::close(fd_);
640 5379 fd_ = -1;
641 }
642
643 24215 local_endpoint_ = endpoint{};
644 24215 remote_endpoint_ = endpoint{};
645 24215 }
646
647 154 inline select_socket_service::select_socket_service(
648 154 capy::execution_context& ctx)
649 154 : state_(
650 std::make_unique<select_socket_state>(
651 154 ctx.use_service<select_scheduler>()))
652 {
653 154 }
654
655 308 inline select_socket_service::~select_socket_service() {}
656
657 inline void
658 154 select_socket_service::shutdown()
659 {
660 154 std::lock_guard lock(state_->mutex_);
661
662 154 while (auto* impl = state_->socket_list_.pop_front())
663 impl->close_socket();
664
665 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666 // drains completed_ops_, calling destroy() on each queued op. Letting
667 // ~state_ release the ptrs (during service destruction, after scheduler
668 // shutdown) keeps every impl alive until all ops have been drained.
669 154 }
670
671 inline io_object::implementation*
672 8069 select_socket_service::construct()
673 {
674 8069 auto impl = std::make_shared<select_socket>(*this);
675 8069 auto* raw = impl.get();
676
677 {
678 8069 std::lock_guard lock(state_->mutex_);
679 8069 state_->socket_list_.push_back(raw);
680 8069 state_->socket_ptrs_.emplace(raw, std::move(impl));
681 8069 }
682
683 8069 return raw;
684 8069 }
685
686 inline void
687 8069 select_socket_service::destroy(io_object::implementation* impl)
688 {
689 8069 auto* select_impl = static_cast<select_socket*>(impl);
690 8069 select_impl->close_socket();
691 8069 std::lock_guard lock(state_->mutex_);
692 8069 state_->socket_list_.remove(select_impl);
693 8069 state_->socket_ptrs_.erase(select_impl);
694 8069 }
695
696 inline std::error_code
697 2698 select_socket_service::open_socket(
698 tcp_socket::implementation& impl,
699 int family, int type, int protocol)
700 {
701 2698 auto* select_impl = static_cast<select_socket*>(&impl);
702 2698 select_impl->close_socket();
703
704 2698 int fd = ::socket(family, type, protocol);
705 2698 if (fd < 0)
706 return make_err(errno);
707
708 2698 if (family == AF_INET6)
709 {
710 5 int one = 1;
711 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
712 }
713
714 // Set non-blocking and close-on-exec
715 2698 int flags = ::fcntl(fd, F_GETFL, 0);
716 2698 if (flags == -1)
717 {
718 int errn = errno;
719 ::close(fd);
720 return make_err(errn);
721 }
722 2698 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723 {
724 int errn = errno;
725 ::close(fd);
726 return make_err(errn);
727 }
728 2698 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729 {
730 int errn = errno;
731 ::close(fd);
732 return make_err(errn);
733 }
734
735 // Check fd is within select() limits
736 2698 if (fd >= FD_SETSIZE)
737 {
738 ::close(fd);
739 return make_err(EMFILE); // Too many open files
740 }
741
742 2698 select_impl->fd_ = fd;
743 2698 return {};
744 }
745
746 inline void
747 13448 select_socket_service::close(io_object::handle& h)
748 {
749 13448 static_cast<select_socket*>(h.get())->close_socket();
750 13448 }
751
752 inline void
753 130411 select_socket_service::post(select_op* op)
754 {
755 130411 state_->sched_.post(op);
756 130411 }
757
758 inline void
759 2962 select_socket_service::work_started() noexcept
760 {
761 2962 state_->sched_.work_started();
762 2962 }
763
764 inline void
765 157 select_socket_service::work_finished() noexcept
766 {
767 157 state_->sched_.work_finished();
768 157 }
769
770 } // namespace boost::corosio::detail
771
772 #endif // BOOST_COROSIO_HAS_SELECT
773
774 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
775