Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_param.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <boost/capy/concept/read_stream.hpp>
18 : #include <boost/capy/coro.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/io_result.hpp>
21 :
22 : #include <system_error>
23 :
24 : #include <concepts>
25 : #include <coroutine>
26 : #include <cstddef>
27 : #include <exception>
28 : #include <span>
29 : #include <stop_token>
30 : #include <utility>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : /** Type-erased wrapper for any ReadStream.
36 :
37 : This class provides type erasure for any type satisfying the
38 : @ref ReadStream concept, enabling runtime polymorphism for
39 : read operations. It uses a cached coroutine frame to achieve
40 : zero steady-state allocation after construction.
41 :
42 : The wrapper supports two construction modes:
43 : - **Owning**: Pass by value to transfer ownership. The wrapper
44 : allocates storage and owns the stream.
45 : - **Reference**: Pass a pointer to wrap without ownership. The
46 : pointed-to stream must outlive this wrapper.
47 :
48 : @par Frame Preallocation
49 : The constructor preallocates the internal coroutine frame.
50 : This reserves all virtual address space at server startup
51 : so memory usage can be measured up front, rather than
52 : allocating piecemeal as traffic arrives.
53 :
54 : @par Thread Safety
55 : Not thread-safe. Concurrent operations on the same wrapper
56 : are undefined behavior.
57 :
58 : @par Example
59 : @code
60 : // Owning - takes ownership of the stream
61 : any_read_stream stream(socket{ioc});
62 :
63 : // Reference - wraps without ownership
64 : socket sock(ioc);
65 : any_read_stream stream(&sock);
66 :
67 : mutable_buffer buf(data, size);
68 : auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
69 : @endcode
70 :
71 : @see any_write_stream, any_stream, ReadStream
72 : */
73 : class any_read_stream
74 : {
75 : struct vtable;
76 :
77 : template<ReadStream S>
78 : struct vtable_for_impl;
79 :
80 : struct read_op;
81 :
82 : void* stream_ = nullptr;
83 : vtable const* vt_ = nullptr;
84 : void* cached_frame_ = nullptr;
85 : std::size_t cached_size_ = 0;
86 : void* storage_ = nullptr;
87 :
88 : template<ReadStream S>
89 : static coro
90 : do_read_impl(
91 : void* stream,
92 : any_read_stream* wrapper,
93 : std::span<mutable_buffer const> buffers,
94 : coro h,
95 : executor_ref ex,
96 : std::stop_token token,
97 : std::error_code* ec,
98 : std::size_t* n);
99 :
100 : template<ReadStream S>
101 : static read_op
102 : read_coro(
103 : any_read_stream* wrapper,
104 : S& stream,
105 : std::span<mutable_buffer const> bufs,
106 : std::error_code* out_ec,
107 : std::size_t* out_n);
108 :
109 : void* alloc_frame(std::size_t size);
110 : void free_frame(void* p, std::size_t size);
111 :
112 : public:
113 : /** Destructor.
114 :
115 : Destroys the owned stream (if any) and releases the cached
116 : coroutine frame.
117 : */
118 : ~any_read_stream();
119 :
120 : /** Default constructor.
121 :
122 : Constructs an empty wrapper. Operations on a default-constructed
123 : wrapper result in undefined behavior.
124 : */
125 1 : any_read_stream() = default;
126 :
127 : /** Non-copyable.
128 :
129 : The frame cache is per-instance and cannot be shared.
130 : */
131 : any_read_stream(any_read_stream const&) = delete;
132 : any_read_stream& operator=(any_read_stream const&) = delete;
133 :
134 : /** Move constructor.
135 :
136 : Transfers ownership of the wrapped stream (if owned) and
137 : cached frame from `other`. After the move, `other` is
138 : in a default-constructed state.
139 :
140 : @param other The wrapper to move from.
141 : */
142 2 : any_read_stream(any_read_stream&& other) noexcept
143 2 : : stream_(std::exchange(other.stream_, nullptr))
144 2 : , vt_(std::exchange(other.vt_, nullptr))
145 2 : , cached_frame_(std::exchange(other.cached_frame_, nullptr))
146 2 : , cached_size_(std::exchange(other.cached_size_, 0))
147 2 : , storage_(std::exchange(other.storage_, nullptr))
148 : {
149 2 : }
150 :
151 : /** Move assignment operator.
152 :
153 : Destroys any owned stream and releases existing resources,
154 : then transfers ownership from `other`.
155 :
156 : @param other The wrapper to move from.
157 : @return Reference to this wrapper.
158 : */
159 : any_read_stream&
160 : operator=(any_read_stream&& other) noexcept;
161 :
162 : /** Construct by taking ownership of a ReadStream.
163 :
164 : Allocates storage and moves the stream into this wrapper.
165 : The wrapper owns the stream and will destroy it.
166 :
167 : @param s The stream to take ownership of.
168 : */
169 : template<ReadStream S>
170 : requires (!std::same_as<std::decay_t<S>, any_read_stream>)
171 : any_read_stream(S s);
172 :
173 : /** Construct by wrapping a ReadStream without ownership.
174 :
175 : Wraps the given stream by pointer. The stream must remain
176 : valid for the lifetime of this wrapper.
177 :
178 : @param s Pointer to the stream to wrap.
179 : */
180 : template<ReadStream S>
181 71 : any_read_stream(S* s) noexcept
182 71 : : stream_(s)
183 71 : , vt_(&vtable_for_impl<S>::value)
184 : {
185 : // Preallocate the coroutine frame
186 71 : read_coro<S>(this, *s, {}, nullptr, nullptr);
187 71 : }
188 : /** Check if the wrapper contains a valid stream.
189 :
190 : @return `true` if wrapping a stream, `false` if default-constructed
191 : or moved-from.
192 : */
193 : bool
194 19 : has_value() const noexcept
195 : {
196 19 : return stream_ != nullptr;
197 : }
198 :
199 : /** Check if the wrapper contains a valid stream.
200 :
201 : @return `true` if wrapping a stream, `false` if default-constructed
202 : or moved-from.
203 : */
204 : explicit
205 2 : operator bool() const noexcept
206 : {
207 2 : return has_value();
208 : }
209 :
210 : /** Initiate an asynchronous read operation.
211 :
212 : Reads data into the provided buffer sequence. The operation
213 : completes when at least one byte has been read, or an error
214 : occurs.
215 :
216 : @param buffers The buffer sequence to read into. Passed by
217 : value to ensure the sequence lives in the coroutine frame
218 : across suspension points.
219 :
220 : @return An awaitable yielding `(error_code,std::size_t)`.
221 :
222 : @par Preconditions
223 : The wrapper must contain a valid stream (`has_value() == true`).
224 : */
225 : template<MutableBufferSequence MB>
226 : auto
227 : read_some(MB buffers);
228 :
229 : protected:
230 : /** Rebind to a new stream after move.
231 :
232 : Updates the internal pointer to reference a new stream object.
233 : Used by owning wrappers after move assignment when the owned
234 : object has moved to a new location.
235 :
236 : @param new_stream The new stream to bind to. Must be the same
237 : type as the original stream.
238 :
239 : @note Terminates if called with a stream of different type
240 : than the original.
241 : */
242 : template<ReadStream S>
243 : void
244 : rebind(S& new_stream) noexcept
245 : {
246 : if(vt_ != &vtable_for_impl<S>::value)
247 : std::terminate();
248 : stream_ = &new_stream;
249 : }
250 : };
251 :
252 : //----------------------------------------------------------
253 :
254 : struct any_read_stream::vtable
255 : {
256 : void (*destroy)(void*) noexcept;
257 :
258 : coro (*do_read)(
259 : void* stream,
260 : any_read_stream* wrapper,
261 : std::span<mutable_buffer const> buffers,
262 : coro h,
263 : executor_ref ex,
264 : std::stop_token token,
265 : std::error_code* ec,
266 : std::size_t* n);
267 : };
268 :
269 : template<ReadStream S>
270 : struct any_read_stream::vtable_for_impl
271 : {
272 : static void
273 0 : do_destroy_impl(void* stream) noexcept
274 : {
275 0 : static_cast<S*>(stream)->~S();
276 0 : }
277 :
278 : static constexpr vtable value = {
279 : &do_destroy_impl,
280 : &any_read_stream::do_read_impl<S>
281 : };
282 : };
283 :
284 : //----------------------------------------------------------
285 :
286 : inline
287 78 : any_read_stream::~any_read_stream()
288 : {
289 78 : if(storage_)
290 : {
291 0 : vt_->destroy(stream_);
292 0 : ::operator delete(storage_);
293 : }
294 78 : if(cached_frame_)
295 71 : ::operator delete(cached_frame_);
296 78 : }
297 :
298 : inline any_read_stream&
299 3 : any_read_stream::operator=(any_read_stream&& other) noexcept
300 : {
301 3 : if(this != &other)
302 : {
303 3 : if(storage_)
304 : {
305 0 : vt_->destroy(stream_);
306 0 : ::operator delete(storage_);
307 : }
308 3 : if(cached_frame_)
309 0 : ::operator delete(cached_frame_);
310 3 : stream_ = std::exchange(other.stream_, nullptr);
311 3 : vt_ = std::exchange(other.vt_, nullptr);
312 3 : cached_frame_ = std::exchange(other.cached_frame_, nullptr);
313 3 : cached_size_ = std::exchange(other.cached_size_, 0);
314 3 : storage_ = std::exchange(other.storage_, nullptr);
315 : }
316 3 : return *this;
317 : }
318 :
319 : template<ReadStream S>
320 : requires (!std::same_as<std::decay_t<S>, any_read_stream>)
321 : any_read_stream::any_read_stream(S s)
322 : : vt_(&vtable_for_impl<S>::value)
323 : {
324 : struct guard {
325 : any_read_stream* self;
326 : bool committed = false;
327 : ~guard() {
328 : if(!committed && self->storage_) {
329 : self->vt_->destroy(self->stream_);
330 : ::operator delete(self->storage_);
331 : self->storage_ = nullptr;
332 : self->stream_ = nullptr;
333 : }
334 : }
335 : } g{this};
336 :
337 : storage_ = ::operator new(sizeof(S));
338 : stream_ = ::new(storage_) S(std::move(s));
339 :
340 : // Preallocate the coroutine frame
341 : auto& ref = *static_cast<S*>(stream_);
342 : read_coro<S>(this, ref, {}, nullptr, nullptr);
343 :
344 : g.committed = true;
345 : }
346 :
347 : //----------------------------------------------------------
348 :
349 : struct any_read_stream::read_op
350 : {
351 : struct promise_type
352 : {
353 : executor_ref executor_;
354 : std::stop_token stop_token_;
355 : coro caller_h_{};
356 :
357 141 : promise_type() = default;
358 :
359 : read_op
360 141 : get_return_object() noexcept
361 : {
362 : return read_op{
363 141 : std::coroutine_handle<promise_type>::from_promise(*this)};
364 : }
365 :
366 : std::suspend_always
367 141 : initial_suspend() noexcept
368 : {
369 141 : return {};
370 : }
371 :
372 : auto
373 50 : final_suspend() noexcept
374 : {
375 : struct awaiter
376 : {
377 : promise_type* p_;
378 :
379 50 : bool await_ready() const noexcept { return false; }
380 :
381 50 : coro await_suspend(coro) const noexcept
382 : {
383 50 : if(p_->caller_h_)
384 0 : return p_->caller_h_;
385 50 : return std::noop_coroutine();
386 : }
387 :
388 0 : void await_resume() const noexcept {}
389 : };
390 50 : return awaiter{this};
391 : }
392 :
393 : void
394 50 : return_void() noexcept
395 : {
396 50 : }
397 :
398 : void
399 20 : unhandled_exception()
400 : {
401 : // Store exception for later propagation
402 : // For now, just rethrow to let outer handler catch it
403 20 : throw;
404 : }
405 :
406 : template<class... Args>
407 : static void*
408 141 : operator new(
409 : std::size_t size,
410 : any_read_stream* wrapper,
411 : Args&&...)
412 : {
413 141 : return wrapper->alloc_frame(size);
414 : }
415 :
416 : template<class... Args>
417 : static void
418 : operator delete(void*, any_read_stream*, Args&&...) noexcept
419 : {
420 : }
421 :
422 : static void
423 141 : operator delete(void*, std::size_t) noexcept
424 : {
425 141 : }
426 :
427 : void
428 70 : set_executor(executor_ref ex) noexcept
429 : {
430 70 : executor_ = ex;
431 70 : }
432 :
433 : void
434 70 : set_stop_token(std::stop_token token) noexcept
435 : {
436 70 : stop_token_ = token;
437 70 : }
438 :
439 : void
440 0 : set_caller(coro h) noexcept
441 : {
442 0 : caller_h_ = h;
443 0 : }
444 :
445 : template<class Awaitable>
446 : struct transform_awaiter
447 : {
448 : std::decay_t<Awaitable> a_;
449 : promise_type* p_;
450 :
451 70 : bool await_ready()
452 : {
453 70 : return a_.await_ready();
454 : }
455 :
456 70 : decltype(auto) await_resume()
457 : {
458 70 : return a_.await_resume();
459 : }
460 :
461 0 : auto await_suspend(coro h)
462 : {
463 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
464 : }
465 : };
466 :
467 : template<class Awaitable>
468 70 : auto await_transform(Awaitable&& a)
469 : {
470 : using A = std::decay_t<Awaitable>;
471 : if constexpr (IoAwaitable<A>)
472 : {
473 : return transform_awaiter<Awaitable>{
474 70 : std::forward<Awaitable>(a), this};
475 : }
476 : else
477 : {
478 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
479 : }
480 : }
481 : };
482 :
483 : std::coroutine_handle<promise_type> h_;
484 :
485 141 : ~read_op()
486 : {
487 141 : if(h_)
488 91 : h_.destroy();
489 141 : }
490 :
491 : read_op(read_op const&) = delete;
492 : read_op& operator=(read_op const&) = delete;
493 :
494 : read_op(read_op&& other) noexcept
495 : : h_(std::exchange(other.h_, nullptr))
496 : {
497 : }
498 :
499 : read_op& operator=(read_op&& other) noexcept
500 : {
501 : if(this != &other)
502 : {
503 : if(h_)
504 : h_.destroy();
505 : h_ = std::exchange(other.h_, nullptr);
506 : }
507 : return *this;
508 : }
509 :
510 : private:
511 : explicit
512 141 : read_op(std::coroutine_handle<promise_type> h) noexcept
513 141 : : h_(h)
514 : {
515 141 : }
516 : };
517 :
518 : //----------------------------------------------------------
519 :
520 : inline void*
521 141 : any_read_stream::alloc_frame(std::size_t size)
522 : {
523 141 : if(cached_frame_ && cached_size_ >= size)
524 70 : return cached_frame_;
525 :
526 71 : if(cached_frame_)
527 0 : ::operator delete(cached_frame_);
528 :
529 71 : cached_frame_ = ::operator new(size);
530 71 : cached_size_ = size;
531 71 : return cached_frame_;
532 : }
533 :
534 : inline void
535 : any_read_stream::free_frame(void*, std::size_t)
536 : {
537 : // Keep the frame cached for reuse
538 : }
539 :
540 : #if defined(__GNUC__) && !defined(__clang__)
541 : #pragma GCC diagnostic push
542 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
543 : #endif
544 :
545 : template<ReadStream S>
546 : any_read_stream::read_op
547 141 : any_read_stream::read_coro(
548 : any_read_stream*,
549 : S& stream,
550 : std::span<mutable_buffer const> bufs,
551 : std::error_code* out_ec,
552 : std::size_t* out_n)
553 : {
554 : auto [err, bytes] = co_await stream.read_some(bufs);
555 :
556 : *out_ec = err;
557 : *out_n = bytes;
558 282 : }
559 :
560 : #if defined(__GNUC__) && !defined(__clang__)
561 : #pragma GCC diagnostic pop
562 : #endif
563 :
564 : template<ReadStream S>
565 : coro
566 70 : any_read_stream::do_read_impl(
567 : void* stream,
568 : any_read_stream* wrapper,
569 : std::span<mutable_buffer const> buffers,
570 : coro h,
571 : executor_ref ex,
572 : std::stop_token token,
573 : std::error_code* ec,
574 : std::size_t* n)
575 : {
576 70 : auto& s = *static_cast<S*>(stream);
577 :
578 : // Create coroutine - frame is cached in wrapper
579 70 : auto op = read_coro<S>(wrapper, s, buffers, ec, n);
580 :
581 : // Set executor and stop token on promise before resuming
582 70 : op.h_.promise().set_executor(ex);
583 70 : op.h_.promise().set_stop_token(token);
584 :
585 : // Resume the coroutine to start the operation
586 70 : op.h_.resume();
587 :
588 : // Check if operation completed synchronously
589 50 : if(op.h_.done())
590 : {
591 50 : op.h_.destroy();
592 50 : op.h_ = nullptr;
593 : // Return caller's handle via executor dispatch
594 50 : return ex.dispatch(h);
595 : }
596 :
597 : // Operation is pending - caller will be resumed via symmetric transfer
598 0 : op.h_.promise().set_caller(h);
599 0 : op.h_ = nullptr;
600 0 : return std::noop_coroutine();
601 70 : }
602 :
603 : //----------------------------------------------------------
604 :
605 : template<MutableBufferSequence MB>
606 : auto
607 70 : any_read_stream::read_some(MB buffers)
608 : {
609 : struct awaitable
610 : {
611 : any_read_stream* self_;
612 : buffer_param<MB> bp_;
613 : std::error_code ec_;
614 : std::size_t n_ = 0;
615 :
616 : bool
617 70 : await_ready() const noexcept
618 : {
619 70 : return false;
620 : }
621 :
622 : coro
623 70 : await_suspend(coro h, executor_ref ex, std::stop_token token)
624 : {
625 210 : return self_->vt_->do_read(
626 70 : self_->stream_,
627 : self_,
628 140 : bp_.data(),
629 : h,
630 : ex,
631 : token,
632 : &ec_,
633 100 : &n_);
634 : }
635 :
636 : io_result<std::size_t>
637 50 : await_resume() const noexcept
638 : {
639 50 : return {ec_, n_};
640 : }
641 : };
642 70 : return awaitable{this, buffer_param<MB>(buffers), {}, 0};
643 : }
644 :
645 : } // namespace capy
646 : } // namespace boost
647 :
648 : #endif
|