Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_param.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <boost/capy/concept/read_source.hpp>
18 : #include <boost/capy/coro.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/task.hpp>
22 :
23 : #include <system_error>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <span>
30 : #include <stop_token>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any ReadSource.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref ReadSource concept, enabling runtime polymorphism for
40 : source read operations. It uses a cached coroutine frame to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the source.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to source must outlive this wrapper.
48 :
49 : @par Frame Preallocation
50 : The constructor preallocates the internal coroutine frame.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Thread Safety
56 : Not thread-safe. Concurrent operations on the same wrapper
57 : are undefined behavior.
58 :
59 : @par Example
60 : @code
61 : // Owning - takes ownership of the source
62 : any_read_source rs(some_source{args...});
63 :
64 : // Reference - wraps without ownership
65 : some_source source;
66 : any_read_source rs(&source);
67 :
68 : mutable_buffer buf(data, size);
69 : auto [ec, n] = co_await rs.read(std::span(&buf, 1));
70 : @endcode
71 :
72 : @see any_read_stream, ReadSource
73 : */
74 : class any_read_source
75 : {
76 : struct vtable;
77 :
78 : template<ReadSource S>
79 : struct vtable_for_impl;
80 :
81 : struct read_op;
82 :
83 : void* source_ = nullptr;
84 : vtable const* vt_ = nullptr;
85 : void* cached_frame_ = nullptr;
86 : std::size_t cached_size_ = 0;
87 : void* storage_ = nullptr;
88 :
89 : template<ReadSource S>
90 : static coro
91 : do_read_impl(
92 : void* source,
93 : any_read_source* wrapper,
94 : std::span<mutable_buffer const> buffers,
95 : coro h,
96 : executor_ref ex,
97 : std::stop_token token,
98 : std::error_code* ec,
99 : std::size_t* n);
100 :
101 : template<ReadSource S>
102 : static read_op
103 : read_coro(
104 : any_read_source* wrapper,
105 : S& source,
106 : std::span<mutable_buffer const> bufs,
107 : std::error_code* out_ec,
108 : std::size_t* out_n);
109 :
110 : void* alloc_frame(std::size_t size);
111 : void free_frame(void* p, std::size_t size);
112 :
113 : public:
114 : /** Destructor.
115 :
116 : Destroys the owned source (if any) and releases the cached
117 : coroutine frame.
118 : */
119 : ~any_read_source();
120 :
121 : /** Default constructor.
122 :
123 : Constructs an empty wrapper. Operations on a default-constructed
124 : wrapper result in undefined behavior.
125 : */
126 : any_read_source() = default;
127 :
128 : /** Non-copyable.
129 :
130 : The frame cache is per-instance and cannot be shared.
131 : */
132 : any_read_source(any_read_source const&) = delete;
133 : any_read_source& operator=(any_read_source const&) = delete;
134 :
135 : /** Move constructor.
136 :
137 : Transfers ownership of the wrapped source (if owned) and
138 : cached frame from `other`. After the move, `other` is
139 : in a default-constructed state.
140 :
141 : @param other The wrapper to move from.
142 : */
143 1 : any_read_source(any_read_source&& other) noexcept
144 1 : : source_(std::exchange(other.source_, nullptr))
145 1 : , vt_(std::exchange(other.vt_, nullptr))
146 1 : , cached_frame_(std::exchange(other.cached_frame_, nullptr))
147 1 : , cached_size_(std::exchange(other.cached_size_, 0))
148 1 : , storage_(std::exchange(other.storage_, nullptr))
149 : {
150 1 : }
151 :
152 : /** Move assignment operator.
153 :
154 : Destroys any owned source and releases existing resources,
155 : then transfers ownership from `other`.
156 :
157 : @param other The wrapper to move from.
158 : @return Reference to this wrapper.
159 : */
160 : any_read_source&
161 : operator=(any_read_source&& other) noexcept;
162 :
163 : /** Construct by taking ownership of a ReadSource.
164 :
165 : Allocates storage and moves the source into this wrapper.
166 : The wrapper owns the source and will destroy it.
167 :
168 : @param s The source to take ownership of.
169 : */
170 : template<ReadSource S>
171 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
172 : any_read_source(S s);
173 :
174 : /** Construct by wrapping a ReadSource without ownership.
175 :
176 : Wraps the given source by pointer. The source must remain
177 : valid for the lifetime of this wrapper.
178 :
179 : @param s Pointer to the source to wrap.
180 : */
181 : template<ReadSource S>
182 88 : any_read_source(S* s) noexcept
183 88 : : source_(s)
184 88 : , vt_(&vtable_for_impl<S>::value)
185 : {
186 : // Preallocate the coroutine frame
187 88 : read_coro<S>(this, *s, {}, nullptr, nullptr);
188 88 : }
189 :
190 : /** Check if the wrapper contains a valid source.
191 :
192 : @return `true` if wrapping a source, `false` if default-constructed
193 : or moved-from.
194 : */
195 : bool
196 9 : has_value() const noexcept
197 : {
198 9 : return source_ != nullptr;
199 : }
200 :
201 : /** Check if the wrapper contains a valid source.
202 :
203 : @return `true` if wrapping a source, `false` if default-constructed
204 : or moved-from.
205 : */
206 : explicit
207 2 : operator bool() const noexcept
208 : {
209 2 : return has_value();
210 : }
211 :
212 : /** Initiate an asynchronous read operation.
213 :
214 : Reads data into the provided buffer sequence. The operation
215 : completes when the entire buffer sequence is filled, end-of-file
216 : is reached, or an error occurs.
217 :
218 : @param buffers The buffer sequence to read into. Passed by
219 : value to ensure the sequence lives in the coroutine frame
220 : across suspension points.
221 :
222 : @return An awaitable yielding `(error_code,std::size_t)`.
223 :
224 : @par Postconditions
225 : Exactly one of the following is true on return:
226 : @li **Success**: `!ec` and `n == buffer_size(buffers)`.
227 : The entire buffer was filled.
228 : @li **End-of-stream or Error**: `ec` and `n` indicates
229 : the number of bytes transferred before the failure.
230 :
231 : @par Preconditions
232 : The wrapper must contain a valid source (`has_value() == true`).
233 : */
234 : template<MutableBufferSequence MB>
235 : task<io_result<std::size_t>>
236 : read(MB buffers);
237 :
238 : protected:
239 : /** Rebind to a new source after move.
240 :
241 : Updates the internal pointer to reference a new source object.
242 : Used by owning wrappers after move assignment when the owned
243 : object has moved to a new location.
244 :
245 : @param new_source The new source to bind to. Must be the same
246 : type as the original source.
247 :
248 : @note Terminates if called with a source of different type
249 : than the original.
250 : */
251 : template<ReadSource S>
252 : void
253 : rebind(S& new_source) noexcept
254 : {
255 : if(vt_ != &vtable_for_impl<S>::value)
256 : std::terminate();
257 : source_ = &new_source;
258 : }
259 :
260 : private:
261 : auto
262 : read_some_(std::span<mutable_buffer const> buffers);
263 : };
264 :
265 : //----------------------------------------------------------
266 :
267 : struct any_read_source::vtable
268 : {
269 : void (*destroy)(void*) noexcept;
270 :
271 : coro (*do_read)(
272 : void* source,
273 : any_read_source* wrapper,
274 : std::span<mutable_buffer const> buffers,
275 : coro h,
276 : executor_ref ex,
277 : std::stop_token token,
278 : std::error_code* ec,
279 : std::size_t* n);
280 : };
281 :
282 : template<ReadSource S>
283 : struct any_read_source::vtable_for_impl
284 : {
285 : static void
286 0 : do_destroy_impl(void* source) noexcept
287 : {
288 0 : static_cast<S*>(source)->~S();
289 0 : }
290 :
291 : static constexpr vtable value = {
292 : &do_destroy_impl,
293 : &any_read_source::do_read_impl<S>
294 : };
295 : };
296 :
297 : //----------------------------------------------------------
298 :
299 : inline
300 91 : any_read_source::~any_read_source()
301 : {
302 91 : if(storage_)
303 : {
304 0 : vt_->destroy(source_);
305 0 : ::operator delete(storage_);
306 : }
307 91 : if(cached_frame_)
308 88 : ::operator delete(cached_frame_);
309 91 : }
310 :
311 : inline any_read_source&
312 1 : any_read_source::operator=(any_read_source&& other) noexcept
313 : {
314 1 : if(this != &other)
315 : {
316 1 : if(storage_)
317 : {
318 0 : vt_->destroy(source_);
319 0 : ::operator delete(storage_);
320 : }
321 1 : if(cached_frame_)
322 0 : ::operator delete(cached_frame_);
323 1 : source_ = std::exchange(other.source_, nullptr);
324 1 : vt_ = std::exchange(other.vt_, nullptr);
325 1 : cached_frame_ = std::exchange(other.cached_frame_, nullptr);
326 1 : cached_size_ = std::exchange(other.cached_size_, 0);
327 1 : storage_ = std::exchange(other.storage_, nullptr);
328 : }
329 1 : return *this;
330 : }
331 :
332 : template<ReadSource S>
333 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
334 : any_read_source::any_read_source(S s)
335 : : vt_(&vtable_for_impl<S>::value)
336 : {
337 : struct guard {
338 : any_read_source* self;
339 : bool committed = false;
340 : ~guard() {
341 : if(!committed && self->storage_) {
342 : self->vt_->destroy(self->source_);
343 : ::operator delete(self->storage_);
344 : self->storage_ = nullptr;
345 : self->source_ = nullptr;
346 : }
347 : }
348 : } g{this};
349 :
350 : storage_ = ::operator new(sizeof(S));
351 : source_ = ::new(storage_) S(std::move(s));
352 :
353 : // Preallocate the coroutine frame
354 : auto& ref = *static_cast<S*>(source_);
355 : read_coro<S>(this, ref, {}, nullptr, nullptr);
356 :
357 : g.committed = true;
358 : }
359 :
360 : //----------------------------------------------------------
361 :
362 : struct any_read_source::read_op
363 : {
364 : struct promise_type
365 : {
366 : executor_ref executor_;
367 : std::stop_token stop_token_;
368 : coro caller_h_{};
369 :
370 218 : promise_type() = default;
371 :
372 : read_op
373 218 : get_return_object() noexcept
374 : {
375 : return read_op{
376 218 : std::coroutine_handle<promise_type>::from_promise(*this)};
377 : }
378 :
379 : std::suspend_always
380 218 : initial_suspend() noexcept
381 : {
382 218 : return {};
383 : }
384 :
385 : auto
386 99 : final_suspend() noexcept
387 : {
388 : struct awaiter
389 : {
390 : promise_type* p_;
391 :
392 99 : bool await_ready() const noexcept { return false; }
393 :
394 99 : coro await_suspend(coro) const noexcept
395 : {
396 99 : if(p_->caller_h_)
397 0 : return p_->caller_h_;
398 99 : return std::noop_coroutine();
399 : }
400 :
401 0 : void await_resume() const noexcept {}
402 : };
403 99 : return awaiter{this};
404 : }
405 :
406 : void
407 99 : return_void() noexcept
408 : {
409 99 : }
410 :
411 : void
412 31 : unhandled_exception()
413 : {
414 31 : throw;
415 : }
416 :
417 : template<class... Args>
418 : static void*
419 218 : operator new(
420 : std::size_t size,
421 : any_read_source* wrapper,
422 : Args&&...)
423 : {
424 218 : return wrapper->alloc_frame(size);
425 : }
426 :
427 : template<class... Args>
428 : static void
429 : operator delete(void*, any_read_source*, Args&&...) noexcept
430 : {
431 : }
432 :
433 : static void
434 218 : operator delete(void*, std::size_t) noexcept
435 : {
436 218 : }
437 :
438 : void
439 130 : set_executor(executor_ref ex) noexcept
440 : {
441 130 : executor_ = ex;
442 130 : }
443 :
444 : void
445 130 : set_stop_token(std::stop_token token) noexcept
446 : {
447 130 : stop_token_ = token;
448 130 : }
449 :
450 : void
451 0 : set_caller(coro h) noexcept
452 : {
453 0 : caller_h_ = h;
454 0 : }
455 :
456 : template<class Awaitable>
457 : struct transform_awaiter
458 : {
459 : std::decay_t<Awaitable> a_;
460 : promise_type* p_;
461 :
462 130 : bool await_ready()
463 : {
464 130 : return a_.await_ready();
465 : }
466 :
467 130 : decltype(auto) await_resume()
468 : {
469 130 : return a_.await_resume();
470 : }
471 :
472 0 : auto await_suspend(coro h)
473 : {
474 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
475 : }
476 : };
477 :
478 : template<class Awaitable>
479 130 : auto await_transform(Awaitable&& a)
480 : {
481 : using A = std::decay_t<Awaitable>;
482 : if constexpr (IoAwaitable<A>)
483 : {
484 : return transform_awaiter<Awaitable>{
485 130 : std::forward<Awaitable>(a), this};
486 : }
487 : else
488 : {
489 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
490 : }
491 : }
492 : };
493 :
494 : std::coroutine_handle<promise_type> h_;
495 :
496 218 : ~read_op()
497 : {
498 218 : if(h_)
499 119 : h_.destroy();
500 218 : }
501 :
502 : read_op(read_op const&) = delete;
503 : read_op& operator=(read_op const&) = delete;
504 :
505 : read_op(read_op&& other) noexcept
506 : : h_(std::exchange(other.h_, nullptr))
507 : {
508 : }
509 :
510 : read_op& operator=(read_op&& other) noexcept
511 : {
512 : if(this != &other)
513 : {
514 : if(h_)
515 : h_.destroy();
516 : h_ = std::exchange(other.h_, nullptr);
517 : }
518 : return *this;
519 : }
520 :
521 : private:
522 : explicit
523 218 : read_op(std::coroutine_handle<promise_type> h) noexcept
524 218 : : h_(h)
525 : {
526 218 : }
527 : };
528 :
529 : //----------------------------------------------------------
530 :
531 : inline void*
532 218 : any_read_source::alloc_frame(std::size_t size)
533 : {
534 218 : if(cached_frame_ && cached_size_ >= size)
535 130 : return cached_frame_;
536 :
537 88 : if(cached_frame_)
538 0 : ::operator delete(cached_frame_);
539 :
540 88 : cached_frame_ = ::operator new(size);
541 88 : cached_size_ = size;
542 88 : return cached_frame_;
543 : }
544 :
545 : inline void
546 : any_read_source::free_frame(void*, std::size_t)
547 : {
548 : // Keep the frame cached for reuse
549 : }
550 :
551 : #if defined(__GNUC__) && !defined(__clang__)
552 : #pragma GCC diagnostic push
553 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
554 : #endif
555 :
556 : template<ReadSource S>
557 : any_read_source::read_op
558 218 : any_read_source::read_coro(
559 : any_read_source*,
560 : S& source,
561 : std::span<mutable_buffer const> bufs,
562 : std::error_code* out_ec,
563 : std::size_t* out_n)
564 : {
565 : auto [err, bytes] = co_await source.read(bufs);
566 :
567 : *out_ec = err;
568 : *out_n = bytes;
569 436 : }
570 :
571 : #if defined(__GNUC__) && !defined(__clang__)
572 : #pragma GCC diagnostic pop
573 : #endif
574 :
575 : template<ReadSource S>
576 : coro
577 130 : any_read_source::do_read_impl(
578 : void* source,
579 : any_read_source* wrapper,
580 : std::span<mutable_buffer const> buffers,
581 : coro h,
582 : executor_ref ex,
583 : std::stop_token token,
584 : std::error_code* ec,
585 : std::size_t* n)
586 : {
587 130 : auto& s = *static_cast<S*>(source);
588 :
589 : // Create coroutine - frame is cached in wrapper
590 130 : auto op = read_coro<S>(wrapper, s, buffers, ec, n);
591 :
592 : // Set executor and stop token on promise before resuming
593 130 : op.h_.promise().set_executor(ex);
594 130 : op.h_.promise().set_stop_token(token);
595 :
596 : // Resume the coroutine to start the operation
597 130 : op.h_.resume();
598 :
599 : // Check if operation completed synchronously
600 99 : if(op.h_.done())
601 : {
602 99 : op.h_.destroy();
603 99 : op.h_ = nullptr;
604 99 : return ex.dispatch(h);
605 : }
606 :
607 : // Operation is pending - caller will be resumed via symmetric transfer
608 0 : op.h_.promise().set_caller(h);
609 0 : op.h_ = nullptr;
610 0 : return std::noop_coroutine();
611 130 : }
612 :
613 : //----------------------------------------------------------
614 :
615 : inline auto
616 130 : any_read_source::read_some_(std::span<mutable_buffer const> buffers)
617 : {
618 : struct awaitable
619 : {
620 : any_read_source* self_;
621 : std::span<mutable_buffer const> buffers_;
622 : std::error_code ec_;
623 : std::size_t n_ = 0;
624 :
625 : bool
626 130 : await_ready() const noexcept
627 : {
628 130 : return false;
629 : }
630 :
631 : coro
632 130 : await_suspend(coro h, executor_ref ex, std::stop_token token)
633 : {
634 260 : return self_->vt_->do_read(
635 130 : self_->source_,
636 : self_,
637 : buffers_,
638 : h,
639 : ex,
640 : token,
641 : &ec_,
642 198 : &n_);
643 : }
644 :
645 : io_result<std::size_t>
646 99 : await_resume() const noexcept
647 : {
648 99 : return {ec_, n_};
649 : }
650 : };
651 130 : return awaitable{this, buffers, {}, 0};
652 : }
653 :
654 : template<MutableBufferSequence MB>
655 : task<io_result<std::size_t>>
656 106 : any_read_source::read(MB buffers)
657 : {
658 : buffer_param<MB> bp(std::move(buffers));
659 : std::size_t total = 0;
660 :
661 : for(;;)
662 : {
663 : auto bufs = bp.data();
664 : if(bufs.empty())
665 : break;
666 :
667 : auto [ec, n] = co_await read_some_(bufs);
668 : total += n;
669 : if(ec)
670 : co_return {ec, total};
671 : bp.consume(n);
672 : }
673 :
674 : co_return {{}, total};
675 212 : }
676 :
677 : } // namespace capy
678 : } // namespace boost
679 :
680 : #endif
|