libs/capy/include/boost/capy/io/any_read_stream.hpp

84.2% Lines (101/120) 86.4% Functions (38/44) 72.4% Branches (21/29)
libs/capy/include/boost/capy/io/any_read_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/buffers/buffer_param.hpp>
16 #include <boost/capy/concept/io_awaitable.hpp>
17 #include <boost/capy/concept/read_stream.hpp>
18 #include <boost/capy/coro.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/io_result.hpp>
21
22 #include <system_error>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <exception>
28 #include <span>
29 #include <stop_token>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any ReadStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref ReadStream concept, enabling runtime polymorphism for
39 read operations. It uses a cached coroutine frame to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Frame Preallocation
49 The constructor preallocates the internal coroutine frame.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Thread Safety
55 Not thread-safe. Concurrent operations on the same wrapper
56 are undefined behavior.
57
58 @par Example
59 @code
60 // Owning - takes ownership of the stream
61 any_read_stream stream(socket{ioc});
62
63 // Reference - wraps without ownership
64 socket sock(ioc);
65 any_read_stream stream(&sock);
66
67 mutable_buffer buf(data, size);
68 auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
69 @endcode
70
71 @see any_write_stream, any_stream, ReadStream
72 */
73 class any_read_stream
74 {
75 struct vtable;
76
77 template<ReadStream S>
78 struct vtable_for_impl;
79
80 struct read_op;
81
82 void* stream_ = nullptr;
83 vtable const* vt_ = nullptr;
84 void* cached_frame_ = nullptr;
85 std::size_t cached_size_ = 0;
86 void* storage_ = nullptr;
87
88 template<ReadStream S>
89 static coro
90 do_read_impl(
91 void* stream,
92 any_read_stream* wrapper,
93 std::span<mutable_buffer const> buffers,
94 coro h,
95 executor_ref ex,
96 std::stop_token token,
97 std::error_code* ec,
98 std::size_t* n);
99
100 template<ReadStream S>
101 static read_op
102 read_coro(
103 any_read_stream* wrapper,
104 S& stream,
105 std::span<mutable_buffer const> bufs,
106 std::error_code* out_ec,
107 std::size_t* out_n);
108
109 void* alloc_frame(std::size_t size);
110 void free_frame(void* p, std::size_t size);
111
112 public:
113 /** Destructor.
114
115 Destroys the owned stream (if any) and releases the cached
116 coroutine frame.
117 */
118 ~any_read_stream();
119
120 /** Default constructor.
121
122 Constructs an empty wrapper. Operations on a default-constructed
123 wrapper result in undefined behavior.
124 */
125 1 any_read_stream() = default;
126
127 /** Non-copyable.
128
129 The frame cache is per-instance and cannot be shared.
130 */
131 any_read_stream(any_read_stream const&) = delete;
132 any_read_stream& operator=(any_read_stream const&) = delete;
133
134 /** Move constructor.
135
136 Transfers ownership of the wrapped stream (if owned) and
137 cached frame from `other`. After the move, `other` is
138 in a default-constructed state.
139
140 @param other The wrapper to move from.
141 */
142 2 any_read_stream(any_read_stream&& other) noexcept
143 2 : stream_(std::exchange(other.stream_, nullptr))
144 2 , vt_(std::exchange(other.vt_, nullptr))
145 2 , cached_frame_(std::exchange(other.cached_frame_, nullptr))
146 2 , cached_size_(std::exchange(other.cached_size_, 0))
147 2 , storage_(std::exchange(other.storage_, nullptr))
148 {
149 2 }
150
151 /** Move assignment operator.
152
153 Destroys any owned stream and releases existing resources,
154 then transfers ownership from `other`.
155
156 @param other The wrapper to move from.
157 @return Reference to this wrapper.
158 */
159 any_read_stream&
160 operator=(any_read_stream&& other) noexcept;
161
162 /** Construct by taking ownership of a ReadStream.
163
164 Allocates storage and moves the stream into this wrapper.
165 The wrapper owns the stream and will destroy it.
166
167 @param s The stream to take ownership of.
168 */
169 template<ReadStream S>
170 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
171 any_read_stream(S s);
172
173 /** Construct by wrapping a ReadStream without ownership.
174
175 Wraps the given stream by pointer. The stream must remain
176 valid for the lifetime of this wrapper.
177
178 @param s Pointer to the stream to wrap.
179 */
180 template<ReadStream S>
181 71 any_read_stream(S* s) noexcept
182 71 : stream_(s)
183 71 , vt_(&vtable_for_impl<S>::value)
184 {
185 // Preallocate the coroutine frame
186 71 read_coro<S>(this, *s, {}, nullptr, nullptr);
187 71 }
188 /** Check if the wrapper contains a valid stream.
189
190 @return `true` if wrapping a stream, `false` if default-constructed
191 or moved-from.
192 */
193 bool
194 19 has_value() const noexcept
195 {
196 19 return stream_ != nullptr;
197 }
198
199 /** Check if the wrapper contains a valid stream.
200
201 @return `true` if wrapping a stream, `false` if default-constructed
202 or moved-from.
203 */
204 explicit
205 2 operator bool() const noexcept
206 {
207 2 return has_value();
208 }
209
210 /** Initiate an asynchronous read operation.
211
212 Reads data into the provided buffer sequence. The operation
213 completes when at least one byte has been read, or an error
214 occurs.
215
216 @param buffers The buffer sequence to read into. Passed by
217 value to ensure the sequence lives in the coroutine frame
218 across suspension points.
219
220 @return An awaitable yielding `(error_code,std::size_t)`.
221
222 @par Preconditions
223 The wrapper must contain a valid stream (`has_value() == true`).
224 */
225 template<MutableBufferSequence MB>
226 auto
227 read_some(MB buffers);
228
229 protected:
230 /** Rebind to a new stream after move.
231
232 Updates the internal pointer to reference a new stream object.
233 Used by owning wrappers after move assignment when the owned
234 object has moved to a new location.
235
236 @param new_stream The new stream to bind to. Must be the same
237 type as the original stream.
238
239 @note Terminates if called with a stream of different type
240 than the original.
241 */
242 template<ReadStream S>
243 void
244 rebind(S& new_stream) noexcept
245 {
246 if(vt_ != &vtable_for_impl<S>::value)
247 std::terminate();
248 stream_ = &new_stream;
249 }
250 };
251
252 //----------------------------------------------------------
253
254 struct any_read_stream::vtable
255 {
256 void (*destroy)(void*) noexcept;
257
258 coro (*do_read)(
259 void* stream,
260 any_read_stream* wrapper,
261 std::span<mutable_buffer const> buffers,
262 coro h,
263 executor_ref ex,
264 std::stop_token token,
265 std::error_code* ec,
266 std::size_t* n);
267 };
268
269 template<ReadStream S>
270 struct any_read_stream::vtable_for_impl
271 {
272 static void
273 do_destroy_impl(void* stream) noexcept
274 {
275 static_cast<S*>(stream)->~S();
276 }
277
278 static constexpr vtable value = {
279 &do_destroy_impl,
280 &any_read_stream::do_read_impl<S>
281 };
282 };
283
284 //----------------------------------------------------------
285
286 inline
287 78 any_read_stream::~any_read_stream()
288 {
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if(storage_)
290 {
291 vt_->destroy(stream_);
292 ::operator delete(storage_);
293 }
294
2/2
✓ Branch 0 taken 71 times.
✓ Branch 1 taken 7 times.
78 if(cached_frame_)
295 71 ::operator delete(cached_frame_);
296 78 }
297
298 inline any_read_stream&
299 3 any_read_stream::operator=(any_read_stream&& other) noexcept
300 {
301
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
302 {
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
304 {
305 vt_->destroy(stream_);
306 ::operator delete(storage_);
307 }
308
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_frame_)
309 ::operator delete(cached_frame_);
310 3 stream_ = std::exchange(other.stream_, nullptr);
311 3 vt_ = std::exchange(other.vt_, nullptr);
312 3 cached_frame_ = std::exchange(other.cached_frame_, nullptr);
313 3 cached_size_ = std::exchange(other.cached_size_, 0);
314 3 storage_ = std::exchange(other.storage_, nullptr);
315 }
316 3 return *this;
317 }
318
319 template<ReadStream S>
320 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
321 any_read_stream::any_read_stream(S s)
322 : vt_(&vtable_for_impl<S>::value)
323 {
324 struct guard {
325 any_read_stream* self;
326 bool committed = false;
327 ~guard() {
328 if(!committed && self->storage_) {
329 self->vt_->destroy(self->stream_);
330 ::operator delete(self->storage_);
331 self->storage_ = nullptr;
332 self->stream_ = nullptr;
333 }
334 }
335 } g{this};
336
337 storage_ = ::operator new(sizeof(S));
338 stream_ = ::new(storage_) S(std::move(s));
339
340 // Preallocate the coroutine frame
341 auto& ref = *static_cast<S*>(stream_);
342 read_coro<S>(this, ref, {}, nullptr, nullptr);
343
344 g.committed = true;
345 }
346
347 //----------------------------------------------------------
348
349 struct any_read_stream::read_op
350 {
351 struct promise_type
352 {
353 executor_ref executor_;
354 std::stop_token stop_token_;
355 coro caller_h_{};
356
357 141 promise_type() = default;
358
359 read_op
360 141 get_return_object() noexcept
361 {
362 return read_op{
363 141 std::coroutine_handle<promise_type>::from_promise(*this)};
364 }
365
366 std::suspend_always
367 141 initial_suspend() noexcept
368 {
369 141 return {};
370 }
371
372 auto
373 50 final_suspend() noexcept
374 {
375 struct awaiter
376 {
377 promise_type* p_;
378
379 50 bool await_ready() const noexcept { return false; }
380
381 50 coro await_suspend(coro) const noexcept
382 {
383
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 50 times.
50 if(p_->caller_h_)
384 return p_->caller_h_;
385 50 return std::noop_coroutine();
386 }
387
388 void await_resume() const noexcept {}
389 };
390 50 return awaiter{this};
391 }
392
393 void
394 50 return_void() noexcept
395 {
396 50 }
397
398 void
399 20 unhandled_exception()
400 {
401 // Store exception for later propagation
402 // For now, just rethrow to let outer handler catch it
403 20 throw;
404 }
405
406 template<class... Args>
407 static void*
408 141 operator new(
409 std::size_t size,
410 any_read_stream* wrapper,
411 Args&&...)
412 {
413 141 return wrapper->alloc_frame(size);
414 }
415
416 template<class... Args>
417 static void
418 operator delete(void*, any_read_stream*, Args&&...) noexcept
419 {
420 }
421
422 static void
423 141 operator delete(void*, std::size_t) noexcept
424 {
425 141 }
426
427 void
428 70 set_executor(executor_ref ex) noexcept
429 {
430 70 executor_ = ex;
431 70 }
432
433 void
434 70 set_stop_token(std::stop_token token) noexcept
435 {
436 70 stop_token_ = token;
437 70 }
438
439 void
440 set_caller(coro h) noexcept
441 {
442 caller_h_ = h;
443 }
444
445 template<class Awaitable>
446 struct transform_awaiter
447 {
448 std::decay_t<Awaitable> a_;
449 promise_type* p_;
450
451 70 bool await_ready()
452 {
453 70 return a_.await_ready();
454 }
455
456 70 decltype(auto) await_resume()
457 {
458 70 return a_.await_resume();
459 }
460
461 auto await_suspend(coro h)
462 {
463 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
464 }
465 };
466
467 template<class Awaitable>
468 70 auto await_transform(Awaitable&& a)
469 {
470 using A = std::decay_t<Awaitable>;
471 if constexpr (IoAwaitable<A>)
472 {
473 return transform_awaiter<Awaitable>{
474 70 std::forward<Awaitable>(a), this};
475 }
476 else
477 {
478 static_assert(sizeof(A) == 0, "requires IoAwaitable");
479 }
480 }
481 };
482
483 std::coroutine_handle<promise_type> h_;
484
485 141 ~read_op()
486 {
487
2/2
✓ Branch 1 taken 91 times.
✓ Branch 2 taken 50 times.
141 if(h_)
488 91 h_.destroy();
489 141 }
490
491 read_op(read_op const&) = delete;
492 read_op& operator=(read_op const&) = delete;
493
494 read_op(read_op&& other) noexcept
495 : h_(std::exchange(other.h_, nullptr))
496 {
497 }
498
499 read_op& operator=(read_op&& other) noexcept
500 {
501 if(this != &other)
502 {
503 if(h_)
504 h_.destroy();
505 h_ = std::exchange(other.h_, nullptr);
506 }
507 return *this;
508 }
509
510 private:
511 explicit
512 141 read_op(std::coroutine_handle<promise_type> h) noexcept
513 141 : h_(h)
514 {
515 141 }
516 };
517
518 //----------------------------------------------------------
519
520 inline void*
521 141 any_read_stream::alloc_frame(std::size_t size)
522 {
523
3/4
✓ Branch 0 taken 70 times.
✓ Branch 1 taken 71 times.
✓ Branch 2 taken 70 times.
✗ Branch 3 not taken.
141 if(cached_frame_ && cached_size_ >= size)
524 70 return cached_frame_;
525
526
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 71 times.
71 if(cached_frame_)
527 ::operator delete(cached_frame_);
528
529 71 cached_frame_ = ::operator new(size);
530 71 cached_size_ = size;
531 71 return cached_frame_;
532 }
533
534 inline void
535 any_read_stream::free_frame(void*, std::size_t)
536 {
537 // Keep the frame cached for reuse
538 }
539
540 #if defined(__GNUC__) && !defined(__clang__)
541 #pragma GCC diagnostic push
542 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
543 #endif
544
545 template<ReadStream S>
546 any_read_stream::read_op
547
1/1
✓ Branch 1 taken 141 times.
141 any_read_stream::read_coro(
548 any_read_stream*,
549 S& stream,
550 std::span<mutable_buffer const> bufs,
551 std::error_code* out_ec,
552 std::size_t* out_n)
553 {
554 auto [err, bytes] = co_await stream.read_some(bufs);
555
556 *out_ec = err;
557 *out_n = bytes;
558 282 }
559
560 #if defined(__GNUC__) && !defined(__clang__)
561 #pragma GCC diagnostic pop
562 #endif
563
564 template<ReadStream S>
565 coro
566 70 any_read_stream::do_read_impl(
567 void* stream,
568 any_read_stream* wrapper,
569 std::span<mutable_buffer const> buffers,
570 coro h,
571 executor_ref ex,
572 std::stop_token token,
573 std::error_code* ec,
574 std::size_t* n)
575 {
576 70 auto& s = *static_cast<S*>(stream);
577
578 // Create coroutine - frame is cached in wrapper
579
1/1
✓ Branch 1 taken 70 times.
70 auto op = read_coro<S>(wrapper, s, buffers, ec, n);
580
581 // Set executor and stop token on promise before resuming
582 70 op.h_.promise().set_executor(ex);
583 70 op.h_.promise().set_stop_token(token);
584
585 // Resume the coroutine to start the operation
586
1/1
✓ Branch 1 taken 50 times.
70 op.h_.resume();
587
588 // Check if operation completed synchronously
589
1/2
✓ Branch 1 taken 50 times.
✗ Branch 2 not taken.
50 if(op.h_.done())
590 {
591
1/1
✓ Branch 1 taken 50 times.
50 op.h_.destroy();
592 50 op.h_ = nullptr;
593 // Return caller's handle via executor dispatch
594
1/1
✓ Branch 1 taken 50 times.
50 return ex.dispatch(h);
595 }
596
597 // Operation is pending - caller will be resumed via symmetric transfer
598 op.h_.promise().set_caller(h);
599 op.h_ = nullptr;
600 return std::noop_coroutine();
601 70 }
602
603 //----------------------------------------------------------
604
605 template<MutableBufferSequence MB>
606 auto
607 70 any_read_stream::read_some(MB buffers)
608 {
609 struct awaitable
610 {
611 any_read_stream* self_;
612 buffer_param<MB> bp_;
613 std::error_code ec_;
614 std::size_t n_ = 0;
615
616 bool
617 70 await_ready() const noexcept
618 {
619 70 return false;
620 }
621
622 coro
623 70 await_suspend(coro h, executor_ref ex, std::stop_token token)
624 {
625 210 return self_->vt_->do_read(
626
1/1
✓ Branch 1 taken 10 times.
70 self_->stream_,
627 self_,
628
1/1
✓ Branch 1 taken 14 times.
140 bp_.data(),
629 h,
630 ex,
631 token,
632 &ec_,
633 100 &n_);
634 }
635
636 io_result<std::size_t>
637 50 await_resume() const noexcept
638 {
639 50 return {ec_, n_};
640 }
641 };
642 70 return awaitable{this, buffer_param<MB>(buffers), {}, 0};
643 }
644
645 } // namespace capy
646 } // namespace boost
647
648 #endif
649