Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_copy.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/buffer_sink.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/write_sink.hpp>
20 : #include <boost/capy/coro.hpp>
21 : #include <boost/capy/ex/executor_ref.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/task.hpp>
24 :
25 : #include <system_error>
26 :
27 : #include <concepts>
28 : #include <coroutine>
29 : #include <cstddef>
30 : #include <exception>
31 : #include <stop_token>
32 : #include <utility>
33 :
34 : namespace boost {
35 : namespace capy {
36 :
37 : /** Type-erased wrapper for any BufferSink.
38 :
39 : This class provides type erasure for any type satisfying the
40 : @ref BufferSink concept, enabling runtime polymorphism for
41 : buffer sink operations. It uses a cached coroutine frame to achieve
42 : zero steady-state allocation after construction.
43 :
44 : The wrapper also satisfies @ref WriteSink through templated
45 : @ref write methods. These methods copy data from the caller's
46 : buffers into the sink's internal storage, incurring one extra
47 : buffer copy compared to using @ref prepare and @ref commit
48 : directly.
49 :
50 : The wrapper supports two construction modes:
51 : - **Owning**: Pass by value to transfer ownership. The wrapper
52 : allocates storage and owns the sink.
53 : - **Reference**: Pass a pointer to wrap without ownership. The
54 : pointed-to sink must outlive this wrapper.
55 :
56 : @par Frame Preallocation
57 : The constructor preallocates the internal coroutine frame.
58 : This reserves all virtual address space at server startup
59 : so memory usage can be measured up front, rather than
60 : allocating piecemeal as traffic arrives.
61 :
62 : @par Thread Safety
63 : Not thread-safe. Concurrent operations on the same wrapper
64 : are undefined behavior.
65 :
66 : @par Example
67 : @code
68 : // Owning - takes ownership of the sink
69 : any_buffer_sink abs(some_buffer_sink{args...});
70 :
71 : // Reference - wraps without ownership
72 : some_buffer_sink sink;
73 : any_buffer_sink abs(&sink);
74 :
75 : mutable_buffer arr[16];
76 : std::size_t count = abs.prepare(arr, 16);
77 : // Write data into arr[0..count)
78 : auto [ec] = co_await abs.commit(bytes_written);
79 : auto [ec2] = co_await abs.commit_eof();
80 : @endcode
81 :
82 : @see any_buffer_source, BufferSink, WriteSink
83 : */
84 : class any_buffer_sink
85 : {
86 : struct vtable;
87 :
88 : template<BufferSink S>
89 : struct vtable_for_impl;
90 :
91 : struct commit_op;
92 :
93 : void* sink_ = nullptr;
94 : vtable const* vt_ = nullptr;
95 : void* cached_frame_ = nullptr;
96 : std::size_t cached_size_ = 0;
97 : void* storage_ = nullptr;
98 :
99 : template<BufferSink S>
100 : static std::size_t
101 : do_prepare_impl(
102 : void* sink,
103 : mutable_buffer* arr,
104 : std::size_t max_count);
105 :
106 : template<BufferSink S>
107 : static coro
108 : do_commit_impl(
109 : void* sink,
110 : any_buffer_sink* wrapper,
111 : std::size_t n,
112 : bool eof,
113 : coro h,
114 : executor_ref ex,
115 : std::stop_token token,
116 : std::error_code* ec);
117 :
118 : template<BufferSink S>
119 : static coro
120 : do_commit_eof_impl(
121 : void* sink,
122 : any_buffer_sink* wrapper,
123 : coro h,
124 : executor_ref ex,
125 : std::stop_token token,
126 : std::error_code* ec);
127 :
128 : template<BufferSink S>
129 : static commit_op
130 : commit_coro(
131 : any_buffer_sink* wrapper,
132 : S& sink,
133 : std::size_t n,
134 : std::error_code* out_ec);
135 :
136 : template<BufferSink S>
137 : static commit_op
138 : commit_with_eof_coro(
139 : any_buffer_sink* wrapper,
140 : S& sink,
141 : std::size_t n,
142 : bool eof,
143 : std::error_code* out_ec);
144 :
145 : template<BufferSink S>
146 : static commit_op
147 : commit_eof_coro(
148 : any_buffer_sink* wrapper,
149 : S& sink,
150 : std::error_code* out_ec);
151 :
152 : void* alloc_frame(std::size_t size);
153 : void free_frame(void* p, std::size_t size);
154 :
155 : public:
156 : /** Destructor.
157 :
158 : Destroys the owned sink (if any) and releases the cached
159 : coroutine frame.
160 : */
161 : ~any_buffer_sink();
162 :
163 : /** Default constructor.
164 :
165 : Constructs an empty wrapper. Operations on a default-constructed
166 : wrapper result in undefined behavior.
167 : */
168 : any_buffer_sink() = default;
169 :
170 : /** Non-copyable.
171 :
172 : The frame cache is per-instance and cannot be shared.
173 : */
174 : any_buffer_sink(any_buffer_sink const&) = delete;
175 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
176 :
177 : /** Move constructor.
178 :
179 : Transfers ownership of the wrapped sink (if owned) and
180 : cached frame from `other`. After the move, `other` is
181 : in a default-constructed state.
182 :
183 : @param other The wrapper to move from.
184 : */
185 1 : any_buffer_sink(any_buffer_sink&& other) noexcept
186 1 : : sink_(std::exchange(other.sink_, nullptr))
187 1 : , vt_(std::exchange(other.vt_, nullptr))
188 1 : , cached_frame_(std::exchange(other.cached_frame_, nullptr))
189 1 : , cached_size_(std::exchange(other.cached_size_, 0))
190 1 : , storage_(std::exchange(other.storage_, nullptr))
191 : {
192 1 : }
193 :
194 : /** Move assignment operator.
195 :
196 : Destroys any owned sink and releases existing resources,
197 : then transfers ownership from `other`.
198 :
199 : @param other The wrapper to move from.
200 : @return Reference to this wrapper.
201 : */
202 : any_buffer_sink&
203 : operator=(any_buffer_sink&& other) noexcept;
204 :
205 : /** Construct by taking ownership of a BufferSink.
206 :
207 : Allocates storage and moves the sink into this wrapper.
208 : The wrapper owns the sink and will destroy it.
209 :
210 : @param s The sink to take ownership of.
211 : */
212 : template<BufferSink S>
213 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
214 : any_buffer_sink(S s);
215 :
216 : /** Construct by wrapping a BufferSink without ownership.
217 :
218 : Wraps the given sink by pointer. The sink must remain
219 : valid for the lifetime of this wrapper.
220 :
221 : @param s Pointer to the sink to wrap.
222 : */
223 : template<BufferSink S>
224 60 : any_buffer_sink(S* s) noexcept
225 60 : : sink_(s)
226 60 : , vt_(&vtable_for_impl<S>::value)
227 : {
228 : // Preallocate coroutine frames to find max size
229 60 : commit_coro<S>(this, *s, 0, nullptr);
230 60 : commit_with_eof_coro<S>(this, *s, 0, false, nullptr);
231 60 : commit_eof_coro<S>(this, *s, nullptr);
232 60 : }
233 :
234 : /** Check if the wrapper contains a valid sink.
235 :
236 : @return `true` if wrapping a sink, `false` if default-constructed
237 : or moved-from.
238 : */
239 : bool
240 9 : has_value() const noexcept
241 : {
242 9 : return sink_ != nullptr;
243 : }
244 :
245 : /** Check if the wrapper contains a valid sink.
246 :
247 : @return `true` if wrapping a sink, `false` if default-constructed
248 : or moved-from.
249 : */
250 : explicit
251 2 : operator bool() const noexcept
252 : {
253 2 : return has_value();
254 : }
255 :
256 : /** Prepare writable buffers.
257 :
258 : Fills the provided array with mutable buffer descriptors
259 : pointing to the underlying sink's internal storage. This
260 : operation is synchronous.
261 :
262 : @param arr Pointer to array of mutable_buffer to fill.
263 : @param max_count Maximum number of buffers to fill.
264 :
265 : @return The number of buffers filled.
266 :
267 : @par Preconditions
268 : The wrapper must contain a valid sink (`has_value() == true`).
269 : */
270 : std::size_t
271 : prepare(mutable_buffer* arr, std::size_t max_count);
272 :
273 : /** Commit bytes written to the prepared buffers.
274 :
275 : Commits `n` bytes written to the buffers returned by the
276 : most recent call to @ref prepare. The operation may trigger
277 : underlying I/O.
278 :
279 : @param n The number of bytes to commit.
280 :
281 : @return An awaitable yielding `(error_code)`.
282 :
283 : @par Preconditions
284 : The wrapper must contain a valid sink (`has_value() == true`).
285 : */
286 : auto
287 : commit(std::size_t n);
288 :
289 : /** Commit bytes written with optional end-of-stream.
290 :
291 : Commits `n` bytes written to the buffers returned by the
292 : most recent call to @ref prepare. If `eof` is true, also
293 : signals end-of-stream.
294 :
295 : @param n The number of bytes to commit.
296 : @param eof If true, signals end-of-stream after committing.
297 :
298 : @return An awaitable yielding `(error_code)`.
299 :
300 : @par Preconditions
301 : The wrapper must contain a valid sink (`has_value() == true`).
302 : */
303 : auto
304 : commit(std::size_t n, bool eof);
305 :
306 : /** Signal end-of-stream.
307 :
308 : Indicates that no more data will be written to the sink.
309 : The operation completes when the sink is finalized, or
310 : an error occurs.
311 :
312 : @return An awaitable yielding `(error_code)`.
313 :
314 : @par Preconditions
315 : The wrapper must contain a valid sink (`has_value() == true`).
316 : */
317 : auto
318 : commit_eof();
319 :
320 : /** Write data from a buffer sequence.
321 :
322 : Writes all data from the buffer sequence to the underlying
323 : sink. This method satisfies the @ref WriteSink concept.
324 :
325 : @note This operation copies data from the caller's buffers
326 : into the sink's internal buffers. For zero-copy writes,
327 : use @ref prepare and @ref commit directly.
328 :
329 : @param buffers The buffer sequence to write.
330 :
331 : @return An awaitable yielding `(error_code,std::size_t)`.
332 :
333 : @par Preconditions
334 : The wrapper must contain a valid sink (`has_value() == true`).
335 : */
336 : template<ConstBufferSequence CB>
337 : task<io_result<std::size_t>>
338 : write(CB buffers);
339 :
340 : /** Write data with optional end-of-stream.
341 :
342 : Writes all data from the buffer sequence to the underlying
343 : sink, optionally finalizing it afterwards. This method
344 : satisfies the @ref WriteSink concept.
345 :
346 : @note This operation copies data from the caller's buffers
347 : into the sink's internal buffers. For zero-copy writes,
348 : use @ref prepare and @ref commit directly.
349 :
350 : @param buffers The buffer sequence to write.
351 : @param eof If true, finalize the sink after writing.
352 :
353 : @return An awaitable yielding `(error_code,std::size_t)`.
354 :
355 : @par Preconditions
356 : The wrapper must contain a valid sink (`has_value() == true`).
357 : */
358 : template<ConstBufferSequence CB>
359 : task<io_result<std::size_t>>
360 : write(CB buffers, bool eof);
361 :
362 : /** Signal end-of-stream.
363 :
364 : Indicates that no more data will be written to the sink.
365 : This method satisfies the @ref WriteSink concept.
366 :
367 : @return An awaitable yielding `(error_code)`.
368 :
369 : @par Preconditions
370 : The wrapper must contain a valid sink (`has_value() == true`).
371 : */
372 : auto
373 : write_eof();
374 :
375 : protected:
376 : /** Rebind to a new sink after move.
377 :
378 : Updates the internal pointer to reference a new sink object.
379 : Used by owning wrappers after move assignment when the owned
380 : object has moved to a new location.
381 :
382 : @param new_sink The new sink to bind to. Must be the same
383 : type as the original sink.
384 :
385 : @note Terminates if called with a sink of different type
386 : than the original.
387 : */
388 : template<BufferSink S>
389 : void
390 : rebind(S& new_sink) noexcept
391 : {
392 : if(vt_ != &vtable_for_impl<S>::value)
393 : std::terminate();
394 : sink_ = &new_sink;
395 : }
396 : };
397 :
398 : //----------------------------------------------------------
399 :
400 : struct any_buffer_sink::vtable
401 : {
402 : void (*destroy)(void*) noexcept;
403 :
404 : std::size_t (*do_prepare)(
405 : void* sink,
406 : mutable_buffer* arr,
407 : std::size_t max_count);
408 :
409 : coro (*do_commit)(
410 : void* sink,
411 : any_buffer_sink* wrapper,
412 : std::size_t n,
413 : bool eof,
414 : coro h,
415 : executor_ref ex,
416 : std::stop_token token,
417 : std::error_code* ec);
418 :
419 : coro (*do_commit_eof)(
420 : void* sink,
421 : any_buffer_sink* wrapper,
422 : coro h,
423 : executor_ref ex,
424 : std::stop_token token,
425 : std::error_code* ec);
426 : };
427 :
428 : template<BufferSink S>
429 : struct any_buffer_sink::vtable_for_impl
430 : {
431 : static void
432 0 : do_destroy_impl(void* sink) noexcept
433 : {
434 0 : static_cast<S*>(sink)->~S();
435 0 : }
436 :
437 : static constexpr vtable value = {
438 : &do_destroy_impl,
439 : &any_buffer_sink::do_prepare_impl<S>,
440 : &any_buffer_sink::do_commit_impl<S>,
441 : &any_buffer_sink::do_commit_eof_impl<S>
442 : };
443 : };
444 :
445 : //----------------------------------------------------------
446 :
447 : inline
448 63 : any_buffer_sink::~any_buffer_sink()
449 : {
450 63 : if(storage_)
451 : {
452 0 : vt_->destroy(sink_);
453 0 : ::operator delete(storage_);
454 : }
455 63 : if(cached_frame_)
456 60 : ::operator delete(cached_frame_);
457 63 : }
458 :
459 : inline any_buffer_sink&
460 1 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
461 : {
462 1 : if(this != &other)
463 : {
464 1 : if(storage_)
465 : {
466 0 : vt_->destroy(sink_);
467 0 : ::operator delete(storage_);
468 : }
469 1 : if(cached_frame_)
470 0 : ::operator delete(cached_frame_);
471 1 : sink_ = std::exchange(other.sink_, nullptr);
472 1 : vt_ = std::exchange(other.vt_, nullptr);
473 1 : cached_frame_ = std::exchange(other.cached_frame_, nullptr);
474 1 : cached_size_ = std::exchange(other.cached_size_, 0);
475 1 : storage_ = std::exchange(other.storage_, nullptr);
476 : }
477 1 : return *this;
478 : }
479 :
480 : template<BufferSink S>
481 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
482 : any_buffer_sink::any_buffer_sink(S s)
483 : : vt_(&vtable_for_impl<S>::value)
484 : {
485 : struct guard {
486 : any_buffer_sink* self;
487 : bool committed = false;
488 : ~guard() {
489 : if(!committed && self->storage_) {
490 : self->vt_->destroy(self->sink_);
491 : ::operator delete(self->storage_);
492 : self->storage_ = nullptr;
493 : self->sink_ = nullptr;
494 : }
495 : }
496 : } g{this};
497 :
498 : storage_ = ::operator new(sizeof(S));
499 : sink_ = ::new(storage_) S(std::move(s));
500 :
501 : // Preallocate coroutine frames to find max size
502 : auto& ref = *static_cast<S*>(sink_);
503 : commit_coro<S>(this, ref, 0, nullptr);
504 : commit_with_eof_coro<S>(this, ref, 0, false, nullptr);
505 : commit_eof_coro<S>(this, ref, nullptr);
506 :
507 : g.committed = true;
508 : }
509 :
510 : //----------------------------------------------------------
511 :
512 : struct any_buffer_sink::commit_op
513 : {
514 : struct promise_type
515 : {
516 : executor_ref executor_;
517 : std::stop_token stop_token_;
518 : coro caller_h_{};
519 :
520 246 : promise_type() = default;
521 :
522 : commit_op
523 246 : get_return_object() noexcept
524 : {
525 : return commit_op{
526 246 : std::coroutine_handle<promise_type>::from_promise(*this)};
527 : }
528 :
529 : std::suspend_always
530 246 : initial_suspend() noexcept
531 : {
532 246 : return {};
533 : }
534 :
535 : auto
536 50 : final_suspend() noexcept
537 : {
538 : struct awaiter
539 : {
540 : promise_type* p_;
541 :
542 50 : bool await_ready() const noexcept { return false; }
543 :
544 50 : coro await_suspend(coro) const noexcept
545 : {
546 50 : if(p_->caller_h_)
547 0 : return p_->caller_h_;
548 50 : return std::noop_coroutine();
549 : }
550 :
551 0 : void await_resume() const noexcept {}
552 : };
553 50 : return awaiter{this};
554 : }
555 :
556 : void
557 50 : return_void() noexcept
558 : {
559 50 : }
560 :
561 : void
562 16 : unhandled_exception()
563 : {
564 16 : throw;
565 : }
566 :
567 : template<class... Args>
568 : static void*
569 246 : operator new(
570 : std::size_t size,
571 : any_buffer_sink* wrapper,
572 : Args&&...)
573 : {
574 246 : return wrapper->alloc_frame(size);
575 : }
576 :
577 : template<class... Args>
578 : static void
579 : operator delete(void*, any_buffer_sink*, Args&&...) noexcept
580 : {
581 : }
582 :
583 : static void
584 246 : operator delete(void*, std::size_t) noexcept
585 : {
586 246 : }
587 :
588 : void
589 66 : set_executor(executor_ref ex) noexcept
590 : {
591 66 : executor_ = ex;
592 66 : }
593 :
594 : void
595 66 : set_stop_token(std::stop_token token) noexcept
596 : {
597 66 : stop_token_ = token;
598 66 : }
599 :
600 : void
601 0 : set_caller(coro h) noexcept
602 : {
603 0 : caller_h_ = h;
604 0 : }
605 :
606 : template<class Awaitable>
607 : struct transform_awaiter
608 : {
609 : std::decay_t<Awaitable> a_;
610 : promise_type* p_;
611 :
612 66 : bool await_ready()
613 : {
614 66 : return a_.await_ready();
615 : }
616 :
617 66 : auto await_resume()
618 : {
619 66 : return a_.await_resume();
620 : }
621 :
622 0 : auto await_suspend(coro h)
623 : {
624 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
625 : }
626 : };
627 :
628 : template<class Awaitable>
629 66 : auto await_transform(Awaitable&& a)
630 : {
631 : using A = std::decay_t<Awaitable>;
632 : if constexpr (IoAwaitable<A>)
633 : {
634 : return transform_awaiter<Awaitable>{
635 66 : std::forward<Awaitable>(a), this};
636 : }
637 : else
638 : {
639 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
640 : }
641 : }
642 : };
643 :
644 : std::coroutine_handle<promise_type> h_;
645 :
646 246 : ~commit_op()
647 : {
648 246 : if(h_)
649 196 : h_.destroy();
650 246 : }
651 :
652 : commit_op(commit_op const&) = delete;
653 : commit_op& operator=(commit_op const&) = delete;
654 :
655 : commit_op(commit_op&& other) noexcept
656 : : h_(std::exchange(other.h_, nullptr))
657 : {
658 : }
659 :
660 : commit_op& operator=(commit_op&& other) noexcept
661 : {
662 : if(this != &other)
663 : {
664 : if(h_)
665 : h_.destroy();
666 : h_ = std::exchange(other.h_, nullptr);
667 : }
668 : return *this;
669 : }
670 :
671 : private:
672 : explicit
673 246 : commit_op(std::coroutine_handle<promise_type> h) noexcept
674 246 : : h_(h)
675 : {
676 246 : }
677 : };
678 :
679 : //----------------------------------------------------------
680 :
681 : inline void*
682 246 : any_buffer_sink::alloc_frame(std::size_t size)
683 : {
684 246 : if(cached_frame_ && cached_size_ >= size)
685 126 : return cached_frame_;
686 :
687 120 : if(cached_frame_)
688 60 : ::operator delete(cached_frame_);
689 :
690 120 : cached_frame_ = ::operator new(size);
691 120 : cached_size_ = size;
692 120 : return cached_frame_;
693 : }
694 :
695 : inline void
696 : any_buffer_sink::free_frame(void*, std::size_t)
697 : {
698 : // Keep the frame cached for reuse
699 : }
700 :
701 : template<BufferSink S>
702 : std::size_t
703 68 : any_buffer_sink::do_prepare_impl(
704 : void* sink,
705 : mutable_buffer* arr,
706 : std::size_t max_count)
707 : {
708 68 : auto& s = *static_cast<S*>(sink);
709 68 : return s.prepare(arr, max_count);
710 : }
711 :
712 : #if defined(__GNUC__) && !defined(__clang__)
713 : #pragma GCC diagnostic push
714 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
715 : #endif
716 :
717 : template<BufferSink S>
718 : any_buffer_sink::commit_op
719 98 : any_buffer_sink::commit_coro(
720 : any_buffer_sink*,
721 : S& sink,
722 : std::size_t n,
723 : std::error_code* out_ec)
724 : {
725 : auto [err] = co_await sink.commit(n);
726 : *out_ec = err;
727 196 : }
728 :
729 : template<BufferSink S>
730 : any_buffer_sink::commit_op
731 70 : any_buffer_sink::commit_with_eof_coro(
732 : any_buffer_sink*,
733 : S& sink,
734 : std::size_t n,
735 : bool eof,
736 : std::error_code* out_ec)
737 : {
738 : auto [err] = co_await sink.commit(n, eof);
739 : *out_ec = err;
740 140 : }
741 :
742 : template<BufferSink S>
743 : any_buffer_sink::commit_op
744 78 : any_buffer_sink::commit_eof_coro(
745 : any_buffer_sink*,
746 : S& sink,
747 : std::error_code* out_ec)
748 : {
749 : auto [err] = co_await sink.commit_eof();
750 : *out_ec = err;
751 156 : }
752 :
753 : #if defined(__GNUC__) && !defined(__clang__)
754 : #pragma GCC diagnostic pop
755 : #endif
756 :
757 : template<BufferSink S>
758 : coro
759 48 : any_buffer_sink::do_commit_impl(
760 : void* sink,
761 : any_buffer_sink* wrapper,
762 : std::size_t n,
763 : bool eof,
764 : coro h,
765 : executor_ref ex,
766 : std::stop_token token,
767 : std::error_code* ec)
768 : {
769 48 : auto& s = *static_cast<S*>(sink);
770 :
771 : // Create coroutine - frame is cached in wrapper
772 48 : auto op = eof
773 48 : ? commit_with_eof_coro<S>(wrapper, s, n, eof, ec)
774 : : commit_coro<S>(wrapper, s, n, ec);
775 :
776 : // Set executor and stop token on promise before resuming
777 48 : op.h_.promise().set_executor(ex);
778 48 : op.h_.promise().set_stop_token(token);
779 :
780 : // Resume the coroutine to start the operation
781 48 : op.h_.resume();
782 :
783 : // Check if operation completed synchronously
784 37 : if(op.h_.done())
785 : {
786 37 : op.h_.destroy();
787 37 : op.h_ = nullptr;
788 37 : return ex.dispatch(h);
789 : }
790 :
791 : // Operation is pending - caller will be resumed via symmetric transfer
792 0 : op.h_.promise().set_caller(h);
793 0 : op.h_ = nullptr;
794 0 : return std::noop_coroutine();
795 48 : }
796 :
797 : template<BufferSink S>
798 : coro
799 18 : any_buffer_sink::do_commit_eof_impl(
800 : void* sink,
801 : any_buffer_sink* wrapper,
802 : coro h,
803 : executor_ref ex,
804 : std::stop_token token,
805 : std::error_code* ec)
806 : {
807 18 : auto& s = *static_cast<S*>(sink);
808 :
809 : // Create coroutine - frame is cached in wrapper
810 18 : auto op = commit_eof_coro<S>(wrapper, s, ec);
811 :
812 : // Set executor and stop token on promise before resuming
813 18 : op.h_.promise().set_executor(ex);
814 18 : op.h_.promise().set_stop_token(token);
815 :
816 : // Resume the coroutine to start the operation
817 18 : op.h_.resume();
818 :
819 : // Check if operation completed synchronously
820 13 : if(op.h_.done())
821 : {
822 13 : op.h_.destroy();
823 13 : op.h_ = nullptr;
824 13 : return ex.dispatch(h);
825 : }
826 :
827 : // Operation is pending - caller will be resumed via symmetric transfer
828 0 : op.h_.promise().set_caller(h);
829 0 : op.h_ = nullptr;
830 0 : return std::noop_coroutine();
831 18 : }
832 :
833 : //----------------------------------------------------------
834 :
835 : inline std::size_t
836 68 : any_buffer_sink::prepare(
837 : mutable_buffer* arr,
838 : std::size_t max_count)
839 : {
840 68 : return vt_->do_prepare(sink_, arr, max_count);
841 : }
842 :
843 : inline auto
844 38 : any_buffer_sink::commit(std::size_t n)
845 : {
846 : struct awaitable
847 : {
848 : any_buffer_sink* self_;
849 : std::size_t n_;
850 : std::error_code ec_;
851 :
852 : bool
853 38 : await_ready() const noexcept
854 : {
855 38 : return false;
856 : }
857 :
858 : coro
859 38 : await_suspend(coro h, executor_ref ex, std::stop_token token)
860 : {
861 76 : return self_->vt_->do_commit(
862 38 : self_->sink_,
863 : self_,
864 : n_,
865 : false,
866 : h,
867 : ex,
868 : token,
869 60 : &ec_);
870 : }
871 :
872 : io_result<>
873 30 : await_resume() const noexcept
874 : {
875 30 : return {ec_};
876 : }
877 : };
878 38 : return awaitable{this, n, {}};
879 : }
880 :
881 : inline auto
882 10 : any_buffer_sink::commit(std::size_t n, bool eof)
883 : {
884 : struct awaitable
885 : {
886 : any_buffer_sink* self_;
887 : std::size_t n_;
888 : bool eof_;
889 : std::error_code ec_;
890 :
891 : bool
892 10 : await_ready() const noexcept
893 : {
894 10 : return false;
895 : }
896 :
897 : coro
898 10 : await_suspend(coro h, executor_ref ex, std::stop_token token)
899 : {
900 20 : return self_->vt_->do_commit(
901 10 : self_->sink_,
902 : self_,
903 : n_,
904 10 : eof_,
905 : h,
906 : ex,
907 : token,
908 14 : &ec_);
909 : }
910 :
911 : io_result<>
912 7 : await_resume() const noexcept
913 : {
914 7 : return {ec_};
915 : }
916 : };
917 10 : return awaitable{this, n, eof, {}};
918 : }
919 :
920 : inline auto
921 18 : any_buffer_sink::commit_eof()
922 : {
923 : struct awaitable
924 : {
925 : any_buffer_sink* self_;
926 : std::error_code ec_;
927 :
928 : bool
929 18 : await_ready() const noexcept
930 : {
931 18 : return false;
932 : }
933 :
934 : coro
935 18 : await_suspend(coro h, executor_ref ex, std::stop_token token)
936 : {
937 36 : return self_->vt_->do_commit_eof(
938 18 : self_->sink_,
939 : self_,
940 : h,
941 : ex,
942 : token,
943 26 : &ec_);
944 : }
945 :
946 : io_result<>
947 13 : await_resume() const noexcept
948 : {
949 13 : return {ec_};
950 : }
951 : };
952 18 : return awaitable{this, {}};
953 : }
954 :
955 : //----------------------------------------------------------
956 :
957 : template<ConstBufferSequence CB>
958 : task<io_result<std::size_t>>
959 : any_buffer_sink::write(CB buffers)
960 : {
961 : return write(buffers, false);
962 : }
963 :
964 : template<ConstBufferSequence CB>
965 : task<io_result<std::size_t>>
966 : any_buffer_sink::write(CB buffers, bool eof)
967 : {
968 : buffer_param<CB> bp(buffers);
969 : std::size_t total = 0;
970 :
971 : for(;;)
972 : {
973 : auto src = bp.data();
974 : if(src.empty())
975 : break;
976 :
977 : mutable_buffer arr[detail::max_iovec_];
978 : std::size_t count = prepare(arr, detail::max_iovec_);
979 : if(count == 0)
980 : {
981 : auto [ec] = co_await commit(0);
982 : if(ec)
983 : co_return {ec, total};
984 : continue;
985 : }
986 :
987 : auto n = buffer_copy(std::span(arr, count), src);
988 : auto [ec] = co_await commit(n);
989 : if(ec)
990 : co_return {ec, total};
991 : bp.consume(n);
992 : total += n;
993 : }
994 :
995 : if(eof)
996 : {
997 : auto [ec] = co_await commit_eof();
998 : if(ec)
999 : co_return {ec, total};
1000 : }
1001 :
1002 : co_return {{}, total};
1003 : }
1004 :
1005 : inline auto
1006 : any_buffer_sink::write_eof()
1007 : {
1008 : return commit_eof();
1009 : }
1010 :
1011 : //----------------------------------------------------------
1012 :
1013 : static_assert(WriteSink<any_buffer_sink>);
1014 :
1015 : } // namespace capy
1016 : } // namespace boost
1017 :
1018 : #endif
|