libs/capy/include/boost/capy/io/any_write_sink.hpp

85.9% Lines (171/199) 84.3% Functions (59/70) 77.8% Branches (35/45)
libs/capy/include/boost/capy/io/any_write_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/buffers/buffer_param.hpp>
16 #include <boost/capy/concept/io_awaitable.hpp>
17 #include <boost/capy/concept/write_sink.hpp>
18 #include <boost/capy/coro.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/io_result.hpp>
21 #include <boost/capy/task.hpp>
22
23 #include <system_error>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <span>
30 #include <stop_token>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any WriteSink.
37
38 This class provides type erasure for any type satisfying the
39 @ref WriteSink concept, enabling runtime polymorphism for
40 sink write operations. It uses a cached coroutine frame to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the sink.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to sink must outlive this wrapper.
48
49 @par Frame Preallocation
50 The constructor preallocates the internal coroutine frame.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Thread Safety
56 Not thread-safe. Concurrent operations on the same wrapper
57 are undefined behavior.
58
59 @par Example
60 @code
61 // Owning - takes ownership of the sink
62 any_write_sink ws(some_sink{args...});
63
64 // Reference - wraps without ownership
65 some_sink sink;
66 any_write_sink ws(&sink);
67
68 const_buffer buf(data, size);
69 auto [ec, n] = co_await ws.write(std::span(&buf, 1));
70 auto [ec2] = co_await ws.write_eof();
71 @endcode
72
73 @see any_write_stream, WriteSink
74 */
75 class any_write_sink
76 {
77 struct vtable;
78
79 template<WriteSink S>
80 struct vtable_for_impl;
81
82 struct write_op;
83 struct write_eof_op;
84
85 void* sink_ = nullptr;
86 vtable const* vt_ = nullptr;
87 void* cached_frame_ = nullptr;
88 std::size_t cached_size_ = 0;
89 void* storage_ = nullptr;
90
91 template<WriteSink S>
92 static coro
93 do_write_impl(
94 void* sink,
95 any_write_sink* wrapper,
96 std::span<const_buffer const> buffers,
97 bool eof,
98 coro h,
99 executor_ref ex,
100 std::stop_token token,
101 std::error_code* ec,
102 std::size_t* n);
103
104 template<WriteSink S>
105 static coro
106 do_write_eof_impl(
107 void* sink,
108 any_write_sink* wrapper,
109 coro h,
110 executor_ref ex,
111 std::stop_token token,
112 std::error_code* ec);
113
114 template<WriteSink S>
115 static write_op
116 write_coro(
117 any_write_sink* wrapper,
118 S& sink,
119 std::span<const_buffer const> bufs,
120 std::error_code* out_ec,
121 std::size_t* out_n);
122
123 template<WriteSink S>
124 static write_op
125 write_with_eof_coro(
126 any_write_sink* wrapper,
127 S& sink,
128 std::span<const_buffer const> bufs,
129 bool eof,
130 std::error_code* out_ec,
131 std::size_t* out_n);
132
133 template<WriteSink S>
134 static write_eof_op
135 write_eof_coro(
136 any_write_sink* wrapper,
137 S& sink,
138 std::error_code* out_ec);
139
140 void* alloc_frame(std::size_t size);
141 void free_frame(void* p, std::size_t size);
142
143 public:
144 /** Destructor.
145
146 Destroys the owned sink (if any) and releases the cached
147 coroutine frame.
148 */
149 ~any_write_sink();
150
151 /** Default constructor.
152
153 Constructs an empty wrapper. Operations on a default-constructed
154 wrapper result in undefined behavior.
155 */
156 any_write_sink() = default;
157
158 /** Non-copyable.
159
160 The frame cache is per-instance and cannot be shared.
161 */
162 any_write_sink(any_write_sink const&) = delete;
163 any_write_sink& operator=(any_write_sink const&) = delete;
164
165 /** Move constructor.
166
167 Transfers ownership of the wrapped sink (if owned) and
168 cached frame from `other`. After the move, `other` is
169 in a default-constructed state.
170
171 @param other The wrapper to move from.
172 */
173 1 any_write_sink(any_write_sink&& other) noexcept
174 1 : sink_(std::exchange(other.sink_, nullptr))
175 1 , vt_(std::exchange(other.vt_, nullptr))
176 1 , cached_frame_(std::exchange(other.cached_frame_, nullptr))
177 1 , cached_size_(std::exchange(other.cached_size_, 0))
178 1 , storage_(std::exchange(other.storage_, nullptr))
179 {
180 1 }
181
182 /** Move assignment operator.
183
184 Destroys any owned sink and releases existing resources,
185 then transfers ownership from `other`.
186
187 @param other The wrapper to move from.
188 @return Reference to this wrapper.
189 */
190 any_write_sink&
191 operator=(any_write_sink&& other) noexcept;
192
193 /** Construct by taking ownership of a WriteSink.
194
195 Allocates storage and moves the sink into this wrapper.
196 The wrapper owns the sink and will destroy it.
197
198 @param s The sink to take ownership of.
199 */
200 template<WriteSink S>
201 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
202 any_write_sink(S s);
203
204 /** Construct by wrapping a WriteSink without ownership.
205
206 Wraps the given sink by pointer. The sink must remain
207 valid for the lifetime of this wrapper.
208
209 @param s Pointer to the sink to wrap.
210 */
211 template<WriteSink S>
212 96 any_write_sink(S* s) noexcept
213 96 : sink_(s)
214 96 , vt_(&vtable_for_impl<S>::value)
215 {
216 // Preallocate coroutine frames to find max size
217 96 write_coro<S>(this, *s, {}, nullptr, nullptr);
218 96 write_with_eof_coro<S>(this, *s, {}, false, nullptr, nullptr);
219 96 write_eof_coro<S>(this, *s, nullptr);
220 96 }
221
222 /** Check if the wrapper contains a valid sink.
223
224 @return `true` if wrapping a sink, `false` if default-constructed
225 or moved-from.
226 */
227 bool
228 9 has_value() const noexcept
229 {
230 9 return sink_ != nullptr;
231 }
232
233 /** Check if the wrapper contains a valid sink.
234
235 @return `true` if wrapping a sink, `false` if default-constructed
236 or moved-from.
237 */
238 explicit
239 2 operator bool() const noexcept
240 {
241 2 return has_value();
242 }
243
244 /** Initiate an asynchronous write operation.
245
246 Writes data from the provided buffer sequence. The operation
247 completes when all bytes have been consumed, or an error
248 occurs.
249
250 @param buffers The buffer sequence containing data to write.
251 Passed by value to ensure the sequence lives in the
252 coroutine frame across suspension points.
253
254 @return An awaitable yielding `(error_code,std::size_t)`.
255
256 @par Preconditions
257 The wrapper must contain a valid sink (`has_value() == true`).
258 */
259 template<ConstBufferSequence CB>
260 task<io_result<std::size_t>>
261 write(CB buffers);
262
263 /** Initiate an asynchronous write operation with optional EOF.
264
265 Writes data from the provided buffer sequence, optionally
266 finalizing the sink afterwards. The operation completes when
267 all bytes have been consumed and (if eof is true) the sink
268 is finalized, or an error occurs.
269
270 @param buffers The buffer sequence containing data to write.
271 Passed by value to ensure the sequence lives in the
272 coroutine frame across suspension points.
273
274 @param eof If `true`, the sink is finalized after writing
275 the data.
276
277 @return An awaitable yielding `(error_code,std::size_t)`.
278
279 @par Preconditions
280 The wrapper must contain a valid sink (`has_value() == true`).
281 */
282 template<ConstBufferSequence CB>
283 task<io_result<std::size_t>>
284 write(CB buffers, bool eof);
285
286 /** Signal end of data.
287
288 Indicates that no more data will be written to the sink.
289 The operation completes when the sink is finalized, or
290 an error occurs.
291
292 @return An awaitable yielding `(error_code)`.
293
294 @par Preconditions
295 The wrapper must contain a valid sink (`has_value() == true`).
296 */
297 auto
298 write_eof();
299
300 protected:
301 /** Rebind to a new sink after move.
302
303 Updates the internal pointer to reference a new sink object.
304 Used by owning wrappers after move assignment when the owned
305 object has moved to a new location.
306
307 @param new_sink The new sink to bind to. Must be the same
308 type as the original sink.
309
310 @note Terminates if called with a sink of different type
311 than the original.
312 */
313 template<WriteSink S>
314 void
315 rebind(S& new_sink) noexcept
316 {
317 if(vt_ != &vtable_for_impl<S>::value)
318 std::terminate();
319 sink_ = &new_sink;
320 }
321
322 private:
323 auto
324 write_some_(std::span<const_buffer const> buffers, bool eof);
325 };
326
327 //----------------------------------------------------------
328
329 struct any_write_sink::vtable
330 {
331 void (*destroy)(void*) noexcept;
332
333 coro (*do_write)(
334 void* sink,
335 any_write_sink* wrapper,
336 std::span<const_buffer const> buffers,
337 bool eof,
338 coro h,
339 executor_ref ex,
340 std::stop_token token,
341 std::error_code* ec,
342 std::size_t* n);
343
344 coro (*do_write_eof)(
345 void* sink,
346 any_write_sink* wrapper,
347 coro h,
348 executor_ref ex,
349 std::stop_token token,
350 std::error_code* ec);
351 };
352
353 template<WriteSink S>
354 struct any_write_sink::vtable_for_impl
355 {
356 static void
357 do_destroy_impl(void* sink) noexcept
358 {
359 static_cast<S*>(sink)->~S();
360 }
361
362 static constexpr vtable value = {
363 &do_destroy_impl,
364 &any_write_sink::do_write_impl<S>,
365 &any_write_sink::do_write_eof_impl<S>
366 };
367 };
368
369 //----------------------------------------------------------
370
371 inline
372 99 any_write_sink::~any_write_sink()
373 {
374
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if(storage_)
375 {
376 vt_->destroy(sink_);
377 ::operator delete(storage_);
378 }
379
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 3 times.
99 if(cached_frame_)
380 96 ::operator delete(cached_frame_);
381 99 }
382
383 inline any_write_sink&
384 1 any_write_sink::operator=(any_write_sink&& other) noexcept
385 {
386
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
387 {
388
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
389 {
390 vt_->destroy(sink_);
391 ::operator delete(storage_);
392 }
393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_frame_)
394 ::operator delete(cached_frame_);
395 1 sink_ = std::exchange(other.sink_, nullptr);
396 1 vt_ = std::exchange(other.vt_, nullptr);
397 1 cached_frame_ = std::exchange(other.cached_frame_, nullptr);
398 1 cached_size_ = std::exchange(other.cached_size_, 0);
399 1 storage_ = std::exchange(other.storage_, nullptr);
400 }
401 1 return *this;
402 }
403
404 template<WriteSink S>
405 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
406 any_write_sink::any_write_sink(S s)
407 : vt_(&vtable_for_impl<S>::value)
408 {
409 struct guard {
410 any_write_sink* self;
411 bool committed = false;
412 ~guard() {
413 if(!committed && self->storage_) {
414 self->vt_->destroy(self->sink_);
415 ::operator delete(self->storage_);
416 self->storage_ = nullptr;
417 self->sink_ = nullptr;
418 }
419 }
420 } g{this};
421
422 storage_ = ::operator new(sizeof(S));
423 sink_ = ::new(storage_) S(std::move(s));
424
425 // Preallocate coroutine frames to find max size
426 auto& ref = *static_cast<S*>(sink_);
427 write_coro<S>(this, ref, {}, nullptr, nullptr);
428 write_with_eof_coro<S>(this, ref, {}, false, nullptr, nullptr);
429 write_eof_coro<S>(this, ref, nullptr);
430
431 g.committed = true;
432 }
433
434 //----------------------------------------------------------
435
436 struct any_write_sink::write_op
437 {
438 struct promise_type
439 {
440 executor_ref executor_;
441 std::stop_token stop_token_;
442 coro caller_h_{};
443
444 318 promise_type() = default;
445
446 write_op
447 318 get_return_object() noexcept
448 {
449 return write_op{
450 318 std::coroutine_handle<promise_type>::from_promise(*this)};
451 }
452
453 std::suspend_always
454 318 initial_suspend() noexcept
455 {
456 318 return {};
457 }
458
459 auto
460 98 final_suspend() noexcept
461 {
462 struct awaiter
463 {
464 promise_type* p_;
465
466 98 bool await_ready() const noexcept { return false; }
467
468 98 coro await_suspend(coro) const noexcept
469 {
470
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
98 if(p_->caller_h_)
471 return p_->caller_h_;
472 98 return std::noop_coroutine();
473 }
474
475 void await_resume() const noexcept {}
476 };
477 98 return awaiter{this};
478 }
479
480 void
481 98 return_void() noexcept
482 {
483 98 }
484
485 void
486 28 unhandled_exception()
487 {
488 28 throw;
489 }
490
491 template<class... Args>
492 static void*
493 318 operator new(
494 std::size_t size,
495 any_write_sink* wrapper,
496 Args&&...)
497 {
498 318 return wrapper->alloc_frame(size);
499 }
500
501 template<class... Args>
502 static void
503 operator delete(void*, any_write_sink*, Args&&...) noexcept
504 {
505 }
506
507 static void
508 318 operator delete(void*, std::size_t) noexcept
509 {
510 318 }
511
512 void
513 126 set_executor(executor_ref ex) noexcept
514 {
515 126 executor_ = ex;
516 126 }
517
518 void
519 126 set_stop_token(std::stop_token token) noexcept
520 {
521 126 stop_token_ = token;
522 126 }
523
524 void
525 set_caller(coro h) noexcept
526 {
527 caller_h_ = h;
528 }
529
530 template<class Awaitable>
531 struct transform_awaiter
532 {
533 std::decay_t<Awaitable> a_;
534 promise_type* p_;
535
536 126 bool await_ready()
537 {
538 126 return a_.await_ready();
539 }
540
541 126 decltype(auto) await_resume()
542 {
543 126 return a_.await_resume();
544 }
545
546 auto await_suspend(coro h)
547 {
548 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
549 }
550 };
551
552 template<class Awaitable>
553 126 auto await_transform(Awaitable&& a)
554 {
555 using A = std::decay_t<Awaitable>;
556 if constexpr (IoAwaitable<A>)
557 {
558 return transform_awaiter<Awaitable>{
559 126 std::forward<Awaitable>(a), this};
560 }
561 else
562 {
563 static_assert(sizeof(A) == 0, "requires IoAwaitable");
564 }
565 }
566 };
567
568 std::coroutine_handle<promise_type> h_;
569
570 318 ~write_op()
571 {
572
2/2
✓ Branch 1 taken 220 times.
✓ Branch 2 taken 98 times.
318 if(h_)
573 220 h_.destroy();
574 318 }
575
576 write_op(write_op const&) = delete;
577 write_op& operator=(write_op const&) = delete;
578
579 write_op(write_op&& other) noexcept
580 : h_(std::exchange(other.h_, nullptr))
581 {
582 }
583
584 write_op& operator=(write_op&& other) noexcept
585 {
586 if(this != &other)
587 {
588 if(h_)
589 h_.destroy();
590 h_ = std::exchange(other.h_, nullptr);
591 }
592 return *this;
593 }
594
595 private:
596 explicit
597 318 write_op(std::coroutine_handle<promise_type> h) noexcept
598 318 : h_(h)
599 {
600 318 }
601 };
602
603 //----------------------------------------------------------
604
605 struct any_write_sink::write_eof_op
606 {
607 struct promise_type
608 {
609 executor_ref executor_;
610 std::stop_token stop_token_;
611 coro caller_h_{};
612
613 120 promise_type() = default;
614
615 write_eof_op
616 120 get_return_object() noexcept
617 {
618 return write_eof_op{
619 120 std::coroutine_handle<promise_type>::from_promise(*this)};
620 }
621
622 std::suspend_always
623 120 initial_suspend() noexcept
624 {
625 120 return {};
626 }
627
628 auto
629 17 final_suspend() noexcept
630 {
631 struct awaiter
632 {
633 promise_type* p_;
634
635 17 bool await_ready() const noexcept { return false; }
636
637 17 coro await_suspend(coro) const noexcept
638 {
639
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 17 times.
17 if(p_->caller_h_)
640 return p_->caller_h_;
641 17 return std::noop_coroutine();
642 }
643
644 void await_resume() const noexcept {}
645 };
646 17 return awaiter{this};
647 }
648
649 void
650 17 return_void() noexcept
651 {
652 17 }
653
654 void
655 7 unhandled_exception()
656 {
657 7 throw;
658 }
659
660 template<class... Args>
661 static void*
662 120 operator new(
663 std::size_t size,
664 any_write_sink* wrapper,
665 Args&&...)
666 {
667 120 return wrapper->alloc_frame(size);
668 }
669
670 template<class... Args>
671 static void
672 operator delete(void*, any_write_sink*, Args&&...) noexcept
673 {
674 }
675
676 static void
677 120 operator delete(void*, std::size_t) noexcept
678 {
679 120 }
680
681 void
682 24 set_executor(executor_ref ex) noexcept
683 {
684 24 executor_ = ex;
685 24 }
686
687 void
688 24 set_stop_token(std::stop_token token) noexcept
689 {
690 24 stop_token_ = token;
691 24 }
692
693 void
694 set_caller(coro h) noexcept
695 {
696 caller_h_ = h;
697 }
698
699 template<class Awaitable>
700 struct transform_awaiter
701 {
702 std::decay_t<Awaitable> a_;
703 promise_type* p_;
704
705 24 bool await_ready()
706 {
707 24 return a_.await_ready();
708 }
709
710 24 decltype(auto) await_resume()
711 {
712 24 return a_.await_resume();
713 }
714
715 auto await_suspend(coro h)
716 {
717 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
718 }
719 };
720
721 template<class Awaitable>
722 24 auto await_transform(Awaitable&& a)
723 {
724 using A = std::decay_t<Awaitable>;
725 if constexpr (IoAwaitable<A>)
726 {
727 return transform_awaiter<Awaitable>{
728 24 std::forward<Awaitable>(a), this};
729 }
730 else
731 {
732 static_assert(sizeof(A) == 0, "requires IoAwaitable");
733 }
734 }
735 };
736
737 std::coroutine_handle<promise_type> h_;
738
739 120 ~write_eof_op()
740 {
741
2/2
✓ Branch 1 taken 103 times.
✓ Branch 2 taken 17 times.
120 if(h_)
742 103 h_.destroy();
743 120 }
744
745 write_eof_op(write_eof_op const&) = delete;
746 write_eof_op& operator=(write_eof_op const&) = delete;
747
748 write_eof_op(write_eof_op&& other) noexcept
749 : h_(std::exchange(other.h_, nullptr))
750 {
751 }
752
753 write_eof_op& operator=(write_eof_op&& other) noexcept
754 {
755 if(this != &other)
756 {
757 if(h_)
758 h_.destroy();
759 h_ = std::exchange(other.h_, nullptr);
760 }
761 return *this;
762 }
763
764 private:
765 explicit
766 120 write_eof_op(std::coroutine_handle<promise_type> h) noexcept
767 120 : h_(h)
768 {
769 120 }
770 };
771
772 //----------------------------------------------------------
773
774 inline void*
775 438 any_write_sink::alloc_frame(std::size_t size)
776 {
777
4/4
✓ Branch 0 taken 342 times.
✓ Branch 1 taken 96 times.
✓ Branch 2 taken 246 times.
✓ Branch 3 taken 96 times.
438 if(cached_frame_ && cached_size_ >= size)
778 246 return cached_frame_;
779
780
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 96 times.
192 if(cached_frame_)
781 96 ::operator delete(cached_frame_);
782
783 192 cached_frame_ = ::operator new(size);
784 192 cached_size_ = size;
785 192 return cached_frame_;
786 }
787
788 inline void
789 any_write_sink::free_frame(void*, std::size_t)
790 {
791 // Keep the frame cached for reuse
792 }
793
794 #if defined(__GNUC__) && !defined(__clang__)
795 #pragma GCC diagnostic push
796 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
797 #endif
798
799 template<WriteSink S>
800 any_write_sink::write_op
801
1/1
✓ Branch 1 taken 222 times.
222 any_write_sink::write_coro(
802 any_write_sink*,
803 S& sink,
804 std::span<const_buffer const> bufs,
805 std::error_code* out_ec,
806 std::size_t* out_n)
807 {
808 auto [err, bytes] = co_await sink.write(bufs);
809
810 *out_ec = err;
811 *out_n = bytes;
812 444 }
813
814 template<WriteSink S>
815 any_write_sink::write_op
816
1/1
✓ Branch 1 taken 96 times.
96 any_write_sink::write_with_eof_coro(
817 any_write_sink*,
818 S& sink,
819 std::span<const_buffer const> bufs,
820 bool eof,
821 std::error_code* out_ec,
822 std::size_t* out_n)
823 {
824 auto [err, bytes] = co_await sink.write(bufs, eof);
825
826 *out_ec = err;
827 *out_n = bytes;
828 192 }
829
830 template<WriteSink S>
831 any_write_sink::write_eof_op
832
1/1
✓ Branch 1 taken 120 times.
120 any_write_sink::write_eof_coro(
833 any_write_sink*,
834 S& sink,
835 std::error_code* out_ec)
836 {
837 auto [err] = co_await sink.write_eof();
838
839 *out_ec = err;
840 240 }
841
842 #if defined(__GNUC__) && !defined(__clang__)
843 #pragma GCC diagnostic pop
844 #endif
845
846 template<WriteSink S>
847 coro
848 126 any_write_sink::do_write_impl(
849 void* sink,
850 any_write_sink* wrapper,
851 std::span<const_buffer const> buffers,
852 bool eof,
853 coro h,
854 executor_ref ex,
855 std::stop_token token,
856 std::error_code* ec,
857 std::size_t* n)
858 {
859 126 auto& s = *static_cast<S*>(sink);
860
861 // Create coroutine - frame is cached in wrapper
862
1/2
✗ Branch 1 not taken.
✓ Branch 4 taken 126 times.
126 auto op = eof
863
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 126 times.
126 ? write_with_eof_coro<S>(wrapper, s, buffers, eof, ec, n)
864 : write_coro<S>(wrapper, s, buffers, ec, n);
865
866 // Set executor and stop token on promise before resuming
867 126 op.h_.promise().set_executor(ex);
868 126 op.h_.promise().set_stop_token(token);
869
870 // Resume the coroutine to start the operation
871
1/1
✓ Branch 1 taken 98 times.
126 op.h_.resume();
872
873 // Check if operation completed synchronously
874
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 if(op.h_.done())
875 {
876
1/1
✓ Branch 1 taken 98 times.
98 op.h_.destroy();
877 98 op.h_ = nullptr;
878
1/1
✓ Branch 1 taken 98 times.
98 return ex.dispatch(h);
879 }
880
881 // Operation is pending - caller will be resumed via symmetric transfer
882 op.h_.promise().set_caller(h);
883 op.h_ = nullptr;
884 return std::noop_coroutine();
885 126 }
886
887 template<WriteSink S>
888 coro
889 24 any_write_sink::do_write_eof_impl(
890 void* sink,
891 any_write_sink* wrapper,
892 coro h,
893 executor_ref ex,
894 std::stop_token token,
895 std::error_code* ec)
896 {
897 24 auto& s = *static_cast<S*>(sink);
898
899 // Create coroutine - frame is cached in wrapper
900
1/1
✓ Branch 1 taken 24 times.
24 auto op = write_eof_coro<S>(wrapper, s, ec);
901
902 // Set executor and stop token on promise before resuming
903 24 op.h_.promise().set_executor(ex);
904 24 op.h_.promise().set_stop_token(token);
905
906 // Resume the coroutine to start the operation
907
1/1
✓ Branch 1 taken 17 times.
24 op.h_.resume();
908
909 // Check if operation completed synchronously
910
1/2
✓ Branch 1 taken 17 times.
✗ Branch 2 not taken.
17 if(op.h_.done())
911 {
912
1/1
✓ Branch 1 taken 17 times.
17 op.h_.destroy();
913 17 op.h_ = nullptr;
914
1/1
✓ Branch 1 taken 17 times.
17 return ex.dispatch(h);
915 }
916
917 // Operation is pending - caller will be resumed via symmetric transfer
918 op.h_.promise().set_caller(h);
919 op.h_ = nullptr;
920 return std::noop_coroutine();
921 24 }
922
923 //----------------------------------------------------------
924
925 inline auto
926 126 any_write_sink::write_some_(
927 std::span<const_buffer const> buffers,
928 bool eof)
929 {
930 struct awaitable
931 {
932 any_write_sink* self_;
933 std::span<const_buffer const> buffers_;
934 bool eof_;
935 std::error_code ec_;
936 std::size_t n_ = 0;
937
938 bool
939 126 await_ready() const noexcept
940 {
941 126 return false;
942 }
943
944 coro
945 126 await_suspend(coro h, executor_ref ex, std::stop_token token)
946 {
947 252 return self_->vt_->do_write(
948 126 self_->sink_,
949 self_,
950 buffers_,
951
1/1
✓ Branch 1 taken 98 times.
126 eof_,
952 h,
953 ex,
954 token,
955 &ec_,
956 196 &n_);
957 }
958
959 io_result<std::size_t>
960 98 await_resume() const noexcept
961 {
962 98 return {ec_, n_};
963 }
964 };
965 126 return awaitable{this, buffers, eof, {}, 0};
966 }
967
968 inline auto
969 24 any_write_sink::write_eof()
970 {
971 struct awaitable
972 {
973 any_write_sink* self_;
974 std::error_code ec_;
975
976 bool
977 24 await_ready() const noexcept
978 {
979 24 return false;
980 }
981
982 coro
983 24 await_suspend(coro h, executor_ref ex, std::stop_token token)
984 {
985 48 return self_->vt_->do_write_eof(
986
1/1
✓ Branch 1 taken 17 times.
24 self_->sink_,
987 self_,
988 h,
989 ex,
990 token,
991 34 &ec_);
992 }
993
994 io_result<>
995 17 await_resume() const noexcept
996 {
997 17 return {ec_};
998 }
999 };
1000 24 return awaitable{this, {}};
1001 }
1002
1003 template<ConstBufferSequence CB>
1004 task<io_result<std::size_t>>
1005 66 any_write_sink::write(CB buffers)
1006 {
1007 66 return write(buffers, false);
1008 }
1009
1010 template<ConstBufferSequence CB>
1011 task<io_result<std::size_t>>
1012
1/1
✓ Branch 1 taken 98 times.
98 any_write_sink::write(CB buffers, bool eof)
1013 {
1014 buffer_param<CB> bp(buffers);
1015 std::size_t total = 0;
1016
1017 for(;;)
1018 {
1019 auto bufs = bp.data();
1020 if(bufs.empty())
1021 break;
1022
1023 auto [ec, n] = co_await write_some_(bufs, false);
1024 if(ec)
1025 co_return {ec, total + n};
1026 bp.consume(n);
1027 total += n;
1028 }
1029
1030 if(eof)
1031 {
1032 auto [ec] = co_await write_eof();
1033 if(ec)
1034 co_return {ec, total};
1035 }
1036
1037 co_return {{}, total};
1038 196 }
1039
1040 } // namespace capy
1041 } // namespace boost
1042
1043 #endif
1044