Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_copy.hpp>
16 : #include <boost/capy/buffers/slice.hpp>
17 : #include <boost/capy/concept/buffer_source.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/read_source.hpp>
20 : #include <boost/capy/coro.hpp>
21 : #include <boost/capy/error.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <boost/capy/io_result.hpp>
24 : #include <boost/capy/task.hpp>
25 :
26 : #include <system_error>
27 :
28 : #include <concepts>
29 : #include <coroutine>
30 : #include <cstddef>
31 : #include <exception>
32 : #include <span>
33 : #include <stop_token>
34 : #include <utility>
35 :
36 : namespace boost {
37 : namespace capy {
38 :
39 : /** Type-erased wrapper for any BufferSource.
40 :
41 : This class provides type erasure for any type satisfying the
42 : @ref BufferSource concept, enabling runtime polymorphism for
43 : buffer pull operations. The wrapper also satisfies @ref ReadSource,
44 : allowing it to be used with code expecting either interface.
45 : It uses a cached coroutine frame to achieve zero steady-state
46 : allocation after construction.
47 :
48 : The wrapper also satisfies @ref ReadSource through the templated
49 : @ref read method. This method copies data from the source's
50 : internal buffers into the caller's buffers, incurring one extra
51 : buffer copy compared to using @ref pull and @ref consume directly.
52 :
53 : The wrapper supports two construction modes:
54 : - **Owning**: Pass by value to transfer ownership. The wrapper
55 : allocates storage and owns the source.
56 : - **Reference**: Pass a pointer to wrap without ownership. The
57 : pointed-to source must outlive this wrapper.
58 :
59 : @par Frame Preallocation
60 : The constructor preallocates the internal coroutine frame.
61 : This reserves all virtual address space at server startup
62 : so memory usage can be measured up front, rather than
63 : allocating piecemeal as traffic arrives.
64 :
65 : @par Thread Safety
66 : Not thread-safe. Concurrent operations on the same wrapper
67 : are undefined behavior.
68 :
69 : @par Example
70 : @code
71 : // Owning - takes ownership of the source
72 : any_buffer_source abs(some_buffer_source{args...});
73 :
74 : // Reference - wraps without ownership
75 : some_buffer_source src;
76 : any_buffer_source abs(&src);
77 :
78 : const_buffer arr[16];
79 : auto [ec, count] = co_await abs.pull(arr, 16);
80 : @endcode
81 :
82 : @see any_write_sink, BufferSource, ReadSource
83 : */
84 : class any_buffer_source
85 : {
86 : struct vtable;
87 :
88 : template<BufferSource S>
89 : struct vtable_for_impl;
90 :
91 : struct pull_op;
92 :
93 : void* source_ = nullptr;
94 : vtable const* vt_ = nullptr;
95 : void* cached_frame_ = nullptr;
96 : std::size_t cached_size_ = 0;
97 : void* storage_ = nullptr;
98 :
99 : template<BufferSource S>
100 : static coro
101 : do_pull_impl(
102 : void* source,
103 : any_buffer_source* wrapper,
104 : const_buffer* arr,
105 : std::size_t max_count,
106 : coro h,
107 : executor_ref ex,
108 : std::stop_token token,
109 : std::error_code* ec,
110 : std::size_t* count);
111 :
112 : template<BufferSource S>
113 : static pull_op
114 : pull_coro(
115 : any_buffer_source* wrapper,
116 : S& source,
117 : const_buffer* arr,
118 : std::size_t max_count,
119 : std::error_code* out_ec,
120 : std::size_t* out_count);
121 :
122 : void* alloc_frame(std::size_t size);
123 : void free_frame(void* p, std::size_t size);
124 :
125 : public:
126 : /** Destructor.
127 :
128 : Destroys the owned source (if any) and releases the cached
129 : coroutine frame.
130 : */
131 : ~any_buffer_source();
132 :
133 : /** Default constructor.
134 :
135 : Constructs an empty wrapper. Operations on a default-constructed
136 : wrapper result in undefined behavior.
137 : */
138 : any_buffer_source() = default;
139 :
140 : /** Non-copyable.
141 :
142 : The frame cache is per-instance and cannot be shared.
143 : */
144 : any_buffer_source(any_buffer_source const&) = delete;
145 : any_buffer_source& operator=(any_buffer_source const&) = delete;
146 :
147 : /** Move constructor.
148 :
149 : Transfers ownership of the wrapped source (if owned) and
150 : cached frame from `other`. After the move, `other` is
151 : in a default-constructed state.
152 :
153 : @param other The wrapper to move from.
154 : */
155 1 : any_buffer_source(any_buffer_source&& other) noexcept
156 1 : : source_(std::exchange(other.source_, nullptr))
157 1 : , vt_(std::exchange(other.vt_, nullptr))
158 1 : , cached_frame_(std::exchange(other.cached_frame_, nullptr))
159 1 : , cached_size_(std::exchange(other.cached_size_, 0))
160 1 : , storage_(std::exchange(other.storage_, nullptr))
161 : {
162 1 : }
163 :
164 : /** Move assignment operator.
165 :
166 : Destroys any owned source and releases existing resources,
167 : then transfers ownership from `other`.
168 :
169 : @param other The wrapper to move from.
170 : @return Reference to this wrapper.
171 : */
172 : any_buffer_source&
173 : operator=(any_buffer_source&& other) noexcept;
174 :
175 : /** Construct by taking ownership of a BufferSource.
176 :
177 : Allocates storage and moves the source into this wrapper.
178 : The wrapper owns the source and will destroy it.
179 :
180 : @param s The source to take ownership of.
181 : */
182 : template<BufferSource S>
183 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
184 : any_buffer_source(S s);
185 :
186 : /** Construct by wrapping a BufferSource without ownership.
187 :
188 : Wraps the given source by pointer. The source must remain
189 : valid for the lifetime of this wrapper.
190 :
191 : @param s Pointer to the source to wrap.
192 : */
193 : template<BufferSource S>
194 56 : any_buffer_source(S* s) noexcept
195 56 : : source_(s)
196 56 : , vt_(&vtable_for_impl<S>::value)
197 : {
198 : // Preallocate coroutine frame to find max size
199 56 : pull_coro<S>(this, *s, nullptr, 0, nullptr, nullptr);
200 56 : }
201 :
202 : /** Check if the wrapper contains a valid source.
203 :
204 : @return `true` if wrapping a source, `false` if default-constructed
205 : or moved-from.
206 : */
207 : bool
208 9 : has_value() const noexcept
209 : {
210 9 : return source_ != nullptr;
211 : }
212 :
213 : /** Check if the wrapper contains a valid source.
214 :
215 : @return `true` if wrapping a source, `false` if default-constructed
216 : or moved-from.
217 : */
218 : explicit
219 2 : operator bool() const noexcept
220 : {
221 2 : return has_value();
222 : }
223 :
224 : /** Consume bytes from the source.
225 :
226 : Advances the internal read position of the underlying source
227 : by the specified number of bytes. The next call to @ref pull
228 : returns data starting after the consumed bytes.
229 :
230 : @param n The number of bytes to consume. Must not exceed the
231 : total size of buffers returned by the previous @ref pull.
232 :
233 : @par Preconditions
234 : The wrapper must contain a valid source (`has_value() == true`).
235 : */
236 : void
237 : consume(std::size_t n) noexcept;
238 :
239 : /** Pull buffer data from the source.
240 :
241 : Fills the provided array with buffer descriptors from the
242 : underlying source. The operation completes when data is
243 : available, the source is exhausted, or an error occurs.
244 :
245 : @param arr Pointer to array of const_buffer to fill.
246 : @param max_count Maximum number of buffers to fill.
247 :
248 : @return An awaitable yielding `(error_code,std::size_t)`.
249 : On success with data, `count > 0` indicates buffers filled.
250 : On success with `count == 0`, source is exhausted.
251 :
252 : @par Preconditions
253 : The wrapper must contain a valid source (`has_value() == true`).
254 : */
255 : auto
256 : pull(const_buffer* arr, std::size_t max_count);
257 :
258 : /** Read data into a mutable buffer sequence.
259 :
260 : Fills the provided buffer sequence by pulling data from the
261 : underlying source and copying it into the caller's buffers.
262 : This satisfies @ref ReadSource but incurs a copy; for zero-copy
263 : access, use @ref pull and @ref consume instead.
264 :
265 : @note This operation copies data from the source's internal
266 : buffers into the caller's buffers. For zero-copy reads,
267 : use @ref pull and @ref consume directly.
268 :
269 : @param buffers The buffer sequence to fill.
270 :
271 : @return An awaitable yielding `(error_code,std::size_t)`.
272 : On success, `n == buffer_size(buffers)`.
273 : On EOF, `ec == error::eof` and `n` is bytes transferred.
274 :
275 : @par Preconditions
276 : The wrapper must contain a valid source (`has_value() == true`).
277 :
278 : @see pull, consume
279 : */
280 : template<MutableBufferSequence MB>
281 : task<io_result<std::size_t>>
282 : read(MB buffers);
283 :
284 : protected:
285 : /** Rebind to a new source after move.
286 :
287 : Updates the internal pointer to reference a new source object.
288 : Used by owning wrappers after move assignment when the owned
289 : object has moved to a new location.
290 :
291 : @param new_source The new source to bind to. Must be the same
292 : type as the original source.
293 :
294 : @note Terminates if called with a source of different type
295 : than the original.
296 : */
297 : template<BufferSource S>
298 : void
299 : rebind(S& new_source) noexcept
300 : {
301 : if(vt_ != &vtable_for_impl<S>::value)
302 : std::terminate();
303 : source_ = &new_source;
304 : }
305 : };
306 :
307 : //----------------------------------------------------------
308 :
309 : struct any_buffer_source::vtable
310 : {
311 : void (*destroy)(void*) noexcept;
312 :
313 : coro (*do_pull)(
314 : void* source,
315 : any_buffer_source* wrapper,
316 : const_buffer* arr,
317 : std::size_t max_count,
318 : coro h,
319 : executor_ref ex,
320 : std::stop_token token,
321 : std::error_code* ec,
322 : std::size_t* count);
323 : void (*do_consume)(void* source, std::size_t n) noexcept;
324 : };
325 :
326 : template<BufferSource S>
327 : struct any_buffer_source::vtable_for_impl
328 : {
329 : static void
330 0 : do_destroy_impl(void* source) noexcept
331 : {
332 0 : static_cast<S*>(source)->~S();
333 0 : }
334 :
335 : static void
336 39 : do_consume_impl(void* source, std::size_t n) noexcept
337 : {
338 39 : static_cast<S*>(source)->consume(n);
339 39 : }
340 :
341 : static constexpr vtable value = {
342 : &do_destroy_impl,
343 : &any_buffer_source::do_pull_impl<S>,
344 : &do_consume_impl
345 : };
346 : };
347 :
348 : //----------------------------------------------------------
349 :
350 : inline
351 59 : any_buffer_source::~any_buffer_source()
352 : {
353 59 : if(storage_)
354 : {
355 0 : vt_->destroy(source_);
356 0 : ::operator delete(storage_);
357 : }
358 59 : if(cached_frame_)
359 56 : ::operator delete(cached_frame_);
360 59 : }
361 :
362 : inline any_buffer_source&
363 1 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
364 : {
365 1 : if(this != &other)
366 : {
367 1 : if(storage_)
368 : {
369 0 : vt_->destroy(source_);
370 0 : ::operator delete(storage_);
371 : }
372 1 : if(cached_frame_)
373 0 : ::operator delete(cached_frame_);
374 1 : source_ = std::exchange(other.source_, nullptr);
375 1 : vt_ = std::exchange(other.vt_, nullptr);
376 1 : cached_frame_ = std::exchange(other.cached_frame_, nullptr);
377 1 : cached_size_ = std::exchange(other.cached_size_, 0);
378 1 : storage_ = std::exchange(other.storage_, nullptr);
379 : }
380 1 : return *this;
381 : }
382 :
383 : template<BufferSource S>
384 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
385 : any_buffer_source::any_buffer_source(S s)
386 : : vt_(&vtable_for_impl<S>::value)
387 : {
388 : struct guard {
389 : any_buffer_source* self;
390 : bool committed = false;
391 : ~guard() {
392 : if(!committed && self->storage_) {
393 : self->vt_->destroy(self->source_);
394 : ::operator delete(self->storage_);
395 : self->storage_ = nullptr;
396 : self->source_ = nullptr;
397 : }
398 : }
399 : } g{this};
400 :
401 : storage_ = ::operator new(sizeof(S));
402 : source_ = ::new(storage_) S(std::move(s));
403 :
404 : // Preallocate coroutine frame to find max size
405 : auto& ref = *static_cast<S*>(source_);
406 : pull_coro<S>(this, ref, nullptr, 0, nullptr, nullptr);
407 :
408 : g.committed = true;
409 : }
410 :
411 : //----------------------------------------------------------
412 :
413 : struct any_buffer_source::pull_op
414 : {
415 : struct promise_type
416 : {
417 : executor_ref executor_;
418 : std::stop_token stop_token_;
419 : coro caller_h_{};
420 :
421 148 : promise_type() = default;
422 :
423 : pull_op
424 148 : get_return_object() noexcept
425 : {
426 : return pull_op{
427 148 : std::coroutine_handle<promise_type>::from_promise(*this)};
428 : }
429 :
430 : std::suspend_always
431 148 : initial_suspend() noexcept
432 : {
433 148 : return {};
434 : }
435 :
436 : auto
437 73 : final_suspend() noexcept
438 : {
439 : struct awaiter
440 : {
441 : promise_type* p_;
442 :
443 73 : bool await_ready() const noexcept { return false; }
444 :
445 73 : coro await_suspend(coro) const noexcept
446 : {
447 73 : if(p_->caller_h_)
448 0 : return p_->caller_h_;
449 73 : return std::noop_coroutine();
450 : }
451 :
452 0 : void await_resume() const noexcept {}
453 : };
454 73 : return awaiter{this};
455 : }
456 :
457 : void
458 73 : return_void() noexcept
459 : {
460 73 : }
461 :
462 : void
463 19 : unhandled_exception()
464 : {
465 19 : throw;
466 : }
467 :
468 : template<class... Args>
469 : static void*
470 148 : operator new(
471 : std::size_t size,
472 : any_buffer_source* wrapper,
473 : Args&&...)
474 : {
475 148 : return wrapper->alloc_frame(size);
476 : }
477 :
478 : template<class... Args>
479 : static void
480 : operator delete(void*, any_buffer_source*, Args&&...) noexcept
481 : {
482 : }
483 :
484 : static void
485 148 : operator delete(void*, std::size_t) noexcept
486 : {
487 148 : }
488 :
489 : void
490 92 : set_executor(executor_ref ex) noexcept
491 : {
492 92 : executor_ = ex;
493 92 : }
494 :
495 : void
496 92 : set_stop_token(std::stop_token token) noexcept
497 : {
498 92 : stop_token_ = token;
499 92 : }
500 :
501 : void
502 0 : set_caller(coro h) noexcept
503 : {
504 0 : caller_h_ = h;
505 0 : }
506 :
507 : template<class Awaitable>
508 : struct transform_awaiter
509 : {
510 : std::decay_t<Awaitable> a_;
511 : promise_type* p_;
512 :
513 92 : bool await_ready()
514 : {
515 92 : return a_.await_ready();
516 : }
517 :
518 92 : decltype(auto) await_resume()
519 : {
520 92 : return a_.await_resume();
521 : }
522 :
523 0 : auto await_suspend(coro h)
524 : {
525 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
526 : }
527 : };
528 :
529 : template<class Awaitable>
530 92 : auto await_transform(Awaitable&& a)
531 : {
532 : using A = std::decay_t<Awaitable>;
533 : if constexpr (IoAwaitable<A>)
534 : {
535 : return transform_awaiter<Awaitable>{
536 92 : std::forward<Awaitable>(a), this};
537 : }
538 : else
539 : {
540 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
541 : }
542 : }
543 : };
544 :
545 : std::coroutine_handle<promise_type> h_;
546 :
547 148 : ~pull_op()
548 : {
549 148 : if(h_)
550 75 : h_.destroy();
551 148 : }
552 :
553 : pull_op(pull_op const&) = delete;
554 : pull_op& operator=(pull_op const&) = delete;
555 :
556 : pull_op(pull_op&& other) noexcept
557 : : h_(std::exchange(other.h_, nullptr))
558 : {
559 : }
560 :
561 : pull_op& operator=(pull_op&& other) noexcept
562 : {
563 : if(this != &other)
564 : {
565 : if(h_)
566 : h_.destroy();
567 : h_ = std::exchange(other.h_, nullptr);
568 : }
569 : return *this;
570 : }
571 :
572 : private:
573 : explicit
574 148 : pull_op(std::coroutine_handle<promise_type> h) noexcept
575 148 : : h_(h)
576 : {
577 148 : }
578 : };
579 :
580 : //----------------------------------------------------------
581 :
582 : inline void*
583 148 : any_buffer_source::alloc_frame(std::size_t size)
584 : {
585 148 : if(cached_frame_ && cached_size_ >= size)
586 92 : return cached_frame_;
587 :
588 56 : if(cached_frame_)
589 0 : ::operator delete(cached_frame_);
590 :
591 56 : cached_frame_ = ::operator new(size);
592 56 : cached_size_ = size;
593 56 : return cached_frame_;
594 : }
595 :
596 : inline void
597 : any_buffer_source::free_frame(void*, std::size_t)
598 : {
599 : // Keep the frame cached for reuse
600 : }
601 :
602 : #if defined(__GNUC__) && !defined(__clang__)
603 : #pragma GCC diagnostic push
604 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
605 : #endif
606 :
607 : template<BufferSource S>
608 : any_buffer_source::pull_op
609 148 : any_buffer_source::pull_coro(
610 : any_buffer_source*,
611 : S& source,
612 : const_buffer* arr,
613 : std::size_t max_count,
614 : std::error_code* out_ec,
615 : std::size_t* out_count)
616 : {
617 : auto [err, count] = co_await source.pull(arr, max_count);
618 :
619 : *out_ec = err;
620 : *out_count = count;
621 296 : }
622 :
623 : #if defined(__GNUC__) && !defined(__clang__)
624 : #pragma GCC diagnostic pop
625 : #endif
626 :
627 : template<BufferSource S>
628 : coro
629 92 : any_buffer_source::do_pull_impl(
630 : void* source,
631 : any_buffer_source* wrapper,
632 : const_buffer* arr,
633 : std::size_t max_count,
634 : coro h,
635 : executor_ref ex,
636 : std::stop_token token,
637 : std::error_code* ec,
638 : std::size_t* count)
639 : {
640 92 : auto& s = *static_cast<S*>(source);
641 :
642 : // Create coroutine - frame is cached in wrapper
643 92 : auto op = pull_coro<S>(wrapper, s, arr, max_count, ec, count);
644 :
645 : // Set executor and stop token on promise before resuming
646 92 : op.h_.promise().set_executor(ex);
647 92 : op.h_.promise().set_stop_token(token);
648 :
649 : // Resume the coroutine to start the operation
650 92 : op.h_.resume();
651 :
652 : // Check if operation completed synchronously
653 73 : if(op.h_.done())
654 : {
655 73 : op.h_.destroy();
656 73 : op.h_ = nullptr;
657 73 : return ex.dispatch(h);
658 : }
659 :
660 : // Operation is pending - caller will be resumed via symmetric transfer
661 0 : op.h_.promise().set_caller(h);
662 0 : op.h_ = nullptr;
663 0 : return std::noop_coroutine();
664 92 : }
665 :
666 : //----------------------------------------------------------
667 :
668 : inline void
669 39 : any_buffer_source::consume(std::size_t n) noexcept
670 : {
671 39 : vt_->do_consume(source_, n);
672 39 : }
673 :
674 : inline auto
675 92 : any_buffer_source::pull(
676 : const_buffer* arr,
677 : std::size_t max_count)
678 : {
679 : struct awaitable
680 : {
681 : any_buffer_source* self_;
682 : const_buffer* arr_;
683 : std::size_t max_count_;
684 : std::error_code ec_;
685 : std::size_t count_ = 0;
686 :
687 : bool
688 92 : await_ready() const noexcept
689 : {
690 92 : return false;
691 : }
692 :
693 : coro
694 92 : await_suspend(coro h, executor_ref ex, std::stop_token token)
695 : {
696 184 : return self_->vt_->do_pull(
697 92 : self_->source_,
698 : self_,
699 : arr_,
700 : max_count_,
701 : h,
702 : ex,
703 : token,
704 : &ec_,
705 146 : &count_);
706 : }
707 :
708 : io_result<std::size_t>
709 73 : await_resume() const noexcept
710 : {
711 73 : return {ec_, count_};
712 : }
713 : };
714 92 : return awaitable{this, arr, max_count, {}, 0};
715 : }
716 :
717 : template<MutableBufferSequence MB>
718 : task<io_result<std::size_t>>
719 : any_buffer_source::read(MB buffers)
720 : {
721 : std::size_t total = 0;
722 : auto dest = sans_prefix(buffers, 0);
723 :
724 : while(!buffer_empty(dest))
725 : {
726 : const_buffer arr[detail::max_iovec_];
727 : auto [ec, count] = co_await pull(arr, detail::max_iovec_);
728 :
729 : if(ec)
730 : co_return {ec, total};
731 :
732 : if(count == 0)
733 : co_return {error::eof, total};
734 :
735 : auto n = buffer_copy(dest, std::span(arr, count));
736 : consume(n);
737 : total += n;
738 : dest = sans_prefix(dest, n);
739 : }
740 :
741 : co_return {{}, total};
742 : }
743 :
744 : //----------------------------------------------------------
745 :
746 : static_assert(BufferSource<any_buffer_source>);
747 : static_assert(ReadSource<any_buffer_source>);
748 :
749 : } // namespace capy
750 : } // namespace boost
751 :
752 : #endif
|