libs/capy/include/boost/capy/io/any_buffer_source.hpp

84.7% Lines (105/124) 88.9% Functions (32/36) 71.4% Branches (20/28)
libs/capy/include/boost/capy/io/any_buffer_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/buffers/buffer_copy.hpp>
16 #include <boost/capy/buffers/slice.hpp>
17 #include <boost/capy/concept/buffer_source.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/read_source.hpp>
20 #include <boost/capy/coro.hpp>
21 #include <boost/capy/error.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <boost/capy/io_result.hpp>
24 #include <boost/capy/task.hpp>
25
26 #include <system_error>
27
28 #include <concepts>
29 #include <coroutine>
30 #include <cstddef>
31 #include <exception>
32 #include <span>
33 #include <stop_token>
34 #include <utility>
35
36 namespace boost {
37 namespace capy {
38
39 /** Type-erased wrapper for any BufferSource.
40
41 This class provides type erasure for any type satisfying the
42 @ref BufferSource concept, enabling runtime polymorphism for
43 buffer pull operations. The wrapper also satisfies @ref ReadSource,
44 allowing it to be used with code expecting either interface.
45 It uses a cached coroutine frame to achieve zero steady-state
46 allocation after construction.
47
48 The wrapper also satisfies @ref ReadSource through the templated
49 @ref read method. This method copies data from the source's
50 internal buffers into the caller's buffers, incurring one extra
51 buffer copy compared to using @ref pull and @ref consume directly.
52
53 The wrapper supports two construction modes:
54 - **Owning**: Pass by value to transfer ownership. The wrapper
55 allocates storage and owns the source.
56 - **Reference**: Pass a pointer to wrap without ownership. The
57 pointed-to source must outlive this wrapper.
58
59 @par Frame Preallocation
60 The constructor preallocates the internal coroutine frame.
61 This reserves all virtual address space at server startup
62 so memory usage can be measured up front, rather than
63 allocating piecemeal as traffic arrives.
64
65 @par Thread Safety
66 Not thread-safe. Concurrent operations on the same wrapper
67 are undefined behavior.
68
69 @par Example
70 @code
71 // Owning - takes ownership of the source
72 any_buffer_source abs(some_buffer_source{args...});
73
74 // Reference - wraps without ownership
75 some_buffer_source src;
76 any_buffer_source abs(&src);
77
78 const_buffer arr[16];
79 auto [ec, count] = co_await abs.pull(arr, 16);
80 @endcode
81
82 @see any_write_sink, BufferSource, ReadSource
83 */
84 class any_buffer_source
85 {
86 struct vtable;
87
88 template<BufferSource S>
89 struct vtable_for_impl;
90
91 struct pull_op;
92
93 void* source_ = nullptr;
94 vtable const* vt_ = nullptr;
95 void* cached_frame_ = nullptr;
96 std::size_t cached_size_ = 0;
97 void* storage_ = nullptr;
98
99 template<BufferSource S>
100 static coro
101 do_pull_impl(
102 void* source,
103 any_buffer_source* wrapper,
104 const_buffer* arr,
105 std::size_t max_count,
106 coro h,
107 executor_ref ex,
108 std::stop_token token,
109 std::error_code* ec,
110 std::size_t* count);
111
112 template<BufferSource S>
113 static pull_op
114 pull_coro(
115 any_buffer_source* wrapper,
116 S& source,
117 const_buffer* arr,
118 std::size_t max_count,
119 std::error_code* out_ec,
120 std::size_t* out_count);
121
122 void* alloc_frame(std::size_t size);
123 void free_frame(void* p, std::size_t size);
124
125 public:
126 /** Destructor.
127
128 Destroys the owned source (if any) and releases the cached
129 coroutine frame.
130 */
131 ~any_buffer_source();
132
133 /** Default constructor.
134
135 Constructs an empty wrapper. Operations on a default-constructed
136 wrapper result in undefined behavior.
137 */
138 any_buffer_source() = default;
139
140 /** Non-copyable.
141
142 The frame cache is per-instance and cannot be shared.
143 */
144 any_buffer_source(any_buffer_source const&) = delete;
145 any_buffer_source& operator=(any_buffer_source const&) = delete;
146
147 /** Move constructor.
148
149 Transfers ownership of the wrapped source (if owned) and
150 cached frame from `other`. After the move, `other` is
151 in a default-constructed state.
152
153 @param other The wrapper to move from.
154 */
155 1 any_buffer_source(any_buffer_source&& other) noexcept
156 1 : source_(std::exchange(other.source_, nullptr))
157 1 , vt_(std::exchange(other.vt_, nullptr))
158 1 , cached_frame_(std::exchange(other.cached_frame_, nullptr))
159 1 , cached_size_(std::exchange(other.cached_size_, 0))
160 1 , storage_(std::exchange(other.storage_, nullptr))
161 {
162 1 }
163
164 /** Move assignment operator.
165
166 Destroys any owned source and releases existing resources,
167 then transfers ownership from `other`.
168
169 @param other The wrapper to move from.
170 @return Reference to this wrapper.
171 */
172 any_buffer_source&
173 operator=(any_buffer_source&& other) noexcept;
174
175 /** Construct by taking ownership of a BufferSource.
176
177 Allocates storage and moves the source into this wrapper.
178 The wrapper owns the source and will destroy it.
179
180 @param s The source to take ownership of.
181 */
182 template<BufferSource S>
183 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
184 any_buffer_source(S s);
185
186 /** Construct by wrapping a BufferSource without ownership.
187
188 Wraps the given source by pointer. The source must remain
189 valid for the lifetime of this wrapper.
190
191 @param s Pointer to the source to wrap.
192 */
193 template<BufferSource S>
194 56 any_buffer_source(S* s) noexcept
195 56 : source_(s)
196 56 , vt_(&vtable_for_impl<S>::value)
197 {
198 // Preallocate coroutine frame to find max size
199 56 pull_coro<S>(this, *s, nullptr, 0, nullptr, nullptr);
200 56 }
201
202 /** Check if the wrapper contains a valid source.
203
204 @return `true` if wrapping a source, `false` if default-constructed
205 or moved-from.
206 */
207 bool
208 9 has_value() const noexcept
209 {
210 9 return source_ != nullptr;
211 }
212
213 /** Check if the wrapper contains a valid source.
214
215 @return `true` if wrapping a source, `false` if default-constructed
216 or moved-from.
217 */
218 explicit
219 2 operator bool() const noexcept
220 {
221 2 return has_value();
222 }
223
224 /** Consume bytes from the source.
225
226 Advances the internal read position of the underlying source
227 by the specified number of bytes. The next call to @ref pull
228 returns data starting after the consumed bytes.
229
230 @param n The number of bytes to consume. Must not exceed the
231 total size of buffers returned by the previous @ref pull.
232
233 @par Preconditions
234 The wrapper must contain a valid source (`has_value() == true`).
235 */
236 void
237 consume(std::size_t n) noexcept;
238
239 /** Pull buffer data from the source.
240
241 Fills the provided array with buffer descriptors from the
242 underlying source. The operation completes when data is
243 available, the source is exhausted, or an error occurs.
244
245 @param arr Pointer to array of const_buffer to fill.
246 @param max_count Maximum number of buffers to fill.
247
248 @return An awaitable yielding `(error_code,std::size_t)`.
249 On success with data, `count > 0` indicates buffers filled.
250 On success with `count == 0`, source is exhausted.
251
252 @par Preconditions
253 The wrapper must contain a valid source (`has_value() == true`).
254 */
255 auto
256 pull(const_buffer* arr, std::size_t max_count);
257
258 /** Read data into a mutable buffer sequence.
259
260 Fills the provided buffer sequence by pulling data from the
261 underlying source and copying it into the caller's buffers.
262 This satisfies @ref ReadSource but incurs a copy; for zero-copy
263 access, use @ref pull and @ref consume instead.
264
265 @note This operation copies data from the source's internal
266 buffers into the caller's buffers. For zero-copy reads,
267 use @ref pull and @ref consume directly.
268
269 @param buffers The buffer sequence to fill.
270
271 @return An awaitable yielding `(error_code,std::size_t)`.
272 On success, `n == buffer_size(buffers)`.
273 On EOF, `ec == error::eof` and `n` is bytes transferred.
274
275 @par Preconditions
276 The wrapper must contain a valid source (`has_value() == true`).
277
278 @see pull, consume
279 */
280 template<MutableBufferSequence MB>
281 task<io_result<std::size_t>>
282 read(MB buffers);
283
284 protected:
285 /** Rebind to a new source after move.
286
287 Updates the internal pointer to reference a new source object.
288 Used by owning wrappers after move assignment when the owned
289 object has moved to a new location.
290
291 @param new_source The new source to bind to. Must be the same
292 type as the original source.
293
294 @note Terminates if called with a source of different type
295 than the original.
296 */
297 template<BufferSource S>
298 void
299 rebind(S& new_source) noexcept
300 {
301 if(vt_ != &vtable_for_impl<S>::value)
302 std::terminate();
303 source_ = &new_source;
304 }
305 };
306
307 //----------------------------------------------------------
308
309 struct any_buffer_source::vtable
310 {
311 void (*destroy)(void*) noexcept;
312
313 coro (*do_pull)(
314 void* source,
315 any_buffer_source* wrapper,
316 const_buffer* arr,
317 std::size_t max_count,
318 coro h,
319 executor_ref ex,
320 std::stop_token token,
321 std::error_code* ec,
322 std::size_t* count);
323 void (*do_consume)(void* source, std::size_t n) noexcept;
324 };
325
326 template<BufferSource S>
327 struct any_buffer_source::vtable_for_impl
328 {
329 static void
330 do_destroy_impl(void* source) noexcept
331 {
332 static_cast<S*>(source)->~S();
333 }
334
335 static void
336 39 do_consume_impl(void* source, std::size_t n) noexcept
337 {
338 39 static_cast<S*>(source)->consume(n);
339 39 }
340
341 static constexpr vtable value = {
342 &do_destroy_impl,
343 &any_buffer_source::do_pull_impl<S>,
344 &do_consume_impl
345 };
346 };
347
348 //----------------------------------------------------------
349
350 inline
351 59 any_buffer_source::~any_buffer_source()
352 {
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 59 times.
59 if(storage_)
354 {
355 vt_->destroy(source_);
356 ::operator delete(storage_);
357 }
358
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 3 times.
59 if(cached_frame_)
359 56 ::operator delete(cached_frame_);
360 59 }
361
362 inline any_buffer_source&
363 1 any_buffer_source::operator=(any_buffer_source&& other) noexcept
364 {
365
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
366 {
367
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
368 {
369 vt_->destroy(source_);
370 ::operator delete(storage_);
371 }
372
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_frame_)
373 ::operator delete(cached_frame_);
374 1 source_ = std::exchange(other.source_, nullptr);
375 1 vt_ = std::exchange(other.vt_, nullptr);
376 1 cached_frame_ = std::exchange(other.cached_frame_, nullptr);
377 1 cached_size_ = std::exchange(other.cached_size_, 0);
378 1 storage_ = std::exchange(other.storage_, nullptr);
379 }
380 1 return *this;
381 }
382
383 template<BufferSource S>
384 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
385 any_buffer_source::any_buffer_source(S s)
386 : vt_(&vtable_for_impl<S>::value)
387 {
388 struct guard {
389 any_buffer_source* self;
390 bool committed = false;
391 ~guard() {
392 if(!committed && self->storage_) {
393 self->vt_->destroy(self->source_);
394 ::operator delete(self->storage_);
395 self->storage_ = nullptr;
396 self->source_ = nullptr;
397 }
398 }
399 } g{this};
400
401 storage_ = ::operator new(sizeof(S));
402 source_ = ::new(storage_) S(std::move(s));
403
404 // Preallocate coroutine frame to find max size
405 auto& ref = *static_cast<S*>(source_);
406 pull_coro<S>(this, ref, nullptr, 0, nullptr, nullptr);
407
408 g.committed = true;
409 }
410
411 //----------------------------------------------------------
412
413 struct any_buffer_source::pull_op
414 {
415 struct promise_type
416 {
417 executor_ref executor_;
418 std::stop_token stop_token_;
419 coro caller_h_{};
420
421 148 promise_type() = default;
422
423 pull_op
424 148 get_return_object() noexcept
425 {
426 return pull_op{
427 148 std::coroutine_handle<promise_type>::from_promise(*this)};
428 }
429
430 std::suspend_always
431 148 initial_suspend() noexcept
432 {
433 148 return {};
434 }
435
436 auto
437 73 final_suspend() noexcept
438 {
439 struct awaiter
440 {
441 promise_type* p_;
442
443 73 bool await_ready() const noexcept { return false; }
444
445 73 coro await_suspend(coro) const noexcept
446 {
447
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 73 times.
73 if(p_->caller_h_)
448 return p_->caller_h_;
449 73 return std::noop_coroutine();
450 }
451
452 void await_resume() const noexcept {}
453 };
454 73 return awaiter{this};
455 }
456
457 void
458 73 return_void() noexcept
459 {
460 73 }
461
462 void
463 19 unhandled_exception()
464 {
465 19 throw;
466 }
467
468 template<class... Args>
469 static void*
470 148 operator new(
471 std::size_t size,
472 any_buffer_source* wrapper,
473 Args&&...)
474 {
475 148 return wrapper->alloc_frame(size);
476 }
477
478 template<class... Args>
479 static void
480 operator delete(void*, any_buffer_source*, Args&&...) noexcept
481 {
482 }
483
484 static void
485 148 operator delete(void*, std::size_t) noexcept
486 {
487 148 }
488
489 void
490 92 set_executor(executor_ref ex) noexcept
491 {
492 92 executor_ = ex;
493 92 }
494
495 void
496 92 set_stop_token(std::stop_token token) noexcept
497 {
498 92 stop_token_ = token;
499 92 }
500
501 void
502 set_caller(coro h) noexcept
503 {
504 caller_h_ = h;
505 }
506
507 template<class Awaitable>
508 struct transform_awaiter
509 {
510 std::decay_t<Awaitable> a_;
511 promise_type* p_;
512
513 92 bool await_ready()
514 {
515 92 return a_.await_ready();
516 }
517
518 92 decltype(auto) await_resume()
519 {
520 92 return a_.await_resume();
521 }
522
523 auto await_suspend(coro h)
524 {
525 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
526 }
527 };
528
529 template<class Awaitable>
530 92 auto await_transform(Awaitable&& a)
531 {
532 using A = std::decay_t<Awaitable>;
533 if constexpr (IoAwaitable<A>)
534 {
535 return transform_awaiter<Awaitable>{
536 92 std::forward<Awaitable>(a), this};
537 }
538 else
539 {
540 static_assert(sizeof(A) == 0, "requires IoAwaitable");
541 }
542 }
543 };
544
545 std::coroutine_handle<promise_type> h_;
546
547 148 ~pull_op()
548 {
549
2/2
✓ Branch 1 taken 75 times.
✓ Branch 2 taken 73 times.
148 if(h_)
550 75 h_.destroy();
551 148 }
552
553 pull_op(pull_op const&) = delete;
554 pull_op& operator=(pull_op const&) = delete;
555
556 pull_op(pull_op&& other) noexcept
557 : h_(std::exchange(other.h_, nullptr))
558 {
559 }
560
561 pull_op& operator=(pull_op&& other) noexcept
562 {
563 if(this != &other)
564 {
565 if(h_)
566 h_.destroy();
567 h_ = std::exchange(other.h_, nullptr);
568 }
569 return *this;
570 }
571
572 private:
573 explicit
574 148 pull_op(std::coroutine_handle<promise_type> h) noexcept
575 148 : h_(h)
576 {
577 148 }
578 };
579
580 //----------------------------------------------------------
581
582 inline void*
583 148 any_buffer_source::alloc_frame(std::size_t size)
584 {
585
3/4
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 56 times.
✓ Branch 2 taken 92 times.
✗ Branch 3 not taken.
148 if(cached_frame_ && cached_size_ >= size)
586 92 return cached_frame_;
587
588
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 56 times.
56 if(cached_frame_)
589 ::operator delete(cached_frame_);
590
591 56 cached_frame_ = ::operator new(size);
592 56 cached_size_ = size;
593 56 return cached_frame_;
594 }
595
596 inline void
597 any_buffer_source::free_frame(void*, std::size_t)
598 {
599 // Keep the frame cached for reuse
600 }
601
602 #if defined(__GNUC__) && !defined(__clang__)
603 #pragma GCC diagnostic push
604 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
605 #endif
606
607 template<BufferSource S>
608 any_buffer_source::pull_op
609
1/1
✓ Branch 1 taken 148 times.
148 any_buffer_source::pull_coro(
610 any_buffer_source*,
611 S& source,
612 const_buffer* arr,
613 std::size_t max_count,
614 std::error_code* out_ec,
615 std::size_t* out_count)
616 {
617 auto [err, count] = co_await source.pull(arr, max_count);
618
619 *out_ec = err;
620 *out_count = count;
621 296 }
622
623 #if defined(__GNUC__) && !defined(__clang__)
624 #pragma GCC diagnostic pop
625 #endif
626
627 template<BufferSource S>
628 coro
629 92 any_buffer_source::do_pull_impl(
630 void* source,
631 any_buffer_source* wrapper,
632 const_buffer* arr,
633 std::size_t max_count,
634 coro h,
635 executor_ref ex,
636 std::stop_token token,
637 std::error_code* ec,
638 std::size_t* count)
639 {
640 92 auto& s = *static_cast<S*>(source);
641
642 // Create coroutine - frame is cached in wrapper
643
1/1
✓ Branch 1 taken 92 times.
92 auto op = pull_coro<S>(wrapper, s, arr, max_count, ec, count);
644
645 // Set executor and stop token on promise before resuming
646 92 op.h_.promise().set_executor(ex);
647 92 op.h_.promise().set_stop_token(token);
648
649 // Resume the coroutine to start the operation
650
1/1
✓ Branch 1 taken 73 times.
92 op.h_.resume();
651
652 // Check if operation completed synchronously
653
1/2
✓ Branch 1 taken 73 times.
✗ Branch 2 not taken.
73 if(op.h_.done())
654 {
655
1/1
✓ Branch 1 taken 73 times.
73 op.h_.destroy();
656 73 op.h_ = nullptr;
657
1/1
✓ Branch 1 taken 73 times.
73 return ex.dispatch(h);
658 }
659
660 // Operation is pending - caller will be resumed via symmetric transfer
661 op.h_.promise().set_caller(h);
662 op.h_ = nullptr;
663 return std::noop_coroutine();
664 92 }
665
666 //----------------------------------------------------------
667
668 inline void
669 39 any_buffer_source::consume(std::size_t n) noexcept
670 {
671 39 vt_->do_consume(source_, n);
672 39 }
673
674 inline auto
675 92 any_buffer_source::pull(
676 const_buffer* arr,
677 std::size_t max_count)
678 {
679 struct awaitable
680 {
681 any_buffer_source* self_;
682 const_buffer* arr_;
683 std::size_t max_count_;
684 std::error_code ec_;
685 std::size_t count_ = 0;
686
687 bool
688 92 await_ready() const noexcept
689 {
690 92 return false;
691 }
692
693 coro
694 92 await_suspend(coro h, executor_ref ex, std::stop_token token)
695 {
696 184 return self_->vt_->do_pull(
697
1/1
✓ Branch 1 taken 73 times.
92 self_->source_,
698 self_,
699 arr_,
700 max_count_,
701 h,
702 ex,
703 token,
704 &ec_,
705 146 &count_);
706 }
707
708 io_result<std::size_t>
709 73 await_resume() const noexcept
710 {
711 73 return {ec_, count_};
712 }
713 };
714 92 return awaitable{this, arr, max_count, {}, 0};
715 }
716
717 template<MutableBufferSequence MB>
718 task<io_result<std::size_t>>
719 any_buffer_source::read(MB buffers)
720 {
721 std::size_t total = 0;
722 auto dest = sans_prefix(buffers, 0);
723
724 while(!buffer_empty(dest))
725 {
726 const_buffer arr[detail::max_iovec_];
727 auto [ec, count] = co_await pull(arr, detail::max_iovec_);
728
729 if(ec)
730 co_return {ec, total};
731
732 if(count == 0)
733 co_return {error::eof, total};
734
735 auto n = buffer_copy(dest, std::span(arr, count));
736 consume(n);
737 total += n;
738 dest = sans_prefix(dest, n);
739 }
740
741 co_return {{}, total};
742 }
743
744 //----------------------------------------------------------
745
746 static_assert(BufferSource<any_buffer_source>);
747 static_assert(ReadSource<any_buffer_source>);
748
749 } // namespace capy
750 } // namespace boost
751
752 #endif
753