libs/capy/include/boost/capy/io/any_buffer_sink.hpp

87.3% Lines (144/165) 89.5% Functions (51/57) 82.9% Branches (34/41)
libs/capy/include/boost/capy/io/any_buffer_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/buffers/buffer_copy.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/buffer_sink.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/write_sink.hpp>
20 #include <boost/capy/coro.hpp>
21 #include <boost/capy/ex/executor_ref.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/task.hpp>
24
25 #include <system_error>
26
27 #include <concepts>
28 #include <coroutine>
29 #include <cstddef>
30 #include <exception>
31 #include <stop_token>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any BufferSink.
38
39 This class provides type erasure for any type satisfying the
40 @ref BufferSink concept, enabling runtime polymorphism for
41 buffer sink operations. It uses a cached coroutine frame to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper also satisfies @ref WriteSink through templated
45 @ref write methods. These methods copy data from the caller's
46 buffers into the sink's internal storage, incurring one extra
47 buffer copy compared to using @ref prepare and @ref commit
48 directly.
49
50 The wrapper supports two construction modes:
51 - **Owning**: Pass by value to transfer ownership. The wrapper
52 allocates storage and owns the sink.
53 - **Reference**: Pass a pointer to wrap without ownership. The
54 pointed-to sink must outlive this wrapper.
55
56 @par Frame Preallocation
57 The constructor preallocates the internal coroutine frame.
58 This reserves all virtual address space at server startup
59 so memory usage can be measured up front, rather than
60 allocating piecemeal as traffic arrives.
61
62 @par Thread Safety
63 Not thread-safe. Concurrent operations on the same wrapper
64 are undefined behavior.
65
66 @par Example
67 @code
68 // Owning - takes ownership of the sink
69 any_buffer_sink abs(some_buffer_sink{args...});
70
71 // Reference - wraps without ownership
72 some_buffer_sink sink;
73 any_buffer_sink abs(&sink);
74
75 mutable_buffer arr[16];
76 std::size_t count = abs.prepare(arr, 16);
77 // Write data into arr[0..count)
78 auto [ec] = co_await abs.commit(bytes_written);
79 auto [ec2] = co_await abs.commit_eof();
80 @endcode
81
82 @see any_buffer_source, BufferSink, WriteSink
83 */
84 class any_buffer_sink
85 {
86 struct vtable;
87
88 template<BufferSink S>
89 struct vtable_for_impl;
90
91 struct commit_op;
92
93 void* sink_ = nullptr;
94 vtable const* vt_ = nullptr;
95 void* cached_frame_ = nullptr;
96 std::size_t cached_size_ = 0;
97 void* storage_ = nullptr;
98
99 template<BufferSink S>
100 static std::size_t
101 do_prepare_impl(
102 void* sink,
103 mutable_buffer* arr,
104 std::size_t max_count);
105
106 template<BufferSink S>
107 static coro
108 do_commit_impl(
109 void* sink,
110 any_buffer_sink* wrapper,
111 std::size_t n,
112 bool eof,
113 coro h,
114 executor_ref ex,
115 std::stop_token token,
116 std::error_code* ec);
117
118 template<BufferSink S>
119 static coro
120 do_commit_eof_impl(
121 void* sink,
122 any_buffer_sink* wrapper,
123 coro h,
124 executor_ref ex,
125 std::stop_token token,
126 std::error_code* ec);
127
128 template<BufferSink S>
129 static commit_op
130 commit_coro(
131 any_buffer_sink* wrapper,
132 S& sink,
133 std::size_t n,
134 std::error_code* out_ec);
135
136 template<BufferSink S>
137 static commit_op
138 commit_with_eof_coro(
139 any_buffer_sink* wrapper,
140 S& sink,
141 std::size_t n,
142 bool eof,
143 std::error_code* out_ec);
144
145 template<BufferSink S>
146 static commit_op
147 commit_eof_coro(
148 any_buffer_sink* wrapper,
149 S& sink,
150 std::error_code* out_ec);
151
152 void* alloc_frame(std::size_t size);
153 void free_frame(void* p, std::size_t size);
154
155 public:
156 /** Destructor.
157
158 Destroys the owned sink (if any) and releases the cached
159 coroutine frame.
160 */
161 ~any_buffer_sink();
162
163 /** Default constructor.
164
165 Constructs an empty wrapper. Operations on a default-constructed
166 wrapper result in undefined behavior.
167 */
168 any_buffer_sink() = default;
169
170 /** Non-copyable.
171
172 The frame cache is per-instance and cannot be shared.
173 */
174 any_buffer_sink(any_buffer_sink const&) = delete;
175 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
176
177 /** Move constructor.
178
179 Transfers ownership of the wrapped sink (if owned) and
180 cached frame from `other`. After the move, `other` is
181 in a default-constructed state.
182
183 @param other The wrapper to move from.
184 */
185 1 any_buffer_sink(any_buffer_sink&& other) noexcept
186 1 : sink_(std::exchange(other.sink_, nullptr))
187 1 , vt_(std::exchange(other.vt_, nullptr))
188 1 , cached_frame_(std::exchange(other.cached_frame_, nullptr))
189 1 , cached_size_(std::exchange(other.cached_size_, 0))
190 1 , storage_(std::exchange(other.storage_, nullptr))
191 {
192 1 }
193
194 /** Move assignment operator.
195
196 Destroys any owned sink and releases existing resources,
197 then transfers ownership from `other`.
198
199 @param other The wrapper to move from.
200 @return Reference to this wrapper.
201 */
202 any_buffer_sink&
203 operator=(any_buffer_sink&& other) noexcept;
204
205 /** Construct by taking ownership of a BufferSink.
206
207 Allocates storage and moves the sink into this wrapper.
208 The wrapper owns the sink and will destroy it.
209
210 @param s The sink to take ownership of.
211 */
212 template<BufferSink S>
213 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
214 any_buffer_sink(S s);
215
216 /** Construct by wrapping a BufferSink without ownership.
217
218 Wraps the given sink by pointer. The sink must remain
219 valid for the lifetime of this wrapper.
220
221 @param s Pointer to the sink to wrap.
222 */
223 template<BufferSink S>
224 60 any_buffer_sink(S* s) noexcept
225 60 : sink_(s)
226 60 , vt_(&vtable_for_impl<S>::value)
227 {
228 // Preallocate coroutine frames to find max size
229 60 commit_coro<S>(this, *s, 0, nullptr);
230 60 commit_with_eof_coro<S>(this, *s, 0, false, nullptr);
231 60 commit_eof_coro<S>(this, *s, nullptr);
232 60 }
233
234 /** Check if the wrapper contains a valid sink.
235
236 @return `true` if wrapping a sink, `false` if default-constructed
237 or moved-from.
238 */
239 bool
240 9 has_value() const noexcept
241 {
242 9 return sink_ != nullptr;
243 }
244
245 /** Check if the wrapper contains a valid sink.
246
247 @return `true` if wrapping a sink, `false` if default-constructed
248 or moved-from.
249 */
250 explicit
251 2 operator bool() const noexcept
252 {
253 2 return has_value();
254 }
255
256 /** Prepare writable buffers.
257
258 Fills the provided array with mutable buffer descriptors
259 pointing to the underlying sink's internal storage. This
260 operation is synchronous.
261
262 @param arr Pointer to array of mutable_buffer to fill.
263 @param max_count Maximum number of buffers to fill.
264
265 @return The number of buffers filled.
266
267 @par Preconditions
268 The wrapper must contain a valid sink (`has_value() == true`).
269 */
270 std::size_t
271 prepare(mutable_buffer* arr, std::size_t max_count);
272
273 /** Commit bytes written to the prepared buffers.
274
275 Commits `n` bytes written to the buffers returned by the
276 most recent call to @ref prepare. The operation may trigger
277 underlying I/O.
278
279 @param n The number of bytes to commit.
280
281 @return An awaitable yielding `(error_code)`.
282
283 @par Preconditions
284 The wrapper must contain a valid sink (`has_value() == true`).
285 */
286 auto
287 commit(std::size_t n);
288
289 /** Commit bytes written with optional end-of-stream.
290
291 Commits `n` bytes written to the buffers returned by the
292 most recent call to @ref prepare. If `eof` is true, also
293 signals end-of-stream.
294
295 @param n The number of bytes to commit.
296 @param eof If true, signals end-of-stream after committing.
297
298 @return An awaitable yielding `(error_code)`.
299
300 @par Preconditions
301 The wrapper must contain a valid sink (`has_value() == true`).
302 */
303 auto
304 commit(std::size_t n, bool eof);
305
306 /** Signal end-of-stream.
307
308 Indicates that no more data will be written to the sink.
309 The operation completes when the sink is finalized, or
310 an error occurs.
311
312 @return An awaitable yielding `(error_code)`.
313
314 @par Preconditions
315 The wrapper must contain a valid sink (`has_value() == true`).
316 */
317 auto
318 commit_eof();
319
320 /** Write data from a buffer sequence.
321
322 Writes all data from the buffer sequence to the underlying
323 sink. This method satisfies the @ref WriteSink concept.
324
325 @note This operation copies data from the caller's buffers
326 into the sink's internal buffers. For zero-copy writes,
327 use @ref prepare and @ref commit directly.
328
329 @param buffers The buffer sequence to write.
330
331 @return An awaitable yielding `(error_code,std::size_t)`.
332
333 @par Preconditions
334 The wrapper must contain a valid sink (`has_value() == true`).
335 */
336 template<ConstBufferSequence CB>
337 task<io_result<std::size_t>>
338 write(CB buffers);
339
340 /** Write data with optional end-of-stream.
341
342 Writes all data from the buffer sequence to the underlying
343 sink, optionally finalizing it afterwards. This method
344 satisfies the @ref WriteSink concept.
345
346 @note This operation copies data from the caller's buffers
347 into the sink's internal buffers. For zero-copy writes,
348 use @ref prepare and @ref commit directly.
349
350 @param buffers The buffer sequence to write.
351 @param eof If true, finalize the sink after writing.
352
353 @return An awaitable yielding `(error_code,std::size_t)`.
354
355 @par Preconditions
356 The wrapper must contain a valid sink (`has_value() == true`).
357 */
358 template<ConstBufferSequence CB>
359 task<io_result<std::size_t>>
360 write(CB buffers, bool eof);
361
362 /** Signal end-of-stream.
363
364 Indicates that no more data will be written to the sink.
365 This method satisfies the @ref WriteSink concept.
366
367 @return An awaitable yielding `(error_code)`.
368
369 @par Preconditions
370 The wrapper must contain a valid sink (`has_value() == true`).
371 */
372 auto
373 write_eof();
374
375 protected:
376 /** Rebind to a new sink after move.
377
378 Updates the internal pointer to reference a new sink object.
379 Used by owning wrappers after move assignment when the owned
380 object has moved to a new location.
381
382 @param new_sink The new sink to bind to. Must be the same
383 type as the original sink.
384
385 @note Terminates if called with a sink of different type
386 than the original.
387 */
388 template<BufferSink S>
389 void
390 rebind(S& new_sink) noexcept
391 {
392 if(vt_ != &vtable_for_impl<S>::value)
393 std::terminate();
394 sink_ = &new_sink;
395 }
396 };
397
398 //----------------------------------------------------------
399
400 struct any_buffer_sink::vtable
401 {
402 void (*destroy)(void*) noexcept;
403
404 std::size_t (*do_prepare)(
405 void* sink,
406 mutable_buffer* arr,
407 std::size_t max_count);
408
409 coro (*do_commit)(
410 void* sink,
411 any_buffer_sink* wrapper,
412 std::size_t n,
413 bool eof,
414 coro h,
415 executor_ref ex,
416 std::stop_token token,
417 std::error_code* ec);
418
419 coro (*do_commit_eof)(
420 void* sink,
421 any_buffer_sink* wrapper,
422 coro h,
423 executor_ref ex,
424 std::stop_token token,
425 std::error_code* ec);
426 };
427
428 template<BufferSink S>
429 struct any_buffer_sink::vtable_for_impl
430 {
431 static void
432 do_destroy_impl(void* sink) noexcept
433 {
434 static_cast<S*>(sink)->~S();
435 }
436
437 static constexpr vtable value = {
438 &do_destroy_impl,
439 &any_buffer_sink::do_prepare_impl<S>,
440 &any_buffer_sink::do_commit_impl<S>,
441 &any_buffer_sink::do_commit_eof_impl<S>
442 };
443 };
444
445 //----------------------------------------------------------
446
447 inline
448 63 any_buffer_sink::~any_buffer_sink()
449 {
450
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63 times.
63 if(storage_)
451 {
452 vt_->destroy(sink_);
453 ::operator delete(storage_);
454 }
455
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 3 times.
63 if(cached_frame_)
456 60 ::operator delete(cached_frame_);
457 63 }
458
459 inline any_buffer_sink&
460 1 any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
461 {
462
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
463 {
464
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
465 {
466 vt_->destroy(sink_);
467 ::operator delete(storage_);
468 }
469
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_frame_)
470 ::operator delete(cached_frame_);
471 1 sink_ = std::exchange(other.sink_, nullptr);
472 1 vt_ = std::exchange(other.vt_, nullptr);
473 1 cached_frame_ = std::exchange(other.cached_frame_, nullptr);
474 1 cached_size_ = std::exchange(other.cached_size_, 0);
475 1 storage_ = std::exchange(other.storage_, nullptr);
476 }
477 1 return *this;
478 }
479
480 template<BufferSink S>
481 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
482 any_buffer_sink::any_buffer_sink(S s)
483 : vt_(&vtable_for_impl<S>::value)
484 {
485 struct guard {
486 any_buffer_sink* self;
487 bool committed = false;
488 ~guard() {
489 if(!committed && self->storage_) {
490 self->vt_->destroy(self->sink_);
491 ::operator delete(self->storage_);
492 self->storage_ = nullptr;
493 self->sink_ = nullptr;
494 }
495 }
496 } g{this};
497
498 storage_ = ::operator new(sizeof(S));
499 sink_ = ::new(storage_) S(std::move(s));
500
501 // Preallocate coroutine frames to find max size
502 auto& ref = *static_cast<S*>(sink_);
503 commit_coro<S>(this, ref, 0, nullptr);
504 commit_with_eof_coro<S>(this, ref, 0, false, nullptr);
505 commit_eof_coro<S>(this, ref, nullptr);
506
507 g.committed = true;
508 }
509
510 //----------------------------------------------------------
511
512 struct any_buffer_sink::commit_op
513 {
514 struct promise_type
515 {
516 executor_ref executor_;
517 std::stop_token stop_token_;
518 coro caller_h_{};
519
520 246 promise_type() = default;
521
522 commit_op
523 246 get_return_object() noexcept
524 {
525 return commit_op{
526 246 std::coroutine_handle<promise_type>::from_promise(*this)};
527 }
528
529 std::suspend_always
530 246 initial_suspend() noexcept
531 {
532 246 return {};
533 }
534
535 auto
536 50 final_suspend() noexcept
537 {
538 struct awaiter
539 {
540 promise_type* p_;
541
542 50 bool await_ready() const noexcept { return false; }
543
544 50 coro await_suspend(coro) const noexcept
545 {
546
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 50 times.
50 if(p_->caller_h_)
547 return p_->caller_h_;
548 50 return std::noop_coroutine();
549 }
550
551 void await_resume() const noexcept {}
552 };
553 50 return awaiter{this};
554 }
555
556 void
557 50 return_void() noexcept
558 {
559 50 }
560
561 void
562 16 unhandled_exception()
563 {
564 16 throw;
565 }
566
567 template<class... Args>
568 static void*
569 246 operator new(
570 std::size_t size,
571 any_buffer_sink* wrapper,
572 Args&&...)
573 {
574 246 return wrapper->alloc_frame(size);
575 }
576
577 template<class... Args>
578 static void
579 operator delete(void*, any_buffer_sink*, Args&&...) noexcept
580 {
581 }
582
583 static void
584 246 operator delete(void*, std::size_t) noexcept
585 {
586 246 }
587
588 void
589 66 set_executor(executor_ref ex) noexcept
590 {
591 66 executor_ = ex;
592 66 }
593
594 void
595 66 set_stop_token(std::stop_token token) noexcept
596 {
597 66 stop_token_ = token;
598 66 }
599
600 void
601 set_caller(coro h) noexcept
602 {
603 caller_h_ = h;
604 }
605
606 template<class Awaitable>
607 struct transform_awaiter
608 {
609 std::decay_t<Awaitable> a_;
610 promise_type* p_;
611
612 66 bool await_ready()
613 {
614 66 return a_.await_ready();
615 }
616
617 66 auto await_resume()
618 {
619 66 return a_.await_resume();
620 }
621
622 auto await_suspend(coro h)
623 {
624 return a_.await_suspend(h, p_->executor_, p_->stop_token_);
625 }
626 };
627
628 template<class Awaitable>
629 66 auto await_transform(Awaitable&& a)
630 {
631 using A = std::decay_t<Awaitable>;
632 if constexpr (IoAwaitable<A>)
633 {
634 return transform_awaiter<Awaitable>{
635 66 std::forward<Awaitable>(a), this};
636 }
637 else
638 {
639 static_assert(sizeof(A) == 0, "requires IoAwaitable");
640 }
641 }
642 };
643
644 std::coroutine_handle<promise_type> h_;
645
646 246 ~commit_op()
647 {
648
2/2
✓ Branch 1 taken 196 times.
✓ Branch 2 taken 50 times.
246 if(h_)
649 196 h_.destroy();
650 246 }
651
652 commit_op(commit_op const&) = delete;
653 commit_op& operator=(commit_op const&) = delete;
654
655 commit_op(commit_op&& other) noexcept
656 : h_(std::exchange(other.h_, nullptr))
657 {
658 }
659
660 commit_op& operator=(commit_op&& other) noexcept
661 {
662 if(this != &other)
663 {
664 if(h_)
665 h_.destroy();
666 h_ = std::exchange(other.h_, nullptr);
667 }
668 return *this;
669 }
670
671 private:
672 explicit
673 246 commit_op(std::coroutine_handle<promise_type> h) noexcept
674 246 : h_(h)
675 {
676 246 }
677 };
678
679 //----------------------------------------------------------
680
681 inline void*
682 246 any_buffer_sink::alloc_frame(std::size_t size)
683 {
684
4/4
✓ Branch 0 taken 186 times.
✓ Branch 1 taken 60 times.
✓ Branch 2 taken 126 times.
✓ Branch 3 taken 60 times.
246 if(cached_frame_ && cached_size_ >= size)
685 126 return cached_frame_;
686
687
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 60 times.
120 if(cached_frame_)
688 60 ::operator delete(cached_frame_);
689
690 120 cached_frame_ = ::operator new(size);
691 120 cached_size_ = size;
692 120 return cached_frame_;
693 }
694
695 inline void
696 any_buffer_sink::free_frame(void*, std::size_t)
697 {
698 // Keep the frame cached for reuse
699 }
700
701 template<BufferSink S>
702 std::size_t
703 68 any_buffer_sink::do_prepare_impl(
704 void* sink,
705 mutable_buffer* arr,
706 std::size_t max_count)
707 {
708 68 auto& s = *static_cast<S*>(sink);
709 68 return s.prepare(arr, max_count);
710 }
711
712 #if defined(__GNUC__) && !defined(__clang__)
713 #pragma GCC diagnostic push
714 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
715 #endif
716
717 template<BufferSink S>
718 any_buffer_sink::commit_op
719
1/1
✓ Branch 1 taken 98 times.
98 any_buffer_sink::commit_coro(
720 any_buffer_sink*,
721 S& sink,
722 std::size_t n,
723 std::error_code* out_ec)
724 {
725 auto [err] = co_await sink.commit(n);
726 *out_ec = err;
727 196 }
728
729 template<BufferSink S>
730 any_buffer_sink::commit_op
731
1/1
✓ Branch 1 taken 70 times.
70 any_buffer_sink::commit_with_eof_coro(
732 any_buffer_sink*,
733 S& sink,
734 std::size_t n,
735 bool eof,
736 std::error_code* out_ec)
737 {
738 auto [err] = co_await sink.commit(n, eof);
739 *out_ec = err;
740 140 }
741
742 template<BufferSink S>
743 any_buffer_sink::commit_op
744
1/1
✓ Branch 1 taken 78 times.
78 any_buffer_sink::commit_eof_coro(
745 any_buffer_sink*,
746 S& sink,
747 std::error_code* out_ec)
748 {
749 auto [err] = co_await sink.commit_eof();
750 *out_ec = err;
751 156 }
752
753 #if defined(__GNUC__) && !defined(__clang__)
754 #pragma GCC diagnostic pop
755 #endif
756
757 template<BufferSink S>
758 coro
759 48 any_buffer_sink::do_commit_impl(
760 void* sink,
761 any_buffer_sink* wrapper,
762 std::size_t n,
763 bool eof,
764 coro h,
765 executor_ref ex,
766 std::stop_token token,
767 std::error_code* ec)
768 {
769 48 auto& s = *static_cast<S*>(sink);
770
771 // Create coroutine - frame is cached in wrapper
772
2/2
✓ Branch 1 taken 10 times.
✓ Branch 4 taken 38 times.
48 auto op = eof
773
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 38 times.
48 ? commit_with_eof_coro<S>(wrapper, s, n, eof, ec)
774 : commit_coro<S>(wrapper, s, n, ec);
775
776 // Set executor and stop token on promise before resuming
777 48 op.h_.promise().set_executor(ex);
778 48 op.h_.promise().set_stop_token(token);
779
780 // Resume the coroutine to start the operation
781
1/1
✓ Branch 1 taken 37 times.
48 op.h_.resume();
782
783 // Check if operation completed synchronously
784
1/2
✓ Branch 1 taken 37 times.
✗ Branch 2 not taken.
37 if(op.h_.done())
785 {
786
1/1
✓ Branch 1 taken 37 times.
37 op.h_.destroy();
787 37 op.h_ = nullptr;
788
1/1
✓ Branch 1 taken 37 times.
37 return ex.dispatch(h);
789 }
790
791 // Operation is pending - caller will be resumed via symmetric transfer
792 op.h_.promise().set_caller(h);
793 op.h_ = nullptr;
794 return std::noop_coroutine();
795 48 }
796
797 template<BufferSink S>
798 coro
799 18 any_buffer_sink::do_commit_eof_impl(
800 void* sink,
801 any_buffer_sink* wrapper,
802 coro h,
803 executor_ref ex,
804 std::stop_token token,
805 std::error_code* ec)
806 {
807 18 auto& s = *static_cast<S*>(sink);
808
809 // Create coroutine - frame is cached in wrapper
810
1/1
✓ Branch 1 taken 18 times.
18 auto op = commit_eof_coro<S>(wrapper, s, ec);
811
812 // Set executor and stop token on promise before resuming
813 18 op.h_.promise().set_executor(ex);
814 18 op.h_.promise().set_stop_token(token);
815
816 // Resume the coroutine to start the operation
817
1/1
✓ Branch 1 taken 13 times.
18 op.h_.resume();
818
819 // Check if operation completed synchronously
820
1/2
✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
13 if(op.h_.done())
821 {
822
1/1
✓ Branch 1 taken 13 times.
13 op.h_.destroy();
823 13 op.h_ = nullptr;
824
1/1
✓ Branch 1 taken 13 times.
13 return ex.dispatch(h);
825 }
826
827 // Operation is pending - caller will be resumed via symmetric transfer
828 op.h_.promise().set_caller(h);
829 op.h_ = nullptr;
830 return std::noop_coroutine();
831 18 }
832
833 //----------------------------------------------------------
834
835 inline std::size_t
836 68 any_buffer_sink::prepare(
837 mutable_buffer* arr,
838 std::size_t max_count)
839 {
840 68 return vt_->do_prepare(sink_, arr, max_count);
841 }
842
843 inline auto
844 38 any_buffer_sink::commit(std::size_t n)
845 {
846 struct awaitable
847 {
848 any_buffer_sink* self_;
849 std::size_t n_;
850 std::error_code ec_;
851
852 bool
853 38 await_ready() const noexcept
854 {
855 38 return false;
856 }
857
858 coro
859 38 await_suspend(coro h, executor_ref ex, std::stop_token token)
860 {
861 76 return self_->vt_->do_commit(
862
1/1
✓ Branch 1 taken 30 times.
38 self_->sink_,
863 self_,
864 n_,
865 false,
866 h,
867 ex,
868 token,
869 60 &ec_);
870 }
871
872 io_result<>
873 30 await_resume() const noexcept
874 {
875 30 return {ec_};
876 }
877 };
878 38 return awaitable{this, n, {}};
879 }
880
881 inline auto
882 10 any_buffer_sink::commit(std::size_t n, bool eof)
883 {
884 struct awaitable
885 {
886 any_buffer_sink* self_;
887 std::size_t n_;
888 bool eof_;
889 std::error_code ec_;
890
891 bool
892 10 await_ready() const noexcept
893 {
894 10 return false;
895 }
896
897 coro
898 10 await_suspend(coro h, executor_ref ex, std::stop_token token)
899 {
900 20 return self_->vt_->do_commit(
901 10 self_->sink_,
902 self_,
903 n_,
904
1/1
✓ Branch 1 taken 7 times.
10 eof_,
905 h,
906 ex,
907 token,
908 14 &ec_);
909 }
910
911 io_result<>
912 7 await_resume() const noexcept
913 {
914 7 return {ec_};
915 }
916 };
917 10 return awaitable{this, n, eof, {}};
918 }
919
920 inline auto
921 18 any_buffer_sink::commit_eof()
922 {
923 struct awaitable
924 {
925 any_buffer_sink* self_;
926 std::error_code ec_;
927
928 bool
929 18 await_ready() const noexcept
930 {
931 18 return false;
932 }
933
934 coro
935 18 await_suspend(coro h, executor_ref ex, std::stop_token token)
936 {
937 36 return self_->vt_->do_commit_eof(
938
1/1
✓ Branch 1 taken 13 times.
18 self_->sink_,
939 self_,
940 h,
941 ex,
942 token,
943 26 &ec_);
944 }
945
946 io_result<>
947 13 await_resume() const noexcept
948 {
949 13 return {ec_};
950 }
951 };
952 18 return awaitable{this, {}};
953 }
954
955 //----------------------------------------------------------
956
957 template<ConstBufferSequence CB>
958 task<io_result<std::size_t>>
959 any_buffer_sink::write(CB buffers)
960 {
961 return write(buffers, false);
962 }
963
964 template<ConstBufferSequence CB>
965 task<io_result<std::size_t>>
966 any_buffer_sink::write(CB buffers, bool eof)
967 {
968 buffer_param<CB> bp(buffers);
969 std::size_t total = 0;
970
971 for(;;)
972 {
973 auto src = bp.data();
974 if(src.empty())
975 break;
976
977 mutable_buffer arr[detail::max_iovec_];
978 std::size_t count = prepare(arr, detail::max_iovec_);
979 if(count == 0)
980 {
981 auto [ec] = co_await commit(0);
982 if(ec)
983 co_return {ec, total};
984 continue;
985 }
986
987 auto n = buffer_copy(std::span(arr, count), src);
988 auto [ec] = co_await commit(n);
989 if(ec)
990 co_return {ec, total};
991 bp.consume(n);
992 total += n;
993 }
994
995 if(eof)
996 {
997 auto [ec] = co_await commit_eof();
998 if(ec)
999 co_return {ec, total};
1000 }
1001
1002 co_return {{}, total};
1003 }
1004
1005 inline auto
1006 any_buffer_sink::write_eof()
1007 {
1008 return commit_eof();
1009 }
1010
1011 //----------------------------------------------------------
1012
1013 static_assert(WriteSink<any_buffer_sink>);
1014
1015 } // namespace capy
1016 } // namespace boost
1017
1018 #endif
1019