libs/capy/include/boost/capy/io/any_read_source.hpp

84.2% Lines (101/120) 88.9% Functions (32/36) 72.4% Branches (21/29)
libs/capy/include/boost/capy/io/any_read_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/buffers/buffer_param.hpp>
16 #include <boost/capy/concept/io_awaitable.hpp>
17 #include <boost/capy/concept/read_source.hpp>
18 #include <boost/capy/coro.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/io_result.hpp>
21 #include <boost/capy/task.hpp>
22
23 #include <system_error>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <span>
30 #include <stop_token>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any ReadSource.
37
38 This class provides type erasure for any type satisfying the
39 @ref ReadSource concept, enabling runtime polymorphism for
40 source read operations. It uses a cached coroutine frame to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the source.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to source must outlive this wrapper.
48
49 @par Frame Preallocation
50 The constructor preallocates the internal coroutine frame.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Thread Safety
56 Not thread-safe. Concurrent operations on the same wrapper
57 are undefined behavior.
58
59 @par Example
60 @code
61 // Owning - takes ownership of the source
62 any_read_source rs(some_source{args...});
63
64 // Reference - wraps without ownership
65 some_source source;
66 any_read_source rs(&source);
67
68 mutable_buffer buf(data, size);
69 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
70 @endcode
71
72 @see any_read_stream, ReadSource
73 */
74 class any_read_source
75 {
76 struct vtable;
77
78 template<ReadSource S>
79 struct vtable_for_impl;
80
81 struct read_op;
82
83 void* source_ = nullptr;
84 vtable const* vt_ = nullptr;
85 void* cached_frame_ = nullptr;
86 std::size_t cached_size_ = 0;
87 void* storage_ = nullptr;
88
89 template<ReadSource S>
90 static coro
91 do_read_impl(
92 void* source,
93 any_read_source* wrapper,
94 std::span<mutable_buffer const> buffers,
95 coro h,
96 executor_ref ex,
97 std::stop_token token,
98 std::error_code* ec,
99 std::size_t* n);
100
101 template<ReadSource S>
102 static read_op
103 read_coro(
104 any_read_source* wrapper,
105 S& source,
106 std::span<mutable_buffer const> bufs,
107 std::error_code* out_ec,
108 std::size_t* out_n);
109
110 void* alloc_frame(std::size_t size);
111 void free_frame(void* p, std::size_t size);
112
113 public:
114 /** Destructor.
115
116 Destroys the owned source (if any) and releases the cached
117 coroutine frame.
118 */
119 ~any_read_source();
120
121 /** Default constructor.
122
123 Constructs an empty wrapper. Operations on a default-constructed
124 wrapper result in undefined behavior.
125 */
126 any_read_source() = default;
127
128 /** Non-copyable.
129
130 The frame cache is per-instance and cannot be shared.
131 */
132 any_read_source(any_read_source const&) = delete;
133 any_read_source& operator=(any_read_source const&) = delete;
134
135 /** Move constructor.
136
137 Transfers ownership of the wrapped source (if owned) and
138 cached frame from `other`. After the move, `other` is
139 in a default-constructed state.
140
141 @param other The wrapper to move from.
142 */
143 1 any_read_source(any_read_source&& other) noexcept
144 1 : source_(std::exchange(other.source_, nullptr))
145 1 , vt_(std::exchange(other.vt_, nullptr))
146 1 , cached_frame_(std::exchange(other.cached_frame_, nullptr))
147 1 , cached_size_(std::exchange(other.cached_size_, 0))
148 1 , storage_(std::exchange(other.storage_, nullptr))
149 {
150 1 }
151
152 /** Move assignment operator.
153
154 Destroys any owned source and releases existing resources,
155 then transfers ownership from `other`.
156
157 @param other The wrapper to move from.
158 @return Reference to this wrapper.
159 */
160 any_read_source&
161 operator=(any_read_source&& other) noexcept;
162
163 /** Construct by taking ownership of a ReadSource.
164
165 Allocates storage and moves the source into this wrapper.
166 The wrapper owns the source and will destroy it.
167
168 @param s The source to take ownership of.
169 */
170 template<ReadSource S>
171 requires (!std::same_as<std::decay_t<S>, any_read_source>)
172 any_read_source(S s);
173
174 /** Construct by wrapping a ReadSource without ownership.
175
176 Wraps the given source by pointer. The source must remain
177 valid for the lifetime of this wrapper.
178
179 @param s Pointer to the source to wrap.
180 */
181 template<ReadSource S>
182 88 any_read_source(S* s) noexcept
183 88 : source_(s)
184 88 , vt_(&vtable_for_impl<S>::value)
185 {
186 // Preallocate the coroutine frame
187 88 read_coro<S>(this, *s, {}, nullptr, nullptr);
188 88 }
189
190 /** Check if the wrapper contains a valid source.
191
192 @return `true` if wrapping a source, `false` if default-constructed
193 or moved-from.
194 */
195 bool
196 9 has_value() const noexcept
197 {
198 9 return source_ != nullptr;
199 }
200
201 /** Check if the wrapper contains a valid source.
202
203 @return `true` if wrapping a source, `false` if default-constructed
204 or moved-from.
205 */
206 explicit
207 2 operator bool() const noexcept
208 {
209 2 return has_value();
210 }
211
212 /** Initiate an asynchronous read operation.
213
214 Reads data into the provided buffer sequence. The operation
215 completes when the entire buffer sequence is filled, end-of-file
216 is reached, or an error occurs.
217
218 @param buffers The buffer sequence to read into. Passed by
219 value to ensure the sequence lives in the coroutine frame
220 across suspension points.
221
222 @return An awaitable yielding `(error_code,std::size_t)`.
223
224 @par Postconditions
225 Exactly one of the following is true on return:
226 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
227 The entire buffer was filled.
228 @li **End-of-stream or Error**: `ec` and `n` indicates
229 the number of bytes transferred before the failure.
230
231 @par Preconditions
232 The wrapper must contain a valid source (`has_value() == true`).
233 */
234 template<MutableBufferSequence MB>
235 task<io_result<std::size_t>>
236 read(MB buffers);
237
238 protected:
239 /** Rebind to a new source after move.
240
241 Updates the internal pointer to reference a new source object.
242 Used by owning wrappers after move assignment when the owned
243 object has moved to a new location.
244
245 @param new_source The new source to bind to. Must be the same
246 type as the original source.
247
248 @note Terminates if called with a source of different type
249 than the original.
250 */
251 template<ReadSource S>
252 void
253 rebind(S& new_source) noexcept
254 {
255 if(vt_ != &vtable_for_impl<S>::value)
256 std::terminate();
257 source_ = &new_source;
258 }
259
260 private:
261 auto
262 read_some_(std::span<mutable_buffer const> buffers);
263 };
264
265 //----------------------------------------------------------
266
267 struct any_read_source::vtable
268 {
269 void (*destroy)(void*) noexcept;
270
271 coro (*do_read)(
272 void* source,
273 any_read_source* wrapper,
274 std::span<mutable_buffer const> buffers,
275 coro h,
276 executor_ref ex,
277 std::stop_token token,
278 std::error_code* ec,
279 std::size_t* n);
280 };
281
282 template<ReadSource S>
283 struct any_read_source::vtable_for_impl
284 {
285 static void
286 do_destroy_impl(void* source) noexcept
287 {
288 static_cast<S*>(source)->~S();
289 }
290
291 static constexpr vtable value = {
292 &do_destroy_impl,
293 &any_read_source::do_read_impl<S>
294 };
295 };
296
297 //----------------------------------------------------------
298
299 inline
300 91 any_read_source::~any_read_source()
301 {
302
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if(storage_)
303 {
304 vt_->destroy(source_);
305 ::operator delete(storage_);
306 }
307
2/2
✓ Branch 0 taken 88 times.
✓ Branch 1 taken 3 times.
91 if(cached_frame_)
308 88 ::operator delete(cached_frame_);
309 91 }
310
311 inline any_read_source&
312 1 any_read_source::operator=(any_read_source&& other) noexcept
313 {
314
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
315 {
316
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
317 {
318 vt_->destroy(source_);
319 ::operator delete(storage_);
320 }
321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_frame_)
322 ::operator delete(cached_frame_);
323 1 source_ = std::exchange(other.source_, nullptr);
324 1 vt_ = std::exchange(other.vt_, nullptr);
325 1 cached_frame_ = std::exchange(other.cached_frame_, nullptr);
326 1 cached_size_ = std::exchange(other.cached_size_, 0);
327 1 storage_ = std::exchange(other.storage_, nullptr);
328 }
329 1 return *this;
330 }
331
332 template<ReadSource S>
333 requires (!std::same_as<std::decay_t<S>, any_read_source>)
334 any_read_source::any_read_source(S s)
335 : vt_(&vtable_for_impl<S>::value)
336 {
337 struct guard {
338 any_read_source* self;
339 bool committed = false;
340 ~guard() {
341 if(!committed && self->storage_) {
342 self->vt_->destroy(self->source_);
343 ::operator delete(self->storage_);
344 self->storage_ = nullptr;
345 self->source_ = nullptr;
346 }
347 }
348 } g{this};
349
350 storage_ = ::operator new(sizeof(S));
351 source_ = ::new(storage_) S(std::move(s));
352
353 // Preallocate the coroutine frame
354 auto& ref = *static_cast<S*>(source_);
355 read_coro<S>(this, ref, {}, nullptr, nullptr);
356
357 g.committed = true;
358 }
359
360 //----------------------------------------------------------
361
362 struct any_read_source::read_op
363 {
364 struct promise_type
365 {
366 executor_ref executor_;
367 std::stop_token stop_token_;
368 coro caller_h_{};
369
370 218 promise_type() = default;
371
372 read_op
373 218 get_return_object() noexcept
374 {
375 return read_op{
376 218 std::coroutine_handle<promise_type>::from_promise(*this)};
377 }
378
379 std::suspend_always
380 218 initial_suspend() noexcept
381 {
382 218 return {};
383 }
384
385 auto
386 99 final_suspend() noexcept
387 {
388 struct awaiter
389 {
390 promise_type* p_;
391
392 99 bool await_ready() const noexcept { return false; }
393
394 99 coro await_suspend(coro) const noexcept
395 {
396
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 99 times.
99 if(p_->caller_h_)
397 return p_->caller_h_;
398 99 return std::noop_coroutine();
399 }
400
401 void await_resume() const noexcept {}
402 };
403 99 return awaiter{this};
404 }
405
406 void
407 99 return_void() noexcept
408 {
409 99 }
410
411 void
412 31 unhandled_exception()
413 {
414 31 throw;
415 }
416
417 template<class... Args>
418 static void*
419 218 operator new(
420 std::size_t size,
421 any_read_source* wrapper,
422 Args&&...)
423 {
424 218 return wrapper->alloc_frame(size);
425 }
426
427 template<class... Args>
428 static void
429 operator delete(void*, any_read_source*, Args&&...) noexcept
430 {
431 }
432
433 static void
434 218 operator delete(void*, std::size_t) noexcept
435 {
436 218 }
437
438 void
439 130 set_executor(executor_ref ex) noexcept
440 {
441 130 executor_ = ex;
442 130 }
443
444 void
445 130 set_stop_token(std::stop_token token) noexcept
446 {
447 130 stop_token_ = token;
448 130 }
449
450 void
451 set_caller(coro h) noexcept
452 {
453 caller_h_ = h;
454 }
455
456 template<class Awaitable>
457 struct transform_awaiter
458 {
459 std::decay_t<Awaitable> a_;
460 promise_type* p_;
461
462 130 bool await_ready()
463 {
464 130 return a_.await_ready();
465 }
466
467 130 decltype(auto) await_resume()
468 {
469 130 return a_.await_resume();
470 }
471
472 auto await_suspend(coro h)
473 {
474 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
475 }
476 };
477
478 template<class Awaitable>
479 130 auto await_transform(Awaitable&& a)
480 {
481 using A = std::decay_t<Awaitable>;
482 if constexpr (IoAwaitable<A>)
483 {
484 return transform_awaiter<Awaitable>{
485 130 std::forward<Awaitable>(a), this};
486 }
487 else
488 {
489 static_assert(sizeof(A) == 0, "requires IoAwaitable");
490 }
491 }
492 };
493
494 std::coroutine_handle<promise_type> h_;
495
496 218 ~read_op()
497 {
498
2/2
✓ Branch 1 taken 119 times.
✓ Branch 2 taken 99 times.
218 if(h_)
499 119 h_.destroy();
500 218 }
501
502 read_op(read_op const&) = delete;
503 read_op& operator=(read_op const&) = delete;
504
505 read_op(read_op&& other) noexcept
506 : h_(std::exchange(other.h_, nullptr))
507 {
508 }
509
510 read_op& operator=(read_op&& other) noexcept
511 {
512 if(this != &other)
513 {
514 if(h_)
515 h_.destroy();
516 h_ = std::exchange(other.h_, nullptr);
517 }
518 return *this;
519 }
520
521 private:
522 explicit
523 218 read_op(std::coroutine_handle<promise_type> h) noexcept
524 218 : h_(h)
525 {
526 218 }
527 };
528
529 //----------------------------------------------------------
530
531 inline void*
532 218 any_read_source::alloc_frame(std::size_t size)
533 {
534
3/4
✓ Branch 0 taken 130 times.
✓ Branch 1 taken 88 times.
✓ Branch 2 taken 130 times.
✗ Branch 3 not taken.
218 if(cached_frame_ && cached_size_ >= size)
535 130 return cached_frame_;
536
537
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 88 times.
88 if(cached_frame_)
538 ::operator delete(cached_frame_);
539
540 88 cached_frame_ = ::operator new(size);
541 88 cached_size_ = size;
542 88 return cached_frame_;
543 }
544
545 inline void
546 any_read_source::free_frame(void*, std::size_t)
547 {
548 // Keep the frame cached for reuse
549 }
550
551 #if defined(__GNUC__) && !defined(__clang__)
552 #pragma GCC diagnostic push
553 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
554 #endif
555
556 template<ReadSource S>
557 any_read_source::read_op
558
1/1
✓ Branch 1 taken 218 times.
218 any_read_source::read_coro(
559 any_read_source*,
560 S& source,
561 std::span<mutable_buffer const> bufs,
562 std::error_code* out_ec,
563 std::size_t* out_n)
564 {
565 auto [err, bytes] = co_await source.read(bufs);
566
567 *out_ec = err;
568 *out_n = bytes;
569 436 }
570
571 #if defined(__GNUC__) && !defined(__clang__)
572 #pragma GCC diagnostic pop
573 #endif
574
575 template<ReadSource S>
576 coro
577 130 any_read_source::do_read_impl(
578 void* source,
579 any_read_source* wrapper,
580 std::span<mutable_buffer const> buffers,
581 coro h,
582 executor_ref ex,
583 std::stop_token token,
584 std::error_code* ec,
585 std::size_t* n)
586 {
587 130 auto& s = *static_cast<S*>(source);
588
589 // Create coroutine - frame is cached in wrapper
590
1/1
✓ Branch 1 taken 130 times.
130 auto op = read_coro<S>(wrapper, s, buffers, ec, n);
591
592 // Set executor and stop token on promise before resuming
593 130 op.h_.promise().set_executor(ex);
594 130 op.h_.promise().set_stop_token(token);
595
596 // Resume the coroutine to start the operation
597
1/1
✓ Branch 1 taken 99 times.
130 op.h_.resume();
598
599 // Check if operation completed synchronously
600
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 if(op.h_.done())
601 {
602
1/1
✓ Branch 1 taken 99 times.
99 op.h_.destroy();
603 99 op.h_ = nullptr;
604
1/1
✓ Branch 1 taken 99 times.
99 return ex.dispatch(h);
605 }
606
607 // Operation is pending - caller will be resumed via symmetric transfer
608 op.h_.promise().set_caller(h);
609 op.h_ = nullptr;
610 return std::noop_coroutine();
611 130 }
612
613 //----------------------------------------------------------
614
615 inline auto
616 130 any_read_source::read_some_(std::span<mutable_buffer const> buffers)
617 {
618 struct awaitable
619 {
620 any_read_source* self_;
621 std::span<mutable_buffer const> buffers_;
622 std::error_code ec_;
623 std::size_t n_ = 0;
624
625 bool
626 130 await_ready() const noexcept
627 {
628 130 return false;
629 }
630
631 coro
632 130 await_suspend(coro h, executor_ref ex, std::stop_token token)
633 {
634 260 return self_->vt_->do_read(
635
1/1
✓ Branch 1 taken 99 times.
130 self_->source_,
636 self_,
637 buffers_,
638 h,
639 ex,
640 token,
641 &ec_,
642 198 &n_);
643 }
644
645 io_result<std::size_t>
646 99 await_resume() const noexcept
647 {
648 99 return {ec_, n_};
649 }
650 };
651 130 return awaitable{this, buffers, {}, 0};
652 }
653
654 template<MutableBufferSequence MB>
655 task<io_result<std::size_t>>
656
1/1
✓ Branch 1 taken 106 times.
106 any_read_source::read(MB buffers)
657 {
658 buffer_param<MB> bp(std::move(buffers));
659 std::size_t total = 0;
660
661 for(;;)
662 {
663 auto bufs = bp.data();
664 if(bufs.empty())
665 break;
666
667 auto [ec, n] = co_await read_some_(bufs);
668 total += n;
669 if(ec)
670 co_return {ec, total};
671 bp.consume(n);
672 }
673
674 co_return {{}, total};
675 212 }
676
677 } // namespace capy
678 } // namespace boost
679
680 #endif
681