LCOV - code coverage report
Current view: top level - boost/capy/io - any_write_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.2 % 120 101
Test Date: 2026-01-30 23:43:15 Functions: 89.3 % 56 50

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      11              : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/buffers.hpp>
      15              : #include <boost/capy/buffers/buffer_param.hpp>
      16              : #include <boost/capy/concept/io_awaitable.hpp>
      17              : #include <boost/capy/concept/write_stream.hpp>
      18              : #include <boost/capy/coro.hpp>
      19              : #include <boost/capy/ex/executor_ref.hpp>
      20              : #include <boost/capy/io_result.hpp>
      21              : 
      22              : #include <system_error>
      23              : 
      24              : #include <concepts>
      25              : #include <coroutine>
      26              : #include <cstddef>
      27              : #include <exception>
      28              : #include <span>
      29              : #include <stop_token>
      30              : #include <utility>
      31              : 
      32              : namespace boost {
      33              : namespace capy {
      34              : 
      35              : /** Type-erased wrapper for any WriteStream.
      36              : 
      37              :     This class provides type erasure for any type satisfying the
      38              :     @ref WriteStream concept, enabling runtime polymorphism for
      39              :     write operations. It uses a cached coroutine frame to achieve
      40              :     zero steady-state allocation after construction.
      41              : 
      42              :     The wrapper supports two construction modes:
      43              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      44              :       allocates storage and owns the stream.
      45              :     - **Reference**: Pass a pointer to wrap without ownership. The
      46              :       pointed-to stream must outlive this wrapper.
      47              : 
      48              :     @par Frame Preallocation
      49              :     The constructor preallocates the internal coroutine frame.
      50              :     This reserves all virtual address space at server startup
      51              :     so memory usage can be measured up front, rather than
      52              :     allocating piecemeal as traffic arrives.
      53              : 
      54              :     @par Thread Safety
      55              :     Not thread-safe. Concurrent operations on the same wrapper
      56              :     are undefined behavior.
      57              : 
      58              :     @par Example
      59              :     @code
      60              :     // Owning - takes ownership of the stream
      61              :     any_write_stream stream(socket{ioc});
      62              : 
      63              :     // Reference - wraps without ownership
      64              :     socket sock(ioc);
      65              :     any_write_stream stream(&sock);
      66              : 
      67              :     const_buffer buf(data, size);
      68              :     auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
      69              :     @endcode
      70              : 
      71              :     @see any_read_stream, any_stream, WriteStream
      72              : */
      73              : class any_write_stream
      74              : {
      75              :     struct vtable;
      76              : 
      77              :     template<WriteStream S>
      78              :     struct vtable_for_impl;
      79              : 
      80              :     struct write_op;
      81              : 
      82              :     void* stream_ = nullptr;
      83              :     vtable const* vt_ = nullptr;
      84              :     void* cached_frame_ = nullptr;
      85              :     std::size_t cached_size_ = 0;
      86              :     void* storage_ = nullptr;
      87              : 
      88              :     template<WriteStream S>
      89              :     static coro
      90              :     do_write_impl(
      91              :         void* stream,
      92              :         any_write_stream* wrapper,
      93              :         std::span<const_buffer const> buffers,
      94              :         coro h,
      95              :         executor_ref ex,
      96              :         std::stop_token token,
      97              :         std::error_code* ec,
      98              :         std::size_t* n);
      99              : 
     100              :     template<WriteStream S>
     101              :     static write_op
     102              :     write_coro(
     103              :         any_write_stream* wrapper,
     104              :         S& stream,
     105              :         std::span<const_buffer const> bufs,
     106              :         std::error_code* out_ec,
     107              :         std::size_t* out_n);
     108              : 
     109              :     void* alloc_frame(std::size_t size);
     110              :     void free_frame(void* p, std::size_t size);
     111              : 
     112              : public:
     113              :     /** Destructor.
     114              : 
     115              :         Destroys the owned stream (if any) and releases the cached
     116              :         coroutine frame.
     117              :     */
     118              :     ~any_write_stream();
     119              : 
     120              :     /** Default constructor.
     121              : 
     122              :         Constructs an empty wrapper. Operations on a default-constructed
     123              :         wrapper result in undefined behavior.
     124              :     */
     125            1 :     any_write_stream() = default;
     126              : 
     127              :     /** Non-copyable.
     128              : 
     129              :         The frame cache is per-instance and cannot be shared.
     130              :     */
     131              :     any_write_stream(any_write_stream const&) = delete;
     132              :     any_write_stream& operator=(any_write_stream const&) = delete;
     133              : 
     134              :     /** Move constructor.
     135              : 
     136              :         Transfers ownership of the wrapped stream (if owned) and
     137              :         cached frame from `other`. After the move, `other` is
     138              :         in a default-constructed state.
     139              : 
     140              :         @param other The wrapper to move from.
     141              :     */
     142            2 :     any_write_stream(any_write_stream&& other) noexcept
     143            2 :         : stream_(std::exchange(other.stream_, nullptr))
     144            2 :         , vt_(std::exchange(other.vt_, nullptr))
     145            2 :         , cached_frame_(std::exchange(other.cached_frame_, nullptr))
     146            2 :         , cached_size_(std::exchange(other.cached_size_, 0))
     147            2 :         , storage_(std::exchange(other.storage_, nullptr))
     148              :     {
     149            2 :     }
     150              : 
     151              :     /** Move assignment operator.
     152              : 
     153              :         Destroys any owned stream and releases existing resources,
     154              :         then transfers ownership from `other`.
     155              : 
     156              :         @param other The wrapper to move from.
     157              :         @return Reference to this wrapper.
     158              :     */
     159              :     any_write_stream&
     160              :     operator=(any_write_stream&& other) noexcept;
     161              : 
     162              :     /** Construct by taking ownership of a WriteStream.
     163              : 
     164              :         Allocates storage and moves the stream into this wrapper.
     165              :         The wrapper owns the stream and will destroy it.
     166              : 
     167              :         @param s The stream to take ownership of.
     168              :     */
     169              :     template<WriteStream S>
     170              :         requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     171              :     any_write_stream(S s);
     172              : 
     173              :     /** Construct by wrapping a WriteStream without ownership.
     174              : 
     175              :         Wraps the given stream by pointer. The stream must remain
     176              :         valid for the lifetime of this wrapper.
     177              : 
     178              :         @param s Pointer to the stream to wrap.
     179              :     */
     180              :     template<WriteStream S>
     181           65 :     any_write_stream(S* s) noexcept
     182           65 :         : stream_(s)
     183           65 :         , vt_(&vtable_for_impl<S>::value)
     184              :     {
     185              :         // Preallocate the coroutine frame
     186           65 :         write_coro<S>(this, *s, {}, nullptr, nullptr);
     187           65 :     }
     188              :     /** Check if the wrapper contains a valid stream.
     189              : 
     190              :         @return `true` if wrapping a stream, `false` if default-constructed
     191              :             or moved-from.
     192              :     */
     193              :     bool
     194           15 :     has_value() const noexcept
     195              :     {
     196           15 :         return stream_ != nullptr;
     197              :     }
     198              : 
     199              :     /** Check if the wrapper contains a valid stream.
     200              : 
     201              :         @return `true` if wrapping a stream, `false` if default-constructed
     202              :             or moved-from.
     203              :     */
     204              :     explicit
     205            2 :     operator bool() const noexcept
     206              :     {
     207            2 :         return has_value();
     208              :     }
     209              : 
     210              :     /** Initiate an asynchronous write operation.
     211              : 
     212              :         Writes data from the provided buffer sequence. The operation
     213              :         completes when at least one byte has been written, or an error
     214              :         occurs.
     215              : 
     216              :         @param buffers The buffer sequence containing data to write.
     217              :             Passed by value to ensure the sequence lives in the
     218              :             coroutine frame across suspension points.
     219              : 
     220              :         @return An awaitable yielding `(error_code,std::size_t)`.
     221              : 
     222              :         @par Preconditions
     223              :         The wrapper must contain a valid stream (`has_value() == true`).
     224              :     */
     225              :     template<ConstBufferSequence CB>
     226              :     auto
     227              :     write_some(CB buffers);
     228              : 
     229              : protected:
     230              :     /** Rebind to a new stream after move.
     231              : 
     232              :         Updates the internal pointer to reference a new stream object.
     233              :         Used by owning wrappers after move assignment when the owned
     234              :         object has moved to a new location.
     235              : 
     236              :         @param new_stream The new stream to bind to. Must be the same
     237              :             type as the original stream.
     238              : 
     239              :         @note Terminates if called with a stream of different type
     240              :             than the original.
     241              :     */
     242              :     template<WriteStream S>
     243              :     void
     244              :     rebind(S& new_stream) noexcept
     245              :     {
     246              :         if(vt_ != &vtable_for_impl<S>::value)
     247              :             std::terminate();
     248              :         stream_ = &new_stream;
     249              :     }
     250              : };
     251              : 
     252              : //----------------------------------------------------------
     253              : 
     254              : struct any_write_stream::vtable
     255              : {
     256              :     void (*destroy)(void*) noexcept;
     257              : 
     258              :     coro (*do_write)(
     259              :         void* stream,
     260              :         any_write_stream* wrapper,
     261              :         std::span<const_buffer const> buffers,
     262              :         coro h,
     263              :         executor_ref ex,
     264              :         std::stop_token token,
     265              :         std::error_code* ec,
     266              :         std::size_t* n);
     267              : };
     268              : 
     269              : template<WriteStream S>
     270              : struct any_write_stream::vtable_for_impl
     271              : {
     272              :     static void
     273            0 :     do_destroy_impl(void* stream) noexcept
     274              :     {
     275            0 :         static_cast<S*>(stream)->~S();
     276            0 :     }
     277              : 
     278              :     static constexpr vtable value = {
     279              :         &do_destroy_impl,
     280              :         &any_write_stream::do_write_impl<S>
     281              :     };
     282              : };
     283              : 
     284              : //----------------------------------------------------------
     285              : 
     286              : inline
     287           72 : any_write_stream::~any_write_stream()
     288              : {
     289           72 :     if(storage_)
     290              :     {
     291            0 :         vt_->destroy(stream_);
     292            0 :         ::operator delete(storage_);
     293              :     }
     294           72 :     if(cached_frame_)
     295           65 :         ::operator delete(cached_frame_);
     296           72 : }
     297              : 
     298              : inline any_write_stream&
     299            3 : any_write_stream::operator=(any_write_stream&& other) noexcept
     300              : {
     301            3 :     if(this != &other)
     302              :     {
     303            3 :         if(storage_)
     304              :         {
     305            0 :             vt_->destroy(stream_);
     306            0 :             ::operator delete(storage_);
     307              :         }
     308            3 :         if(cached_frame_)
     309            0 :             ::operator delete(cached_frame_);
     310            3 :         stream_ = std::exchange(other.stream_, nullptr);
     311            3 :         vt_ = std::exchange(other.vt_, nullptr);
     312            3 :         cached_frame_ = std::exchange(other.cached_frame_, nullptr);
     313            3 :         cached_size_ = std::exchange(other.cached_size_, 0);
     314            3 :         storage_ = std::exchange(other.storage_, nullptr);
     315              :     }
     316            3 :     return *this;
     317              : }
     318              : 
     319              : template<WriteStream S>
     320              :     requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     321              : any_write_stream::any_write_stream(S s)
     322              :     : vt_(&vtable_for_impl<S>::value)
     323              : {
     324              :     struct guard {
     325              :         any_write_stream* self;
     326              :         bool committed = false;
     327              :         ~guard() {
     328              :             if(!committed && self->storage_) {
     329              :                 self->vt_->destroy(self->stream_);
     330              :                 ::operator delete(self->storage_);
     331              :                 self->storage_ = nullptr;
     332              :                 self->stream_ = nullptr;
     333              :             }
     334              :         }
     335              :     } g{this};
     336              : 
     337              :     storage_ = ::operator new(sizeof(S));
     338              :     stream_ = ::new(storage_) S(std::move(s));
     339              : 
     340              :     // Preallocate the coroutine frame
     341              :     auto& ref = *static_cast<S*>(stream_);
     342              :     write_coro<S>(this, ref, {}, nullptr, nullptr);
     343              : 
     344              :     g.committed = true;
     345              : }
     346              : 
     347              : //----------------------------------------------------------
     348              : 
     349              : struct any_write_stream::write_op
     350              : {
     351              :     struct promise_type
     352              :     {
     353              :         executor_ref executor_;
     354              :         std::stop_token stop_token_;
     355              :         coro caller_h_{};
     356              : 
     357          125 :         promise_type() = default;
     358              : 
     359              :         write_op
     360          125 :         get_return_object() noexcept
     361              :         {
     362              :             return write_op{
     363          125 :                 std::coroutine_handle<promise_type>::from_promise(*this)};
     364              :         }
     365              : 
     366              :         std::suspend_always
     367          125 :         initial_suspend() noexcept
     368              :         {
     369          125 :             return {};
     370              :         }
     371              : 
     372              :         auto
     373           43 :         final_suspend() noexcept
     374              :         {
     375              :             struct awaiter
     376              :             {
     377              :                 promise_type* p_;
     378              : 
     379           43 :                 bool await_ready() const noexcept { return false; }
     380              : 
     381           43 :                 coro await_suspend(coro) const noexcept
     382              :                 {
     383           43 :                     if(p_->caller_h_)
     384            0 :                         return p_->caller_h_;
     385           43 :                     return std::noop_coroutine();
     386              :                 }
     387              : 
     388            0 :                 void await_resume() const noexcept {}
     389              :             };
     390           43 :             return awaiter{this};
     391              :         }
     392              : 
     393              :         void
     394           43 :         return_void() noexcept
     395              :         {
     396           43 :         }
     397              : 
     398              :         void
     399           17 :         unhandled_exception()
     400              :         {
     401              :             // Store exception for later propagation
     402              :             // For now, just rethrow to let outer handler catch it
     403           17 :             throw;
     404              :         }
     405              : 
     406              :         template<class... Args>
     407              :         static void*
     408          125 :         operator new(
     409              :             std::size_t size,
     410              :             any_write_stream* wrapper,
     411              :             Args&&...)
     412              :         {
     413          125 :             return wrapper->alloc_frame(size);
     414              :         }
     415              : 
     416              :         template<class... Args>
     417              :         static void
     418              :         operator delete(void*, any_write_stream*, Args&&...) noexcept
     419              :         {
     420              :         }
     421              : 
     422              :         static void
     423          125 :         operator delete(void*, std::size_t) noexcept
     424              :         {
     425          125 :         }
     426              : 
     427              :         void
     428           60 :         set_executor(executor_ref ex) noexcept
     429              :         {
     430           60 :             executor_ = ex;
     431           60 :         }
     432              : 
     433              :         void
     434           60 :         set_stop_token(std::stop_token token) noexcept
     435              :         {
     436           60 :             stop_token_ = token;
     437           60 :         }
     438              : 
     439              :         void
     440            0 :         set_caller(coro h) noexcept
     441              :         {
     442            0 :             caller_h_ = h;
     443            0 :         }
     444              : 
     445              :         template<class Awaitable>
     446              :         struct transform_awaiter
     447              :         {
     448              :             std::decay_t<Awaitable> a_;
     449              :             promise_type* p_;
     450              : 
     451           60 :             bool await_ready()
     452              :             {
     453           60 :                 return a_.await_ready();
     454              :             }
     455              : 
     456           60 :             decltype(auto) await_resume()
     457              :             {
     458           60 :                 return a_.await_resume();
     459              :             }
     460              : 
     461            0 :             auto await_suspend(coro h)
     462              :             {
     463            0 :                 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
     464              :             }
     465              :         };
     466              : 
     467              :         template<class Awaitable>
     468           60 :         auto await_transform(Awaitable&& a)
     469              :         {
     470              :             using A = std::decay_t<Awaitable>;
     471              :             if constexpr (IoAwaitable<A>)
     472              :             {
     473              :                 return transform_awaiter<Awaitable>{
     474           60 :                     std::forward<Awaitable>(a), this};
     475              :             }
     476              :             else
     477              :             {
     478              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     479              :             }
     480              :         }
     481              :     };
     482              : 
     483              :     std::coroutine_handle<promise_type> h_;
     484              : 
     485          125 :     ~write_op()
     486              :     {
     487          125 :         if(h_)
     488           82 :             h_.destroy();
     489          125 :     }
     490              : 
     491              :     write_op(write_op const&) = delete;
     492              :     write_op& operator=(write_op const&) = delete;
     493              : 
     494              :     write_op(write_op&& other) noexcept
     495              :         : h_(std::exchange(other.h_, nullptr))
     496              :     {
     497              :     }
     498              : 
     499              :     write_op& operator=(write_op&& other) noexcept
     500              :     {
     501              :         if(this != &other)
     502              :         {
     503              :             if(h_)
     504              :                 h_.destroy();
     505              :             h_ = std::exchange(other.h_, nullptr);
     506              :         }
     507              :         return *this;
     508              :     }
     509              : 
     510              : private:
     511              :     explicit
     512          125 :     write_op(std::coroutine_handle<promise_type> h) noexcept
     513          125 :         : h_(h)
     514              :     {
     515          125 :     }
     516              : };
     517              : 
     518              : //----------------------------------------------------------
     519              : 
     520              : inline void*
     521          125 : any_write_stream::alloc_frame(std::size_t size)
     522              : {
     523          125 :     if(cached_frame_ && cached_size_ >= size)
     524           60 :         return cached_frame_;
     525              : 
     526           65 :     if(cached_frame_)
     527            0 :         ::operator delete(cached_frame_);
     528              : 
     529           65 :     cached_frame_ = ::operator new(size);
     530           65 :     cached_size_ = size;
     531           65 :     return cached_frame_;
     532              : }
     533              : 
     534              : inline void
     535              : any_write_stream::free_frame(void*, std::size_t)
     536              : {
     537              :     // Keep the frame cached for reuse
     538              : }
     539              : 
     540              : #if defined(__GNUC__) && !defined(__clang__)
     541              : #pragma GCC diagnostic push
     542              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     543              : #endif
     544              : 
     545              : template<WriteStream S>
     546              : any_write_stream::write_op
     547          125 : any_write_stream::write_coro(
     548              :     any_write_stream*,
     549              :     S& stream,
     550              :     std::span<const_buffer const> bufs,
     551              :     std::error_code* out_ec,
     552              :     std::size_t* out_n)
     553              : {
     554              :     auto [err, bytes] = co_await stream.write_some(bufs);
     555              : 
     556              :     *out_ec = err;
     557              :     *out_n = bytes;
     558          250 : }
     559              : 
     560              : #if defined(__GNUC__) && !defined(__clang__)
     561              : #pragma GCC diagnostic pop
     562              : #endif
     563              : 
     564              : template<WriteStream S>
     565              : coro
     566           60 : any_write_stream::do_write_impl(
     567              :     void* stream,
     568              :     any_write_stream* wrapper,
     569              :     std::span<const_buffer const> buffers,
     570              :     coro h,
     571              :     executor_ref ex,
     572              :     std::stop_token token,
     573              :     std::error_code* ec,
     574              :     std::size_t* n)
     575              : {
     576           60 :     auto& s = *static_cast<S*>(stream);
     577              : 
     578              :     // Create coroutine - frame is cached in wrapper
     579           60 :     auto op = write_coro<S>(wrapper, s, buffers, ec, n);
     580              : 
     581              :     // Set executor and stop token on promise before resuming
     582           60 :     op.h_.promise().set_executor(ex);
     583           60 :     op.h_.promise().set_stop_token(token);
     584              : 
     585              :     // Resume the coroutine to start the operation
     586           60 :     op.h_.resume();
     587              : 
     588              :     // Check if operation completed synchronously
     589           43 :     if(op.h_.done())
     590              :     {
     591           43 :         op.h_.destroy();
     592           43 :         op.h_ = nullptr;
     593              :         // Return caller's handle via executor dispatch
     594           43 :         return ex.dispatch(h);
     595              :     }
     596              : 
     597              :     // Operation is pending - caller will be resumed via symmetric transfer
     598            0 :     op.h_.promise().set_caller(h);
     599            0 :     op.h_ = nullptr;
     600            0 :     return std::noop_coroutine();
     601           60 : }
     602              : 
     603              : //----------------------------------------------------------
     604              : 
     605              : template<ConstBufferSequence CB>
     606              : auto
     607           60 : any_write_stream::write_some(CB buffers)
     608              : {
     609              :     struct awaitable
     610              :     {
     611              :         any_write_stream* self_;
     612              :         buffer_param<CB> bp_;
     613              :         std::error_code ec_;
     614              :         std::size_t n_ = 0;
     615              : 
     616              :         bool
     617           60 :         await_ready() const noexcept
     618              :         {
     619           60 :             return false;
     620              :         }
     621              : 
     622              :         coro
     623           60 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     624              :         {
     625          180 :             return self_->vt_->do_write(
     626           60 :                 self_->stream_,
     627              :                 self_,
     628          120 :                 bp_.data(),
     629              :                 h,
     630              :                 ex,
     631              :                 token,
     632              :                 &ec_,
     633           86 :                 &n_);
     634              :         }
     635              : 
     636              :         io_result<std::size_t>
     637           43 :         await_resume() const noexcept
     638              :         {
     639           43 :             return {ec_, n_};
     640              :         }
     641              :     };
     642           60 :     return awaitable{this, buffer_param<CB>(buffers), {}, 0};
     643              : }
     644              : 
     645              : } // namespace capy
     646              : } // namespace boost
     647              : 
     648              : #endif
        

Generated by: LCOV version 2.3