libs/capy/include/boost/capy/io/any_write_stream.hpp

84.2% Lines (101/120) 86.4% Functions (38/44) 72.4% Branches (21/29)
libs/capy/include/boost/capy/io/any_write_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/buffers/buffer_param.hpp>
16 #include <boost/capy/concept/io_awaitable.hpp>
17 #include <boost/capy/concept/write_stream.hpp>
18 #include <boost/capy/coro.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/io_result.hpp>
21
22 #include <system_error>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <exception>
28 #include <span>
29 #include <stop_token>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any WriteStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref WriteStream concept, enabling runtime polymorphism for
39 write operations. It uses a cached coroutine frame to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Frame Preallocation
49 The constructor preallocates the internal coroutine frame.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Thread Safety
55 Not thread-safe. Concurrent operations on the same wrapper
56 are undefined behavior.
57
58 @par Example
59 @code
60 // Owning - takes ownership of the stream
61 any_write_stream stream(socket{ioc});
62
63 // Reference - wraps without ownership
64 socket sock(ioc);
65 any_write_stream stream(&sock);
66
67 const_buffer buf(data, size);
68 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
69 @endcode
70
71 @see any_read_stream, any_stream, WriteStream
72 */
73 class any_write_stream
74 {
75 struct vtable;
76
77 template<WriteStream S>
78 struct vtable_for_impl;
79
80 struct write_op;
81
82 void* stream_ = nullptr;
83 vtable const* vt_ = nullptr;
84 void* cached_frame_ = nullptr;
85 std::size_t cached_size_ = 0;
86 void* storage_ = nullptr;
87
88 template<WriteStream S>
89 static coro
90 do_write_impl(
91 void* stream,
92 any_write_stream* wrapper,
93 std::span<const_buffer const> buffers,
94 coro h,
95 executor_ref ex,
96 std::stop_token token,
97 std::error_code* ec,
98 std::size_t* n);
99
100 template<WriteStream S>
101 static write_op
102 write_coro(
103 any_write_stream* wrapper,
104 S& stream,
105 std::span<const_buffer const> bufs,
106 std::error_code* out_ec,
107 std::size_t* out_n);
108
109 void* alloc_frame(std::size_t size);
110 void free_frame(void* p, std::size_t size);
111
112 public:
113 /** Destructor.
114
115 Destroys the owned stream (if any) and releases the cached
116 coroutine frame.
117 */
118 ~any_write_stream();
119
120 /** Default constructor.
121
122 Constructs an empty wrapper. Operations on a default-constructed
123 wrapper result in undefined behavior.
124 */
125 1 any_write_stream() = default;
126
127 /** Non-copyable.
128
129 The frame cache is per-instance and cannot be shared.
130 */
131 any_write_stream(any_write_stream const&) = delete;
132 any_write_stream& operator=(any_write_stream const&) = delete;
133
134 /** Move constructor.
135
136 Transfers ownership of the wrapped stream (if owned) and
137 cached frame from `other`. After the move, `other` is
138 in a default-constructed state.
139
140 @param other The wrapper to move from.
141 */
142 2 any_write_stream(any_write_stream&& other) noexcept
143 2 : stream_(std::exchange(other.stream_, nullptr))
144 2 , vt_(std::exchange(other.vt_, nullptr))
145 2 , cached_frame_(std::exchange(other.cached_frame_, nullptr))
146 2 , cached_size_(std::exchange(other.cached_size_, 0))
147 2 , storage_(std::exchange(other.storage_, nullptr))
148 {
149 2 }
150
151 /** Move assignment operator.
152
153 Destroys any owned stream and releases existing resources,
154 then transfers ownership from `other`.
155
156 @param other The wrapper to move from.
157 @return Reference to this wrapper.
158 */
159 any_write_stream&
160 operator=(any_write_stream&& other) noexcept;
161
162 /** Construct by taking ownership of a WriteStream.
163
164 Allocates storage and moves the stream into this wrapper.
165 The wrapper owns the stream and will destroy it.
166
167 @param s The stream to take ownership of.
168 */
169 template<WriteStream S>
170 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
171 any_write_stream(S s);
172
173 /** Construct by wrapping a WriteStream without ownership.
174
175 Wraps the given stream by pointer. The stream must remain
176 valid for the lifetime of this wrapper.
177
178 @param s Pointer to the stream to wrap.
179 */
180 template<WriteStream S>
181 65 any_write_stream(S* s) noexcept
182 65 : stream_(s)
183 65 , vt_(&vtable_for_impl<S>::value)
184 {
185 // Preallocate the coroutine frame
186 65 write_coro<S>(this, *s, {}, nullptr, nullptr);
187 65 }
188 /** Check if the wrapper contains a valid stream.
189
190 @return `true` if wrapping a stream, `false` if default-constructed
191 or moved-from.
192 */
193 bool
194 15 has_value() const noexcept
195 {
196 15 return stream_ != nullptr;
197 }
198
199 /** Check if the wrapper contains a valid stream.
200
201 @return `true` if wrapping a stream, `false` if default-constructed
202 or moved-from.
203 */
204 explicit
205 2 operator bool() const noexcept
206 {
207 2 return has_value();
208 }
209
210 /** Initiate an asynchronous write operation.
211
212 Writes data from the provided buffer sequence. The operation
213 completes when at least one byte has been written, or an error
214 occurs.
215
216 @param buffers The buffer sequence containing data to write.
217 Passed by value to ensure the sequence lives in the
218 coroutine frame across suspension points.
219
220 @return An awaitable yielding `(error_code,std::size_t)`.
221
222 @par Preconditions
223 The wrapper must contain a valid stream (`has_value() == true`).
224 */
225 template<ConstBufferSequence CB>
226 auto
227 write_some(CB buffers);
228
229 protected:
230 /** Rebind to a new stream after move.
231
232 Updates the internal pointer to reference a new stream object.
233 Used by owning wrappers after move assignment when the owned
234 object has moved to a new location.
235
236 @param new_stream The new stream to bind to. Must be the same
237 type as the original stream.
238
239 @note Terminates if called with a stream of different type
240 than the original.
241 */
242 template<WriteStream S>
243 void
244 rebind(S& new_stream) noexcept
245 {
246 if(vt_ != &vtable_for_impl<S>::value)
247 std::terminate();
248 stream_ = &new_stream;
249 }
250 };
251
252 //----------------------------------------------------------
253
254 struct any_write_stream::vtable
255 {
256 void (*destroy)(void*) noexcept;
257
258 coro (*do_write)(
259 void* stream,
260 any_write_stream* wrapper,
261 std::span<const_buffer const> buffers,
262 coro h,
263 executor_ref ex,
264 std::stop_token token,
265 std::error_code* ec,
266 std::size_t* n);
267 };
268
269 template<WriteStream S>
270 struct any_write_stream::vtable_for_impl
271 {
272 static void
273 do_destroy_impl(void* stream) noexcept
274 {
275 static_cast<S*>(stream)->~S();
276 }
277
278 static constexpr vtable value = {
279 &do_destroy_impl,
280 &any_write_stream::do_write_impl<S>
281 };
282 };
283
284 //----------------------------------------------------------
285
286 inline
287 72 any_write_stream::~any_write_stream()
288 {
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 72 times.
72 if(storage_)
290 {
291 vt_->destroy(stream_);
292 ::operator delete(storage_);
293 }
294
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 7 times.
72 if(cached_frame_)
295 65 ::operator delete(cached_frame_);
296 72 }
297
298 inline any_write_stream&
299 3 any_write_stream::operator=(any_write_stream&& other) noexcept
300 {
301
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
302 {
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
304 {
305 vt_->destroy(stream_);
306 ::operator delete(storage_);
307 }
308
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_frame_)
309 ::operator delete(cached_frame_);
310 3 stream_ = std::exchange(other.stream_, nullptr);
311 3 vt_ = std::exchange(other.vt_, nullptr);
312 3 cached_frame_ = std::exchange(other.cached_frame_, nullptr);
313 3 cached_size_ = std::exchange(other.cached_size_, 0);
314 3 storage_ = std::exchange(other.storage_, nullptr);
315 }
316 3 return *this;
317 }
318
319 template<WriteStream S>
320 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
321 any_write_stream::any_write_stream(S s)
322 : vt_(&vtable_for_impl<S>::value)
323 {
324 struct guard {
325 any_write_stream* self;
326 bool committed = false;
327 ~guard() {
328 if(!committed && self->storage_) {
329 self->vt_->destroy(self->stream_);
330 ::operator delete(self->storage_);
331 self->storage_ = nullptr;
332 self->stream_ = nullptr;
333 }
334 }
335 } g{this};
336
337 storage_ = ::operator new(sizeof(S));
338 stream_ = ::new(storage_) S(std::move(s));
339
340 // Preallocate the coroutine frame
341 auto& ref = *static_cast<S*>(stream_);
342 write_coro<S>(this, ref, {}, nullptr, nullptr);
343
344 g.committed = true;
345 }
346
347 //----------------------------------------------------------
348
349 struct any_write_stream::write_op
350 {
351 struct promise_type
352 {
353 executor_ref executor_;
354 std::stop_token stop_token_;
355 coro caller_h_{};
356
357 125 promise_type() = default;
358
359 write_op
360 125 get_return_object() noexcept
361 {
362 return write_op{
363 125 std::coroutine_handle<promise_type>::from_promise(*this)};
364 }
365
366 std::suspend_always
367 125 initial_suspend() noexcept
368 {
369 125 return {};
370 }
371
372 auto
373 43 final_suspend() noexcept
374 {
375 struct awaiter
376 {
377 promise_type* p_;
378
379 43 bool await_ready() const noexcept { return false; }
380
381 43 coro await_suspend(coro) const noexcept
382 {
383
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
43 if(p_->caller_h_)
384 return p_->caller_h_;
385 43 return std::noop_coroutine();
386 }
387
388 void await_resume() const noexcept {}
389 };
390 43 return awaiter{this};
391 }
392
393 void
394 43 return_void() noexcept
395 {
396 43 }
397
398 void
399 17 unhandled_exception()
400 {
401 // Store exception for later propagation
402 // For now, just rethrow to let outer handler catch it
403 17 throw;
404 }
405
406 template<class... Args>
407 static void*
408 125 operator new(
409 std::size_t size,
410 any_write_stream* wrapper,
411 Args&&...)
412 {
413 125 return wrapper->alloc_frame(size);
414 }
415
416 template<class... Args>
417 static void
418 operator delete(void*, any_write_stream*, Args&&...) noexcept
419 {
420 }
421
422 static void
423 125 operator delete(void*, std::size_t) noexcept
424 {
425 125 }
426
427 void
428 60 set_executor(executor_ref ex) noexcept
429 {
430 60 executor_ = ex;
431 60 }
432
433 void
434 60 set_stop_token(std::stop_token token) noexcept
435 {
436 60 stop_token_ = token;
437 60 }
438
439 void
440 set_caller(coro h) noexcept
441 {
442 caller_h_ = h;
443 }
444
445 template<class Awaitable>
446 struct transform_awaiter
447 {
448 std::decay_t<Awaitable> a_;
449 promise_type* p_;
450
451 60 bool await_ready()
452 {
453 60 return a_.await_ready();
454 }
455
456 60 decltype(auto) await_resume()
457 {
458 60 return a_.await_resume();
459 }
460
461 auto await_suspend(coro h)
462 {
463 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
464 }
465 };
466
467 template<class Awaitable>
468 60 auto await_transform(Awaitable&& a)
469 {
470 using A = std::decay_t<Awaitable>;
471 if constexpr (IoAwaitable<A>)
472 {
473 return transform_awaiter<Awaitable>{
474 60 std::forward<Awaitable>(a), this};
475 }
476 else
477 {
478 static_assert(sizeof(A) == 0, "requires IoAwaitable");
479 }
480 }
481 };
482
483 std::coroutine_handle<promise_type> h_;
484
485 125 ~write_op()
486 {
487
2/2
✓ Branch 1 taken 82 times.
✓ Branch 2 taken 43 times.
125 if(h_)
488 82 h_.destroy();
489 125 }
490
491 write_op(write_op const&) = delete;
492 write_op& operator=(write_op const&) = delete;
493
494 write_op(write_op&& other) noexcept
495 : h_(std::exchange(other.h_, nullptr))
496 {
497 }
498
499 write_op& operator=(write_op&& other) noexcept
500 {
501 if(this != &other)
502 {
503 if(h_)
504 h_.destroy();
505 h_ = std::exchange(other.h_, nullptr);
506 }
507 return *this;
508 }
509
510 private:
511 explicit
512 125 write_op(std::coroutine_handle<promise_type> h) noexcept
513 125 : h_(h)
514 {
515 125 }
516 };
517
518 //----------------------------------------------------------
519
520 inline void*
521 125 any_write_stream::alloc_frame(std::size_t size)
522 {
523
3/4
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 65 times.
✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
125 if(cached_frame_ && cached_size_ >= size)
524 60 return cached_frame_;
525
526
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 65 times.
65 if(cached_frame_)
527 ::operator delete(cached_frame_);
528
529 65 cached_frame_ = ::operator new(size);
530 65 cached_size_ = size;
531 65 return cached_frame_;
532 }
533
534 inline void
535 any_write_stream::free_frame(void*, std::size_t)
536 {
537 // Keep the frame cached for reuse
538 }
539
540 #if defined(__GNUC__) && !defined(__clang__)
541 #pragma GCC diagnostic push
542 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
543 #endif
544
545 template<WriteStream S>
546 any_write_stream::write_op
547
1/1
✓ Branch 1 taken 125 times.
125 any_write_stream::write_coro(
548 any_write_stream*,
549 S& stream,
550 std::span<const_buffer const> bufs,
551 std::error_code* out_ec,
552 std::size_t* out_n)
553 {
554 auto [err, bytes] = co_await stream.write_some(bufs);
555
556 *out_ec = err;
557 *out_n = bytes;
558 250 }
559
560 #if defined(__GNUC__) && !defined(__clang__)
561 #pragma GCC diagnostic pop
562 #endif
563
564 template<WriteStream S>
565 coro
566 60 any_write_stream::do_write_impl(
567 void* stream,
568 any_write_stream* wrapper,
569 std::span<const_buffer const> buffers,
570 coro h,
571 executor_ref ex,
572 std::stop_token token,
573 std::error_code* ec,
574 std::size_t* n)
575 {
576 60 auto& s = *static_cast<S*>(stream);
577
578 // Create coroutine - frame is cached in wrapper
579
1/1
✓ Branch 1 taken 60 times.
60 auto op = write_coro<S>(wrapper, s, buffers, ec, n);
580
581 // Set executor and stop token on promise before resuming
582 60 op.h_.promise().set_executor(ex);
583 60 op.h_.promise().set_stop_token(token);
584
585 // Resume the coroutine to start the operation
586
1/1
✓ Branch 1 taken 43 times.
60 op.h_.resume();
587
588 // Check if operation completed synchronously
589
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 if(op.h_.done())
590 {
591
1/1
✓ Branch 1 taken 43 times.
43 op.h_.destroy();
592 43 op.h_ = nullptr;
593 // Return caller's handle via executor dispatch
594
1/1
✓ Branch 1 taken 43 times.
43 return ex.dispatch(h);
595 }
596
597 // Operation is pending - caller will be resumed via symmetric transfer
598 op.h_.promise().set_caller(h);
599 op.h_ = nullptr;
600 return std::noop_coroutine();
601 60 }
602
603 //----------------------------------------------------------
604
605 template<ConstBufferSequence CB>
606 auto
607 60 any_write_stream::write_some(CB buffers)
608 {
609 struct awaitable
610 {
611 any_write_stream* self_;
612 buffer_param<CB> bp_;
613 std::error_code ec_;
614 std::size_t n_ = 0;
615
616 bool
617 60 await_ready() const noexcept
618 {
619 60 return false;
620 }
621
622 coro
623 60 await_suspend(coro h, executor_ref ex, std::stop_token token)
624 {
625 180 return self_->vt_->do_write(
626
1/1
✓ Branch 1 taken 7 times.
60 self_->stream_,
627 self_,
628
1/1
✓ Branch 1 taken 10 times.
120 bp_.data(),
629 h,
630 ex,
631 token,
632 &ec_,
633 86 &n_);
634 }
635
636 io_result<std::size_t>
637 43 await_resume() const noexcept
638 {
639 43 return {ec_, n_};
640 }
641 };
642 60 return awaitable{this, buffer_param<CB>(buffers), {}, 0};
643 }
644
645 } // namespace capy
646 } // namespace boost
647
648 #endif
649