1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_IO_STREAM_HPP
10  
#ifndef BOOST_COROSIO_IO_STREAM_HPP
11  
#define BOOST_COROSIO_IO_STREAM_HPP
11  
#define BOOST_COROSIO_IO_STREAM_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/io_object.hpp>
14  
#include <boost/corosio/io_object.hpp>
15  
#include <boost/capy/io_result.hpp>
15  
#include <boost/capy/io_result.hpp>
16  
#include <boost/corosio/io_buffer_param.hpp>
16  
#include <boost/corosio/io_buffer_param.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <system_error>
19  
#include <system_error>
20  

20  

21  
#include <coroutine>
21  
#include <coroutine>
22  
#include <cstddef>
22  
#include <cstddef>
23  
#include <stop_token>
23  
#include <stop_token>
24  

24  

25  
namespace boost::corosio {
25  
namespace boost::corosio {
26  

26  

27  
/** Platform stream with read/write operations.
27  
/** Platform stream with read/write operations.
28  

28  

29  
    This base class provides the fundamental async read and write
29  
    This base class provides the fundamental async read and write
30  
    operations for kernel-level stream I/O. Derived classes wrap
30  
    operations for kernel-level stream I/O. Derived classes wrap
31  
    OS-specific stream implementations (sockets, pipes, etc.) and
31  
    OS-specific stream implementations (sockets, pipes, etc.) and
32  
    satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
32  
    satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
33  

33  

34  
    @par Semantics
34  
    @par Semantics
35  
    Concrete classes wrap direct platform I/O completed by the kernel.
35  
    Concrete classes wrap direct platform I/O completed by the kernel.
36  
    Functions taking `io_stream&` signal "platform implementation
36  
    Functions taking `io_stream&` signal "platform implementation
37  
    required" - use this when you need actual kernel I/O rather than
37  
    required" - use this when you need actual kernel I/O rather than
38  
    a mock or test double.
38  
    a mock or test double.
39  

39  

40  
    For generic stream algorithms that work with test mocks,
40  
    For generic stream algorithms that work with test mocks,
41  
    use `template<capy::Stream S>` instead of `io_stream&`.
41  
    use `template<capy::Stream S>` instead of `io_stream&`.
42  

42  

43  
    @par Thread Safety
43  
    @par Thread Safety
44  
    Distinct objects: Safe.
44  
    Distinct objects: Safe.
45  
    Shared objects: Unsafe. All calls to a single stream must be made
45  
    Shared objects: Unsafe. All calls to a single stream must be made
46  
    from the same implicit or explicit serialization context.
46  
    from the same implicit or explicit serialization context.
47  

47  

48  
    @par Example
48  
    @par Example
49  
    @code
49  
    @code
50  
    // Read until buffer full or EOF
50  
    // Read until buffer full or EOF
51  
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
51  
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
52  
    {
52  
    {
53  
        std::size_t total = 0;
53  
        std::size_t total = 0;
54  
        while( total < buf.size() )
54  
        while( total < buf.size() )
55  
        {
55  
        {
56  
            auto [ec, n] = co_await stream.read_some(
56  
            auto [ec, n] = co_await stream.read_some(
57  
                capy::buffer( buf.data() + total, buf.size() - total ) );
57  
                capy::buffer( buf.data() + total, buf.size() - total ) );
58  
            if( ec == capy::cond::eof )
58  
            if( ec == capy::cond::eof )
59  
                break;
59  
                break;
60  
            if( ec.failed() )
60  
            if( ec.failed() )
61  
                capy::detail::throw_system_error( ec );
61  
                capy::detail::throw_system_error( ec );
62  
            total += n;
62  
            total += n;
63  
        }
63  
        }
64  
    }
64  
    }
65  
    @endcode
65  
    @endcode
66  

66  

67  
    @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
67  
    @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
68  
*/
68  
*/
69  
class BOOST_COROSIO_DECL io_stream : public io_object
69  
class BOOST_COROSIO_DECL io_stream : public io_object
70  
{
70  
{
71  
public:
71  
public:
72  
    /** Asynchronously read data from the stream.
72  
    /** Asynchronously read data from the stream.
73  

73  

74  
        This operation suspends the calling coroutine and initiates a
74  
        This operation suspends the calling coroutine and initiates a
75  
        kernel-level read. The coroutine resumes when the operation
75  
        kernel-level read. The coroutine resumes when the operation
76  
        completes.
76  
        completes.
77  

77  

78  
        @li The operation completes when:
78  
        @li The operation completes when:
79  
        @li At least one byte has been read into the buffer sequence
79  
        @li At least one byte has been read into the buffer sequence
80  
        @li The peer closes the connection (EOF)
80  
        @li The peer closes the connection (EOF)
81  
        @li An error occurs
81  
        @li An error occurs
82  
        @li The operation is cancelled via stop token or `cancel()`
82  
        @li The operation is cancelled via stop token or `cancel()`
83  

83  

84  
        @par Concurrency
84  
        @par Concurrency
85  
        At most one write operation may be in flight concurrently with
85  
        At most one write operation may be in flight concurrently with
86  
        this read. No other read operations may be in flight until this
86  
        this read. No other read operations may be in flight until this
87  
        operation completes. Note that concurrent in-flight operations
87  
        operation completes. Note that concurrent in-flight operations
88  
        does not imply the initiating calls may be made concurrently;
88  
        does not imply the initiating calls may be made concurrently;
89  
        all calls must be serialized.
89  
        all calls must be serialized.
90  

90  

91  
        @par Cancellation
91  
        @par Cancellation
92  
        Supports cancellation via `std::stop_token` propagated through
92  
        Supports cancellation via `std::stop_token` propagated through
93  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
93  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
94  
        member. When cancelled, the operation completes with an error
94  
        member. When cancelled, the operation completes with an error
95  
        that compares equal to `capy::cond::canceled`.
95  
        that compares equal to `capy::cond::canceled`.
96  

96  

97  
        @par Preconditions
97  
        @par Preconditions
98  
        The stream must be open and connected.
98  
        The stream must be open and connected.
99  

99  

100  
        @param buffers The buffer sequence to read data into. The caller
100  
        @param buffers The buffer sequence to read data into. The caller
101  
            retains ownership and must ensure validity until the
101  
            retains ownership and must ensure validity until the
102  
            operation completes.
102  
            operation completes.
103  

103  

104  
        @return An awaitable yielding `(error_code, std::size_t)`.
104  
        @return An awaitable yielding `(error_code, std::size_t)`.
105  
            On success, `bytes_transferred` contains the number of bytes
105  
            On success, `bytes_transferred` contains the number of bytes
106  
            read. Compare error codes to conditions, not specific values:
106  
            read. Compare error codes to conditions, not specific values:
107  
            @li `capy::cond::eof` - Peer closed connection (TCP FIN)
107  
            @li `capy::cond::eof` - Peer closed connection (TCP FIN)
108  
            @li `capy::cond::canceled` - Operation was cancelled
108  
            @li `capy::cond::canceled` - Operation was cancelled
109  

109  

110  
        @par Example
110  
        @par Example
111  
        @code
111  
        @code
112  
        // Simple read with error handling
112  
        // Simple read with error handling
113  
        auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
113  
        auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
114  
        if( ec == capy::cond::eof )
114  
        if( ec == capy::cond::eof )
115  
            co_return;  // Connection closed gracefully
115  
            co_return;  // Connection closed gracefully
116  
        if( ec.failed() )
116  
        if( ec.failed() )
117  
            capy::detail::throw_system_error( ec );
117  
            capy::detail::throw_system_error( ec );
118  
        process( buf, n );
118  
        process( buf, n );
119  
        @endcode
119  
        @endcode
120  

120  

121  
        @note This operation may read fewer bytes than the buffer
121  
        @note This operation may read fewer bytes than the buffer
122  
            capacity. Use a loop or `capy::async_read` to read an
122  
            capacity. Use a loop or `capy::async_read` to read an
123  
            exact amount.
123  
            exact amount.
124  

124  

125  
        @see write_some, capy::async_read
125  
        @see write_some, capy::async_read
126  
    */
126  
    */
127  
    template<capy::MutableBufferSequence MB>
127  
    template<capy::MutableBufferSequence MB>
128  
    auto read_some(MB const& buffers)
128  
    auto read_some(MB const& buffers)
129  
    {
129  
    {
130  
        return read_some_awaitable<MB>(*this, buffers);
130  
        return read_some_awaitable<MB>(*this, buffers);
131  
    }
131  
    }
132  

132  

133  
    /** Asynchronously write data to the stream.
133  
    /** Asynchronously write data to the stream.
134  

134  

135  
        This operation suspends the calling coroutine and initiates a
135  
        This operation suspends the calling coroutine and initiates a
136  
        kernel-level write. The coroutine resumes when the operation
136  
        kernel-level write. The coroutine resumes when the operation
137  
        completes.
137  
        completes.
138  

138  

139  
        @li The operation completes when:
139  
        @li The operation completes when:
140  
        @li At least one byte has been written from the buffer sequence
140  
        @li At least one byte has been written from the buffer sequence
141  
        @li An error occurs (including connection reset by peer)
141  
        @li An error occurs (including connection reset by peer)
142  
        @li The operation is cancelled via stop token or `cancel()`
142  
        @li The operation is cancelled via stop token or `cancel()`
143  

143  

144  
        @par Concurrency
144  
        @par Concurrency
145  
        At most one read operation may be in flight concurrently with
145  
        At most one read operation may be in flight concurrently with
146  
        this write. No other write operations may be in flight until
146  
        this write. No other write operations may be in flight until
147  
        this operation completes. Note that concurrent in-flight
147  
        this operation completes. Note that concurrent in-flight
148  
        operations does not imply the initiating calls may be made
148  
        operations does not imply the initiating calls may be made
149  
        concurrently; all calls must be serialized.
149  
        concurrently; all calls must be serialized.
150  

150  

151  
        @par Cancellation
151  
        @par Cancellation
152  
        Supports cancellation via `std::stop_token` propagated through
152  
        Supports cancellation via `std::stop_token` propagated through
153  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
153  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
154  
        member. When cancelled, the operation completes with an error
154  
        member. When cancelled, the operation completes with an error
155  
        that compares equal to `capy::cond::canceled`.
155  
        that compares equal to `capy::cond::canceled`.
156  

156  

157  
        @par Preconditions
157  
        @par Preconditions
158  
        The stream must be open and connected.
158  
        The stream must be open and connected.
159  

159  

160  
        @param buffers The buffer sequence containing data to write.
160  
        @param buffers The buffer sequence containing data to write.
161  
            The caller retains ownership and must ensure validity
161  
            The caller retains ownership and must ensure validity
162  
            until the operation completes.
162  
            until the operation completes.
163  

163  

164  
        @return An awaitable yielding `(error_code, std::size_t)`.
164  
        @return An awaitable yielding `(error_code, std::size_t)`.
165  
            On success, `bytes_transferred` contains the number of bytes
165  
            On success, `bytes_transferred` contains the number of bytes
166  
            written. Compare error codes to conditions, not specific
166  
            written. Compare error codes to conditions, not specific
167  
            values:
167  
            values:
168  
            @li `capy::cond::canceled` - Operation was cancelled
168  
            @li `capy::cond::canceled` - Operation was cancelled
169  
            @li `std::errc::broken_pipe` - Peer closed connection
169  
            @li `std::errc::broken_pipe` - Peer closed connection
170  

170  

171  
        @par Example
171  
        @par Example
172  
        @code
172  
        @code
173  
        // Write all data
173  
        // Write all data
174  
        std::string_view data = "Hello, World!";
174  
        std::string_view data = "Hello, World!";
175  
        std::size_t written = 0;
175  
        std::size_t written = 0;
176  
        while( written < data.size() )
176  
        while( written < data.size() )
177  
        {
177  
        {
178  
            auto [ec, n] = co_await stream.write_some(
178  
            auto [ec, n] = co_await stream.write_some(
179  
                capy::buffer( data.data() + written,
179  
                capy::buffer( data.data() + written,
180  
                              data.size() - written ) );
180  
                              data.size() - written ) );
181  
            if( ec.failed() )
181  
            if( ec.failed() )
182  
                capy::detail::throw_system_error( ec );
182  
                capy::detail::throw_system_error( ec );
183  
            written += n;
183  
            written += n;
184  
        }
184  
        }
185  
        @endcode
185  
        @endcode
186  

186  

187  
        @note This operation may write fewer bytes than the buffer
187  
        @note This operation may write fewer bytes than the buffer
188  
            contains. Use a loop or `capy::async_write` to write
188  
            contains. Use a loop or `capy::async_write` to write
189  
            all data.
189  
            all data.
190  

190  

191  
        @see read_some, capy::async_write
191  
        @see read_some, capy::async_write
192  
    */
192  
    */
193  
    template<capy::ConstBufferSequence CB>
193  
    template<capy::ConstBufferSequence CB>
194  
    auto write_some(CB const& buffers)
194  
    auto write_some(CB const& buffers)
195  
    {
195  
    {
196  
        return write_some_awaitable<CB>(*this, buffers);
196  
        return write_some_awaitable<CB>(*this, buffers);
197  
    }
197  
    }
198  

198  

199  
protected:
199  
protected:
200  
    /// Awaitable for async read operations.
200  
    /// Awaitable for async read operations.
201  
    template<class MutableBufferSequence>
201  
    template<class MutableBufferSequence>
202  
    struct read_some_awaitable
202  
    struct read_some_awaitable
203  
    {
203  
    {
204  
        io_stream& ios_;
204  
        io_stream& ios_;
205  
        MutableBufferSequence buffers_;
205  
        MutableBufferSequence buffers_;
206  
        std::stop_token token_;
206  
        std::stop_token token_;
207  
        mutable std::error_code ec_;
207  
        mutable std::error_code ec_;
208  
        mutable std::size_t bytes_transferred_ = 0;
208  
        mutable std::size_t bytes_transferred_ = 0;
209  

209  

210  
        read_some_awaitable(
210  
        read_some_awaitable(
211  
            io_stream& ios,
211  
            io_stream& ios,
212  
            MutableBufferSequence buffers) noexcept
212  
            MutableBufferSequence buffers) noexcept
213  
            : ios_(ios)
213  
            : ios_(ios)
214  
            , buffers_(std::move(buffers))
214  
            , buffers_(std::move(buffers))
215  
        {
215  
        {
216  
        }
216  
        }
217  

217  

218  
        bool await_ready() const noexcept
218  
        bool await_ready() const noexcept
219  
        {
219  
        {
220  
            return token_.stop_requested();
220  
            return token_.stop_requested();
221  
        }
221  
        }
222  

222  

223  
        capy::io_result<std::size_t> await_resume() const noexcept
223  
        capy::io_result<std::size_t> await_resume() const noexcept
224  
        {
224  
        {
225  
            if (token_.stop_requested())
225  
            if (token_.stop_requested())
226  
                return {make_error_code(std::errc::operation_canceled), 0};
226  
                return {make_error_code(std::errc::operation_canceled), 0};
227  
            return {ec_, bytes_transferred_};
227  
            return {ec_, bytes_transferred_};
228  
        }
228  
        }
229  

229  

230  
        auto await_suspend(
230  
        auto await_suspend(
231  
            std::coroutine_handle<> h,
231  
            std::coroutine_handle<> h,
232  
            capy::io_env const* env) -> std::coroutine_handle<>
232  
            capy::io_env const* env) -> std::coroutine_handle<>
233  
        {
233  
        {
234  
            token_ = env->stop_token;
234  
            token_ = env->stop_token;
235  
            return ios_.get().read_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
235  
            return ios_.get().read_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
236  
        }
236  
        }
237  
    };
237  
    };
238  

238  

239  
    /// Awaitable for async write operations.
239  
    /// Awaitable for async write operations.
240  
    template<class ConstBufferSequence>
240  
    template<class ConstBufferSequence>
241  
    struct write_some_awaitable
241  
    struct write_some_awaitable
242  
    {
242  
    {
243  
        io_stream& ios_;
243  
        io_stream& ios_;
244  
        ConstBufferSequence buffers_;
244  
        ConstBufferSequence buffers_;
245  
        std::stop_token token_;
245  
        std::stop_token token_;
246  
        mutable std::error_code ec_;
246  
        mutable std::error_code ec_;
247  
        mutable std::size_t bytes_transferred_ = 0;
247  
        mutable std::size_t bytes_transferred_ = 0;
248  

248  

249  
        write_some_awaitable(
249  
        write_some_awaitable(
250  
            io_stream& ios,
250  
            io_stream& ios,
251  
            ConstBufferSequence buffers) noexcept
251  
            ConstBufferSequence buffers) noexcept
252  
            : ios_(ios)
252  
            : ios_(ios)
253  
            , buffers_(std::move(buffers))
253  
            , buffers_(std::move(buffers))
254  
        {
254  
        {
255  
        }
255  
        }
256  

256  

257  
        bool await_ready() const noexcept
257  
        bool await_ready() const noexcept
258  
        {
258  
        {
259  
            return token_.stop_requested();
259  
            return token_.stop_requested();
260  
        }
260  
        }
261  

261  

262  
        capy::io_result<std::size_t> await_resume() const noexcept
262  
        capy::io_result<std::size_t> await_resume() const noexcept
263  
        {
263  
        {
264  
            if (token_.stop_requested())
264  
            if (token_.stop_requested())
265  
                return {make_error_code(std::errc::operation_canceled), 0};
265  
                return {make_error_code(std::errc::operation_canceled), 0};
266  
            return {ec_, bytes_transferred_};
266  
            return {ec_, bytes_transferred_};
267  
        }
267  
        }
268  

268  

269  
        auto await_suspend(
269  
        auto await_suspend(
270  
            std::coroutine_handle<> h,
270  
            std::coroutine_handle<> h,
271  
            capy::io_env const* env) -> std::coroutine_handle<>
271  
            capy::io_env const* env) -> std::coroutine_handle<>
272  
        {
272  
        {
273  
            token_ = env->stop_token;
273  
            token_ = env->stop_token;
274  
            return ios_.get().write_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
274  
            return ios_.get().write_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
275  
        }
275  
        }
276  
    };
276  
    };
277  

277  

278  
public:
278  
public:
279  
    /** Platform-specific stream implementation interface.
279  
    /** Platform-specific stream implementation interface.
280  

280  

281  
        Derived classes implement this interface to provide kernel-level
281  
        Derived classes implement this interface to provide kernel-level
282  
        read and write operations for each supported platform (IOCP,
282  
        read and write operations for each supported platform (IOCP,
283  
        epoll, kqueue, io_uring).
283  
        epoll, kqueue, io_uring).
284  
    */
284  
    */
285  
    struct io_stream_impl : io_object_impl
285  
    struct io_stream_impl : io_object_impl
286  
    {
286  
    {
287  
        /// Initiate platform read operation.
287  
        /// Initiate platform read operation.
288  
        virtual std::coroutine_handle<> read_some(
288  
        virtual std::coroutine_handle<> read_some(
289  
            std::coroutine_handle<>,
289  
            std::coroutine_handle<>,
290  
            capy::executor_ref,
290  
            capy::executor_ref,
291  
            io_buffer_param,
291  
            io_buffer_param,
292  
            std::stop_token,
292  
            std::stop_token,
293  
            std::error_code*,
293  
            std::error_code*,
294  
            std::size_t*) = 0;
294  
            std::size_t*) = 0;
295  

295  

296  
        /// Initiate platform write operation.
296  
        /// Initiate platform write operation.
297  
        virtual std::coroutine_handle<> write_some(
297  
        virtual std::coroutine_handle<> write_some(
298  
            std::coroutine_handle<>,
298  
            std::coroutine_handle<>,
299  
            capy::executor_ref,
299  
            capy::executor_ref,
300  
            io_buffer_param,
300  
            io_buffer_param,
301  
            std::stop_token,
301  
            std::stop_token,
302  
            std::error_code*,
302  
            std::error_code*,
303  
            std::size_t*) = 0;
303  
            std::size_t*) = 0;
304  
    };
304  
    };
305  

305  

306  
protected:
306  
protected:
307  
    /// Construct stream bound to the given execution context.
307  
    /// Construct stream bound to the given execution context.
308  
    explicit
308  
    explicit
309  
    io_stream(
309  
    io_stream(
310  
        capy::execution_context& ctx) noexcept
310  
        capy::execution_context& ctx) noexcept
311  
        : io_object(ctx)
311  
        : io_object(ctx)
312  
    {
312  
    {
313  
    }
313  
    }
314  

314  

315  
private:
315  
private:
316  
    /// Return implementation downcasted to stream interface.
316  
    /// Return implementation downcasted to stream interface.
317  
    io_stream_impl& get() const noexcept
317  
    io_stream_impl& get() const noexcept
318  
    {
318  
    {
319  
        return *static_cast<io_stream_impl*>(impl_);
319  
        return *static_cast<io_stream_impl*>(impl_);
320  
    }
320  
    }
321  
};
321  
};
322  

322  

323  
} // namespace boost::corosio
323  
} // namespace boost::corosio
324  

324  

325  
#endif
325  
#endif