LCOV - code coverage report
Current view: top level - boost/capy/io - any_write_sink.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 85.9 % 199 171
Test Date: 2026-01-30 23:43:15 Functions: 84.3 % 70 59

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      11              : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/buffers.hpp>
      15              : #include <boost/capy/buffers/buffer_param.hpp>
      16              : #include <boost/capy/concept/io_awaitable.hpp>
      17              : #include <boost/capy/concept/write_sink.hpp>
      18              : #include <boost/capy/coro.hpp>
      19              : #include <boost/capy/ex/executor_ref.hpp>
      20              : #include <boost/capy/io_result.hpp>
      21              : #include <boost/capy/task.hpp>
      22              : 
      23              : #include <system_error>
      24              : 
      25              : #include <concepts>
      26              : #include <coroutine>
      27              : #include <cstddef>
      28              : #include <exception>
      29              : #include <span>
      30              : #include <stop_token>
      31              : #include <utility>
      32              : 
      33              : namespace boost {
      34              : namespace capy {
      35              : 
      36              : /** Type-erased wrapper for any WriteSink.
      37              : 
      38              :     This class provides type erasure for any type satisfying the
      39              :     @ref WriteSink concept, enabling runtime polymorphism for
      40              :     sink write operations. It uses a cached coroutine frame to achieve
      41              :     zero steady-state allocation after construction.
      42              : 
      43              :     The wrapper supports two construction modes:
      44              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      45              :       allocates storage and owns the sink.
      46              :     - **Reference**: Pass a pointer to wrap without ownership. The
      47              :       pointed-to sink must outlive this wrapper.
      48              : 
      49              :     @par Frame Preallocation
      50              :     The constructor preallocates the internal coroutine frame.
      51              :     This reserves all virtual address space at server startup
      52              :     so memory usage can be measured up front, rather than
      53              :     allocating piecemeal as traffic arrives.
      54              : 
      55              :     @par Thread Safety
      56              :     Not thread-safe. Concurrent operations on the same wrapper
      57              :     are undefined behavior.
      58              : 
      59              :     @par Example
      60              :     @code
      61              :     // Owning - takes ownership of the sink
      62              :     any_write_sink ws(some_sink{args...});
      63              : 
      64              :     // Reference - wraps without ownership
      65              :     some_sink sink;
      66              :     any_write_sink ws(&sink);
      67              : 
      68              :     const_buffer buf(data, size);
      69              :     auto [ec, n] = co_await ws.write(std::span(&buf, 1));
      70              :     auto [ec2] = co_await ws.write_eof();
      71              :     @endcode
      72              : 
      73              :     @see any_write_stream, WriteSink
      74              : */
      75              : class any_write_sink
      76              : {
      77              :     struct vtable;
      78              : 
      79              :     template<WriteSink S>
      80              :     struct vtable_for_impl;
      81              : 
      82              :     struct write_op;
      83              :     struct write_eof_op;
      84              : 
      85              :     void* sink_ = nullptr;
      86              :     vtable const* vt_ = nullptr;
      87              :     void* cached_frame_ = nullptr;
      88              :     std::size_t cached_size_ = 0;
      89              :     void* storage_ = nullptr;
      90              : 
      91              :     template<WriteSink S>
      92              :     static coro
      93              :     do_write_impl(
      94              :         void* sink,
      95              :         any_write_sink* wrapper,
      96              :         std::span<const_buffer const> buffers,
      97              :         bool eof,
      98              :         coro h,
      99              :         executor_ref ex,
     100              :         std::stop_token token,
     101              :         std::error_code* ec,
     102              :         std::size_t* n);
     103              : 
     104              :     template<WriteSink S>
     105              :     static coro
     106              :     do_write_eof_impl(
     107              :         void* sink,
     108              :         any_write_sink* wrapper,
     109              :         coro h,
     110              :         executor_ref ex,
     111              :         std::stop_token token,
     112              :         std::error_code* ec);
     113              : 
     114              :     template<WriteSink S>
     115              :     static write_op
     116              :     write_coro(
     117              :         any_write_sink* wrapper,
     118              :         S& sink,
     119              :         std::span<const_buffer const> bufs,
     120              :         std::error_code* out_ec,
     121              :         std::size_t* out_n);
     122              : 
     123              :     template<WriteSink S>
     124              :     static write_op
     125              :     write_with_eof_coro(
     126              :         any_write_sink* wrapper,
     127              :         S& sink,
     128              :         std::span<const_buffer const> bufs,
     129              :         bool eof,
     130              :         std::error_code* out_ec,
     131              :         std::size_t* out_n);
     132              : 
     133              :     template<WriteSink S>
     134              :     static write_eof_op
     135              :     write_eof_coro(
     136              :         any_write_sink* wrapper,
     137              :         S& sink,
     138              :         std::error_code* out_ec);
     139              : 
     140              :     void* alloc_frame(std::size_t size);
     141              :     void free_frame(void* p, std::size_t size);
     142              : 
     143              : public:
     144              :     /** Destructor.
     145              : 
     146              :         Destroys the owned sink (if any) and releases the cached
     147              :         coroutine frame.
     148              :     */
     149              :     ~any_write_sink();
     150              : 
     151              :     /** Default constructor.
     152              : 
     153              :         Constructs an empty wrapper. Operations on a default-constructed
     154              :         wrapper result in undefined behavior.
     155              :     */
     156              :     any_write_sink() = default;
     157              : 
     158              :     /** Non-copyable.
     159              : 
     160              :         The frame cache is per-instance and cannot be shared.
     161              :     */
     162              :     any_write_sink(any_write_sink const&) = delete;
     163              :     any_write_sink& operator=(any_write_sink const&) = delete;
     164              : 
     165              :     /** Move constructor.
     166              : 
     167              :         Transfers ownership of the wrapped sink (if owned) and
     168              :         cached frame from `other`. After the move, `other` is
     169              :         in a default-constructed state.
     170              : 
     171              :         @param other The wrapper to move from.
     172              :     */
     173            1 :     any_write_sink(any_write_sink&& other) noexcept
     174            1 :         : sink_(std::exchange(other.sink_, nullptr))
     175            1 :         , vt_(std::exchange(other.vt_, nullptr))
     176            1 :         , cached_frame_(std::exchange(other.cached_frame_, nullptr))
     177            1 :         , cached_size_(std::exchange(other.cached_size_, 0))
     178            1 :         , storage_(std::exchange(other.storage_, nullptr))
     179              :     {
     180            1 :     }
     181              : 
     182              :     /** Move assignment operator.
     183              : 
     184              :         Destroys any owned sink and releases existing resources,
     185              :         then transfers ownership from `other`.
     186              : 
     187              :         @param other The wrapper to move from.
     188              :         @return Reference to this wrapper.
     189              :     */
     190              :     any_write_sink&
     191              :     operator=(any_write_sink&& other) noexcept;
     192              : 
     193              :     /** Construct by taking ownership of a WriteSink.
     194              : 
     195              :         Allocates storage and moves the sink into this wrapper.
     196              :         The wrapper owns the sink and will destroy it.
     197              : 
     198              :         @param s The sink to take ownership of.
     199              :     */
     200              :     template<WriteSink S>
     201              :         requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     202              :     any_write_sink(S s);
     203              : 
     204              :     /** Construct by wrapping a WriteSink without ownership.
     205              : 
     206              :         Wraps the given sink by pointer. The sink must remain
     207              :         valid for the lifetime of this wrapper.
     208              : 
     209              :         @param s Pointer to the sink to wrap.
     210              :     */
     211              :     template<WriteSink S>
     212           96 :     any_write_sink(S* s) noexcept
     213           96 :         : sink_(s)
     214           96 :         , vt_(&vtable_for_impl<S>::value)
     215              :     {
     216              :         // Preallocate coroutine frames to find max size
     217           96 :         write_coro<S>(this, *s, {}, nullptr, nullptr);
     218           96 :         write_with_eof_coro<S>(this, *s, {}, false, nullptr, nullptr);
     219           96 :         write_eof_coro<S>(this, *s, nullptr);
     220           96 :     }
     221              : 
     222              :     /** Check if the wrapper contains a valid sink.
     223              : 
     224              :         @return `true` if wrapping a sink, `false` if default-constructed
     225              :             or moved-from.
     226              :     */
     227              :     bool
     228            9 :     has_value() const noexcept
     229              :     {
     230            9 :         return sink_ != nullptr;
     231              :     }
     232              : 
     233              :     /** Check if the wrapper contains a valid sink.
     234              : 
     235              :         @return `true` if wrapping a sink, `false` if default-constructed
     236              :             or moved-from.
     237              :     */
     238              :     explicit
     239            2 :     operator bool() const noexcept
     240              :     {
     241            2 :         return has_value();
     242              :     }
     243              : 
     244              :     /** Initiate an asynchronous write operation.
     245              : 
     246              :         Writes data from the provided buffer sequence. The operation
     247              :         completes when all bytes have been consumed, or an error
     248              :         occurs.
     249              : 
     250              :         @param buffers The buffer sequence containing data to write.
     251              :             Passed by value to ensure the sequence lives in the
     252              :             coroutine frame across suspension points.
     253              : 
     254              :         @return An awaitable yielding `(error_code,std::size_t)`.
     255              : 
     256              :         @par Preconditions
     257              :         The wrapper must contain a valid sink (`has_value() == true`).
     258              :     */
     259              :     template<ConstBufferSequence CB>
     260              :     task<io_result<std::size_t>>
     261              :     write(CB buffers);
     262              : 
     263              :     /** Initiate an asynchronous write operation with optional EOF.
     264              : 
     265              :         Writes data from the provided buffer sequence, optionally
     266              :         finalizing the sink afterwards. The operation completes when
     267              :         all bytes have been consumed and (if eof is true) the sink
     268              :         is finalized, or an error occurs.
     269              : 
     270              :         @param buffers The buffer sequence containing data to write.
     271              :             Passed by value to ensure the sequence lives in the
     272              :             coroutine frame across suspension points.
     273              : 
     274              :         @param eof If `true`, the sink is finalized after writing
     275              :             the data.
     276              : 
     277              :         @return An awaitable yielding `(error_code,std::size_t)`.
     278              : 
     279              :         @par Preconditions
     280              :         The wrapper must contain a valid sink (`has_value() == true`).
     281              :     */
     282              :     template<ConstBufferSequence CB>
     283              :     task<io_result<std::size_t>>
     284              :     write(CB buffers, bool eof);
     285              : 
     286              :     /** Signal end of data.
     287              : 
     288              :         Indicates that no more data will be written to the sink.
     289              :         The operation completes when the sink is finalized, or
     290              :         an error occurs.
     291              : 
     292              :         @return An awaitable yielding `(error_code)`.
     293              : 
     294              :         @par Preconditions
     295              :         The wrapper must contain a valid sink (`has_value() == true`).
     296              :     */
     297              :     auto
     298              :     write_eof();
     299              : 
     300              : protected:
     301              :     /** Rebind to a new sink after move.
     302              : 
     303              :         Updates the internal pointer to reference a new sink object.
     304              :         Used by owning wrappers after move assignment when the owned
     305              :         object has moved to a new location.
     306              : 
     307              :         @param new_sink The new sink to bind to. Must be the same
     308              :             type as the original sink.
     309              : 
     310              :         @note Terminates if called with a sink of different type
     311              :             than the original.
     312              :     */
     313              :     template<WriteSink S>
     314              :     void
     315              :     rebind(S& new_sink) noexcept
     316              :     {
     317              :         if(vt_ != &vtable_for_impl<S>::value)
     318              :             std::terminate();
     319              :         sink_ = &new_sink;
     320              :     }
     321              : 
     322              : private:
     323              :     auto
     324              :     write_some_(std::span<const_buffer const> buffers, bool eof);
     325              : };
     326              : 
     327              : //----------------------------------------------------------
     328              : 
     329              : struct any_write_sink::vtable
     330              : {
     331              :     void (*destroy)(void*) noexcept;
     332              : 
     333              :     coro (*do_write)(
     334              :         void* sink,
     335              :         any_write_sink* wrapper,
     336              :         std::span<const_buffer const> buffers,
     337              :         bool eof,
     338              :         coro h,
     339              :         executor_ref ex,
     340              :         std::stop_token token,
     341              :         std::error_code* ec,
     342              :         std::size_t* n);
     343              : 
     344              :     coro (*do_write_eof)(
     345              :         void* sink,
     346              :         any_write_sink* wrapper,
     347              :         coro h,
     348              :         executor_ref ex,
     349              :         std::stop_token token,
     350              :         std::error_code* ec);
     351              : };
     352              : 
     353              : template<WriteSink S>
     354              : struct any_write_sink::vtable_for_impl
     355              : {
     356              :     static void
     357            0 :     do_destroy_impl(void* sink) noexcept
     358              :     {
     359            0 :         static_cast<S*>(sink)->~S();
     360            0 :     }
     361              : 
     362              :     static constexpr vtable value = {
     363              :         &do_destroy_impl,
     364              :         &any_write_sink::do_write_impl<S>,
     365              :         &any_write_sink::do_write_eof_impl<S>
     366              :     };
     367              : };
     368              : 
     369              : //----------------------------------------------------------
     370              : 
     371              : inline
     372           99 : any_write_sink::~any_write_sink()
     373              : {
     374           99 :     if(storage_)
     375              :     {
     376            0 :         vt_->destroy(sink_);
     377            0 :         ::operator delete(storage_);
     378              :     }
     379           99 :     if(cached_frame_)
     380           96 :         ::operator delete(cached_frame_);
     381           99 : }
     382              : 
     383              : inline any_write_sink&
     384            1 : any_write_sink::operator=(any_write_sink&& other) noexcept
     385              : {
     386            1 :     if(this != &other)
     387              :     {
     388            1 :         if(storage_)
     389              :         {
     390            0 :             vt_->destroy(sink_);
     391            0 :             ::operator delete(storage_);
     392              :         }
     393            1 :         if(cached_frame_)
     394            0 :             ::operator delete(cached_frame_);
     395            1 :         sink_ = std::exchange(other.sink_, nullptr);
     396            1 :         vt_ = std::exchange(other.vt_, nullptr);
     397            1 :         cached_frame_ = std::exchange(other.cached_frame_, nullptr);
     398            1 :         cached_size_ = std::exchange(other.cached_size_, 0);
     399            1 :         storage_ = std::exchange(other.storage_, nullptr);
     400              :     }
     401            1 :     return *this;
     402              : }
     403              : 
     404              : template<WriteSink S>
     405              :     requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     406              : any_write_sink::any_write_sink(S s)
     407              :     : vt_(&vtable_for_impl<S>::value)
     408              : {
     409              :     struct guard {
     410              :         any_write_sink* self;
     411              :         bool committed = false;
     412              :         ~guard() {
     413              :             if(!committed && self->storage_) {
     414              :                 self->vt_->destroy(self->sink_);
     415              :                 ::operator delete(self->storage_);
     416              :                 self->storage_ = nullptr;
     417              :                 self->sink_ = nullptr;
     418              :             }
     419              :         }
     420              :     } g{this};
     421              : 
     422              :     storage_ = ::operator new(sizeof(S));
     423              :     sink_ = ::new(storage_) S(std::move(s));
     424              : 
     425              :     // Preallocate coroutine frames to find max size
     426              :     auto& ref = *static_cast<S*>(sink_);
     427              :     write_coro<S>(this, ref, {}, nullptr, nullptr);
     428              :     write_with_eof_coro<S>(this, ref, {}, false, nullptr, nullptr);
     429              :     write_eof_coro<S>(this, ref, nullptr);
     430              : 
     431              :     g.committed = true;
     432              : }
     433              : 
     434              : //----------------------------------------------------------
     435              : 
     436              : struct any_write_sink::write_op
     437              : {
     438              :     struct promise_type
     439              :     {
     440              :         executor_ref executor_;
     441              :         std::stop_token stop_token_;
     442              :         coro caller_h_{};
     443              : 
     444          318 :         promise_type() = default;
     445              : 
     446              :         write_op
     447          318 :         get_return_object() noexcept
     448              :         {
     449              :             return write_op{
     450          318 :                 std::coroutine_handle<promise_type>::from_promise(*this)};
     451              :         }
     452              : 
     453              :         std::suspend_always
     454          318 :         initial_suspend() noexcept
     455              :         {
     456          318 :             return {};
     457              :         }
     458              : 
     459              :         auto
     460           98 :         final_suspend() noexcept
     461              :         {
     462              :             struct awaiter
     463              :             {
     464              :                 promise_type* p_;
     465              : 
     466           98 :                 bool await_ready() const noexcept { return false; }
     467              : 
     468           98 :                 coro await_suspend(coro) const noexcept
     469              :                 {
     470           98 :                     if(p_->caller_h_)
     471            0 :                         return p_->caller_h_;
     472           98 :                     return std::noop_coroutine();
     473              :                 }
     474              : 
     475            0 :                 void await_resume() const noexcept {}
     476              :             };
     477           98 :             return awaiter{this};
     478              :         }
     479              : 
     480              :         void
     481           98 :         return_void() noexcept
     482              :         {
     483           98 :         }
     484              : 
     485              :         void
     486           28 :         unhandled_exception()
     487              :         {
     488           28 :             throw;
     489              :         }
     490              : 
     491              :         template<class... Args>
     492              :         static void*
     493          318 :         operator new(
     494              :             std::size_t size,
     495              :             any_write_sink* wrapper,
     496              :             Args&&...)
     497              :         {
     498          318 :             return wrapper->alloc_frame(size);
     499              :         }
     500              : 
     501              :         template<class... Args>
     502              :         static void
     503              :         operator delete(void*, any_write_sink*, Args&&...) noexcept
     504              :         {
     505              :         }
     506              : 
     507              :         static void
     508          318 :         operator delete(void*, std::size_t) noexcept
     509              :         {
     510          318 :         }
     511              : 
     512              :         void
     513          126 :         set_executor(executor_ref ex) noexcept
     514              :         {
     515          126 :             executor_ = ex;
     516          126 :         }
     517              : 
     518              :         void
     519          126 :         set_stop_token(std::stop_token token) noexcept
     520              :         {
     521          126 :             stop_token_ = token;
     522          126 :         }
     523              : 
     524              :         void
     525            0 :         set_caller(coro h) noexcept
     526              :         {
     527            0 :             caller_h_ = h;
     528            0 :         }
     529              : 
     530              :         template<class Awaitable>
     531              :         struct transform_awaiter
     532              :         {
     533              :             std::decay_t<Awaitable> a_;
     534              :             promise_type* p_;
     535              : 
     536          126 :             bool await_ready()
     537              :             {
     538          126 :                 return a_.await_ready();
     539              :             }
     540              : 
     541          126 :             decltype(auto) await_resume()
     542              :             {
     543          126 :                 return a_.await_resume();
     544              :             }
     545              : 
     546            0 :             auto await_suspend(coro h)
     547              :             {
     548            0 :                 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
     549              :             }
     550              :         };
     551              : 
     552              :         template<class Awaitable>
     553          126 :         auto await_transform(Awaitable&& a)
     554              :         {
     555              :             using A = std::decay_t<Awaitable>;
     556              :             if constexpr (IoAwaitable<A>)
     557              :             {
     558              :                 return transform_awaiter<Awaitable>{
     559          126 :                     std::forward<Awaitable>(a), this};
     560              :             }
     561              :             else
     562              :             {
     563              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     564              :             }
     565              :         }
     566              :     };
     567              : 
     568              :     std::coroutine_handle<promise_type> h_;
     569              : 
     570          318 :     ~write_op()
     571              :     {
     572          318 :         if(h_)
     573          220 :             h_.destroy();
     574          318 :     }
     575              : 
     576              :     write_op(write_op const&) = delete;
     577              :     write_op& operator=(write_op const&) = delete;
     578              : 
     579              :     write_op(write_op&& other) noexcept
     580              :         : h_(std::exchange(other.h_, nullptr))
     581              :     {
     582              :     }
     583              : 
     584              :     write_op& operator=(write_op&& other) noexcept
     585              :     {
     586              :         if(this != &other)
     587              :         {
     588              :             if(h_)
     589              :                 h_.destroy();
     590              :             h_ = std::exchange(other.h_, nullptr);
     591              :         }
     592              :         return *this;
     593              :     }
     594              : 
     595              : private:
     596              :     explicit
     597          318 :     write_op(std::coroutine_handle<promise_type> h) noexcept
     598          318 :         : h_(h)
     599              :     {
     600          318 :     }
     601              : };
     602              : 
     603              : //----------------------------------------------------------
     604              : 
     605              : struct any_write_sink::write_eof_op
     606              : {
     607              :     struct promise_type
     608              :     {
     609              :         executor_ref executor_;
     610              :         std::stop_token stop_token_;
     611              :         coro caller_h_{};
     612              : 
     613          120 :         promise_type() = default;
     614              : 
     615              :         write_eof_op
     616          120 :         get_return_object() noexcept
     617              :         {
     618              :             return write_eof_op{
     619          120 :                 std::coroutine_handle<promise_type>::from_promise(*this)};
     620              :         }
     621              : 
     622              :         std::suspend_always
     623          120 :         initial_suspend() noexcept
     624              :         {
     625          120 :             return {};
     626              :         }
     627              : 
     628              :         auto
     629           17 :         final_suspend() noexcept
     630              :         {
     631              :             struct awaiter
     632              :             {
     633              :                 promise_type* p_;
     634              : 
     635           17 :                 bool await_ready() const noexcept { return false; }
     636              : 
     637           17 :                 coro await_suspend(coro) const noexcept
     638              :                 {
     639           17 :                     if(p_->caller_h_)
     640            0 :                         return p_->caller_h_;
     641           17 :                     return std::noop_coroutine();
     642              :                 }
     643              : 
     644            0 :                 void await_resume() const noexcept {}
     645              :             };
     646           17 :             return awaiter{this};
     647              :         }
     648              : 
     649              :         void
     650           17 :         return_void() noexcept
     651              :         {
     652           17 :         }
     653              : 
     654              :         void
     655            7 :         unhandled_exception()
     656              :         {
     657            7 :             throw;
     658              :         }
     659              : 
     660              :         template<class... Args>
     661              :         static void*
     662          120 :         operator new(
     663              :             std::size_t size,
     664              :             any_write_sink* wrapper,
     665              :             Args&&...)
     666              :         {
     667          120 :             return wrapper->alloc_frame(size);
     668              :         }
     669              : 
     670              :         template<class... Args>
     671              :         static void
     672              :         operator delete(void*, any_write_sink*, Args&&...) noexcept
     673              :         {
     674              :         }
     675              : 
     676              :         static void
     677          120 :         operator delete(void*, std::size_t) noexcept
     678              :         {
     679          120 :         }
     680              : 
     681              :         void
     682           24 :         set_executor(executor_ref ex) noexcept
     683              :         {
     684           24 :             executor_ = ex;
     685           24 :         }
     686              : 
     687              :         void
     688           24 :         set_stop_token(std::stop_token token) noexcept
     689              :         {
     690           24 :             stop_token_ = token;
     691           24 :         }
     692              : 
     693              :         void
     694            0 :         set_caller(coro h) noexcept
     695              :         {
     696            0 :             caller_h_ = h;
     697            0 :         }
     698              : 
     699              :         template<class Awaitable>
     700              :         struct transform_awaiter
     701              :         {
     702              :             std::decay_t<Awaitable> a_;
     703              :             promise_type* p_;
     704              : 
     705           24 :             bool await_ready()
     706              :             {
     707           24 :                 return a_.await_ready();
     708              :             }
     709              : 
     710           24 :             decltype(auto) await_resume()
     711              :             {
     712           24 :                 return a_.await_resume();
     713              :             }
     714              : 
     715            0 :             auto await_suspend(coro h)
     716              :             {
     717            0 :                 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
     718              :             }
     719              :         };
     720              : 
     721              :         template<class Awaitable>
     722           24 :         auto await_transform(Awaitable&& a)
     723              :         {
     724              :             using A = std::decay_t<Awaitable>;
     725              :             if constexpr (IoAwaitable<A>)
     726              :             {
     727              :                 return transform_awaiter<Awaitable>{
     728           24 :                     std::forward<Awaitable>(a), this};
     729              :             }
     730              :             else
     731              :             {
     732              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     733              :             }
     734              :         }
     735              :     };
     736              : 
     737              :     std::coroutine_handle<promise_type> h_;
     738              : 
     739          120 :     ~write_eof_op()
     740              :     {
     741          120 :         if(h_)
     742          103 :             h_.destroy();
     743          120 :     }
     744              : 
     745              :     write_eof_op(write_eof_op const&) = delete;
     746              :     write_eof_op& operator=(write_eof_op const&) = delete;
     747              : 
     748              :     write_eof_op(write_eof_op&& other) noexcept
     749              :         : h_(std::exchange(other.h_, nullptr))
     750              :     {
     751              :     }
     752              : 
     753              :     write_eof_op& operator=(write_eof_op&& other) noexcept
     754              :     {
     755              :         if(this != &other)
     756              :         {
     757              :             if(h_)
     758              :                 h_.destroy();
     759              :             h_ = std::exchange(other.h_, nullptr);
     760              :         }
     761              :         return *this;
     762              :     }
     763              : 
     764              : private:
     765              :     explicit
     766          120 :     write_eof_op(std::coroutine_handle<promise_type> h) noexcept
     767          120 :         : h_(h)
     768              :     {
     769          120 :     }
     770              : };
     771              : 
     772              : //----------------------------------------------------------
     773              : 
     774              : inline void*
     775          438 : any_write_sink::alloc_frame(std::size_t size)
     776              : {
     777          438 :     if(cached_frame_ && cached_size_ >= size)
     778          246 :         return cached_frame_;
     779              : 
     780          192 :     if(cached_frame_)
     781           96 :         ::operator delete(cached_frame_);
     782              : 
     783          192 :     cached_frame_ = ::operator new(size);
     784          192 :     cached_size_ = size;
     785          192 :     return cached_frame_;
     786              : }
     787              : 
     788              : inline void
     789              : any_write_sink::free_frame(void*, std::size_t)
     790              : {
     791              :     // Keep the frame cached for reuse
     792              : }
     793              : 
     794              : #if defined(__GNUC__) && !defined(__clang__)
     795              : #pragma GCC diagnostic push
     796              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     797              : #endif
     798              : 
     799              : template<WriteSink S>
     800              : any_write_sink::write_op
     801          222 : any_write_sink::write_coro(
     802              :     any_write_sink*,
     803              :     S& sink,
     804              :     std::span<const_buffer const> bufs,
     805              :     std::error_code* out_ec,
     806              :     std::size_t* out_n)
     807              : {
     808              :     auto [err, bytes] = co_await sink.write(bufs);
     809              : 
     810              :     *out_ec = err;
     811              :     *out_n = bytes;
     812          444 : }
     813              : 
     814              : template<WriteSink S>
     815              : any_write_sink::write_op
     816           96 : any_write_sink::write_with_eof_coro(
     817              :     any_write_sink*,
     818              :     S& sink,
     819              :     std::span<const_buffer const> bufs,
     820              :     bool eof,
     821              :     std::error_code* out_ec,
     822              :     std::size_t* out_n)
     823              : {
     824              :     auto [err, bytes] = co_await sink.write(bufs, eof);
     825              : 
     826              :     *out_ec = err;
     827              :     *out_n = bytes;
     828          192 : }
     829              : 
     830              : template<WriteSink S>
     831              : any_write_sink::write_eof_op
     832          120 : any_write_sink::write_eof_coro(
     833              :     any_write_sink*,
     834              :     S& sink,
     835              :     std::error_code* out_ec)
     836              : {
     837              :     auto [err] = co_await sink.write_eof();
     838              : 
     839              :     *out_ec = err;
     840          240 : }
     841              : 
     842              : #if defined(__GNUC__) && !defined(__clang__)
     843              : #pragma GCC diagnostic pop
     844              : #endif
     845              : 
     846              : template<WriteSink S>
     847              : coro
     848          126 : any_write_sink::do_write_impl(
     849              :     void* sink,
     850              :     any_write_sink* wrapper,
     851              :     std::span<const_buffer const> buffers,
     852              :     bool eof,
     853              :     coro h,
     854              :     executor_ref ex,
     855              :     std::stop_token token,
     856              :     std::error_code* ec,
     857              :     std::size_t* n)
     858              : {
     859          126 :     auto& s = *static_cast<S*>(sink);
     860              : 
     861              :     // Create coroutine - frame is cached in wrapper
     862          126 :     auto op = eof
     863          126 :         ? write_with_eof_coro<S>(wrapper, s, buffers, eof, ec, n)
     864              :         : write_coro<S>(wrapper, s, buffers, ec, n);
     865              : 
     866              :     // Set executor and stop token on promise before resuming
     867          126 :     op.h_.promise().set_executor(ex);
     868          126 :     op.h_.promise().set_stop_token(token);
     869              : 
     870              :     // Resume the coroutine to start the operation
     871          126 :     op.h_.resume();
     872              : 
     873              :     // Check if operation completed synchronously
     874           98 :     if(op.h_.done())
     875              :     {
     876           98 :         op.h_.destroy();
     877           98 :         op.h_ = nullptr;
     878           98 :         return ex.dispatch(h);
     879              :     }
     880              : 
     881              :     // Operation is pending - caller will be resumed via symmetric transfer
     882            0 :     op.h_.promise().set_caller(h);
     883            0 :     op.h_ = nullptr;
     884            0 :     return std::noop_coroutine();
     885          126 : }
     886              : 
     887              : template<WriteSink S>
     888              : coro
     889           24 : any_write_sink::do_write_eof_impl(
     890              :     void* sink,
     891              :     any_write_sink* wrapper,
     892              :     coro h,
     893              :     executor_ref ex,
     894              :     std::stop_token token,
     895              :     std::error_code* ec)
     896              : {
     897           24 :     auto& s = *static_cast<S*>(sink);
     898              : 
     899              :     // Create coroutine - frame is cached in wrapper
     900           24 :     auto op = write_eof_coro<S>(wrapper, s, ec);
     901              : 
     902              :     // Set executor and stop token on promise before resuming
     903           24 :     op.h_.promise().set_executor(ex);
     904           24 :     op.h_.promise().set_stop_token(token);
     905              : 
     906              :     // Resume the coroutine to start the operation
     907           24 :     op.h_.resume();
     908              : 
     909              :     // Check if operation completed synchronously
     910           17 :     if(op.h_.done())
     911              :     {
     912           17 :         op.h_.destroy();
     913           17 :         op.h_ = nullptr;
     914           17 :         return ex.dispatch(h);
     915              :     }
     916              : 
     917              :     // Operation is pending - caller will be resumed via symmetric transfer
     918            0 :     op.h_.promise().set_caller(h);
     919            0 :     op.h_ = nullptr;
     920            0 :     return std::noop_coroutine();
     921           24 : }
     922              : 
     923              : //----------------------------------------------------------
     924              : 
     925              : inline auto
     926          126 : any_write_sink::write_some_(
     927              :     std::span<const_buffer const> buffers,
     928              :     bool eof)
     929              : {
     930              :     struct awaitable
     931              :     {
     932              :         any_write_sink* self_;
     933              :         std::span<const_buffer const> buffers_;
     934              :         bool eof_;
     935              :         std::error_code ec_;
     936              :         std::size_t n_ = 0;
     937              : 
     938              :         bool
     939          126 :         await_ready() const noexcept
     940              :         {
     941          126 :             return false;
     942              :         }
     943              : 
     944              :         coro
     945          126 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     946              :         {
     947          252 :             return self_->vt_->do_write(
     948          126 :                 self_->sink_,
     949              :                 self_,
     950              :                 buffers_,
     951          126 :                 eof_,
     952              :                 h,
     953              :                 ex,
     954              :                 token,
     955              :                 &ec_,
     956          196 :                 &n_);
     957              :         }
     958              : 
     959              :         io_result<std::size_t>
     960           98 :         await_resume() const noexcept
     961              :         {
     962           98 :             return {ec_, n_};
     963              :         }
     964              :     };
     965          126 :     return awaitable{this, buffers, eof, {}, 0};
     966              : }
     967              : 
     968              : inline auto
     969           24 : any_write_sink::write_eof()
     970              : {
     971              :     struct awaitable
     972              :     {
     973              :         any_write_sink* self_;
     974              :         std::error_code ec_;
     975              : 
     976              :         bool
     977           24 :         await_ready() const noexcept
     978              :         {
     979           24 :             return false;
     980              :         }
     981              : 
     982              :         coro
     983           24 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     984              :         {
     985           48 :             return self_->vt_->do_write_eof(
     986           24 :                 self_->sink_,
     987              :                 self_,
     988              :                 h,
     989              :                 ex,
     990              :                 token,
     991           34 :                 &ec_);
     992              :         }
     993              : 
     994              :         io_result<>
     995           17 :         await_resume() const noexcept
     996              :         {
     997           17 :             return {ec_};
     998              :         }
     999              :     };
    1000           24 :     return awaitable{this, {}};
    1001              : }
    1002              : 
    1003              : template<ConstBufferSequence CB>
    1004              : task<io_result<std::size_t>>
    1005           66 : any_write_sink::write(CB buffers)
    1006              : {
    1007           66 :     return write(buffers, false);
    1008              : }
    1009              : 
    1010              : template<ConstBufferSequence CB>
    1011              : task<io_result<std::size_t>>
    1012           98 : any_write_sink::write(CB buffers, bool eof)
    1013              : {
    1014              :     buffer_param<CB> bp(buffers);
    1015              :     std::size_t total = 0;
    1016              : 
    1017              :     for(;;)
    1018              :     {
    1019              :         auto bufs = bp.data();
    1020              :         if(bufs.empty())
    1021              :             break;
    1022              : 
    1023              :         auto [ec, n] = co_await write_some_(bufs, false);
    1024              :         if(ec)
    1025              :             co_return {ec, total + n};
    1026              :         bp.consume(n);
    1027              :         total += n;
    1028              :     }
    1029              : 
    1030              :     if(eof)
    1031              :     {
    1032              :         auto [ec] = co_await write_eof();
    1033              :         if(ec)
    1034              :             co_return {ec, total};
    1035              :     }
    1036              : 
    1037              :     co_return {{}, total};
    1038          196 : }
    1039              : 
    1040              : } // namespace capy
    1041              : } // namespace boost
    1042              : 
    1043              : #endif
        

Generated by: LCOV version 2.3