Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_param.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <boost/capy/concept/write_sink.hpp>
18 : #include <boost/capy/coro.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/task.hpp>
22 :
23 : #include <system_error>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <span>
30 : #include <stop_token>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any WriteSink.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref WriteSink concept, enabling runtime polymorphism for
40 : sink write operations. It uses a cached coroutine frame to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the sink.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to sink must outlive this wrapper.
48 :
49 : @par Frame Preallocation
50 : The constructor preallocates the internal coroutine frame.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Thread Safety
56 : Not thread-safe. Concurrent operations on the same wrapper
57 : are undefined behavior.
58 :
59 : @par Example
60 : @code
61 : // Owning - takes ownership of the sink
62 : any_write_sink ws(some_sink{args...});
63 :
64 : // Reference - wraps without ownership
65 : some_sink sink;
66 : any_write_sink ws(&sink);
67 :
68 : const_buffer buf(data, size);
69 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
70 : auto [ec2] = co_await ws.write_eof();
71 : @endcode
72 :
73 : @see any_write_stream, WriteSink
74 : */
75 : class any_write_sink
76 : {
77 : struct vtable;
78 :
79 : template<WriteSink S>
80 : struct vtable_for_impl;
81 :
82 : struct write_op;
83 : struct write_eof_op;
84 :
85 : void* sink_ = nullptr;
86 : vtable const* vt_ = nullptr;
87 : void* cached_frame_ = nullptr;
88 : std::size_t cached_size_ = 0;
89 : void* storage_ = nullptr;
90 :
91 : template<WriteSink S>
92 : static coro
93 : do_write_impl(
94 : void* sink,
95 : any_write_sink* wrapper,
96 : std::span<const_buffer const> buffers,
97 : bool eof,
98 : coro h,
99 : executor_ref ex,
100 : std::stop_token token,
101 : std::error_code* ec,
102 : std::size_t* n);
103 :
104 : template<WriteSink S>
105 : static coro
106 : do_write_eof_impl(
107 : void* sink,
108 : any_write_sink* wrapper,
109 : coro h,
110 : executor_ref ex,
111 : std::stop_token token,
112 : std::error_code* ec);
113 :
114 : template<WriteSink S>
115 : static write_op
116 : write_coro(
117 : any_write_sink* wrapper,
118 : S& sink,
119 : std::span<const_buffer const> bufs,
120 : std::error_code* out_ec,
121 : std::size_t* out_n);
122 :
123 : template<WriteSink S>
124 : static write_op
125 : write_with_eof_coro(
126 : any_write_sink* wrapper,
127 : S& sink,
128 : std::span<const_buffer const> bufs,
129 : bool eof,
130 : std::error_code* out_ec,
131 : std::size_t* out_n);
132 :
133 : template<WriteSink S>
134 : static write_eof_op
135 : write_eof_coro(
136 : any_write_sink* wrapper,
137 : S& sink,
138 : std::error_code* out_ec);
139 :
140 : void* alloc_frame(std::size_t size);
141 : void free_frame(void* p, std::size_t size);
142 :
143 : public:
144 : /** Destructor.
145 :
146 : Destroys the owned sink (if any) and releases the cached
147 : coroutine frame.
148 : */
149 : ~any_write_sink();
150 :
151 : /** Default constructor.
152 :
153 : Constructs an empty wrapper. Operations on a default-constructed
154 : wrapper result in undefined behavior.
155 : */
156 : any_write_sink() = default;
157 :
158 : /** Non-copyable.
159 :
160 : The frame cache is per-instance and cannot be shared.
161 : */
162 : any_write_sink(any_write_sink const&) = delete;
163 : any_write_sink& operator=(any_write_sink const&) = delete;
164 :
165 : /** Move constructor.
166 :
167 : Transfers ownership of the wrapped sink (if owned) and
168 : cached frame from `other`. After the move, `other` is
169 : in a default-constructed state.
170 :
171 : @param other The wrapper to move from.
172 : */
173 1 : any_write_sink(any_write_sink&& other) noexcept
174 1 : : sink_(std::exchange(other.sink_, nullptr))
175 1 : , vt_(std::exchange(other.vt_, nullptr))
176 1 : , cached_frame_(std::exchange(other.cached_frame_, nullptr))
177 1 : , cached_size_(std::exchange(other.cached_size_, 0))
178 1 : , storage_(std::exchange(other.storage_, nullptr))
179 : {
180 1 : }
181 :
182 : /** Move assignment operator.
183 :
184 : Destroys any owned sink and releases existing resources,
185 : then transfers ownership from `other`.
186 :
187 : @param other The wrapper to move from.
188 : @return Reference to this wrapper.
189 : */
190 : any_write_sink&
191 : operator=(any_write_sink&& other) noexcept;
192 :
193 : /** Construct by taking ownership of a WriteSink.
194 :
195 : Allocates storage and moves the sink into this wrapper.
196 : The wrapper owns the sink and will destroy it.
197 :
198 : @param s The sink to take ownership of.
199 : */
200 : template<WriteSink S>
201 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
202 : any_write_sink(S s);
203 :
204 : /** Construct by wrapping a WriteSink without ownership.
205 :
206 : Wraps the given sink by pointer. The sink must remain
207 : valid for the lifetime of this wrapper.
208 :
209 : @param s Pointer to the sink to wrap.
210 : */
211 : template<WriteSink S>
212 96 : any_write_sink(S* s) noexcept
213 96 : : sink_(s)
214 96 : , vt_(&vtable_for_impl<S>::value)
215 : {
216 : // Preallocate coroutine frames to find max size
217 96 : write_coro<S>(this, *s, {}, nullptr, nullptr);
218 96 : write_with_eof_coro<S>(this, *s, {}, false, nullptr, nullptr);
219 96 : write_eof_coro<S>(this, *s, nullptr);
220 96 : }
221 :
222 : /** Check if the wrapper contains a valid sink.
223 :
224 : @return `true` if wrapping a sink, `false` if default-constructed
225 : or moved-from.
226 : */
227 : bool
228 9 : has_value() const noexcept
229 : {
230 9 : return sink_ != nullptr;
231 : }
232 :
233 : /** Check if the wrapper contains a valid sink.
234 :
235 : @return `true` if wrapping a sink, `false` if default-constructed
236 : or moved-from.
237 : */
238 : explicit
239 2 : operator bool() const noexcept
240 : {
241 2 : return has_value();
242 : }
243 :
244 : /** Initiate an asynchronous write operation.
245 :
246 : Writes data from the provided buffer sequence. The operation
247 : completes when all bytes have been consumed, or an error
248 : occurs.
249 :
250 : @param buffers The buffer sequence containing data to write.
251 : Passed by value to ensure the sequence lives in the
252 : coroutine frame across suspension points.
253 :
254 : @return An awaitable yielding `(error_code,std::size_t)`.
255 :
256 : @par Preconditions
257 : The wrapper must contain a valid sink (`has_value() == true`).
258 : */
259 : template<ConstBufferSequence CB>
260 : task<io_result<std::size_t>>
261 : write(CB buffers);
262 :
263 : /** Initiate an asynchronous write operation with optional EOF.
264 :
265 : Writes data from the provided buffer sequence, optionally
266 : finalizing the sink afterwards. The operation completes when
267 : all bytes have been consumed and (if eof is true) the sink
268 : is finalized, or an error occurs.
269 :
270 : @param buffers The buffer sequence containing data to write.
271 : Passed by value to ensure the sequence lives in the
272 : coroutine frame across suspension points.
273 :
274 : @param eof If `true`, the sink is finalized after writing
275 : the data.
276 :
277 : @return An awaitable yielding `(error_code,std::size_t)`.
278 :
279 : @par Preconditions
280 : The wrapper must contain a valid sink (`has_value() == true`).
281 : */
282 : template<ConstBufferSequence CB>
283 : task<io_result<std::size_t>>
284 : write(CB buffers, bool eof);
285 :
286 : /** Signal end of data.
287 :
288 : Indicates that no more data will be written to the sink.
289 : The operation completes when the sink is finalized, or
290 : an error occurs.
291 :
292 : @return An awaitable yielding `(error_code)`.
293 :
294 : @par Preconditions
295 : The wrapper must contain a valid sink (`has_value() == true`).
296 : */
297 : auto
298 : write_eof();
299 :
300 : protected:
301 : /** Rebind to a new sink after move.
302 :
303 : Updates the internal pointer to reference a new sink object.
304 : Used by owning wrappers after move assignment when the owned
305 : object has moved to a new location.
306 :
307 : @param new_sink The new sink to bind to. Must be the same
308 : type as the original sink.
309 :
310 : @note Terminates if called with a sink of different type
311 : than the original.
312 : */
313 : template<WriteSink S>
314 : void
315 : rebind(S& new_sink) noexcept
316 : {
317 : if(vt_ != &vtable_for_impl<S>::value)
318 : std::terminate();
319 : sink_ = &new_sink;
320 : }
321 :
322 : private:
323 : auto
324 : write_some_(std::span<const_buffer const> buffers, bool eof);
325 : };
326 :
327 : //----------------------------------------------------------
328 :
329 : struct any_write_sink::vtable
330 : {
331 : void (*destroy)(void*) noexcept;
332 :
333 : coro (*do_write)(
334 : void* sink,
335 : any_write_sink* wrapper,
336 : std::span<const_buffer const> buffers,
337 : bool eof,
338 : coro h,
339 : executor_ref ex,
340 : std::stop_token token,
341 : std::error_code* ec,
342 : std::size_t* n);
343 :
344 : coro (*do_write_eof)(
345 : void* sink,
346 : any_write_sink* wrapper,
347 : coro h,
348 : executor_ref ex,
349 : std::stop_token token,
350 : std::error_code* ec);
351 : };
352 :
353 : template<WriteSink S>
354 : struct any_write_sink::vtable_for_impl
355 : {
356 : static void
357 0 : do_destroy_impl(void* sink) noexcept
358 : {
359 0 : static_cast<S*>(sink)->~S();
360 0 : }
361 :
362 : static constexpr vtable value = {
363 : &do_destroy_impl,
364 : &any_write_sink::do_write_impl<S>,
365 : &any_write_sink::do_write_eof_impl<S>
366 : };
367 : };
368 :
369 : //----------------------------------------------------------
370 :
371 : inline
372 99 : any_write_sink::~any_write_sink()
373 : {
374 99 : if(storage_)
375 : {
376 0 : vt_->destroy(sink_);
377 0 : ::operator delete(storage_);
378 : }
379 99 : if(cached_frame_)
380 96 : ::operator delete(cached_frame_);
381 99 : }
382 :
383 : inline any_write_sink&
384 1 : any_write_sink::operator=(any_write_sink&& other) noexcept
385 : {
386 1 : if(this != &other)
387 : {
388 1 : if(storage_)
389 : {
390 0 : vt_->destroy(sink_);
391 0 : ::operator delete(storage_);
392 : }
393 1 : if(cached_frame_)
394 0 : ::operator delete(cached_frame_);
395 1 : sink_ = std::exchange(other.sink_, nullptr);
396 1 : vt_ = std::exchange(other.vt_, nullptr);
397 1 : cached_frame_ = std::exchange(other.cached_frame_, nullptr);
398 1 : cached_size_ = std::exchange(other.cached_size_, 0);
399 1 : storage_ = std::exchange(other.storage_, nullptr);
400 : }
401 1 : return *this;
402 : }
403 :
404 : template<WriteSink S>
405 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
406 : any_write_sink::any_write_sink(S s)
407 : : vt_(&vtable_for_impl<S>::value)
408 : {
409 : struct guard {
410 : any_write_sink* self;
411 : bool committed = false;
412 : ~guard() {
413 : if(!committed && self->storage_) {
414 : self->vt_->destroy(self->sink_);
415 : ::operator delete(self->storage_);
416 : self->storage_ = nullptr;
417 : self->sink_ = nullptr;
418 : }
419 : }
420 : } g{this};
421 :
422 : storage_ = ::operator new(sizeof(S));
423 : sink_ = ::new(storage_) S(std::move(s));
424 :
425 : // Preallocate coroutine frames to find max size
426 : auto& ref = *static_cast<S*>(sink_);
427 : write_coro<S>(this, ref, {}, nullptr, nullptr);
428 : write_with_eof_coro<S>(this, ref, {}, false, nullptr, nullptr);
429 : write_eof_coro<S>(this, ref, nullptr);
430 :
431 : g.committed = true;
432 : }
433 :
434 : //----------------------------------------------------------
435 :
436 : struct any_write_sink::write_op
437 : {
438 : struct promise_type
439 : {
440 : executor_ref executor_;
441 : std::stop_token stop_token_;
442 : coro caller_h_{};
443 :
444 318 : promise_type() = default;
445 :
446 : write_op
447 318 : get_return_object() noexcept
448 : {
449 : return write_op{
450 318 : std::coroutine_handle<promise_type>::from_promise(*this)};
451 : }
452 :
453 : std::suspend_always
454 318 : initial_suspend() noexcept
455 : {
456 318 : return {};
457 : }
458 :
459 : auto
460 98 : final_suspend() noexcept
461 : {
462 : struct awaiter
463 : {
464 : promise_type* p_;
465 :
466 98 : bool await_ready() const noexcept { return false; }
467 :
468 98 : coro await_suspend(coro) const noexcept
469 : {
470 98 : if(p_->caller_h_)
471 0 : return p_->caller_h_;
472 98 : return std::noop_coroutine();
473 : }
474 :
475 0 : void await_resume() const noexcept {}
476 : };
477 98 : return awaiter{this};
478 : }
479 :
480 : void
481 98 : return_void() noexcept
482 : {
483 98 : }
484 :
485 : void
486 28 : unhandled_exception()
487 : {
488 28 : throw;
489 : }
490 :
491 : template<class... Args>
492 : static void*
493 318 : operator new(
494 : std::size_t size,
495 : any_write_sink* wrapper,
496 : Args&&...)
497 : {
498 318 : return wrapper->alloc_frame(size);
499 : }
500 :
501 : template<class... Args>
502 : static void
503 : operator delete(void*, any_write_sink*, Args&&...) noexcept
504 : {
505 : }
506 :
507 : static void
508 318 : operator delete(void*, std::size_t) noexcept
509 : {
510 318 : }
511 :
512 : void
513 126 : set_executor(executor_ref ex) noexcept
514 : {
515 126 : executor_ = ex;
516 126 : }
517 :
518 : void
519 126 : set_stop_token(std::stop_token token) noexcept
520 : {
521 126 : stop_token_ = token;
522 126 : }
523 :
524 : void
525 0 : set_caller(coro h) noexcept
526 : {
527 0 : caller_h_ = h;
528 0 : }
529 :
530 : template<class Awaitable>
531 : struct transform_awaiter
532 : {
533 : std::decay_t<Awaitable> a_;
534 : promise_type* p_;
535 :
536 126 : bool await_ready()
537 : {
538 126 : return a_.await_ready();
539 : }
540 :
541 126 : decltype(auto) await_resume()
542 : {
543 126 : return a_.await_resume();
544 : }
545 :
546 0 : auto await_suspend(coro h)
547 : {
548 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
549 : }
550 : };
551 :
552 : template<class Awaitable>
553 126 : auto await_transform(Awaitable&& a)
554 : {
555 : using A = std::decay_t<Awaitable>;
556 : if constexpr (IoAwaitable<A>)
557 : {
558 : return transform_awaiter<Awaitable>{
559 126 : std::forward<Awaitable>(a), this};
560 : }
561 : else
562 : {
563 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
564 : }
565 : }
566 : };
567 :
568 : std::coroutine_handle<promise_type> h_;
569 :
570 318 : ~write_op()
571 : {
572 318 : if(h_)
573 220 : h_.destroy();
574 318 : }
575 :
576 : write_op(write_op const&) = delete;
577 : write_op& operator=(write_op const&) = delete;
578 :
579 : write_op(write_op&& other) noexcept
580 : : h_(std::exchange(other.h_, nullptr))
581 : {
582 : }
583 :
584 : write_op& operator=(write_op&& other) noexcept
585 : {
586 : if(this != &other)
587 : {
588 : if(h_)
589 : h_.destroy();
590 : h_ = std::exchange(other.h_, nullptr);
591 : }
592 : return *this;
593 : }
594 :
595 : private:
596 : explicit
597 318 : write_op(std::coroutine_handle<promise_type> h) noexcept
598 318 : : h_(h)
599 : {
600 318 : }
601 : };
602 :
603 : //----------------------------------------------------------
604 :
605 : struct any_write_sink::write_eof_op
606 : {
607 : struct promise_type
608 : {
609 : executor_ref executor_;
610 : std::stop_token stop_token_;
611 : coro caller_h_{};
612 :
613 120 : promise_type() = default;
614 :
615 : write_eof_op
616 120 : get_return_object() noexcept
617 : {
618 : return write_eof_op{
619 120 : std::coroutine_handle<promise_type>::from_promise(*this)};
620 : }
621 :
622 : std::suspend_always
623 120 : initial_suspend() noexcept
624 : {
625 120 : return {};
626 : }
627 :
628 : auto
629 17 : final_suspend() noexcept
630 : {
631 : struct awaiter
632 : {
633 : promise_type* p_;
634 :
635 17 : bool await_ready() const noexcept { return false; }
636 :
637 17 : coro await_suspend(coro) const noexcept
638 : {
639 17 : if(p_->caller_h_)
640 0 : return p_->caller_h_;
641 17 : return std::noop_coroutine();
642 : }
643 :
644 0 : void await_resume() const noexcept {}
645 : };
646 17 : return awaiter{this};
647 : }
648 :
649 : void
650 17 : return_void() noexcept
651 : {
652 17 : }
653 :
654 : void
655 7 : unhandled_exception()
656 : {
657 7 : throw;
658 : }
659 :
660 : template<class... Args>
661 : static void*
662 120 : operator new(
663 : std::size_t size,
664 : any_write_sink* wrapper,
665 : Args&&...)
666 : {
667 120 : return wrapper->alloc_frame(size);
668 : }
669 :
670 : template<class... Args>
671 : static void
672 : operator delete(void*, any_write_sink*, Args&&...) noexcept
673 : {
674 : }
675 :
676 : static void
677 120 : operator delete(void*, std::size_t) noexcept
678 : {
679 120 : }
680 :
681 : void
682 24 : set_executor(executor_ref ex) noexcept
683 : {
684 24 : executor_ = ex;
685 24 : }
686 :
687 : void
688 24 : set_stop_token(std::stop_token token) noexcept
689 : {
690 24 : stop_token_ = token;
691 24 : }
692 :
693 : void
694 0 : set_caller(coro h) noexcept
695 : {
696 0 : caller_h_ = h;
697 0 : }
698 :
699 : template<class Awaitable>
700 : struct transform_awaiter
701 : {
702 : std::decay_t<Awaitable> a_;
703 : promise_type* p_;
704 :
705 24 : bool await_ready()
706 : {
707 24 : return a_.await_ready();
708 : }
709 :
710 24 : decltype(auto) await_resume()
711 : {
712 24 : return a_.await_resume();
713 : }
714 :
715 0 : auto await_suspend(coro h)
716 : {
717 0 : return a_.await_suspend(h, p_->executor_, p_->stop_token_);
718 : }
719 : };
720 :
721 : template<class Awaitable>
722 24 : auto await_transform(Awaitable&& a)
723 : {
724 : using A = std::decay_t<Awaitable>;
725 : if constexpr (IoAwaitable<A>)
726 : {
727 : return transform_awaiter<Awaitable>{
728 24 : std::forward<Awaitable>(a), this};
729 : }
730 : else
731 : {
732 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
733 : }
734 : }
735 : };
736 :
737 : std::coroutine_handle<promise_type> h_;
738 :
739 120 : ~write_eof_op()
740 : {
741 120 : if(h_)
742 103 : h_.destroy();
743 120 : }
744 :
745 : write_eof_op(write_eof_op const&) = delete;
746 : write_eof_op& operator=(write_eof_op const&) = delete;
747 :
748 : write_eof_op(write_eof_op&& other) noexcept
749 : : h_(std::exchange(other.h_, nullptr))
750 : {
751 : }
752 :
753 : write_eof_op& operator=(write_eof_op&& other) noexcept
754 : {
755 : if(this != &other)
756 : {
757 : if(h_)
758 : h_.destroy();
759 : h_ = std::exchange(other.h_, nullptr);
760 : }
761 : return *this;
762 : }
763 :
764 : private:
765 : explicit
766 120 : write_eof_op(std::coroutine_handle<promise_type> h) noexcept
767 120 : : h_(h)
768 : {
769 120 : }
770 : };
771 :
772 : //----------------------------------------------------------
773 :
774 : inline void*
775 438 : any_write_sink::alloc_frame(std::size_t size)
776 : {
777 438 : if(cached_frame_ && cached_size_ >= size)
778 246 : return cached_frame_;
779 :
780 192 : if(cached_frame_)
781 96 : ::operator delete(cached_frame_);
782 :
783 192 : cached_frame_ = ::operator new(size);
784 192 : cached_size_ = size;
785 192 : return cached_frame_;
786 : }
787 :
788 : inline void
789 : any_write_sink::free_frame(void*, std::size_t)
790 : {
791 : // Keep the frame cached for reuse
792 : }
793 :
794 : #if defined(__GNUC__) && !defined(__clang__)
795 : #pragma GCC diagnostic push
796 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
797 : #endif
798 :
799 : template<WriteSink S>
800 : any_write_sink::write_op
801 222 : any_write_sink::write_coro(
802 : any_write_sink*,
803 : S& sink,
804 : std::span<const_buffer const> bufs,
805 : std::error_code* out_ec,
806 : std::size_t* out_n)
807 : {
808 : auto [err, bytes] = co_await sink.write(bufs);
809 :
810 : *out_ec = err;
811 : *out_n = bytes;
812 444 : }
813 :
814 : template<WriteSink S>
815 : any_write_sink::write_op
816 96 : any_write_sink::write_with_eof_coro(
817 : any_write_sink*,
818 : S& sink,
819 : std::span<const_buffer const> bufs,
820 : bool eof,
821 : std::error_code* out_ec,
822 : std::size_t* out_n)
823 : {
824 : auto [err, bytes] = co_await sink.write(bufs, eof);
825 :
826 : *out_ec = err;
827 : *out_n = bytes;
828 192 : }
829 :
830 : template<WriteSink S>
831 : any_write_sink::write_eof_op
832 120 : any_write_sink::write_eof_coro(
833 : any_write_sink*,
834 : S& sink,
835 : std::error_code* out_ec)
836 : {
837 : auto [err] = co_await sink.write_eof();
838 :
839 : *out_ec = err;
840 240 : }
841 :
842 : #if defined(__GNUC__) && !defined(__clang__)
843 : #pragma GCC diagnostic pop
844 : #endif
845 :
846 : template<WriteSink S>
847 : coro
848 126 : any_write_sink::do_write_impl(
849 : void* sink,
850 : any_write_sink* wrapper,
851 : std::span<const_buffer const> buffers,
852 : bool eof,
853 : coro h,
854 : executor_ref ex,
855 : std::stop_token token,
856 : std::error_code* ec,
857 : std::size_t* n)
858 : {
859 126 : auto& s = *static_cast<S*>(sink);
860 :
861 : // Create coroutine - frame is cached in wrapper
862 126 : auto op = eof
863 126 : ? write_with_eof_coro<S>(wrapper, s, buffers, eof, ec, n)
864 : : write_coro<S>(wrapper, s, buffers, ec, n);
865 :
866 : // Set executor and stop token on promise before resuming
867 126 : op.h_.promise().set_executor(ex);
868 126 : op.h_.promise().set_stop_token(token);
869 :
870 : // Resume the coroutine to start the operation
871 126 : op.h_.resume();
872 :
873 : // Check if operation completed synchronously
874 98 : if(op.h_.done())
875 : {
876 98 : op.h_.destroy();
877 98 : op.h_ = nullptr;
878 98 : return ex.dispatch(h);
879 : }
880 :
881 : // Operation is pending - caller will be resumed via symmetric transfer
882 0 : op.h_.promise().set_caller(h);
883 0 : op.h_ = nullptr;
884 0 : return std::noop_coroutine();
885 126 : }
886 :
887 : template<WriteSink S>
888 : coro
889 24 : any_write_sink::do_write_eof_impl(
890 : void* sink,
891 : any_write_sink* wrapper,
892 : coro h,
893 : executor_ref ex,
894 : std::stop_token token,
895 : std::error_code* ec)
896 : {
897 24 : auto& s = *static_cast<S*>(sink);
898 :
899 : // Create coroutine - frame is cached in wrapper
900 24 : auto op = write_eof_coro<S>(wrapper, s, ec);
901 :
902 : // Set executor and stop token on promise before resuming
903 24 : op.h_.promise().set_executor(ex);
904 24 : op.h_.promise().set_stop_token(token);
905 :
906 : // Resume the coroutine to start the operation
907 24 : op.h_.resume();
908 :
909 : // Check if operation completed synchronously
910 17 : if(op.h_.done())
911 : {
912 17 : op.h_.destroy();
913 17 : op.h_ = nullptr;
914 17 : return ex.dispatch(h);
915 : }
916 :
917 : // Operation is pending - caller will be resumed via symmetric transfer
918 0 : op.h_.promise().set_caller(h);
919 0 : op.h_ = nullptr;
920 0 : return std::noop_coroutine();
921 24 : }
922 :
923 : //----------------------------------------------------------
924 :
925 : inline auto
926 126 : any_write_sink::write_some_(
927 : std::span<const_buffer const> buffers,
928 : bool eof)
929 : {
930 : struct awaitable
931 : {
932 : any_write_sink* self_;
933 : std::span<const_buffer const> buffers_;
934 : bool eof_;
935 : std::error_code ec_;
936 : std::size_t n_ = 0;
937 :
938 : bool
939 126 : await_ready() const noexcept
940 : {
941 126 : return false;
942 : }
943 :
944 : coro
945 126 : await_suspend(coro h, executor_ref ex, std::stop_token token)
946 : {
947 252 : return self_->vt_->do_write(
948 126 : self_->sink_,
949 : self_,
950 : buffers_,
951 126 : eof_,
952 : h,
953 : ex,
954 : token,
955 : &ec_,
956 196 : &n_);
957 : }
958 :
959 : io_result<std::size_t>
960 98 : await_resume() const noexcept
961 : {
962 98 : return {ec_, n_};
963 : }
964 : };
965 126 : return awaitable{this, buffers, eof, {}, 0};
966 : }
967 :
968 : inline auto
969 24 : any_write_sink::write_eof()
970 : {
971 : struct awaitable
972 : {
973 : any_write_sink* self_;
974 : std::error_code ec_;
975 :
976 : bool
977 24 : await_ready() const noexcept
978 : {
979 24 : return false;
980 : }
981 :
982 : coro
983 24 : await_suspend(coro h, executor_ref ex, std::stop_token token)
984 : {
985 48 : return self_->vt_->do_write_eof(
986 24 : self_->sink_,
987 : self_,
988 : h,
989 : ex,
990 : token,
991 34 : &ec_);
992 : }
993 :
994 : io_result<>
995 17 : await_resume() const noexcept
996 : {
997 17 : return {ec_};
998 : }
999 : };
1000 24 : return awaitable{this, {}};
1001 : }
1002 :
1003 : template<ConstBufferSequence CB>
1004 : task<io_result<std::size_t>>
1005 66 : any_write_sink::write(CB buffers)
1006 : {
1007 66 : return write(buffers, false);
1008 : }
1009 :
1010 : template<ConstBufferSequence CB>
1011 : task<io_result<std::size_t>>
1012 98 : any_write_sink::write(CB buffers, bool eof)
1013 : {
1014 : buffer_param<CB> bp(buffers);
1015 : std::size_t total = 0;
1016 :
1017 : for(;;)
1018 : {
1019 : auto bufs = bp.data();
1020 : if(bufs.empty())
1021 : break;
1022 :
1023 : auto [ec, n] = co_await write_some_(bufs, false);
1024 : if(ec)
1025 : co_return {ec, total + n};
1026 : bp.consume(n);
1027 : total += n;
1028 : }
1029 :
1030 : if(eof)
1031 : {
1032 : auto [ec] = co_await write_eof();
1033 : if(ec)
1034 : co_return {ec, total};
1035 : }
1036 :
1037 : co_return {{}, total};
1038 196 : }
1039 :
1040 : } // namespace capy
1041 : } // namespace boost
1042 :
1043 : #endif
|