Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_param.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <boost/capy/concept/write_stream.hpp>
18 : #include <boost/capy/coro.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/io_result.hpp>
21 :
22 : #include <system_error>
23 :
24 : #include <concepts>
25 : #include <coroutine>
26 : #include <cstddef>
27 : #include <exception>
28 : #include <span>
29 : #include <stop_token>
30 : #include <utility>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : /** Type-erased wrapper for any WriteStream.
36 :
37 : This class provides type erasure for any type satisfying the
38 : @ref WriteStream concept, enabling runtime polymorphism for
39 : write operations. It uses a cached coroutine frame to achieve
40 : zero steady-state allocation after construction.
41 :
42 : The wrapper supports two construction modes:
43 : - **Owning**: Pass by value to transfer ownership. The wrapper
44 : allocates storage and owns the stream.
45 : - **Reference**: Pass a pointer to wrap without ownership. The
46 : pointed-to stream must outlive this wrapper.
47 :
48 : @par Frame Preallocation
49 : The constructor preallocates the internal coroutine frame.
50 : This reserves all virtual address space at server startup
51 : so memory usage can be measured up front, rather than
52 : allocating piecemeal as traffic arrives.
53 :
54 : @par Thread Safety
55 : Not thread-safe. Concurrent operations on the same wrapper
56 : are undefined behavior.
57 :
58 : @par Example
59 : @code
60 : // Owning - takes ownership of the stream
61 : any_write_stream stream(socket{ioc});
62 :
63 : // Reference - wraps without ownership
64 : socket sock(ioc);
65 : any_write_stream stream(&sock);
66 :
67 : const_buffer buf(data, size);
68 : auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
69 : @endcode
70 :
71 : @see any_read_stream, any_stream, WriteStream
72 : */
73 : class any_write_stream
74 : {
75 : struct vtable;
76 :
77 : template<WriteStream S>
78 : struct vtable_for_impl;
79 :
80 : struct write_op;
81 :
82 : void* stream_ = nullptr;
83 : vtable const* vt_ = nullptr;
84 : void* cached_frame_ = nullptr;
85 : std::size_t cached_size_ = 0;
86 : void* storage_ = nullptr;
87 :
88 : template<WriteStream S>
89 : static coro
90 : do_write_impl(
91 : void* stream,
92 : any_write_stream* wrapper,
93 : std::span<const_buffer const> buffers,
94 : coro h,
95 : executor_ref ex,
96 : std::stop_token token,
97 : std::error_code* ec,
98 : std::size_t* n);
99 :
100 : template<WriteStream S>
101 : static write_op
102 : write_coro(
103 : any_write_stream* wrapper,
104 : S& stream,
105 : std::span<const_buffer const> bufs,
106 : std::error_code* out_ec,
107 : std::size_t* out_n);
108 :
109 : void* alloc_frame(std::size_t size);
110 : void free_frame(void* p, std::size_t size);
111 :
112 : public:
113 : /** Destructor.
114 :
115 : Destroys the owned stream (if any) and releases the cached
116 : coroutine frame.
117 : */
118 : ~any_write_stream();
119 :
120 : /** Default constructor.
121 :
122 : Constructs an empty wrapper. Operations on a default-constructed
123 : wrapper result in undefined behavior.
124 : */
125 1 : any_write_stream() = default;
126 :
127 : /** Non-copyable.
128 :
129 : The frame cache is per-instance and cannot be shared.
130 : */
131 : any_write_stream(any_write_stream const&) = delete;
132 : any_write_stream& operator=(any_write_stream const&) = delete;
133 :
134 : /** Move constructor.
135 :
136 : Transfers ownership of the wrapped stream (if owned) and
137 : cached frame from `other`. After the move, `other` is
138 : in a default-constructed state.
139 :
140 : @param other The wrapper to move from.
141 : */
142 2 : any_write_stream(any_write_stream&& other) noexcept
143 2 : : stream_(std::exchange(other.stream_, nullptr))
144 2 : , vt_(std::exchange(other.vt_, nullptr))
145 2 : , cached_frame_(std::exchange(other.cached_frame_, nullptr))
146 2 : , cached_size_(std::exchange(other.cached_size_, 0))
147 2 : , storage_(std::exchange(other.storage_, nullptr))
148 : {
149 2 : }
150 :
151 : /** Move assignment operator.
152 :
153 : Destroys any owned stream and releases existing resources,
154 : then transfers ownership from `other`.
155 :
156 : @param other The wrapper to move from.
157 : @return Reference to this wrapper.
158 : */
159 : any_write_stream&
160 : operator=(any_write_stream&& other) noexcept;
161 :
162 : /** Construct by taking ownership of a WriteStream.
163 :
164 : Allocates storage and moves the stream into this wrapper.
165 : The wrapper owns the stream and will destroy it.
166 :
167 : @param s The stream to take ownership of.
168 : */
169 : template<WriteStream S>
170 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
171 : any_write_stream(S s);
172 :
173 : /** Construct by wrapping a WriteStream without ownership.
174 :
175 : Wraps the given stream by pointer. The stream must remain
176 : valid for the lifetime of this wrapper.
177 :
178 : @param s Pointer to the stream to wrap.
179 : */
180 : template<WriteStream S>
181 65 : any_write_stream(S* s) noexcept
182 65 : : stream_(s)
183 65 : , vt_(&vtable_for_impl<S>::value)
184 : {
185 : // Preallocate the coroutine frame
186 65 : write_coro<S>(this, *s, {}, nullptr, nullptr);
187 65 : }
188 : /** Check if the wrapper contains a valid stream.
189 :
190 : @return `true` if wrapping a stream, `false` if default-constructed
191 : or moved-from.
192 : */
193 : bool
194 15 : has_value() const noexcept
195 : {
196 15 : return stream_ != nullptr;
197 : }
198 :
199 : /** Check if the wrapper contains a valid stream.
200 :
201 : @return `true` if wrapping a stream, `false` if default-constructed
202 : or moved-from.
203 : */
204 : explicit
205 2 : operator bool() const noexcept
206 : {
207 2 : return has_value();
208 : }
209 :
210 : /** Initiate an asynchronous write operation.
211 :
212 : Writes data from the provided buffer sequence. The operation
213 : completes when at least one byte has been written, or an error
214 : occurs.
215 :
216 : @param buffers The buffer sequence containing data to write.
217 : Passed by value to ensure the sequence lives in the
218 : coroutine frame across suspension points.
219 :
220 : @return An awaitable yielding `(error_code,std::size_t)`.
221 :
222 : @par Preconditions
223 : The wrapper must contain a valid stream (`has_value() == true`).
224 : */
225 : template<ConstBufferSequence CB>
226 : auto
227 : write_some(CB buffers);
228 :
229 : protected:
230 : /** Rebind to a new stream after move.
231 :
232 : Updates the internal pointer to reference a new stream object.
233 : Used by owning wrappers after move assignment when the owned
234 : object has moved to a new location.
235 :
236 : @param new_stream The new stream to bind to. Must be the same
237 : type as the original stream.
238 :
239 : @note Terminates if called with a stream of different type
240 : than the original.
241 : */
242 : template<WriteStream S>
243 : void
244 : rebind(S& new_stream) noexcept
245 : {
246 : if(vt_ != &vtable_for_impl<S>::value)
247 : std::terminate();
248 : stream_ = &new_stream;
249 : }
250 : };
251 :
252 : //----------------------------------------------------------
253 :
254 : struct any_write_stream::vtable
255 : {
256 : void (*destroy)(void*) noexcept;
257 :
258 : coro (*do_write)(
259 : void* stream,
260 : any_write_stream* wrapper,
261 : std::span<const_buffer const> buffers,
262 : coro h,
263 : executor_ref ex,
264 : std::stop_token token,
265 : std::error_code* ec,
266 : std::size_t* n);
267 : };
268 :
269 : template<WriteStream S>
270 : struct any_write_stream::vtable_for_impl
271 : {
272 : static void
273 0 : do_destroy_impl(void* stream) noexcept
274 : {
275 0 : static_cast<S*>(stream)->~S();
276 0 : }
277 :
278 : static constexpr vtable value = {
279 : &do_destroy_impl,
280 : &any_write_stream::do_write_impl<S>
281 : };
282 : };
283 :
284 : //----------------------------------------------------------
285 :
286 : inline
287 72 : any_write_stream::~any_write_stream()
288 : {
289 72 : if(storage_)
290 : {
291 0 : vt_->destroy(stream_);
292 0 : ::operator delete(storage_);
293 : }
294 72 : if(cached_frame_)
295 65 : ::operator delete(cached_frame_);
296 72 : }
297 :
298 : inline any_write_stream&
299 3 : any_write_stream::operator=(any_write_stream&& other) noexcept
300 : {
301 3 : if(this != &other)
302 : {
303 3 : if(storage_)
304 : {
305 0 : vt_->destroy(stream_);
306 0 : ::operator delete(storage_);
307 : }
308 3 : if(cached_frame_)
309 0 : ::operator delete(cached_frame_);
310 3 : stream_ = std::exchange(other.stream_, nullptr);
311 3 : vt_ = std::exchange(other.vt_, nullptr);
312 3 : cached_frame_ = std::exchange(other.cached_frame_, nullptr);
313 3 : cached_size_ = std::exchange(other.cached_size_, 0);
314 3 : storage_ = std::exchange(other.storage_, nullptr);
315 : }
316 3 : return *this;
317 : }
318 :
319 : template<WriteStream S>
320 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
321 : any_write_stream::any_write_stream(S s)
322 : : vt_(&vtable_for_impl<S>::value)
323 : {
324 : struct guard {
325 : any_write_stream* self;
326 : bool committed = false;
327 : ~guard() {
328 : if(!committed && self->storage_) {
329 : self->vt_->destroy(self->stream_);
330 : ::operator delete(self->storage_);
331 : self->storage_ = nullptr;
332 : self->stream_ = nullptr;
333 : }
334 : }
335 : } g{this};
336 :
337 : storage_ = ::operator new(sizeof(S));
338 : stream_ = ::new(storage_) S(std::move(s));
339 :
340 : // Preallocate the coroutine frame
341 : auto& ref = *static_cast<S*>(stream_);
342 : write_coro<S>(this, ref, {}, nullptr, nullptr);
343 :
344 : g.committed = true;
345 : }
346 :
347 : //----------------------------------------------------------
348 :
349 : struct any_write_stream::write_op
350 : {
351 : struct promise_type
352 : {
353 : executor_ref executor_;
354 : std::stop_token stop_token_;
355 : coro caller_h_{};
356 :
357 125 : promise_type() = default;
358 :
359 : write_op
360 125 : get_return_object() noexcept
361 : {
362 : return write_op{
363 125 : std::coroutine_handle<promise_type>::from_promise(*this)};
364 : }
365 :
366 : std::suspend_always
367 125 : initial_suspend() noexcept
368 : {
369 125 : return {};
370 : }
371 :
372 : auto
373 43 : final_suspend() noexcept
374 : {
375 : struct awaiter
376 : {
377 : promise_type* p_;
378 :
379 43 : bool await_ready() const noexcept { return false; }
380 :
381 43 : coro await_suspend(coro) const noexcept
382 : {
383 43 : if(p_->caller_h_)
384 0 : return p_->caller_h_;
385 43 : return std::noop_coroutine();
386 : }
387 :
388 0 : void await_resume() const noexcept {}
389 : };
390 43 : return awaiter{this};
391 : }
392 :
393 : void
394 43 : return_void() noexcept
395 : {
396 43 : }
397 :
398 : void
399 17 : unhandled_exception()
400 : {
401 : // Store exception for later propagation
402 : // For now, just rethrow to let outer handler catch it
403 17 : throw;
404 : }
405 :
406 : template<class... Args>
407 : static void*
408 125 : operator new(
409 : std::size_t size,
410 : any_write_stream* wrapper,
411 : Args&&...)
412 : {
413 125 : return wrapper->alloc_frame(size);
414 : }
415 :
416 : template<class... Args>
417 : static void
418 : operator delete(void*, any_write_stream*, Args&&...) noexcept
419 : {
420 : }
421 :
422 : static void
423 125 : operator delete(void*, std::size_t) noexcept
424 : {
425 125 : }
426 :
427 : void
428 60 : set_executor(executor_ref ex) noexcept
429 : {
430 60 : executor_ = ex;
431 60 : }
432 :
433 : void
434 60 : set_stop_token(std::stop_token token) noexcept
435 : {
436 60 : stop_token_ = token;
437 60 : }
438 :
439 : void
440 0 : set_caller(coro h) noexcept
441 : {
442 0 : caller_h_ = h;
443 0 : }
444 :
445 : template<class Awaitable>
446 : struct transform_awaiter
447 : {
448 : std::decay_t<Awaitable> a_;
449 : promise_type* p_;
450 :
451 60 : bool await_ready()
452 : {
453 60 : return a_.await_ready();
454 : }
455 :
456 60 : decltype(auto) await_resume()
457 : {
458 60 : return a_.await_resume();
459 : }
460 :
461 0 : auto await_suspend(coro h)
462 : {
463 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
464 : }
465 : };
466 :
467 : template<class Awaitable>
468 60 : auto await_transform(Awaitable&& a)
469 : {
470 : using A = std::decay_t<Awaitable>;
471 : if constexpr (IoAwaitable<A>)
472 : {
473 : return transform_awaiter<Awaitable>{
474 60 : std::forward<Awaitable>(a), this};
475 : }
476 : else
477 : {
478 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
479 : }
480 : }
481 : };
482 :
483 : std::coroutine_handle<promise_type> h_;
484 :
485 125 : ~write_op()
486 : {
487 125 : if(h_)
488 82 : h_.destroy();
489 125 : }
490 :
491 : write_op(write_op const&) = delete;
492 : write_op& operator=(write_op const&) = delete;
493 :
494 : write_op(write_op&& other) noexcept
495 : : h_(std::exchange(other.h_, nullptr))
496 : {
497 : }
498 :
499 : write_op& operator=(write_op&& other) noexcept
500 : {
501 : if(this != &other)
502 : {
503 : if(h_)
504 : h_.destroy();
505 : h_ = std::exchange(other.h_, nullptr);
506 : }
507 : return *this;
508 : }
509 :
510 : private:
511 : explicit
512 125 : write_op(std::coroutine_handle<promise_type> h) noexcept
513 125 : : h_(h)
514 : {
515 125 : }
516 : };
517 :
518 : //----------------------------------------------------------
519 :
520 : inline void*
521 125 : any_write_stream::alloc_frame(std::size_t size)
522 : {
523 125 : if(cached_frame_ && cached_size_ >= size)
524 60 : return cached_frame_;
525 :
526 65 : if(cached_frame_)
527 0 : ::operator delete(cached_frame_);
528 :
529 65 : cached_frame_ = ::operator new(size);
530 65 : cached_size_ = size;
531 65 : return cached_frame_;
532 : }
533 :
534 : inline void
535 : any_write_stream::free_frame(void*, std::size_t)
536 : {
537 : // Keep the frame cached for reuse
538 : }
539 :
540 : #if defined(__GNUC__) && !defined(__clang__)
541 : #pragma GCC diagnostic push
542 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
543 : #endif
544 :
545 : template<WriteStream S>
546 : any_write_stream::write_op
547 125 : any_write_stream::write_coro(
548 : any_write_stream*,
549 : S& stream,
550 : std::span<const_buffer const> bufs,
551 : std::error_code* out_ec,
552 : std::size_t* out_n)
553 : {
554 : auto [err, bytes] = co_await stream.write_some(bufs);
555 :
556 : *out_ec = err;
557 : *out_n = bytes;
558 250 : }
559 :
560 : #if defined(__GNUC__) && !defined(__clang__)
561 : #pragma GCC diagnostic pop
562 : #endif
563 :
564 : template<WriteStream S>
565 : coro
566 60 : any_write_stream::do_write_impl(
567 : void* stream,
568 : any_write_stream* wrapper,
569 : std::span<const_buffer const> buffers,
570 : coro h,
571 : executor_ref ex,
572 : std::stop_token token,
573 : std::error_code* ec,
574 : std::size_t* n)
575 : {
576 60 : auto& s = *static_cast<S*>(stream);
577 :
578 : // Create coroutine - frame is cached in wrapper
579 60 : auto op = write_coro<S>(wrapper, s, buffers, ec, n);
580 :
581 : // Set executor and stop token on promise before resuming
582 60 : op.h_.promise().set_executor(ex);
583 60 : op.h_.promise().set_stop_token(token);
584 :
585 : // Resume the coroutine to start the operation
586 60 : op.h_.resume();
587 :
588 : // Check if operation completed synchronously
589 43 : if(op.h_.done())
590 : {
591 43 : op.h_.destroy();
592 43 : op.h_ = nullptr;
593 : // Return caller's handle via executor dispatch
594 43 : return ex.dispatch(h);
595 : }
596 :
597 : // Operation is pending - caller will be resumed via symmetric transfer
598 0 : op.h_.promise().set_caller(h);
599 0 : op.h_ = nullptr;
600 0 : return std::noop_coroutine();
601 60 : }
602 :
603 : //----------------------------------------------------------
604 :
605 : template<ConstBufferSequence CB>
606 : auto
607 60 : any_write_stream::write_some(CB buffers)
608 : {
609 : struct awaitable
610 : {
611 : any_write_stream* self_;
612 : buffer_param<CB> bp_;
613 : std::error_code ec_;
614 : std::size_t n_ = 0;
615 :
616 : bool
617 60 : await_ready() const noexcept
618 : {
619 60 : return false;
620 : }
621 :
622 : coro
623 60 : await_suspend(coro h, executor_ref ex, std::stop_token token)
624 : {
625 180 : return self_->vt_->do_write(
626 60 : self_->stream_,
627 : self_,
628 120 : bp_.data(),
629 : h,
630 : ex,
631 : token,
632 : &ec_,
633 86 : &n_);
634 : }
635 :
636 : io_result<std::size_t>
637 43 : await_resume() const noexcept
638 : {
639 43 : return {ec_, n_};
640 : }
641 : };
642 60 : return awaitable{this, buffer_param<CB>(buffers), {}, 0};
643 : }
644 :
645 : } // namespace capy
646 : } // namespace boost
647 :
648 : #endif
|