Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/make_buffer.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/test/fuse.hpp>
20 :
21 : #include <algorithm>
22 : #include <stop_token>
23 : #include <string>
24 : #include <string_view>
25 :
26 : namespace boost {
27 : namespace capy {
28 : namespace test {
29 :
30 : /** A mock buffer source for testing push operations.
31 :
32 : Use this to verify code that transfers data from a buffer source to
33 : a sink without needing real I/O. Call @ref provide to supply data,
34 : then @ref pull to retrieve buffer descriptors. The associated
35 : @ref fuse enables error injection at controlled points.
36 :
37 : This class satisfies the @ref BufferSource concept by providing
38 : a pull interface that fills an array of buffer descriptors and
39 : a consume interface to indicate bytes used.
40 :
41 : @par Thread Safety
42 : Not thread-safe.
43 :
44 : @par Example
45 : @code
46 : fuse f;
47 : buffer_source bs( f );
48 : bs.provide( "Hello, " );
49 : bs.provide( "World!" );
50 :
51 : auto r = f.armed( [&]( fuse& ) -> task<void> {
52 : const_buffer arr[16];
53 : auto [ec, count] = co_await bs.pull( arr, 16 );
54 : if( ec )
55 : co_return;
56 : // arr[0..count) contains buffer descriptors
57 : std::size_t n = buffer_size( std::span( arr, count ) );
58 : bs.consume( n );
59 : } );
60 : @endcode
61 :
62 : @see fuse, BufferSource
63 : */
64 : class buffer_source
65 : {
66 : fuse* f_;
67 : std::string data_;
68 : std::size_t pos_ = 0;
69 : std::size_t max_pull_size_;
70 :
71 : public:
72 : /** Construct a buffer source.
73 :
74 : @param f The fuse used to inject errors during pulls.
75 :
76 : @param max_pull_size Maximum bytes returned per pull.
77 : Use to simulate chunked delivery.
78 : */
79 288 : explicit buffer_source(
80 : fuse& f,
81 : std::size_t max_pull_size = std::size_t(-1)) noexcept
82 288 : : f_(&f)
83 288 : , max_pull_size_(max_pull_size)
84 : {
85 288 : }
86 :
87 : /** Append data to be returned by subsequent pulls.
88 :
89 : Multiple calls accumulate data that @ref pull returns.
90 :
91 : @param sv The data to append.
92 : */
93 : void
94 302 : provide(std::string_view sv)
95 : {
96 302 : data_.append(sv);
97 302 : }
98 :
99 : /// Clear all data and reset the read position.
100 : void
101 : clear() noexcept
102 : {
103 : data_.clear();
104 : pos_ = 0;
105 : }
106 :
107 : /// Return the number of bytes available for pulling.
108 : std::size_t
109 : available() const noexcept
110 : {
111 : return data_.size() - pos_;
112 : }
113 :
114 : /** Consume bytes from the source.
115 :
116 : Advances the internal read position by the specified number
117 : of bytes. The next call to @ref pull returns data starting
118 : after the consumed bytes.
119 :
120 : @param n The number of bytes to consume. Must not exceed the
121 : total size of buffers returned by the previous @ref pull.
122 : */
123 : void
124 267 : consume(std::size_t n) noexcept
125 : {
126 267 : pos_ += n;
127 267 : }
128 :
129 : /** Pull buffer data from the source.
130 :
131 : Fills the provided array with buffer descriptors pointing to
132 : internal data starting from the current unconsumed position.
133 : Returns the number of buffers filled. When no data remains,
134 : returns count=0 to signal completion.
135 :
136 : Calling pull multiple times without intervening @ref consume
137 : returns the same data. Use consume to advance past processed
138 : bytes.
139 :
140 : @param arr Pointer to array of const_buffer to fill.
141 : @param max_count Maximum number of buffers to fill.
142 :
143 : @return An awaitable yielding `(error_code,std::size_t)`.
144 :
145 : @see consume, fuse
146 : */
147 : auto
148 540 : pull(const_buffer* arr, std::size_t max_count)
149 : {
150 : struct awaitable
151 : {
152 : buffer_source* self_;
153 : const_buffer* arr_;
154 : std::size_t max_count_;
155 :
156 540 : bool await_ready() const noexcept { return true; }
157 :
158 0 : void await_suspend(
159 : coro,
160 : executor_ref,
161 : std::stop_token) const noexcept
162 : {
163 0 : }
164 :
165 : io_result<std::size_t>
166 540 : await_resume()
167 : {
168 540 : auto ec = self_->f_->maybe_fail();
169 459 : if(ec)
170 81 : return {ec, 0};
171 :
172 378 : if(self_->pos_ >= self_->data_.size())
173 66 : return {{}, 0}; // Source exhausted
174 :
175 312 : std::size_t avail = self_->data_.size() - self_->pos_;
176 312 : std::size_t to_return = (std::min)(avail, self_->max_pull_size_);
177 :
178 312 : if(max_count_ == 0)
179 0 : return {{}, 0};
180 :
181 : // Fill a single buffer descriptor
182 312 : arr_[0] = make_buffer(
183 312 : self_->data_.data() + self_->pos_,
184 : to_return);
185 :
186 312 : return {{}, 1};
187 : }
188 : };
189 540 : return awaitable{this, arr, max_count};
190 : }
191 : };
192 :
193 : } // test
194 : } // capy
195 : } // boost
196 :
197 : #endif
|