libs/corosio/include/boost/corosio/io_stream.hpp

97.1% Lines (34/35) 100.0% Functions (27/27) 83.3% Branches (5/6)
libs/corosio/include/boost/corosio/io_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 #define BOOST_COROSIO_IO_STREAM_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/io_object.hpp>
15 #include <boost/capy/io_result.hpp>
16 #include <boost/corosio/io_buffer_param.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/io_env.hpp>
19 #include <system_error>
20
21 #include <coroutine>
22 #include <cstddef>
23 #include <stop_token>
24
25 namespace boost::corosio {
26
27 /** Platform stream with read/write operations.
28
29 This base class provides the fundamental async read and write
30 operations for kernel-level stream I/O. Derived classes wrap
31 OS-specific stream implementations (sockets, pipes, etc.) and
32 satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
33
34 @par Semantics
35 Concrete classes wrap direct platform I/O completed by the kernel.
36 Functions taking `io_stream&` signal "platform implementation
37 required" - use this when you need actual kernel I/O rather than
38 a mock or test double.
39
40 For generic stream algorithms that work with test mocks,
41 use `template<capy::Stream S>` instead of `io_stream&`.
42
43 @par Thread Safety
44 Distinct objects: Safe.
45 Shared objects: Unsafe. All calls to a single stream must be made
46 from the same implicit or explicit serialization context.
47
48 @par Example
49 @code
50 // Read until buffer full or EOF
51 capy::task<> read_all( io_stream& stream, std::span<char> buf )
52 {
53 std::size_t total = 0;
54 while( total < buf.size() )
55 {
56 auto [ec, n] = co_await stream.read_some(
57 capy::buffer( buf.data() + total, buf.size() - total ) );
58 if( ec == capy::cond::eof )
59 break;
60 if( ec.failed() )
61 capy::detail::throw_system_error( ec );
62 total += n;
63 }
64 }
65 @endcode
66
67 @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
68 */
69 class BOOST_COROSIO_DECL io_stream : public io_object
70 {
71 public:
72 /** Asynchronously read data from the stream.
73
74 This operation suspends the calling coroutine and initiates a
75 kernel-level read. The coroutine resumes when the operation
76 completes.
77
78 @li The operation completes when:
79 @li At least one byte has been read into the buffer sequence
80 @li The peer closes the connection (EOF)
81 @li An error occurs
82 @li The operation is cancelled via stop token or `cancel()`
83
84 @par Concurrency
85 At most one write operation may be in flight concurrently with
86 this read. No other read operations may be in flight until this
87 operation completes. Note that concurrent in-flight operations
88 does not imply the initiating calls may be made concurrently;
89 all calls must be serialized.
90
91 @par Cancellation
92 Supports cancellation via `std::stop_token` propagated through
93 the IoAwaitable protocol, or via the I/O object's `cancel()`
94 member. When cancelled, the operation completes with an error
95 that compares equal to `capy::cond::canceled`.
96
97 @par Preconditions
98 The stream must be open and connected.
99
100 @param buffers The buffer sequence to read data into. The caller
101 retains ownership and must ensure validity until the
102 operation completes.
103
104 @return An awaitable yielding `(error_code, std::size_t)`.
105 On success, `bytes_transferred` contains the number of bytes
106 read. Compare error codes to conditions, not specific values:
107 @li `capy::cond::eof` - Peer closed connection (TCP FIN)
108 @li `capy::cond::canceled` - Operation was cancelled
109
110 @par Example
111 @code
112 // Simple read with error handling
113 auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
114 if( ec == capy::cond::eof )
115 co_return; // Connection closed gracefully
116 if( ec.failed() )
117 capy::detail::throw_system_error( ec );
118 process( buf, n );
119 @endcode
120
121 @note This operation may read fewer bytes than the buffer
122 capacity. Use a loop or `capy::async_read` to read an
123 exact amount.
124
125 @see write_some, capy::async_read
126 */
127 template<capy::MutableBufferSequence MB>
128 204992 auto read_some(MB const& buffers)
129 {
130 204992 return read_some_awaitable<MB>(*this, buffers);
131 }
132
133 /** Asynchronously write data to the stream.
134
135 This operation suspends the calling coroutine and initiates a
136 kernel-level write. The coroutine resumes when the operation
137 completes.
138
139 @li The operation completes when:
140 @li At least one byte has been written from the buffer sequence
141 @li An error occurs (including connection reset by peer)
142 @li The operation is cancelled via stop token or `cancel()`
143
144 @par Concurrency
145 At most one read operation may be in flight concurrently with
146 this write. No other write operations may be in flight until
147 this operation completes. Note that concurrent in-flight
148 operations does not imply the initiating calls may be made
149 concurrently; all calls must be serialized.
150
151 @par Cancellation
152 Supports cancellation via `std::stop_token` propagated through
153 the IoAwaitable protocol, or via the I/O object's `cancel()`
154 member. When cancelled, the operation completes with an error
155 that compares equal to `capy::cond::canceled`.
156
157 @par Preconditions
158 The stream must be open and connected.
159
160 @param buffers The buffer sequence containing data to write.
161 The caller retains ownership and must ensure validity
162 until the operation completes.
163
164 @return An awaitable yielding `(error_code, std::size_t)`.
165 On success, `bytes_transferred` contains the number of bytes
166 written. Compare error codes to conditions, not specific
167 values:
168 @li `capy::cond::canceled` - Operation was cancelled
169 @li `std::errc::broken_pipe` - Peer closed connection
170
171 @par Example
172 @code
173 // Write all data
174 std::string_view data = "Hello, World!";
175 std::size_t written = 0;
176 while( written < data.size() )
177 {
178 auto [ec, n] = co_await stream.write_some(
179 capy::buffer( data.data() + written,
180 data.size() - written ) );
181 if( ec.failed() )
182 capy::detail::throw_system_error( ec );
183 written += n;
184 }
185 @endcode
186
187 @note This operation may write fewer bytes than the buffer
188 contains. Use a loop or `capy::async_write` to write
189 all data.
190
191 @see read_some, capy::async_write
192 */
193 template<capy::ConstBufferSequence CB>
194 204633 auto write_some(CB const& buffers)
195 {
196 204633 return write_some_awaitable<CB>(*this, buffers);
197 }
198
199 protected:
200 /// Awaitable for async read operations.
201 template<class MutableBufferSequence>
202 struct read_some_awaitable
203 {
204 io_stream& ios_;
205 MutableBufferSequence buffers_;
206 std::stop_token token_;
207 mutable std::error_code ec_;
208 mutable std::size_t bytes_transferred_ = 0;
209
210 204992 read_some_awaitable(
211 io_stream& ios,
212 MutableBufferSequence buffers) noexcept
213 204992 : ios_(ios)
214 204992 , buffers_(std::move(buffers))
215 {
216 204992 }
217
218 204992 bool await_ready() const noexcept
219 {
220 204992 return token_.stop_requested();
221 }
222
223 204992 capy::io_result<std::size_t> await_resume() const noexcept
224 {
225
2/2
✓ Branch 1 taken 195 times.
✓ Branch 2 taken 204797 times.
204992 if (token_.stop_requested())
226 195 return {make_error_code(std::errc::operation_canceled), 0};
227 204797 return {ec_, bytes_transferred_};
228 }
229
230 204992 auto await_suspend(
231 std::coroutine_handle<> h,
232 capy::io_env const* env) -> std::coroutine_handle<>
233 {
234 204992 token_ = env->stop_token;
235
1/1
✓ Branch 4 taken 204992 times.
204992 return ios_.get().read_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
236 }
237 };
238
239 /// Awaitable for async write operations.
240 template<class ConstBufferSequence>
241 struct write_some_awaitable
242 {
243 io_stream& ios_;
244 ConstBufferSequence buffers_;
245 std::stop_token token_;
246 mutable std::error_code ec_;
247 mutable std::size_t bytes_transferred_ = 0;
248
249 204633 write_some_awaitable(
250 io_stream& ios,
251 ConstBufferSequence buffers) noexcept
252 204633 : ios_(ios)
253 204633 , buffers_(std::move(buffers))
254 {
255 204633 }
256
257 204633 bool await_ready() const noexcept
258 {
259 204633 return token_.stop_requested();
260 }
261
262 204633 capy::io_result<std::size_t> await_resume() const noexcept
263 {
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 204633 times.
204633 if (token_.stop_requested())
265 return {make_error_code(std::errc::operation_canceled), 0};
266 204633 return {ec_, bytes_transferred_};
267 }
268
269 204633 auto await_suspend(
270 std::coroutine_handle<> h,
271 capy::io_env const* env) -> std::coroutine_handle<>
272 {
273 204633 token_ = env->stop_token;
274
1/1
✓ Branch 4 taken 204633 times.
204633 return ios_.get().write_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
275 }
276 };
277
278 public:
279 /** Platform-specific stream implementation interface.
280
281 Derived classes implement this interface to provide kernel-level
282 read and write operations for each supported platform (IOCP,
283 epoll, kqueue, io_uring).
284 */
285 struct io_stream_impl : io_object_impl
286 {
287 /// Initiate platform read operation.
288 virtual std::coroutine_handle<> read_some(
289 std::coroutine_handle<>,
290 capy::executor_ref,
291 io_buffer_param,
292 std::stop_token,
293 std::error_code*,
294 std::size_t*) = 0;
295
296 /// Initiate platform write operation.
297 virtual std::coroutine_handle<> write_some(
298 std::coroutine_handle<>,
299 capy::executor_ref,
300 io_buffer_param,
301 std::stop_token,
302 std::error_code*,
303 std::size_t*) = 0;
304 };
305
306 protected:
307 /// Construct stream bound to the given execution context.
308 explicit
309 16342 io_stream(
310 capy::execution_context& ctx) noexcept
311 16342 : io_object(ctx)
312 {
313 16342 }
314
315 private:
316 /// Return implementation downcasted to stream interface.
317 409625 io_stream_impl& get() const noexcept
318 {
319 409625 return *static_cast<io_stream_impl*>(impl_);
320 }
321 };
322
323 } // namespace boost::corosio
324
325 #endif
326