LCOV - code coverage report
Current view: top level - boost/capy/io - any_read_source.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.2 % 120 101
Test Date: 2026-01-30 23:43:15 Functions: 88.9 % 36 32

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/buffers.hpp>
      15              : #include <boost/capy/buffers/buffer_param.hpp>
      16              : #include <boost/capy/concept/io_awaitable.hpp>
      17              : #include <boost/capy/concept/read_source.hpp>
      18              : #include <boost/capy/coro.hpp>
      19              : #include <boost/capy/ex/executor_ref.hpp>
      20              : #include <boost/capy/io_result.hpp>
      21              : #include <boost/capy/task.hpp>
      22              : 
      23              : #include <system_error>
      24              : 
      25              : #include <concepts>
      26              : #include <coroutine>
      27              : #include <cstddef>
      28              : #include <exception>
      29              : #include <span>
      30              : #include <stop_token>
      31              : #include <utility>
      32              : 
      33              : namespace boost {
      34              : namespace capy {
      35              : 
      36              : /** Type-erased wrapper for any ReadSource.
      37              : 
      38              :     This class provides type erasure for any type satisfying the
      39              :     @ref ReadSource concept, enabling runtime polymorphism for
      40              :     source read operations. It uses a cached coroutine frame to achieve
      41              :     zero steady-state allocation after construction.
      42              : 
      43              :     The wrapper supports two construction modes:
      44              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      45              :       allocates storage and owns the source.
      46              :     - **Reference**: Pass a pointer to wrap without ownership. The
      47              :       pointed-to source must outlive this wrapper.
      48              : 
      49              :     @par Frame Preallocation
      50              :     The constructor preallocates the internal coroutine frame.
      51              :     This reserves all virtual address space at server startup
      52              :     so memory usage can be measured up front, rather than
      53              :     allocating piecemeal as traffic arrives.
      54              : 
      55              :     @par Thread Safety
      56              :     Not thread-safe. Concurrent operations on the same wrapper
      57              :     are undefined behavior.
      58              : 
      59              :     @par Example
      60              :     @code
      61              :     // Owning - takes ownership of the source
      62              :     any_read_source rs(some_source{args...});
      63              : 
      64              :     // Reference - wraps without ownership
      65              :     some_source source;
      66              :     any_read_source rs(&source);
      67              : 
      68              :     mutable_buffer buf(data, size);
      69              :     auto [ec, n] = co_await rs.read(std::span(&buf, 1));
      70              :     @endcode
      71              : 
      72              :     @see any_read_stream, ReadSource
      73              : */
      74              : class any_read_source
      75              : {
      76              :     struct vtable;
      77              : 
      78              :     template<ReadSource S>
      79              :     struct vtable_for_impl;
      80              : 
      81              :     struct read_op;
      82              : 
      83              :     void* source_ = nullptr;
      84              :     vtable const* vt_ = nullptr;
      85              :     void* cached_frame_ = nullptr;
      86              :     std::size_t cached_size_ = 0;
      87              :     void* storage_ = nullptr;
      88              : 
      89              :     template<ReadSource S>
      90              :     static coro
      91              :     do_read_impl(
      92              :         void* source,
      93              :         any_read_source* wrapper,
      94              :         std::span<mutable_buffer const> buffers,
      95              :         coro h,
      96              :         executor_ref ex,
      97              :         std::stop_token token,
      98              :         std::error_code* ec,
      99              :         std::size_t* n);
     100              : 
     101              :     template<ReadSource S>
     102              :     static read_op
     103              :     read_coro(
     104              :         any_read_source* wrapper,
     105              :         S& source,
     106              :         std::span<mutable_buffer const> bufs,
     107              :         std::error_code* out_ec,
     108              :         std::size_t* out_n);
     109              : 
     110              :     void* alloc_frame(std::size_t size);
     111              :     void free_frame(void* p, std::size_t size);
     112              : 
     113              : public:
     114              :     /** Destructor.
     115              : 
     116              :         Destroys the owned source (if any) and releases the cached
     117              :         coroutine frame.
     118              :     */
     119              :     ~any_read_source();
     120              : 
     121              :     /** Default constructor.
     122              : 
     123              :         Constructs an empty wrapper. Operations on a default-constructed
     124              :         wrapper result in undefined behavior.
     125              :     */
     126              :     any_read_source() = default;
     127              : 
     128              :     /** Non-copyable.
     129              : 
     130              :         The frame cache is per-instance and cannot be shared.
     131              :     */
     132              :     any_read_source(any_read_source const&) = delete;
     133              :     any_read_source& operator=(any_read_source const&) = delete;
     134              : 
     135              :     /** Move constructor.
     136              : 
     137              :         Transfers ownership of the wrapped source (if owned) and
     138              :         cached frame from `other`. After the move, `other` is
     139              :         in a default-constructed state.
     140              : 
     141              :         @param other The wrapper to move from.
     142              :     */
     143            1 :     any_read_source(any_read_source&& other) noexcept
     144            1 :         : source_(std::exchange(other.source_, nullptr))
     145            1 :         , vt_(std::exchange(other.vt_, nullptr))
     146            1 :         , cached_frame_(std::exchange(other.cached_frame_, nullptr))
     147            1 :         , cached_size_(std::exchange(other.cached_size_, 0))
     148            1 :         , storage_(std::exchange(other.storage_, nullptr))
     149              :     {
     150            1 :     }
     151              : 
     152              :     /** Move assignment operator.
     153              : 
     154              :         Destroys any owned source and releases existing resources,
     155              :         then transfers ownership from `other`.
     156              : 
     157              :         @param other The wrapper to move from.
     158              :         @return Reference to this wrapper.
     159              :     */
     160              :     any_read_source&
     161              :     operator=(any_read_source&& other) noexcept;
     162              : 
     163              :     /** Construct by taking ownership of a ReadSource.
     164              : 
     165              :         Allocates storage and moves the source into this wrapper.
     166              :         The wrapper owns the source and will destroy it.
     167              : 
     168              :         @param s The source to take ownership of.
     169              :     */
     170              :     template<ReadSource S>
     171              :         requires (!std::same_as<std::decay_t<S>, any_read_source>)
     172              :     any_read_source(S s);
     173              : 
     174              :     /** Construct by wrapping a ReadSource without ownership.
     175              : 
     176              :         Wraps the given source by pointer. The source must remain
     177              :         valid for the lifetime of this wrapper.
     178              : 
     179              :         @param s Pointer to the source to wrap.
     180              :     */
     181              :     template<ReadSource S>
     182           88 :     any_read_source(S* s) noexcept
     183           88 :         : source_(s)
     184           88 :         , vt_(&vtable_for_impl<S>::value)
     185              :     {
     186              :         // Preallocate the coroutine frame
     187           88 :         read_coro<S>(this, *s, {}, nullptr, nullptr);
     188           88 :     }
     189              : 
     190              :     /** Check if the wrapper contains a valid source.
     191              : 
     192              :         @return `true` if wrapping a source, `false` if default-constructed
     193              :             or moved-from.
     194              :     */
     195              :     bool
     196            9 :     has_value() const noexcept
     197              :     {
     198            9 :         return source_ != nullptr;
     199              :     }
     200              : 
     201              :     /** Check if the wrapper contains a valid source.
     202              : 
     203              :         @return `true` if wrapping a source, `false` if default-constructed
     204              :             or moved-from.
     205              :     */
     206              :     explicit
     207            2 :     operator bool() const noexcept
     208              :     {
     209            2 :         return has_value();
     210              :     }
     211              : 
     212              :     /** Initiate an asynchronous read operation.
     213              : 
     214              :         Reads data into the provided buffer sequence. The operation
     215              :         completes when the entire buffer sequence is filled, end-of-file
     216              :         is reached, or an error occurs.
     217              : 
     218              :         @param buffers The buffer sequence to read into. Passed by
     219              :             value to ensure the sequence lives in the coroutine frame
     220              :             across suspension points.
     221              : 
     222              :         @return An awaitable yielding `(error_code,std::size_t)`.
     223              : 
     224              :         @par Postconditions
     225              :         Exactly one of the following is true on return:
     226              :         @li **Success**: `!ec` and `n == buffer_size(buffers)`.
     227              :             The entire buffer was filled.
     228              :         @li **End-of-stream or Error**: `ec` and `n` indicates
     229              :             the number of bytes transferred before the failure.
     230              : 
     231              :         @par Preconditions
     232              :         The wrapper must contain a valid source (`has_value() == true`).
     233              :     */
     234              :     template<MutableBufferSequence MB>
     235              :     task<io_result<std::size_t>>
     236              :     read(MB buffers);
     237              : 
     238              : protected:
     239              :     /** Rebind to a new source after move.
     240              : 
     241              :         Updates the internal pointer to reference a new source object.
     242              :         Used by owning wrappers after move assignment when the owned
     243              :         object has moved to a new location.
     244              : 
     245              :         @param new_source The new source to bind to. Must be the same
     246              :             type as the original source.
     247              : 
     248              :         @note Terminates if called with a source of different type
     249              :             than the original.
     250              :     */
     251              :     template<ReadSource S>
     252              :     void
     253              :     rebind(S& new_source) noexcept
     254              :     {
     255              :         if(vt_ != &vtable_for_impl<S>::value)
     256              :             std::terminate();
     257              :         source_ = &new_source;
     258              :     }
     259              : 
     260              : private:
     261              :     auto
     262              :     read_some_(std::span<mutable_buffer const> buffers);
     263              : };
     264              : 
     265              : //----------------------------------------------------------
     266              : 
     267              : struct any_read_source::vtable
     268              : {
     269              :     void (*destroy)(void*) noexcept;
     270              : 
     271              :     coro (*do_read)(
     272              :         void* source,
     273              :         any_read_source* wrapper,
     274              :         std::span<mutable_buffer const> buffers,
     275              :         coro h,
     276              :         executor_ref ex,
     277              :         std::stop_token token,
     278              :         std::error_code* ec,
     279              :         std::size_t* n);
     280              : };
     281              : 
     282              : template<ReadSource S>
     283              : struct any_read_source::vtable_for_impl
     284              : {
     285              :     static void
     286            0 :     do_destroy_impl(void* source) noexcept
     287              :     {
     288            0 :         static_cast<S*>(source)->~S();
     289            0 :     }
     290              : 
     291              :     static constexpr vtable value = {
     292              :         &do_destroy_impl,
     293              :         &any_read_source::do_read_impl<S>
     294              :     };
     295              : };
     296              : 
     297              : //----------------------------------------------------------
     298              : 
     299              : inline
     300           91 : any_read_source::~any_read_source()
     301              : {
     302           91 :     if(storage_)
     303              :     {
     304            0 :         vt_->destroy(source_);
     305            0 :         ::operator delete(storage_);
     306              :     }
     307           91 :     if(cached_frame_)
     308           88 :         ::operator delete(cached_frame_);
     309           91 : }
     310              : 
     311              : inline any_read_source&
     312            1 : any_read_source::operator=(any_read_source&& other) noexcept
     313              : {
     314            1 :     if(this != &other)
     315              :     {
     316            1 :         if(storage_)
     317              :         {
     318            0 :             vt_->destroy(source_);
     319            0 :             ::operator delete(storage_);
     320              :         }
     321            1 :         if(cached_frame_)
     322            0 :             ::operator delete(cached_frame_);
     323            1 :         source_ = std::exchange(other.source_, nullptr);
     324            1 :         vt_ = std::exchange(other.vt_, nullptr);
     325            1 :         cached_frame_ = std::exchange(other.cached_frame_, nullptr);
     326            1 :         cached_size_ = std::exchange(other.cached_size_, 0);
     327            1 :         storage_ = std::exchange(other.storage_, nullptr);
     328              :     }
     329            1 :     return *this;
     330              : }
     331              : 
     332              : template<ReadSource S>
     333              :     requires (!std::same_as<std::decay_t<S>, any_read_source>)
     334              : any_read_source::any_read_source(S s)
     335              :     : vt_(&vtable_for_impl<S>::value)
     336              : {
     337              :     struct guard {
     338              :         any_read_source* self;
     339              :         bool committed = false;
     340              :         ~guard() {
     341              :             if(!committed && self->storage_) {
     342              :                 self->vt_->destroy(self->source_);
     343              :                 ::operator delete(self->storage_);
     344              :                 self->storage_ = nullptr;
     345              :                 self->source_ = nullptr;
     346              :             }
     347              :         }
     348              :     } g{this};
     349              : 
     350              :     storage_ = ::operator new(sizeof(S));
     351              :     source_ = ::new(storage_) S(std::move(s));
     352              : 
     353              :     // Preallocate the coroutine frame
     354              :     auto& ref = *static_cast<S*>(source_);
     355              :     read_coro<S>(this, ref, {}, nullptr, nullptr);
     356              : 
     357              :     g.committed = true;
     358              : }
     359              : 
     360              : //----------------------------------------------------------
     361              : 
     362              : struct any_read_source::read_op
     363              : {
     364              :     struct promise_type
     365              :     {
     366              :         executor_ref executor_;
     367              :         std::stop_token stop_token_;
     368              :         coro caller_h_{};
     369              : 
     370          218 :         promise_type() = default;
     371              : 
     372              :         read_op
     373          218 :         get_return_object() noexcept
     374              :         {
     375              :             return read_op{
     376          218 :                 std::coroutine_handle<promise_type>::from_promise(*this)};
     377              :         }
     378              : 
     379              :         std::suspend_always
     380          218 :         initial_suspend() noexcept
     381              :         {
     382          218 :             return {};
     383              :         }
     384              : 
     385              :         auto
     386           99 :         final_suspend() noexcept
     387              :         {
     388              :             struct awaiter
     389              :             {
     390              :                 promise_type* p_;
     391              : 
     392           99 :                 bool await_ready() const noexcept { return false; }
     393              : 
     394           99 :                 coro await_suspend(coro) const noexcept
     395              :                 {
     396           99 :                     if(p_->caller_h_)
     397            0 :                         return p_->caller_h_;
     398           99 :                     return std::noop_coroutine();
     399              :                 }
     400              : 
     401            0 :                 void await_resume() const noexcept {}
     402              :             };
     403           99 :             return awaiter{this};
     404              :         }
     405              : 
     406              :         void
     407           99 :         return_void() noexcept
     408              :         {
     409           99 :         }
     410              : 
     411              :         void
     412           31 :         unhandled_exception()
     413              :         {
     414           31 :             throw;
     415              :         }
     416              : 
     417              :         template<class... Args>
     418              :         static void*
     419          218 :         operator new(
     420              :             std::size_t size,
     421              :             any_read_source* wrapper,
     422              :             Args&&...)
     423              :         {
     424          218 :             return wrapper->alloc_frame(size);
     425              :         }
     426              : 
     427              :         template<class... Args>
     428              :         static void
     429              :         operator delete(void*, any_read_source*, Args&&...) noexcept
     430              :         {
     431              :         }
     432              : 
     433              :         static void
     434          218 :         operator delete(void*, std::size_t) noexcept
     435              :         {
     436          218 :         }
     437              : 
     438              :         void
     439          130 :         set_executor(executor_ref ex) noexcept
     440              :         {
     441          130 :             executor_ = ex;
     442          130 :         }
     443              : 
     444              :         void
     445          130 :         set_stop_token(std::stop_token token) noexcept
     446              :         {
     447          130 :             stop_token_ = token;
     448          130 :         }
     449              : 
     450              :         void
     451            0 :         set_caller(coro h) noexcept
     452              :         {
     453            0 :             caller_h_ = h;
     454            0 :         }
     455              : 
     456              :         template<class Awaitable>
     457              :         struct transform_awaiter
     458              :         {
     459              :             std::decay_t<Awaitable> a_;
     460              :             promise_type* p_;
     461              : 
     462          130 :             bool await_ready()
     463              :             {
     464          130 :                 return a_.await_ready();
     465              :             }
     466              : 
     467          130 :             decltype(auto) await_resume()
     468              :             {
     469          130 :                 return a_.await_resume();
     470              :             }
     471              : 
     472            0 :             auto await_suspend(coro h)
     473              :             {
     474            0 :                 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
     475              :             }
     476              :         };
     477              : 
     478              :         template<class Awaitable>
     479          130 :         auto await_transform(Awaitable&& a)
     480              :         {
     481              :             using A = std::decay_t<Awaitable>;
     482              :             if constexpr (IoAwaitable<A>)
     483              :             {
     484              :                 return transform_awaiter<Awaitable>{
     485          130 :                     std::forward<Awaitable>(a), this};
     486              :             }
     487              :             else
     488              :             {
     489              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     490              :             }
     491              :         }
     492              :     };
     493              : 
     494              :     std::coroutine_handle<promise_type> h_;
     495              : 
     496          218 :     ~read_op()
     497              :     {
     498          218 :         if(h_)
     499          119 :             h_.destroy();
     500          218 :     }
     501              : 
     502              :     read_op(read_op const&) = delete;
     503              :     read_op& operator=(read_op const&) = delete;
     504              : 
     505              :     read_op(read_op&& other) noexcept
     506              :         : h_(std::exchange(other.h_, nullptr))
     507              :     {
     508              :     }
     509              : 
     510              :     read_op& operator=(read_op&& other) noexcept
     511              :     {
     512              :         if(this != &other)
     513              :         {
     514              :             if(h_)
     515              :                 h_.destroy();
     516              :             h_ = std::exchange(other.h_, nullptr);
     517              :         }
     518              :         return *this;
     519              :     }
     520              : 
     521              : private:
     522              :     explicit
     523          218 :     read_op(std::coroutine_handle<promise_type> h) noexcept
     524          218 :         : h_(h)
     525              :     {
     526          218 :     }
     527              : };
     528              : 
     529              : //----------------------------------------------------------
     530              : 
     531              : inline void*
     532          218 : any_read_source::alloc_frame(std::size_t size)
     533              : {
     534          218 :     if(cached_frame_ && cached_size_ >= size)
     535          130 :         return cached_frame_;
     536              : 
     537           88 :     if(cached_frame_)
     538            0 :         ::operator delete(cached_frame_);
     539              : 
     540           88 :     cached_frame_ = ::operator new(size);
     541           88 :     cached_size_ = size;
     542           88 :     return cached_frame_;
     543              : }
     544              : 
     545              : inline void
     546              : any_read_source::free_frame(void*, std::size_t)
     547              : {
     548              :     // Keep the frame cached for reuse
     549              : }
     550              : 
     551              : #if defined(__GNUC__) && !defined(__clang__)
     552              : #pragma GCC diagnostic push
     553              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     554              : #endif
     555              : 
     556              : template<ReadSource S>
     557              : any_read_source::read_op
     558          218 : any_read_source::read_coro(
     559              :     any_read_source*,
     560              :     S& source,
     561              :     std::span<mutable_buffer const> bufs,
     562              :     std::error_code* out_ec,
     563              :     std::size_t* out_n)
     564              : {
     565              :     auto [err, bytes] = co_await source.read(bufs);
     566              : 
     567              :     *out_ec = err;
     568              :     *out_n = bytes;
     569          436 : }
     570              : 
     571              : #if defined(__GNUC__) && !defined(__clang__)
     572              : #pragma GCC diagnostic pop
     573              : #endif
     574              : 
     575              : template<ReadSource S>
     576              : coro
     577          130 : any_read_source::do_read_impl(
     578              :     void* source,
     579              :     any_read_source* wrapper,
     580              :     std::span<mutable_buffer const> buffers,
     581              :     coro h,
     582              :     executor_ref ex,
     583              :     std::stop_token token,
     584              :     std::error_code* ec,
     585              :     std::size_t* n)
     586              : {
     587          130 :     auto& s = *static_cast<S*>(source);
     588              : 
     589              :     // Create coroutine - frame is cached in wrapper
     590          130 :     auto op = read_coro<S>(wrapper, s, buffers, ec, n);
     591              : 
     592              :     // Set executor and stop token on promise before resuming
     593          130 :     op.h_.promise().set_executor(ex);
     594          130 :     op.h_.promise().set_stop_token(token);
     595              : 
     596              :     // Resume the coroutine to start the operation
     597          130 :     op.h_.resume();
     598              : 
     599              :     // Check if operation completed synchronously
     600           99 :     if(op.h_.done())
     601              :     {
     602           99 :         op.h_.destroy();
     603           99 :         op.h_ = nullptr;
     604           99 :         return ex.dispatch(h);
     605              :     }
     606              : 
     607              :     // Operation is pending - caller will be resumed via symmetric transfer
     608            0 :     op.h_.promise().set_caller(h);
     609            0 :     op.h_ = nullptr;
     610            0 :     return std::noop_coroutine();
     611          130 : }
     612              : 
     613              : //----------------------------------------------------------
     614              : 
     615              : inline auto
     616          130 : any_read_source::read_some_(std::span<mutable_buffer const> buffers)
     617              : {
     618              :     struct awaitable
     619              :     {
     620              :         any_read_source* self_;
     621              :         std::span<mutable_buffer const> buffers_;
     622              :         std::error_code ec_;
     623              :         std::size_t n_ = 0;
     624              : 
     625              :         bool
     626          130 :         await_ready() const noexcept
     627              :         {
     628          130 :             return false;
     629              :         }
     630              : 
     631              :         coro
     632          130 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     633              :         {
     634          260 :             return self_->vt_->do_read(
     635          130 :                 self_->source_,
     636              :                 self_,
     637              :                 buffers_,
     638              :                 h,
     639              :                 ex,
     640              :                 token,
     641              :                 &ec_,
     642          198 :                 &n_);
     643              :         }
     644              : 
     645              :         io_result<std::size_t>
     646           99 :         await_resume() const noexcept
     647              :         {
     648           99 :             return {ec_, n_};
     649              :         }
     650              :     };
     651          130 :     return awaitable{this, buffers, {}, 0};
     652              : }
     653              : 
     654              : template<MutableBufferSequence MB>
     655              : task<io_result<std::size_t>>
     656          106 : any_read_source::read(MB buffers)
     657              : {
     658              :     buffer_param<MB> bp(std::move(buffers));
     659              :     std::size_t total = 0;
     660              : 
     661              :     for(;;)
     662              :     {
     663              :         auto bufs = bp.data();
     664              :         if(bufs.empty())
     665              :             break;
     666              : 
     667              :         auto [ec, n] = co_await read_some_(bufs);
     668              :         total += n;
     669              :         if(ec)
     670              :             co_return {ec, total};
     671              :         bp.consume(n);
     672              :     }
     673              : 
     674              :     co_return {{}, total};
     675          212 : }
     676              : 
     677              : } // namespace capy
     678              : } // namespace boost
     679              : 
     680              : #endif
        

Generated by: LCOV version 2.3