LCOV - code coverage report
Current view: top level - boost/capy/io - any_buffer_source.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.7 % 124 105
Test Date: 2026-01-30 23:43:15 Functions: 88.9 % 36 32

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/buffers.hpp>
      15              : #include <boost/capy/buffers/buffer_copy.hpp>
      16              : #include <boost/capy/buffers/slice.hpp>
      17              : #include <boost/capy/concept/buffer_source.hpp>
      18              : #include <boost/capy/concept/io_awaitable.hpp>
      19              : #include <boost/capy/concept/read_source.hpp>
      20              : #include <boost/capy/coro.hpp>
      21              : #include <boost/capy/error.hpp>
      22              : #include <boost/capy/ex/executor_ref.hpp>
      23              : #include <boost/capy/io_result.hpp>
      24              : #include <boost/capy/task.hpp>
      25              : 
      26              : #include <system_error>
      27              : 
      28              : #include <concepts>
      29              : #include <coroutine>
      30              : #include <cstddef>
      31              : #include <exception>
      32              : #include <span>
      33              : #include <stop_token>
      34              : #include <utility>
      35              : 
      36              : namespace boost {
      37              : namespace capy {
      38              : 
      39              : /** Type-erased wrapper for any BufferSource.
      40              : 
      41              :     This class provides type erasure for any type satisfying the
      42              :     @ref BufferSource concept, enabling runtime polymorphism for
      43              :     buffer pull operations. The wrapper also satisfies @ref ReadSource,
      44              :     allowing it to be used with code expecting either interface.
      45              :     It uses a cached coroutine frame to achieve zero steady-state
      46              :     allocation after construction.
      47              : 
      48              :     The wrapper also satisfies @ref ReadSource through the templated
      49              :     @ref read method. This method copies data from the source's
      50              :     internal buffers into the caller's buffers, incurring one extra
      51              :     buffer copy compared to using @ref pull and @ref consume directly.
      52              : 
      53              :     The wrapper supports two construction modes:
      54              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      55              :       allocates storage and owns the source.
      56              :     - **Reference**: Pass a pointer to wrap without ownership. The
      57              :       pointed-to source must outlive this wrapper.
      58              : 
      59              :     @par Frame Preallocation
      60              :     The constructor preallocates the internal coroutine frame.
      61              :     This reserves all virtual address space at server startup
      62              :     so memory usage can be measured up front, rather than
      63              :     allocating piecemeal as traffic arrives.
      64              : 
      65              :     @par Thread Safety
      66              :     Not thread-safe. Concurrent operations on the same wrapper
      67              :     are undefined behavior.
      68              : 
      69              :     @par Example
      70              :     @code
      71              :     // Owning - takes ownership of the source
      72              :     any_buffer_source abs(some_buffer_source{args...});
      73              : 
      74              :     // Reference - wraps without ownership
      75              :     some_buffer_source src;
      76              :     any_buffer_source abs(&src);
      77              : 
      78              :     const_buffer arr[16];
      79              :     auto [ec, count] = co_await abs.pull(arr, 16);
      80              :     @endcode
      81              : 
      82              :     @see any_write_sink, BufferSource, ReadSource
      83              : */
      84              : class any_buffer_source
      85              : {
      86              :     struct vtable;
      87              : 
      88              :     template<BufferSource S>
      89              :     struct vtable_for_impl;
      90              : 
      91              :     struct pull_op;
      92              : 
      93              :     void* source_ = nullptr;
      94              :     vtable const* vt_ = nullptr;
      95              :     void* cached_frame_ = nullptr;
      96              :     std::size_t cached_size_ = 0;
      97              :     void* storage_ = nullptr;
      98              : 
      99              :     template<BufferSource S>
     100              :     static coro
     101              :     do_pull_impl(
     102              :         void* source,
     103              :         any_buffer_source* wrapper,
     104              :         const_buffer* arr,
     105              :         std::size_t max_count,
     106              :         coro h,
     107              :         executor_ref ex,
     108              :         std::stop_token token,
     109              :         std::error_code* ec,
     110              :         std::size_t* count);
     111              : 
     112              :     template<BufferSource S>
     113              :     static pull_op
     114              :     pull_coro(
     115              :         any_buffer_source* wrapper,
     116              :         S& source,
     117              :         const_buffer* arr,
     118              :         std::size_t max_count,
     119              :         std::error_code* out_ec,
     120              :         std::size_t* out_count);
     121              : 
     122              :     void* alloc_frame(std::size_t size);
     123              :     void free_frame(void* p, std::size_t size);
     124              : 
     125              : public:
     126              :     /** Destructor.
     127              : 
     128              :         Destroys the owned source (if any) and releases the cached
     129              :         coroutine frame.
     130              :     */
     131              :     ~any_buffer_source();
     132              : 
     133              :     /** Default constructor.
     134              : 
     135              :         Constructs an empty wrapper. Operations on a default-constructed
     136              :         wrapper result in undefined behavior.
     137              :     */
     138              :     any_buffer_source() = default;
     139              : 
     140              :     /** Non-copyable.
     141              : 
     142              :         The frame cache is per-instance and cannot be shared.
     143              :     */
     144              :     any_buffer_source(any_buffer_source const&) = delete;
     145              :     any_buffer_source& operator=(any_buffer_source const&) = delete;
     146              : 
     147              :     /** Move constructor.
     148              : 
     149              :         Transfers ownership of the wrapped source (if owned) and
     150              :         cached frame from `other`. After the move, `other` is
     151              :         in a default-constructed state.
     152              : 
     153              :         @param other The wrapper to move from.
     154              :     */
     155            1 :     any_buffer_source(any_buffer_source&& other) noexcept
     156            1 :         : source_(std::exchange(other.source_, nullptr))
     157            1 :         , vt_(std::exchange(other.vt_, nullptr))
     158            1 :         , cached_frame_(std::exchange(other.cached_frame_, nullptr))
     159            1 :         , cached_size_(std::exchange(other.cached_size_, 0))
     160            1 :         , storage_(std::exchange(other.storage_, nullptr))
     161              :     {
     162            1 :     }
     163              : 
     164              :     /** Move assignment operator.
     165              : 
     166              :         Destroys any owned source and releases existing resources,
     167              :         then transfers ownership from `other`.
     168              : 
     169              :         @param other The wrapper to move from.
     170              :         @return Reference to this wrapper.
     171              :     */
     172              :     any_buffer_source&
     173              :     operator=(any_buffer_source&& other) noexcept;
     174              : 
     175              :     /** Construct by taking ownership of a BufferSource.
     176              : 
     177              :         Allocates storage and moves the source into this wrapper.
     178              :         The wrapper owns the source and will destroy it.
     179              : 
     180              :         @param s The source to take ownership of.
     181              :     */
     182              :     template<BufferSource S>
     183              :         requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     184              :     any_buffer_source(S s);
     185              : 
     186              :     /** Construct by wrapping a BufferSource without ownership.
     187              : 
     188              :         Wraps the given source by pointer. The source must remain
     189              :         valid for the lifetime of this wrapper.
     190              : 
     191              :         @param s Pointer to the source to wrap.
     192              :     */
     193              :     template<BufferSource S>
     194           56 :     any_buffer_source(S* s) noexcept
     195           56 :         : source_(s)
     196           56 :         , vt_(&vtable_for_impl<S>::value)
     197              :     {
     198              :         // Preallocate coroutine frame to find max size
     199           56 :         pull_coro<S>(this, *s, nullptr, 0, nullptr, nullptr);
     200           56 :     }
     201              : 
     202              :     /** Check if the wrapper contains a valid source.
     203              : 
     204              :         @return `true` if wrapping a source, `false` if default-constructed
     205              :             or moved-from.
     206              :     */
     207              :     bool
     208            9 :     has_value() const noexcept
     209              :     {
     210            9 :         return source_ != nullptr;
     211              :     }
     212              : 
     213              :     /** Check if the wrapper contains a valid source.
     214              : 
     215              :         @return `true` if wrapping a source, `false` if default-constructed
     216              :             or moved-from.
     217              :     */
     218              :     explicit
     219            2 :     operator bool() const noexcept
     220              :     {
     221            2 :         return has_value();
     222              :     }
     223              : 
     224              :     /** Consume bytes from the source.
     225              : 
     226              :         Advances the internal read position of the underlying source
     227              :         by the specified number of bytes. The next call to @ref pull
     228              :         returns data starting after the consumed bytes.
     229              : 
     230              :         @param n The number of bytes to consume. Must not exceed the
     231              :         total size of buffers returned by the previous @ref pull.
     232              : 
     233              :         @par Preconditions
     234              :         The wrapper must contain a valid source (`has_value() == true`).
     235              :     */
     236              :     void
     237              :     consume(std::size_t n) noexcept;
     238              : 
     239              :     /** Pull buffer data from the source.
     240              : 
     241              :         Fills the provided array with buffer descriptors from the
     242              :         underlying source. The operation completes when data is
     243              :         available, the source is exhausted, or an error occurs.
     244              : 
     245              :         @param arr Pointer to array of const_buffer to fill.
     246              :         @param max_count Maximum number of buffers to fill.
     247              : 
     248              :         @return An awaitable yielding `(error_code,std::size_t)`.
     249              :             On success with data, `count > 0` indicates buffers filled.
     250              :             On success with `count == 0`, source is exhausted.
     251              : 
     252              :         @par Preconditions
     253              :         The wrapper must contain a valid source (`has_value() == true`).
     254              :     */
     255              :     auto
     256              :     pull(const_buffer* arr, std::size_t max_count);
     257              : 
     258              :     /** Read data into a mutable buffer sequence.
     259              : 
     260              :         Fills the provided buffer sequence by pulling data from the
     261              :         underlying source and copying it into the caller's buffers.
     262              :         This satisfies @ref ReadSource but incurs a copy; for zero-copy
     263              :         access, use @ref pull and @ref consume instead.
     264              : 
     265              :         @note This operation copies data from the source's internal
     266              :         buffers into the caller's buffers. For zero-copy reads,
     267              :         use @ref pull and @ref consume directly.
     268              : 
     269              :         @param buffers The buffer sequence to fill.
     270              : 
     271              :         @return An awaitable yielding `(error_code,std::size_t)`.
     272              :             On success, `n == buffer_size(buffers)`.
     273              :             On EOF, `ec == error::eof` and `n` is bytes transferred.
     274              : 
     275              :         @par Preconditions
     276              :         The wrapper must contain a valid source (`has_value() == true`).
     277              : 
     278              :         @see pull, consume
     279              :     */
     280              :     template<MutableBufferSequence MB>
     281              :     task<io_result<std::size_t>>
     282              :     read(MB buffers);
     283              : 
     284              : protected:
     285              :     /** Rebind to a new source after move.
     286              : 
     287              :         Updates the internal pointer to reference a new source object.
     288              :         Used by owning wrappers after move assignment when the owned
     289              :         object has moved to a new location.
     290              : 
     291              :         @param new_source The new source to bind to. Must be the same
     292              :             type as the original source.
     293              : 
     294              :         @note Terminates if called with a source of different type
     295              :             than the original.
     296              :     */
     297              :     template<BufferSource S>
     298              :     void
     299              :     rebind(S& new_source) noexcept
     300              :     {
     301              :         if(vt_ != &vtable_for_impl<S>::value)
     302              :             std::terminate();
     303              :         source_ = &new_source;
     304              :     }
     305              : };
     306              : 
     307              : //----------------------------------------------------------
     308              : 
     309              : struct any_buffer_source::vtable
     310              : {
     311              :     void (*destroy)(void*) noexcept;
     312              : 
     313              :     coro (*do_pull)(
     314              :         void* source,
     315              :         any_buffer_source* wrapper,
     316              :         const_buffer* arr,
     317              :         std::size_t max_count,
     318              :         coro h,
     319              :         executor_ref ex,
     320              :         std::stop_token token,
     321              :         std::error_code* ec,
     322              :         std::size_t* count);
     323              :     void (*do_consume)(void* source, std::size_t n) noexcept;
     324              : };
     325              : 
     326              : template<BufferSource S>
     327              : struct any_buffer_source::vtable_for_impl
     328              : {
     329              :     static void
     330            0 :     do_destroy_impl(void* source) noexcept
     331              :     {
     332            0 :         static_cast<S*>(source)->~S();
     333            0 :     }
     334              : 
     335              :     static void
     336           39 :     do_consume_impl(void* source, std::size_t n) noexcept
     337              :     {
     338           39 :         static_cast<S*>(source)->consume(n);
     339           39 :     }
     340              : 
     341              :     static constexpr vtable value = {
     342              :         &do_destroy_impl,
     343              :         &any_buffer_source::do_pull_impl<S>,
     344              :         &do_consume_impl
     345              :     };
     346              : };
     347              : 
     348              : //----------------------------------------------------------
     349              : 
     350              : inline
     351           59 : any_buffer_source::~any_buffer_source()
     352              : {
     353           59 :     if(storage_)
     354              :     {
     355            0 :         vt_->destroy(source_);
     356            0 :         ::operator delete(storage_);
     357              :     }
     358           59 :     if(cached_frame_)
     359           56 :         ::operator delete(cached_frame_);
     360           59 : }
     361              : 
     362              : inline any_buffer_source&
     363            1 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
     364              : {
     365            1 :     if(this != &other)
     366              :     {
     367            1 :         if(storage_)
     368              :         {
     369            0 :             vt_->destroy(source_);
     370            0 :             ::operator delete(storage_);
     371              :         }
     372            1 :         if(cached_frame_)
     373            0 :             ::operator delete(cached_frame_);
     374            1 :         source_ = std::exchange(other.source_, nullptr);
     375            1 :         vt_ = std::exchange(other.vt_, nullptr);
     376            1 :         cached_frame_ = std::exchange(other.cached_frame_, nullptr);
     377            1 :         cached_size_ = std::exchange(other.cached_size_, 0);
     378            1 :         storage_ = std::exchange(other.storage_, nullptr);
     379              :     }
     380            1 :     return *this;
     381              : }
     382              : 
     383              : template<BufferSource S>
     384              :     requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     385              : any_buffer_source::any_buffer_source(S s)
     386              :     : vt_(&vtable_for_impl<S>::value)
     387              : {
     388              :     struct guard {
     389              :         any_buffer_source* self;
     390              :         bool committed = false;
     391              :         ~guard() {
     392              :             if(!committed && self->storage_) {
     393              :                 self->vt_->destroy(self->source_);
     394              :                 ::operator delete(self->storage_);
     395              :                 self->storage_ = nullptr;
     396              :                 self->source_ = nullptr;
     397              :             }
     398              :         }
     399              :     } g{this};
     400              : 
     401              :     storage_ = ::operator new(sizeof(S));
     402              :     source_ = ::new(storage_) S(std::move(s));
     403              : 
     404              :     // Preallocate coroutine frame to find max size
     405              :     auto& ref = *static_cast<S*>(source_);
     406              :     pull_coro<S>(this, ref, nullptr, 0, nullptr, nullptr);
     407              : 
     408              :     g.committed = true;
     409              : }
     410              : 
     411              : //----------------------------------------------------------
     412              : 
     413              : struct any_buffer_source::pull_op
     414              : {
     415              :     struct promise_type
     416              :     {
     417              :         executor_ref executor_;
     418              :         std::stop_token stop_token_;
     419              :         coro caller_h_{};
     420              : 
     421          148 :         promise_type() = default;
     422              : 
     423              :         pull_op
     424          148 :         get_return_object() noexcept
     425              :         {
     426              :             return pull_op{
     427          148 :                 std::coroutine_handle<promise_type>::from_promise(*this)};
     428              :         }
     429              : 
     430              :         std::suspend_always
     431          148 :         initial_suspend() noexcept
     432              :         {
     433          148 :             return {};
     434              :         }
     435              : 
     436              :         auto
     437           73 :         final_suspend() noexcept
     438              :         {
     439              :             struct awaiter
     440              :             {
     441              :                 promise_type* p_;
     442              : 
     443           73 :                 bool await_ready() const noexcept { return false; }
     444              : 
     445           73 :                 coro await_suspend(coro) const noexcept
     446              :                 {
     447           73 :                     if(p_->caller_h_)
     448            0 :                         return p_->caller_h_;
     449           73 :                     return std::noop_coroutine();
     450              :                 }
     451              : 
     452            0 :                 void await_resume() const noexcept {}
     453              :             };
     454           73 :             return awaiter{this};
     455              :         }
     456              : 
     457              :         void
     458           73 :         return_void() noexcept
     459              :         {
     460           73 :         }
     461              : 
     462              :         void
     463           19 :         unhandled_exception()
     464              :         {
     465           19 :             throw;
     466              :         }
     467              : 
     468              :         template<class... Args>
     469              :         static void*
     470          148 :         operator new(
     471              :             std::size_t size,
     472              :             any_buffer_source* wrapper,
     473              :             Args&&...)
     474              :         {
     475          148 :             return wrapper->alloc_frame(size);
     476              :         }
     477              : 
     478              :         template<class... Args>
     479              :         static void
     480              :         operator delete(void*, any_buffer_source*, Args&&...) noexcept
     481              :         {
     482              :         }
     483              : 
     484              :         static void
     485          148 :         operator delete(void*, std::size_t) noexcept
     486              :         {
     487          148 :         }
     488              : 
     489              :         void
     490           92 :         set_executor(executor_ref ex) noexcept
     491              :         {
     492           92 :             executor_ = ex;
     493           92 :         }
     494              : 
     495              :         void
     496           92 :         set_stop_token(std::stop_token token) noexcept
     497              :         {
     498           92 :             stop_token_ = token;
     499           92 :         }
     500              : 
     501              :         void
     502            0 :         set_caller(coro h) noexcept
     503              :         {
     504            0 :             caller_h_ = h;
     505            0 :         }
     506              : 
     507              :         template<class Awaitable>
     508              :         struct transform_awaiter
     509              :         {
     510              :             std::decay_t<Awaitable> a_;
     511              :             promise_type* p_;
     512              : 
     513           92 :             bool await_ready()
     514              :             {
     515           92 :                 return a_.await_ready();
     516              :             }
     517              : 
     518           92 :             decltype(auto) await_resume()
     519              :             {
     520           92 :                 return a_.await_resume();
     521              :             }
     522              : 
     523            0 :             auto await_suspend(coro h)
     524              :             {
     525            0 :                 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
     526              :             }
     527              :         };
     528              : 
     529              :         template<class Awaitable>
     530           92 :         auto await_transform(Awaitable&& a)
     531              :         {
     532              :             using A = std::decay_t<Awaitable>;
     533              :             if constexpr (IoAwaitable<A>)
     534              :             {
     535              :                 return transform_awaiter<Awaitable>{
     536           92 :                     std::forward<Awaitable>(a), this};
     537              :             }
     538              :             else
     539              :             {
     540              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     541              :             }
     542              :         }
     543              :     };
     544              : 
     545              :     std::coroutine_handle<promise_type> h_;
     546              : 
     547          148 :     ~pull_op()
     548              :     {
     549          148 :         if(h_)
     550           75 :             h_.destroy();
     551          148 :     }
     552              : 
     553              :     pull_op(pull_op const&) = delete;
     554              :     pull_op& operator=(pull_op const&) = delete;
     555              : 
     556              :     pull_op(pull_op&& other) noexcept
     557              :         : h_(std::exchange(other.h_, nullptr))
     558              :     {
     559              :     }
     560              : 
     561              :     pull_op& operator=(pull_op&& other) noexcept
     562              :     {
     563              :         if(this != &other)
     564              :         {
     565              :             if(h_)
     566              :                 h_.destroy();
     567              :             h_ = std::exchange(other.h_, nullptr);
     568              :         }
     569              :         return *this;
     570              :     }
     571              : 
     572              : private:
     573              :     explicit
     574          148 :     pull_op(std::coroutine_handle<promise_type> h) noexcept
     575          148 :         : h_(h)
     576              :     {
     577          148 :     }
     578              : };
     579              : 
     580              : //----------------------------------------------------------
     581              : 
     582              : inline void*
     583          148 : any_buffer_source::alloc_frame(std::size_t size)
     584              : {
     585          148 :     if(cached_frame_ && cached_size_ >= size)
     586           92 :         return cached_frame_;
     587              : 
     588           56 :     if(cached_frame_)
     589            0 :         ::operator delete(cached_frame_);
     590              : 
     591           56 :     cached_frame_ = ::operator new(size);
     592           56 :     cached_size_ = size;
     593           56 :     return cached_frame_;
     594              : }
     595              : 
     596              : inline void
     597              : any_buffer_source::free_frame(void*, std::size_t)
     598              : {
     599              :     // Keep the frame cached for reuse
     600              : }
     601              : 
     602              : #if defined(__GNUC__) && !defined(__clang__)
     603              : #pragma GCC diagnostic push
     604              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     605              : #endif
     606              : 
     607              : template<BufferSource S>
     608              : any_buffer_source::pull_op
     609          148 : any_buffer_source::pull_coro(
     610              :     any_buffer_source*,
     611              :     S& source,
     612              :     const_buffer* arr,
     613              :     std::size_t max_count,
     614              :     std::error_code* out_ec,
     615              :     std::size_t* out_count)
     616              : {
     617              :     auto [err, count] = co_await source.pull(arr, max_count);
     618              : 
     619              :     *out_ec = err;
     620              :     *out_count = count;
     621          296 : }
     622              : 
     623              : #if defined(__GNUC__) && !defined(__clang__)
     624              : #pragma GCC diagnostic pop
     625              : #endif
     626              : 
     627              : template<BufferSource S>
     628              : coro
     629           92 : any_buffer_source::do_pull_impl(
     630              :     void* source,
     631              :     any_buffer_source* wrapper,
     632              :     const_buffer* arr,
     633              :     std::size_t max_count,
     634              :     coro h,
     635              :     executor_ref ex,
     636              :     std::stop_token token,
     637              :     std::error_code* ec,
     638              :     std::size_t* count)
     639              : {
     640           92 :     auto& s = *static_cast<S*>(source);
     641              : 
     642              :     // Create coroutine - frame is cached in wrapper
     643           92 :     auto op = pull_coro<S>(wrapper, s, arr, max_count, ec, count);
     644              : 
     645              :     // Set executor and stop token on promise before resuming
     646           92 :     op.h_.promise().set_executor(ex);
     647           92 :     op.h_.promise().set_stop_token(token);
     648              : 
     649              :     // Resume the coroutine to start the operation
     650           92 :     op.h_.resume();
     651              : 
     652              :     // Check if operation completed synchronously
     653           73 :     if(op.h_.done())
     654              :     {
     655           73 :         op.h_.destroy();
     656           73 :         op.h_ = nullptr;
     657           73 :         return ex.dispatch(h);
     658              :     }
     659              : 
     660              :     // Operation is pending - caller will be resumed via symmetric transfer
     661            0 :     op.h_.promise().set_caller(h);
     662            0 :     op.h_ = nullptr;
     663            0 :     return std::noop_coroutine();
     664           92 : }
     665              : 
     666              : //----------------------------------------------------------
     667              : 
     668              : inline void
     669           39 : any_buffer_source::consume(std::size_t n) noexcept
     670              : {
     671           39 :     vt_->do_consume(source_, n);
     672           39 : }
     673              : 
     674              : inline auto
     675           92 : any_buffer_source::pull(
     676              :     const_buffer* arr,
     677              :     std::size_t max_count)
     678              : {
     679              :     struct awaitable
     680              :     {
     681              :         any_buffer_source* self_;
     682              :         const_buffer* arr_;
     683              :         std::size_t max_count_;
     684              :         std::error_code ec_;
     685              :         std::size_t count_ = 0;
     686              : 
     687              :         bool
     688           92 :         await_ready() const noexcept
     689              :         {
     690           92 :             return false;
     691              :         }
     692              : 
     693              :         coro
     694           92 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     695              :         {
     696          184 :             return self_->vt_->do_pull(
     697           92 :                 self_->source_,
     698              :                 self_,
     699              :                 arr_,
     700              :                 max_count_,
     701              :                 h,
     702              :                 ex,
     703              :                 token,
     704              :                 &ec_,
     705          146 :                 &count_);
     706              :         }
     707              : 
     708              :         io_result<std::size_t>
     709           73 :         await_resume() const noexcept
     710              :         {
     711           73 :             return {ec_, count_};
     712              :         }
     713              :     };
     714           92 :     return awaitable{this, arr, max_count, {}, 0};
     715              : }
     716              : 
     717              : template<MutableBufferSequence MB>
     718              : task<io_result<std::size_t>>
     719              : any_buffer_source::read(MB buffers)
     720              : {
     721              :     std::size_t total = 0;
     722              :     auto dest = sans_prefix(buffers, 0);
     723              : 
     724              :     while(!buffer_empty(dest))
     725              :     {
     726              :         const_buffer arr[detail::max_iovec_];
     727              :         auto [ec, count] = co_await pull(arr, detail::max_iovec_);
     728              : 
     729              :         if(ec)
     730              :             co_return {ec, total};
     731              : 
     732              :         if(count == 0)
     733              :             co_return {error::eof, total};
     734              : 
     735              :         auto n = buffer_copy(dest, std::span(arr, count));
     736              :         consume(n);
     737              :         total += n;
     738              :         dest = sans_prefix(dest, n);
     739              :     }
     740              : 
     741              :     co_return {{}, total};
     742              : }
     743              : 
     744              : //----------------------------------------------------------
     745              : 
     746              : static_assert(BufferSource<any_buffer_source>);
     747              : static_assert(ReadSource<any_buffer_source>);
     748              : 
     749              : } // namespace capy
     750              : } // namespace boost
     751              : 
     752              : #endif
        

Generated by: LCOV version 2.3