LCOV - code coverage report
Current view: top level - boost/capy/io - any_buffer_sink.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 87.3 % 165 144
Test Date: 2026-01-30 23:43:15 Functions: 89.5 % 57 51

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/buffers.hpp>
      15              : #include <boost/capy/buffers/buffer_copy.hpp>
      16              : #include <boost/capy/buffers/buffer_param.hpp>
      17              : #include <boost/capy/concept/buffer_sink.hpp>
      18              : #include <boost/capy/concept/io_awaitable.hpp>
      19              : #include <boost/capy/concept/write_sink.hpp>
      20              : #include <boost/capy/coro.hpp>
      21              : #include <boost/capy/ex/executor_ref.hpp>
      22              : #include <boost/capy/io_result.hpp>
      23              : #include <boost/capy/task.hpp>
      24              : 
      25              : #include <system_error>
      26              : 
      27              : #include <concepts>
      28              : #include <coroutine>
      29              : #include <cstddef>
      30              : #include <exception>
      31              : #include <stop_token>
      32              : #include <utility>
      33              : 
      34              : namespace boost {
      35              : namespace capy {
      36              : 
      37              : /** Type-erased wrapper for any BufferSink.
      38              : 
      39              :     This class provides type erasure for any type satisfying the
      40              :     @ref BufferSink concept, enabling runtime polymorphism for
      41              :     buffer sink operations. It uses a cached coroutine frame to achieve
      42              :     zero steady-state allocation after construction.
      43              : 
      44              :     The wrapper also satisfies @ref WriteSink through templated
      45              :     @ref write methods. These methods copy data from the caller's
      46              :     buffers into the sink's internal storage, incurring one extra
      47              :     buffer copy compared to using @ref prepare and @ref commit
      48              :     directly.
      49              : 
      50              :     The wrapper supports two construction modes:
      51              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      52              :       allocates storage and owns the sink.
      53              :     - **Reference**: Pass a pointer to wrap without ownership. The
      54              :       pointed-to sink must outlive this wrapper.
      55              : 
      56              :     @par Frame Preallocation
      57              :     The constructor preallocates the internal coroutine frame.
      58              :     This reserves all virtual address space at server startup
      59              :     so memory usage can be measured up front, rather than
      60              :     allocating piecemeal as traffic arrives.
      61              : 
      62              :     @par Thread Safety
      63              :     Not thread-safe. Concurrent operations on the same wrapper
      64              :     are undefined behavior.
      65              : 
      66              :     @par Example
      67              :     @code
      68              :     // Owning - takes ownership of the sink
      69              :     any_buffer_sink abs(some_buffer_sink{args...});
      70              : 
      71              :     // Reference - wraps without ownership
      72              :     some_buffer_sink sink;
      73              :     any_buffer_sink abs(&sink);
      74              : 
      75              :     mutable_buffer arr[16];
      76              :     std::size_t count = abs.prepare(arr, 16);
      77              :     // Write data into arr[0..count)
      78              :     auto [ec] = co_await abs.commit(bytes_written);
      79              :     auto [ec2] = co_await abs.commit_eof();
      80              :     @endcode
      81              : 
      82              :     @see any_buffer_source, BufferSink, WriteSink
      83              : */
      84              : class any_buffer_sink
      85              : {
      86              :     struct vtable;
      87              : 
      88              :     template<BufferSink S>
      89              :     struct vtable_for_impl;
      90              : 
      91              :     struct commit_op;
      92              : 
      93              :     void* sink_ = nullptr;
      94              :     vtable const* vt_ = nullptr;
      95              :     void* cached_frame_ = nullptr;
      96              :     std::size_t cached_size_ = 0;
      97              :     void* storage_ = nullptr;
      98              : 
      99              :     template<BufferSink S>
     100              :     static std::size_t
     101              :     do_prepare_impl(
     102              :         void* sink,
     103              :         mutable_buffer* arr,
     104              :         std::size_t max_count);
     105              : 
     106              :     template<BufferSink S>
     107              :     static coro
     108              :     do_commit_impl(
     109              :         void* sink,
     110              :         any_buffer_sink* wrapper,
     111              :         std::size_t n,
     112              :         bool eof,
     113              :         coro h,
     114              :         executor_ref ex,
     115              :         std::stop_token token,
     116              :         std::error_code* ec);
     117              : 
     118              :     template<BufferSink S>
     119              :     static coro
     120              :     do_commit_eof_impl(
     121              :         void* sink,
     122              :         any_buffer_sink* wrapper,
     123              :         coro h,
     124              :         executor_ref ex,
     125              :         std::stop_token token,
     126              :         std::error_code* ec);
     127              : 
     128              :     template<BufferSink S>
     129              :     static commit_op
     130              :     commit_coro(
     131              :         any_buffer_sink* wrapper,
     132              :         S& sink,
     133              :         std::size_t n,
     134              :         std::error_code* out_ec);
     135              : 
     136              :     template<BufferSink S>
     137              :     static commit_op
     138              :     commit_with_eof_coro(
     139              :         any_buffer_sink* wrapper,
     140              :         S& sink,
     141              :         std::size_t n,
     142              :         bool eof,
     143              :         std::error_code* out_ec);
     144              : 
     145              :     template<BufferSink S>
     146              :     static commit_op
     147              :     commit_eof_coro(
     148              :         any_buffer_sink* wrapper,
     149              :         S& sink,
     150              :         std::error_code* out_ec);
     151              : 
     152              :     void* alloc_frame(std::size_t size);
     153              :     void free_frame(void* p, std::size_t size);
     154              : 
     155              : public:
     156              :     /** Destructor.
     157              : 
     158              :         Destroys the owned sink (if any) and releases the cached
     159              :         coroutine frame.
     160              :     */
     161              :     ~any_buffer_sink();
     162              : 
     163              :     /** Default constructor.
     164              : 
     165              :         Constructs an empty wrapper. Operations on a default-constructed
     166              :         wrapper result in undefined behavior.
     167              :     */
     168              :     any_buffer_sink() = default;
     169              : 
     170              :     /** Non-copyable.
     171              : 
     172              :         The frame cache is per-instance and cannot be shared.
     173              :     */
     174              :     any_buffer_sink(any_buffer_sink const&) = delete;
     175              :     any_buffer_sink& operator=(any_buffer_sink const&) = delete;
     176              : 
     177              :     /** Move constructor.
     178              : 
     179              :         Transfers ownership of the wrapped sink (if owned) and
     180              :         cached frame from `other`. After the move, `other` is
     181              :         in a default-constructed state.
     182              : 
     183              :         @param other The wrapper to move from.
     184              :     */
     185            1 :     any_buffer_sink(any_buffer_sink&& other) noexcept
     186            1 :         : sink_(std::exchange(other.sink_, nullptr))
     187            1 :         , vt_(std::exchange(other.vt_, nullptr))
     188            1 :         , cached_frame_(std::exchange(other.cached_frame_, nullptr))
     189            1 :         , cached_size_(std::exchange(other.cached_size_, 0))
     190            1 :         , storage_(std::exchange(other.storage_, nullptr))
     191              :     {
     192            1 :     }
     193              : 
     194              :     /** Move assignment operator.
     195              : 
     196              :         Destroys any owned sink and releases existing resources,
     197              :         then transfers ownership from `other`.
     198              : 
     199              :         @param other The wrapper to move from.
     200              :         @return Reference to this wrapper.
     201              :     */
     202              :     any_buffer_sink&
     203              :     operator=(any_buffer_sink&& other) noexcept;
     204              : 
     205              :     /** Construct by taking ownership of a BufferSink.
     206              : 
     207              :         Allocates storage and moves the sink into this wrapper.
     208              :         The wrapper owns the sink and will destroy it.
     209              : 
     210              :         @param s The sink to take ownership of.
     211              :     */
     212              :     template<BufferSink S>
     213              :         requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
     214              :     any_buffer_sink(S s);
     215              : 
     216              :     /** Construct by wrapping a BufferSink without ownership.
     217              : 
     218              :         Wraps the given sink by pointer. The sink must remain
     219              :         valid for the lifetime of this wrapper.
     220              : 
     221              :         @param s Pointer to the sink to wrap.
     222              :     */
     223              :     template<BufferSink S>
     224           60 :     any_buffer_sink(S* s) noexcept
     225           60 :         : sink_(s)
     226           60 :         , vt_(&vtable_for_impl<S>::value)
     227              :     {
     228              :         // Preallocate coroutine frames to find max size
     229           60 :         commit_coro<S>(this, *s, 0, nullptr);
     230           60 :         commit_with_eof_coro<S>(this, *s, 0, false, nullptr);
     231           60 :         commit_eof_coro<S>(this, *s, nullptr);
     232           60 :     }
     233              : 
     234              :     /** Check if the wrapper contains a valid sink.
     235              : 
     236              :         @return `true` if wrapping a sink, `false` if default-constructed
     237              :             or moved-from.
     238              :     */
     239              :     bool
     240            9 :     has_value() const noexcept
     241              :     {
     242            9 :         return sink_ != nullptr;
     243              :     }
     244              : 
     245              :     /** Check if the wrapper contains a valid sink.
     246              : 
     247              :         @return `true` if wrapping a sink, `false` if default-constructed
     248              :             or moved-from.
     249              :     */
     250              :     explicit
     251            2 :     operator bool() const noexcept
     252              :     {
     253            2 :         return has_value();
     254              :     }
     255              : 
     256              :     /** Prepare writable buffers.
     257              : 
     258              :         Fills the provided array with mutable buffer descriptors
     259              :         pointing to the underlying sink's internal storage. This
     260              :         operation is synchronous.
     261              : 
     262              :         @param arr Pointer to array of mutable_buffer to fill.
     263              :         @param max_count Maximum number of buffers to fill.
     264              : 
     265              :         @return The number of buffers filled.
     266              : 
     267              :         @par Preconditions
     268              :         The wrapper must contain a valid sink (`has_value() == true`).
     269              :     */
     270              :     std::size_t
     271              :     prepare(mutable_buffer* arr, std::size_t max_count);
     272              : 
     273              :     /** Commit bytes written to the prepared buffers.
     274              : 
     275              :         Commits `n` bytes written to the buffers returned by the
     276              :         most recent call to @ref prepare. The operation may trigger
     277              :         underlying I/O.
     278              : 
     279              :         @param n The number of bytes to commit.
     280              : 
     281              :         @return An awaitable yielding `(error_code)`.
     282              : 
     283              :         @par Preconditions
     284              :         The wrapper must contain a valid sink (`has_value() == true`).
     285              :     */
     286              :     auto
     287              :     commit(std::size_t n);
     288              : 
     289              :     /** Commit bytes written with optional end-of-stream.
     290              : 
     291              :         Commits `n` bytes written to the buffers returned by the
     292              :         most recent call to @ref prepare. If `eof` is true, also
     293              :         signals end-of-stream.
     294              : 
     295              :         @param n The number of bytes to commit.
     296              :         @param eof If true, signals end-of-stream after committing.
     297              : 
     298              :         @return An awaitable yielding `(error_code)`.
     299              : 
     300              :         @par Preconditions
     301              :         The wrapper must contain a valid sink (`has_value() == true`).
     302              :     */
     303              :     auto
     304              :     commit(std::size_t n, bool eof);
     305              : 
     306              :     /** Signal end-of-stream.
     307              : 
     308              :         Indicates that no more data will be written to the sink.
     309              :         The operation completes when the sink is finalized, or
     310              :         an error occurs.
     311              : 
     312              :         @return An awaitable yielding `(error_code)`.
     313              : 
     314              :         @par Preconditions
     315              :         The wrapper must contain a valid sink (`has_value() == true`).
     316              :     */
     317              :     auto
     318              :     commit_eof();
     319              : 
     320              :     /** Write data from a buffer sequence.
     321              : 
     322              :         Writes all data from the buffer sequence to the underlying
     323              :         sink. This method satisfies the @ref WriteSink concept.
     324              : 
     325              :         @note This operation copies data from the caller's buffers
     326              :         into the sink's internal buffers. For zero-copy writes,
     327              :         use @ref prepare and @ref commit directly.
     328              : 
     329              :         @param buffers The buffer sequence to write.
     330              : 
     331              :         @return An awaitable yielding `(error_code,std::size_t)`.
     332              : 
     333              :         @par Preconditions
     334              :         The wrapper must contain a valid sink (`has_value() == true`).
     335              :     */
     336              :     template<ConstBufferSequence CB>
     337              :     task<io_result<std::size_t>>
     338              :     write(CB buffers);
     339              : 
     340              :     /** Write data with optional end-of-stream.
     341              : 
     342              :         Writes all data from the buffer sequence to the underlying
     343              :         sink, optionally finalizing it afterwards. This method
     344              :         satisfies the @ref WriteSink concept.
     345              : 
     346              :         @note This operation copies data from the caller's buffers
     347              :         into the sink's internal buffers. For zero-copy writes,
     348              :         use @ref prepare and @ref commit directly.
     349              : 
     350              :         @param buffers The buffer sequence to write.
     351              :         @param eof If true, finalize the sink after writing.
     352              : 
     353              :         @return An awaitable yielding `(error_code,std::size_t)`.
     354              : 
     355              :         @par Preconditions
     356              :         The wrapper must contain a valid sink (`has_value() == true`).
     357              :     */
     358              :     template<ConstBufferSequence CB>
     359              :     task<io_result<std::size_t>>
     360              :     write(CB buffers, bool eof);
     361              : 
     362              :     /** Signal end-of-stream.
     363              : 
     364              :         Indicates that no more data will be written to the sink.
     365              :         This method satisfies the @ref WriteSink concept.
     366              : 
     367              :         @return An awaitable yielding `(error_code)`.
     368              : 
     369              :         @par Preconditions
     370              :         The wrapper must contain a valid sink (`has_value() == true`).
     371              :     */
     372              :     auto
     373              :     write_eof();
     374              : 
     375              : protected:
     376              :     /** Rebind to a new sink after move.
     377              : 
     378              :         Updates the internal pointer to reference a new sink object.
     379              :         Used by owning wrappers after move assignment when the owned
     380              :         object has moved to a new location.
     381              : 
     382              :         @param new_sink The new sink to bind to. Must be the same
     383              :             type as the original sink.
     384              : 
     385              :         @note Terminates if called with a sink of different type
     386              :             than the original.
     387              :     */
     388              :     template<BufferSink S>
     389              :     void
     390              :     rebind(S& new_sink) noexcept
     391              :     {
     392              :         if(vt_ != &vtable_for_impl<S>::value)
     393              :             std::terminate();
     394              :         sink_ = &new_sink;
     395              :     }
     396              : };
     397              : 
     398              : //----------------------------------------------------------
     399              : 
     400              : struct any_buffer_sink::vtable
     401              : {
     402              :     void (*destroy)(void*) noexcept;
     403              : 
     404              :     std::size_t (*do_prepare)(
     405              :         void* sink,
     406              :         mutable_buffer* arr,
     407              :         std::size_t max_count);
     408              : 
     409              :     coro (*do_commit)(
     410              :         void* sink,
     411              :         any_buffer_sink* wrapper,
     412              :         std::size_t n,
     413              :         bool eof,
     414              :         coro h,
     415              :         executor_ref ex,
     416              :         std::stop_token token,
     417              :         std::error_code* ec);
     418              : 
     419              :     coro (*do_commit_eof)(
     420              :         void* sink,
     421              :         any_buffer_sink* wrapper,
     422              :         coro h,
     423              :         executor_ref ex,
     424              :         std::stop_token token,
     425              :         std::error_code* ec);
     426              : };
     427              : 
     428              : template<BufferSink S>
     429              : struct any_buffer_sink::vtable_for_impl
     430              : {
     431              :     static void
     432            0 :     do_destroy_impl(void* sink) noexcept
     433              :     {
     434            0 :         static_cast<S*>(sink)->~S();
     435            0 :     }
     436              : 
     437              :     static constexpr vtable value = {
     438              :         &do_destroy_impl,
     439              :         &any_buffer_sink::do_prepare_impl<S>,
     440              :         &any_buffer_sink::do_commit_impl<S>,
     441              :         &any_buffer_sink::do_commit_eof_impl<S>
     442              :     };
     443              : };
     444              : 
     445              : //----------------------------------------------------------
     446              : 
     447              : inline
     448           63 : any_buffer_sink::~any_buffer_sink()
     449              : {
     450           63 :     if(storage_)
     451              :     {
     452            0 :         vt_->destroy(sink_);
     453            0 :         ::operator delete(storage_);
     454              :     }
     455           63 :     if(cached_frame_)
     456           60 :         ::operator delete(cached_frame_);
     457           63 : }
     458              : 
     459              : inline any_buffer_sink&
     460            1 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
     461              : {
     462            1 :     if(this != &other)
     463              :     {
     464            1 :         if(storage_)
     465              :         {
     466            0 :             vt_->destroy(sink_);
     467            0 :             ::operator delete(storage_);
     468              :         }
     469            1 :         if(cached_frame_)
     470            0 :             ::operator delete(cached_frame_);
     471            1 :         sink_ = std::exchange(other.sink_, nullptr);
     472            1 :         vt_ = std::exchange(other.vt_, nullptr);
     473            1 :         cached_frame_ = std::exchange(other.cached_frame_, nullptr);
     474            1 :         cached_size_ = std::exchange(other.cached_size_, 0);
     475            1 :         storage_ = std::exchange(other.storage_, nullptr);
     476              :     }
     477            1 :     return *this;
     478              : }
     479              : 
     480              : template<BufferSink S>
     481              :     requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
     482              : any_buffer_sink::any_buffer_sink(S s)
     483              :     : vt_(&vtable_for_impl<S>::value)
     484              : {
     485              :     struct guard {
     486              :         any_buffer_sink* self;
     487              :         bool committed = false;
     488              :         ~guard() {
     489              :             if(!committed && self->storage_) {
     490              :                 self->vt_->destroy(self->sink_);
     491              :                 ::operator delete(self->storage_);
     492              :                 self->storage_ = nullptr;
     493              :                 self->sink_ = nullptr;
     494              :             }
     495              :         }
     496              :     } g{this};
     497              : 
     498              :     storage_ = ::operator new(sizeof(S));
     499              :     sink_ = ::new(storage_) S(std::move(s));
     500              : 
     501              :     // Preallocate coroutine frames to find max size
     502              :     auto& ref = *static_cast<S*>(sink_);
     503              :     commit_coro<S>(this, ref, 0, nullptr);
     504              :     commit_with_eof_coro<S>(this, ref, 0, false, nullptr);
     505              :     commit_eof_coro<S>(this, ref, nullptr);
     506              : 
     507              :     g.committed = true;
     508              : }
     509              : 
     510              : //----------------------------------------------------------
     511              : 
     512              : struct any_buffer_sink::commit_op
     513              : {
     514              :     struct promise_type
     515              :     {
     516              :         executor_ref executor_;
     517              :         std::stop_token stop_token_;
     518              :         coro caller_h_{};
     519              : 
     520          246 :         promise_type() = default;
     521              : 
     522              :         commit_op
     523          246 :         get_return_object() noexcept
     524              :         {
     525              :             return commit_op{
     526          246 :                 std::coroutine_handle<promise_type>::from_promise(*this)};
     527              :         }
     528              : 
     529              :         std::suspend_always
     530          246 :         initial_suspend() noexcept
     531              :         {
     532          246 :             return {};
     533              :         }
     534              : 
     535              :         auto
     536           50 :         final_suspend() noexcept
     537              :         {
     538              :             struct awaiter
     539              :             {
     540              :                 promise_type* p_;
     541              : 
     542           50 :                 bool await_ready() const noexcept { return false; }
     543              : 
     544           50 :                 coro await_suspend(coro) const noexcept
     545              :                 {
     546           50 :                     if(p_->caller_h_)
     547            0 :                         return p_->caller_h_;
     548           50 :                     return std::noop_coroutine();
     549              :                 }
     550              : 
     551            0 :                 void await_resume() const noexcept {}
     552              :             };
     553           50 :             return awaiter{this};
     554              :         }
     555              : 
     556              :         void
     557           50 :         return_void() noexcept
     558              :         {
     559           50 :         }
     560              : 
     561              :         void
     562           16 :         unhandled_exception()
     563              :         {
     564           16 :             throw;
     565              :         }
     566              : 
     567              :         template<class... Args>
     568              :         static void*
     569          246 :         operator new(
     570              :             std::size_t size,
     571              :             any_buffer_sink* wrapper,
     572              :             Args&&...)
     573              :         {
     574          246 :             return wrapper->alloc_frame(size);
     575              :         }
     576              : 
     577              :         template<class... Args>
     578              :         static void
     579              :         operator delete(void*, any_buffer_sink*, Args&&...) noexcept
     580              :         {
     581              :         }
     582              : 
     583              :         static void
     584          246 :         operator delete(void*, std::size_t) noexcept
     585              :         {
     586          246 :         }
     587              : 
     588              :         void
     589           66 :         set_executor(executor_ref ex) noexcept
     590              :         {
     591           66 :             executor_ = ex;
     592           66 :         }
     593              : 
     594              :         void
     595           66 :         set_stop_token(std::stop_token token) noexcept
     596              :         {
     597           66 :             stop_token_ = token;
     598           66 :         }
     599              : 
     600              :         void
     601            0 :         set_caller(coro h) noexcept
     602              :         {
     603            0 :             caller_h_ = h;
     604            0 :         }
     605              : 
     606              :         template<class Awaitable>
     607              :         struct transform_awaiter
     608              :         {
     609              :             std::decay_t<Awaitable> a_;
     610              :             promise_type* p_;
     611              : 
     612           66 :             bool await_ready()
     613              :             {
     614           66 :                 return a_.await_ready();
     615              :             }
     616              : 
     617           66 :             auto await_resume()
     618              :             {
     619           66 :                 return a_.await_resume();
     620              :             }
     621              : 
     622            0 :             auto await_suspend(coro h)
     623              :             {
     624            0 :                 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
     625              :             }
     626              :         };
     627              : 
     628              :         template<class Awaitable>
     629           66 :         auto await_transform(Awaitable&& a)
     630              :         {
     631              :             using A = std::decay_t<Awaitable>;
     632              :             if constexpr (IoAwaitable<A>)
     633              :             {
     634              :                 return transform_awaiter<Awaitable>{
     635           66 :                     std::forward<Awaitable>(a), this};
     636              :             }
     637              :             else
     638              :             {
     639              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     640              :             }
     641              :         }
     642              :     };
     643              : 
     644              :     std::coroutine_handle<promise_type> h_;
     645              : 
     646          246 :     ~commit_op()
     647              :     {
     648          246 :         if(h_)
     649          196 :             h_.destroy();
     650          246 :     }
     651              : 
     652              :     commit_op(commit_op const&) = delete;
     653              :     commit_op& operator=(commit_op const&) = delete;
     654              : 
     655              :     commit_op(commit_op&& other) noexcept
     656              :         : h_(std::exchange(other.h_, nullptr))
     657              :     {
     658              :     }
     659              : 
     660              :     commit_op& operator=(commit_op&& other) noexcept
     661              :     {
     662              :         if(this != &other)
     663              :         {
     664              :             if(h_)
     665              :                 h_.destroy();
     666              :             h_ = std::exchange(other.h_, nullptr);
     667              :         }
     668              :         return *this;
     669              :     }
     670              : 
     671              : private:
     672              :     explicit
     673          246 :     commit_op(std::coroutine_handle<promise_type> h) noexcept
     674          246 :         : h_(h)
     675              :     {
     676          246 :     }
     677              : };
     678              : 
     679              : //----------------------------------------------------------
     680              : 
     681              : inline void*
     682          246 : any_buffer_sink::alloc_frame(std::size_t size)
     683              : {
     684          246 :     if(cached_frame_ && cached_size_ >= size)
     685          126 :         return cached_frame_;
     686              : 
     687          120 :     if(cached_frame_)
     688           60 :         ::operator delete(cached_frame_);
     689              : 
     690          120 :     cached_frame_ = ::operator new(size);
     691          120 :     cached_size_ = size;
     692          120 :     return cached_frame_;
     693              : }
     694              : 
     695              : inline void
     696              : any_buffer_sink::free_frame(void*, std::size_t)
     697              : {
     698              :     // Keep the frame cached for reuse
     699              : }
     700              : 
     701              : template<BufferSink S>
     702              : std::size_t
     703           68 : any_buffer_sink::do_prepare_impl(
     704              :     void* sink,
     705              :     mutable_buffer* arr,
     706              :     std::size_t max_count)
     707              : {
     708           68 :     auto& s = *static_cast<S*>(sink);
     709           68 :     return s.prepare(arr, max_count);
     710              : }
     711              : 
     712              : #if defined(__GNUC__) && !defined(__clang__)
     713              : #pragma GCC diagnostic push
     714              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     715              : #endif
     716              : 
     717              : template<BufferSink S>
     718              : any_buffer_sink::commit_op
     719           98 : any_buffer_sink::commit_coro(
     720              :     any_buffer_sink*,
     721              :     S& sink,
     722              :     std::size_t n,
     723              :     std::error_code* out_ec)
     724              : {
     725              :     auto [err] = co_await sink.commit(n);
     726              :     *out_ec = err;
     727          196 : }
     728              : 
     729              : template<BufferSink S>
     730              : any_buffer_sink::commit_op
     731           70 : any_buffer_sink::commit_with_eof_coro(
     732              :     any_buffer_sink*,
     733              :     S& sink,
     734              :     std::size_t n,
     735              :     bool eof,
     736              :     std::error_code* out_ec)
     737              : {
     738              :     auto [err] = co_await sink.commit(n, eof);
     739              :     *out_ec = err;
     740          140 : }
     741              : 
     742              : template<BufferSink S>
     743              : any_buffer_sink::commit_op
     744           78 : any_buffer_sink::commit_eof_coro(
     745              :     any_buffer_sink*,
     746              :     S& sink,
     747              :     std::error_code* out_ec)
     748              : {
     749              :     auto [err] = co_await sink.commit_eof();
     750              :     *out_ec = err;
     751          156 : }
     752              : 
     753              : #if defined(__GNUC__) && !defined(__clang__)
     754              : #pragma GCC diagnostic pop
     755              : #endif
     756              : 
     757              : template<BufferSink S>
     758              : coro
     759           48 : any_buffer_sink::do_commit_impl(
     760              :     void* sink,
     761              :     any_buffer_sink* wrapper,
     762              :     std::size_t n,
     763              :     bool eof,
     764              :     coro h,
     765              :     executor_ref ex,
     766              :     std::stop_token token,
     767              :     std::error_code* ec)
     768              : {
     769           48 :     auto& s = *static_cast<S*>(sink);
     770              : 
     771              :     // Create coroutine - frame is cached in wrapper
     772           48 :     auto op = eof
     773           48 :         ? commit_with_eof_coro<S>(wrapper, s, n, eof, ec)
     774              :         : commit_coro<S>(wrapper, s, n, ec);
     775              : 
     776              :     // Set executor and stop token on promise before resuming
     777           48 :     op.h_.promise().set_executor(ex);
     778           48 :     op.h_.promise().set_stop_token(token);
     779              : 
     780              :     // Resume the coroutine to start the operation
     781           48 :     op.h_.resume();
     782              : 
     783              :     // Check if operation completed synchronously
     784           37 :     if(op.h_.done())
     785              :     {
     786           37 :         op.h_.destroy();
     787           37 :         op.h_ = nullptr;
     788           37 :         return ex.dispatch(h);
     789              :     }
     790              : 
     791              :     // Operation is pending - caller will be resumed via symmetric transfer
     792            0 :     op.h_.promise().set_caller(h);
     793            0 :     op.h_ = nullptr;
     794            0 :     return std::noop_coroutine();
     795           48 : }
     796              : 
     797              : template<BufferSink S>
     798              : coro
     799           18 : any_buffer_sink::do_commit_eof_impl(
     800              :     void* sink,
     801              :     any_buffer_sink* wrapper,
     802              :     coro h,
     803              :     executor_ref ex,
     804              :     std::stop_token token,
     805              :     std::error_code* ec)
     806              : {
     807           18 :     auto& s = *static_cast<S*>(sink);
     808              : 
     809              :     // Create coroutine - frame is cached in wrapper
     810           18 :     auto op = commit_eof_coro<S>(wrapper, s, ec);
     811              : 
     812              :     // Set executor and stop token on promise before resuming
     813           18 :     op.h_.promise().set_executor(ex);
     814           18 :     op.h_.promise().set_stop_token(token);
     815              : 
     816              :     // Resume the coroutine to start the operation
     817           18 :     op.h_.resume();
     818              : 
     819              :     // Check if operation completed synchronously
     820           13 :     if(op.h_.done())
     821              :     {
     822           13 :         op.h_.destroy();
     823           13 :         op.h_ = nullptr;
     824           13 :         return ex.dispatch(h);
     825              :     }
     826              : 
     827              :     // Operation is pending - caller will be resumed via symmetric transfer
     828            0 :     op.h_.promise().set_caller(h);
     829            0 :     op.h_ = nullptr;
     830            0 :     return std::noop_coroutine();
     831           18 : }
     832              : 
     833              : //----------------------------------------------------------
     834              : 
     835              : inline std::size_t
     836           68 : any_buffer_sink::prepare(
     837              :     mutable_buffer* arr,
     838              :     std::size_t max_count)
     839              : {
     840           68 :     return vt_->do_prepare(sink_, arr, max_count);
     841              : }
     842              : 
     843              : inline auto
     844           38 : any_buffer_sink::commit(std::size_t n)
     845              : {
     846              :     struct awaitable
     847              :     {
     848              :         any_buffer_sink* self_;
     849              :         std::size_t n_;
     850              :         std::error_code ec_;
     851              : 
     852              :         bool
     853           38 :         await_ready() const noexcept
     854              :         {
     855           38 :             return false;
     856              :         }
     857              : 
     858              :         coro
     859           38 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     860              :         {
     861           76 :             return self_->vt_->do_commit(
     862           38 :                 self_->sink_,
     863              :                 self_,
     864              :                 n_,
     865              :                 false,
     866              :                 h,
     867              :                 ex,
     868              :                 token,
     869           60 :                 &ec_);
     870              :         }
     871              : 
     872              :         io_result<>
     873           30 :         await_resume() const noexcept
     874              :         {
     875           30 :             return {ec_};
     876              :         }
     877              :     };
     878           38 :     return awaitable{this, n, {}};
     879              : }
     880              : 
     881              : inline auto
     882           10 : any_buffer_sink::commit(std::size_t n, bool eof)
     883              : {
     884              :     struct awaitable
     885              :     {
     886              :         any_buffer_sink* self_;
     887              :         std::size_t n_;
     888              :         bool eof_;
     889              :         std::error_code ec_;
     890              : 
     891              :         bool
     892           10 :         await_ready() const noexcept
     893              :         {
     894           10 :             return false;
     895              :         }
     896              : 
     897              :         coro
     898           10 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     899              :         {
     900           20 :             return self_->vt_->do_commit(
     901           10 :                 self_->sink_,
     902              :                 self_,
     903              :                 n_,
     904           10 :                 eof_,
     905              :                 h,
     906              :                 ex,
     907              :                 token,
     908           14 :                 &ec_);
     909              :         }
     910              : 
     911              :         io_result<>
     912            7 :         await_resume() const noexcept
     913              :         {
     914            7 :             return {ec_};
     915              :         }
     916              :     };
     917           10 :     return awaitable{this, n, eof, {}};
     918              : }
     919              : 
     920              : inline auto
     921           18 : any_buffer_sink::commit_eof()
     922              : {
     923              :     struct awaitable
     924              :     {
     925              :         any_buffer_sink* self_;
     926              :         std::error_code ec_;
     927              : 
     928              :         bool
     929           18 :         await_ready() const noexcept
     930              :         {
     931           18 :             return false;
     932              :         }
     933              : 
     934              :         coro
     935           18 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     936              :         {
     937           36 :             return self_->vt_->do_commit_eof(
     938           18 :                 self_->sink_,
     939              :                 self_,
     940              :                 h,
     941              :                 ex,
     942              :                 token,
     943           26 :                 &ec_);
     944              :         }
     945              : 
     946              :         io_result<>
     947           13 :         await_resume() const noexcept
     948              :         {
     949           13 :             return {ec_};
     950              :         }
     951              :     };
     952           18 :     return awaitable{this, {}};
     953              : }
     954              : 
     955              : //----------------------------------------------------------
     956              : 
     957              : template<ConstBufferSequence CB>
     958              : task<io_result<std::size_t>>
     959              : any_buffer_sink::write(CB buffers)
     960              : {
     961              :     return write(buffers, false);
     962              : }
     963              : 
     964              : template<ConstBufferSequence CB>
     965              : task<io_result<std::size_t>>
     966              : any_buffer_sink::write(CB buffers, bool eof)
     967              : {
     968              :     buffer_param<CB> bp(buffers);
     969              :     std::size_t total = 0;
     970              : 
     971              :     for(;;)
     972              :     {
     973              :         auto src = bp.data();
     974              :         if(src.empty())
     975              :             break;
     976              : 
     977              :         mutable_buffer arr[detail::max_iovec_];
     978              :         std::size_t count = prepare(arr, detail::max_iovec_);
     979              :         if(count == 0)
     980              :         {
     981              :             auto [ec] = co_await commit(0);
     982              :             if(ec)
     983              :                 co_return {ec, total};
     984              :             continue;
     985              :         }
     986              : 
     987              :         auto n = buffer_copy(std::span(arr, count), src);
     988              :         auto [ec] = co_await commit(n);
     989              :         if(ec)
     990              :             co_return {ec, total};
     991              :         bp.consume(n);
     992              :         total += n;
     993              :     }
     994              : 
     995              :     if(eof)
     996              :     {
     997              :         auto [ec] = co_await commit_eof();
     998              :         if(ec)
     999              :             co_return {ec, total};
    1000              :     }
    1001              : 
    1002              :     co_return {{}, total};
    1003              : }
    1004              : 
    1005              : inline auto
    1006              : any_buffer_sink::write_eof()
    1007              : {
    1008              :     return commit_eof();
    1009              : }
    1010              : 
    1011              : //----------------------------------------------------------
    1012              : 
    1013              : static_assert(WriteSink<any_buffer_sink>);
    1014              : 
    1015              : } // namespace capy
    1016              : } // namespace boost
    1017              : 
    1018              : #endif
        

Generated by: LCOV version 2.3