LCOV - code coverage report
Current view: top level - include/boost/corosio - io_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 97.1 % 35 34
Test Date: 2026-02-12 21:00:53 Functions: 100.0 % 27 27

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_IO_STREAM_HPP
      11              : #define BOOST_COROSIO_IO_STREAM_HPP
      12              : 
      13              : #include <boost/corosio/detail/config.hpp>
      14              : #include <boost/corosio/io_object.hpp>
      15              : #include <boost/capy/io_result.hpp>
      16              : #include <boost/corosio/io_buffer_param.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <boost/capy/ex/io_env.hpp>
      19              : #include <system_error>
      20              : 
      21              : #include <coroutine>
      22              : #include <cstddef>
      23              : #include <stop_token>
      24              : 
      25              : namespace boost::corosio {
      26              : 
      27              : /** Platform stream with read/write operations.
      28              : 
      29              :     This base class provides the fundamental async read and write
      30              :     operations for kernel-level stream I/O. Derived classes wrap
      31              :     OS-specific stream implementations (sockets, pipes, etc.) and
      32              :     satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
      33              : 
      34              :     @par Semantics
      35              :     Concrete classes wrap direct platform I/O completed by the kernel.
      36              :     Functions taking `io_stream&` signal "platform implementation
      37              :     required" - use this when you need actual kernel I/O rather than
      38              :     a mock or test double.
      39              : 
      40              :     For generic stream algorithms that work with test mocks,
      41              :     use `template<capy::Stream S>` instead of `io_stream&`.
      42              : 
      43              :     @par Thread Safety
      44              :     Distinct objects: Safe.
      45              :     Shared objects: Unsafe. All calls to a single stream must be made
      46              :     from the same implicit or explicit serialization context.
      47              : 
      48              :     @par Example
      49              :     @code
      50              :     // Read until buffer full or EOF
      51              :     capy::task<> read_all( io_stream& stream, std::span<char> buf )
      52              :     {
      53              :         std::size_t total = 0;
      54              :         while( total < buf.size() )
      55              :         {
      56              :             auto [ec, n] = co_await stream.read_some(
      57              :                 capy::buffer( buf.data() + total, buf.size() - total ) );
      58              :             if( ec == capy::cond::eof )
      59              :                 break;
      60              :             if( ec.failed() )
      61              :                 capy::detail::throw_system_error( ec );
      62              :             total += n;
      63              :         }
      64              :     }
      65              :     @endcode
      66              : 
      67              :     @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
      68              : */
      69              : class BOOST_COROSIO_DECL io_stream : public io_object
      70              : {
      71              : public:
      72              :     /** Asynchronously read data from the stream.
      73              : 
      74              :         This operation suspends the calling coroutine and initiates a
      75              :         kernel-level read. The coroutine resumes when the operation
      76              :         completes.
      77              : 
      78              :         @li The operation completes when:
      79              :         @li At least one byte has been read into the buffer sequence
      80              :         @li The peer closes the connection (EOF)
      81              :         @li An error occurs
      82              :         @li The operation is cancelled via stop token or `cancel()`
      83              : 
      84              :         @par Concurrency
      85              :         At most one write operation may be in flight concurrently with
      86              :         this read. No other read operations may be in flight until this
      87              :         operation completes. Note that concurrent in-flight operations
      88              :         does not imply the initiating calls may be made concurrently;
      89              :         all calls must be serialized.
      90              : 
      91              :         @par Cancellation
      92              :         Supports cancellation via `std::stop_token` propagated through
      93              :         the IoAwaitable protocol, or via the I/O object's `cancel()`
      94              :         member. When cancelled, the operation completes with an error
      95              :         that compares equal to `capy::cond::canceled`.
      96              : 
      97              :         @par Preconditions
      98              :         The stream must be open and connected.
      99              : 
     100              :         @param buffers The buffer sequence to read data into. The caller
     101              :             retains ownership and must ensure validity until the
     102              :             operation completes.
     103              : 
     104              :         @return An awaitable yielding `(error_code, std::size_t)`.
     105              :             On success, `bytes_transferred` contains the number of bytes
     106              :             read. Compare error codes to conditions, not specific values:
     107              :             @li `capy::cond::eof` - Peer closed connection (TCP FIN)
     108              :             @li `capy::cond::canceled` - Operation was cancelled
     109              : 
     110              :         @par Example
     111              :         @code
     112              :         // Simple read with error handling
     113              :         auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
     114              :         if( ec == capy::cond::eof )
     115              :             co_return;  // Connection closed gracefully
     116              :         if( ec.failed() )
     117              :             capy::detail::throw_system_error( ec );
     118              :         process( buf, n );
     119              :         @endcode
     120              : 
     121              :         @note This operation may read fewer bytes than the buffer
     122              :             capacity. Use a loop or `capy::async_read` to read an
     123              :             exact amount.
     124              : 
     125              :         @see write_some, capy::async_read
     126              :     */
     127              :     template<capy::MutableBufferSequence MB>
     128       204992 :     auto read_some(MB const& buffers)
     129              :     {
     130       204992 :         return read_some_awaitable<MB>(*this, buffers);
     131              :     }
     132              : 
     133              :     /** Asynchronously write data to the stream.
     134              : 
     135              :         This operation suspends the calling coroutine and initiates a
     136              :         kernel-level write. The coroutine resumes when the operation
     137              :         completes.
     138              : 
     139              :         @li The operation completes when:
     140              :         @li At least one byte has been written from the buffer sequence
     141              :         @li An error occurs (including connection reset by peer)
     142              :         @li The operation is cancelled via stop token or `cancel()`
     143              : 
     144              :         @par Concurrency
     145              :         At most one read operation may be in flight concurrently with
     146              :         this write. No other write operations may be in flight until
     147              :         this operation completes. Note that concurrent in-flight
     148              :         operations does not imply the initiating calls may be made
     149              :         concurrently; all calls must be serialized.
     150              : 
     151              :         @par Cancellation
     152              :         Supports cancellation via `std::stop_token` propagated through
     153              :         the IoAwaitable protocol, or via the I/O object's `cancel()`
     154              :         member. When cancelled, the operation completes with an error
     155              :         that compares equal to `capy::cond::canceled`.
     156              : 
     157              :         @par Preconditions
     158              :         The stream must be open and connected.
     159              : 
     160              :         @param buffers The buffer sequence containing data to write.
     161              :             The caller retains ownership and must ensure validity
     162              :             until the operation completes.
     163              : 
     164              :         @return An awaitable yielding `(error_code, std::size_t)`.
     165              :             On success, `bytes_transferred` contains the number of bytes
     166              :             written. Compare error codes to conditions, not specific
     167              :             values:
     168              :             @li `capy::cond::canceled` - Operation was cancelled
     169              :             @li `std::errc::broken_pipe` - Peer closed connection
     170              : 
     171              :         @par Example
     172              :         @code
     173              :         // Write all data
     174              :         std::string_view data = "Hello, World!";
     175              :         std::size_t written = 0;
     176              :         while( written < data.size() )
     177              :         {
     178              :             auto [ec, n] = co_await stream.write_some(
     179              :                 capy::buffer( data.data() + written,
     180              :                               data.size() - written ) );
     181              :             if( ec.failed() )
     182              :                 capy::detail::throw_system_error( ec );
     183              :             written += n;
     184              :         }
     185              :         @endcode
     186              : 
     187              :         @note This operation may write fewer bytes than the buffer
     188              :             contains. Use a loop or `capy::async_write` to write
     189              :             all data.
     190              : 
     191              :         @see read_some, capy::async_write
     192              :     */
     193              :     template<capy::ConstBufferSequence CB>
     194       204633 :     auto write_some(CB const& buffers)
     195              :     {
     196       204633 :         return write_some_awaitable<CB>(*this, buffers);
     197              :     }
     198              : 
     199              : protected:
     200              :     /// Awaitable for async read operations.
     201              :     template<class MutableBufferSequence>
     202              :     struct read_some_awaitable
     203              :     {
     204              :         io_stream& ios_;
     205              :         MutableBufferSequence buffers_;
     206              :         std::stop_token token_;
     207              :         mutable std::error_code ec_;
     208              :         mutable std::size_t bytes_transferred_ = 0;
     209              : 
     210       204992 :         read_some_awaitable(
     211              :             io_stream& ios,
     212              :             MutableBufferSequence buffers) noexcept
     213       204992 :             : ios_(ios)
     214       204992 :             , buffers_(std::move(buffers))
     215              :         {
     216       204992 :         }
     217              : 
     218       204992 :         bool await_ready() const noexcept
     219              :         {
     220       204992 :             return token_.stop_requested();
     221              :         }
     222              : 
     223       204992 :         capy::io_result<std::size_t> await_resume() const noexcept
     224              :         {
     225       204992 :             if (token_.stop_requested())
     226          195 :                 return {make_error_code(std::errc::operation_canceled), 0};
     227       204797 :             return {ec_, bytes_transferred_};
     228              :         }
     229              : 
     230       204992 :         auto await_suspend(
     231              :             std::coroutine_handle<> h,
     232              :             capy::io_env const* env) -> std::coroutine_handle<>
     233              :         {
     234       204992 :             token_ = env->stop_token;
     235       204992 :             return ios_.get().read_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
     236              :         }
     237              :     };
     238              : 
     239              :     /// Awaitable for async write operations.
     240              :     template<class ConstBufferSequence>
     241              :     struct write_some_awaitable
     242              :     {
     243              :         io_stream& ios_;
     244              :         ConstBufferSequence buffers_;
     245              :         std::stop_token token_;
     246              :         mutable std::error_code ec_;
     247              :         mutable std::size_t bytes_transferred_ = 0;
     248              : 
     249       204633 :         write_some_awaitable(
     250              :             io_stream& ios,
     251              :             ConstBufferSequence buffers) noexcept
     252       204633 :             : ios_(ios)
     253       204633 :             , buffers_(std::move(buffers))
     254              :         {
     255       204633 :         }
     256              : 
     257       204633 :         bool await_ready() const noexcept
     258              :         {
     259       204633 :             return token_.stop_requested();
     260              :         }
     261              : 
     262       204633 :         capy::io_result<std::size_t> await_resume() const noexcept
     263              :         {
     264       204633 :             if (token_.stop_requested())
     265            0 :                 return {make_error_code(std::errc::operation_canceled), 0};
     266       204633 :             return {ec_, bytes_transferred_};
     267              :         }
     268              : 
     269       204633 :         auto await_suspend(
     270              :             std::coroutine_handle<> h,
     271              :             capy::io_env const* env) -> std::coroutine_handle<>
     272              :         {
     273       204633 :             token_ = env->stop_token;
     274       204633 :             return ios_.get().write_some(h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
     275              :         }
     276              :     };
     277              : 
     278              : public:
     279              :     /** Platform-specific stream implementation interface.
     280              : 
     281              :         Derived classes implement this interface to provide kernel-level
     282              :         read and write operations for each supported platform (IOCP,
     283              :         epoll, kqueue, io_uring).
     284              :     */
     285              :     struct io_stream_impl : io_object_impl
     286              :     {
     287              :         /// Initiate platform read operation.
     288              :         virtual std::coroutine_handle<> read_some(
     289              :             std::coroutine_handle<>,
     290              :             capy::executor_ref,
     291              :             io_buffer_param,
     292              :             std::stop_token,
     293              :             std::error_code*,
     294              :             std::size_t*) = 0;
     295              : 
     296              :         /// Initiate platform write operation.
     297              :         virtual std::coroutine_handle<> write_some(
     298              :             std::coroutine_handle<>,
     299              :             capy::executor_ref,
     300              :             io_buffer_param,
     301              :             std::stop_token,
     302              :             std::error_code*,
     303              :             std::size_t*) = 0;
     304              :     };
     305              : 
     306              : protected:
     307              :     /// Construct stream bound to the given execution context.
     308              :     explicit
     309        16342 :     io_stream(
     310              :         capy::execution_context& ctx) noexcept
     311        16342 :         : io_object(ctx)
     312              :     {
     313        16342 :     }
     314              : 
     315              : private:
     316              :     /// Return implementation downcasted to stream interface.
     317       409625 :     io_stream_impl& get() const noexcept
     318              :     {
     319       409625 :         return *static_cast<io_stream_impl*>(impl_);
     320              :     }
     321              : };
     322              : 
     323              : } // namespace boost::corosio
     324              : 
     325              : #endif
        

Generated by: LCOV version 2.3