Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 : #define BOOST_CAPY_READ_UNTIL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/cond.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/io_task.hpp>
20 : #include <boost/capy/concept/dynamic_buffer.hpp>
21 : #include <boost/capy/concept/match_condition.hpp>
22 : #include <boost/capy/concept/read_stream.hpp>
23 : #include <boost/capy/ex/executor_ref.hpp>
24 :
25 : #include <algorithm>
26 : #include <cstddef>
27 : #include <optional>
28 : #include <stop_token>
29 : #include <string_view>
30 : #include <type_traits>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : namespace detail {
36 :
37 : // Linearize a buffer sequence into a string
38 : inline
39 : std::string
40 0 : linearize_buffers(ConstBufferSequence auto const& data)
41 : {
42 0 : std::string linear;
43 0 : linear.reserve(buffer_size(data));
44 0 : auto const end_ = end(data);
45 0 : for(auto it = begin(data); it != end_; ++it)
46 0 : linear.append(
47 0 : static_cast<char const*>(it->data()),
48 : it->size());
49 0 : return linear;
50 0 : }
51 :
52 : // Search buffer using a MatchCondition, with single-buffer optimization
53 : template<MatchCondition M>
54 : std::size_t
55 240 : search_buffer_for_match(
56 : ConstBufferSequence auto const& data,
57 : M const& match,
58 : std::size_t* hint = nullptr)
59 : {
60 : // Fast path: single buffer - no linearization needed
61 240 : if(buffer_length(data) == 1)
62 : {
63 240 : auto const& buf = *begin(data);
64 720 : return match(std::string_view(
65 240 : static_cast<char const*>(buf.data()),
66 240 : buf.size()), hint);
67 : }
68 : // Multiple buffers - linearize
69 0 : return match(linearize_buffers(data), hint);
70 : }
71 :
72 : // Implementation coroutine for read_until with MatchCondition
73 : template<class Stream, class B, MatchCondition M>
74 : io_task<std::size_t>
75 126 : read_until_match_impl(
76 : Stream& stream,
77 : B& buffers,
78 : M match,
79 : std::size_t initial_amount)
80 : {
81 : std::size_t amount = initial_amount;
82 :
83 : for(;;)
84 : {
85 : // Check max_size before preparing
86 : if(buffers.size() >= buffers.max_size())
87 : co_return {error::not_found, 0};
88 :
89 : // Prepare space, respecting max_size
90 : std::size_t const available = buffers.max_size() - buffers.size();
91 : std::size_t const to_prepare = (std::min)(amount, available);
92 : if(to_prepare == 0)
93 : co_return {error::not_found, 0};
94 :
95 : auto mb = buffers.prepare(to_prepare);
96 : auto [ec, n] = co_await stream.read_some(mb);
97 : buffers.commit(n);
98 :
99 : if(n > 0)
100 : {
101 : auto pos = search_buffer_for_match(buffers.data(), match);
102 : if(pos != std::string_view::npos)
103 : co_return {{}, pos};
104 : }
105 :
106 : if(ec == cond::eof)
107 : co_return {error::eof, buffers.size()};
108 : if(ec)
109 : co_return {ec, buffers.size()};
110 :
111 : // Grow buffer size for next iteration
112 : if(n == buffer_size(mb))
113 : amount = amount / 2 + amount;
114 : }
115 252 : }
116 :
117 : template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
118 : struct read_until_awaitable
119 : {
120 : Stream* stream_;
121 : M match_;
122 : std::size_t initial_amount_;
123 : std::optional<io_result<std::size_t>> immediate_;
124 : std::optional<io_task<std::size_t>> inner_;
125 :
126 : using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
127 : storage_type buffers_storage_;
128 :
129 126 : B& buffers() noexcept
130 : {
131 : if constexpr(OwnsBuffer)
132 126 : return buffers_storage_;
133 : else
134 0 : return *buffers_storage_;
135 : }
136 :
137 : // Constructor for lvalue (pointer storage)
138 4 : read_until_awaitable(
139 : Stream& stream,
140 : B* buffers,
141 : M match,
142 : std::size_t initial_amount)
143 : requires (!OwnsBuffer)
144 4 : : stream_(std::addressof(stream))
145 4 : , match_(std::move(match))
146 4 : , initial_amount_(initial_amount)
147 4 : , buffers_storage_(buffers)
148 : {
149 4 : auto pos = search_buffer_for_match(
150 4 : buffers_storage_->data(), match_);
151 4 : if(pos != std::string_view::npos)
152 4 : immediate_.emplace(io_result<std::size_t>{{}, pos});
153 4 : }
154 :
155 : // Constructor for rvalue adapter (owned storage)
156 132 : read_until_awaitable(
157 : Stream& stream,
158 : B&& buffers,
159 : M match,
160 : std::size_t initial_amount)
161 : requires OwnsBuffer
162 132 : : stream_(std::addressof(stream))
163 132 : , match_(std::move(match))
164 132 : , initial_amount_(initial_amount)
165 132 : , buffers_storage_(std::move(buffers))
166 : {
167 132 : auto pos = search_buffer_for_match(
168 132 : buffers_storage_.data(), match_);
169 132 : if(pos != std::string_view::npos)
170 6 : immediate_.emplace(io_result<std::size_t>{{}, pos});
171 132 : }
172 :
173 : bool
174 136 : await_ready() const noexcept
175 : {
176 136 : return immediate_.has_value();
177 : }
178 :
179 : coro
180 126 : await_suspend(coro h, executor_ref ex, std::stop_token token)
181 : {
182 252 : inner_.emplace(read_until_match_impl(
183 126 : *stream_, buffers(), match_, initial_amount_));
184 126 : return inner_->await_suspend(h, ex, token);
185 : }
186 :
187 : io_result<std::size_t>
188 136 : await_resume()
189 : {
190 136 : if(immediate_)
191 10 : return *immediate_;
192 126 : return inner_->await_resume();
193 : }
194 : };
195 :
196 : } // namespace detail
197 :
198 : /** Matcher for string delimiters.
199 :
200 : This matcher searches for a delimiter string and provides
201 : the appropriate overlap hint for efficient searching across
202 : read boundaries.
203 : */
204 : struct match_delim
205 : {
206 : std::string_view delim;
207 :
208 : std::size_t
209 202 : operator()(
210 : std::string_view data,
211 : std::size_t* hint) const noexcept
212 : {
213 202 : if(delim.empty())
214 2 : return 0;
215 200 : auto pos = data.find(delim);
216 200 : if(pos != std::string_view::npos)
217 24 : return pos + delim.size();
218 176 : if(hint)
219 0 : *hint = delim.size() > 1 ? delim.size() - 1 : 0;
220 176 : return std::string_view::npos;
221 : }
222 : };
223 :
224 : /** Read data until a match condition is satisfied.
225 :
226 : This function reads data from the stream into the dynamic buffer
227 : until the match condition returns a valid position. The operation
228 : completes when a match is found, an error occurs, EOF is reached,
229 : or the buffer's max_size is reached.
230 :
231 : If the match condition is already satisfied by data in the buffer,
232 : the function returns immediately without performing any I/O.
233 :
234 : @tparam Stream The stream type, must satisfy @ref ReadStream.
235 : @tparam B The buffer type, must satisfy @ref DynamicBufferParam.
236 : @tparam M The match condition type, must satisfy @ref MatchCondition.
237 :
238 : @param stream The stream to read from.
239 : @param buffers The dynamic buffer to read into.
240 : @param match The match condition callable.
241 : @param initial_amount The initial number of bytes to read per
242 : iteration. Grows automatically for subsequent reads.
243 :
244 : @return An awaitable yielding `(error_code,std::size_t)`.
245 : On success, `ec` is default-constructed and `n` is the
246 : position returned by the match condition. On error:
247 : - `ec == cond::eof`: EOF reached before match, `n` is buffer size
248 : - `ec == cond::not_found`: max_size reached before match, `n` is 0
249 : - Other error: I/O error occurred, `n` is bytes read before error
250 :
251 : @par Example
252 : @code
253 : // Read until HTTP header end
254 : task<void> read_http_header(ReadStream auto& stream)
255 : {
256 : std::string header;
257 : auto [ec, n] = co_await read_until(
258 : stream, dynamic_buffer(header),
259 : [](std::string_view data, std::size_t* hint) {
260 : auto pos = data.find("\r\n\r\n");
261 : if(pos != std::string_view::npos)
262 : return pos + 4;
263 : if(hint)
264 : *hint = 3; // Partial match possible
265 : return std::string_view::npos;
266 : });
267 : if(ec)
268 : co_return;
269 : // header contains data including "\r\n\r\n"
270 : }
271 : @endcode
272 :
273 : @see ReadStream, DynamicBufferParam, MatchCondition
274 : */
275 : template<ReadStream Stream, class B, MatchCondition M>
276 : requires DynamicBufferParam<B&&>
277 : auto
278 136 : read_until(
279 : Stream& stream,
280 : B&& buffers,
281 : M match,
282 : std::size_t initial_amount = 2048)
283 : {
284 136 : constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
285 : using BareB = std::remove_reference_t<B>;
286 :
287 : if constexpr(is_lvalue)
288 : return detail::read_until_awaitable<Stream, BareB, M, false>(
289 4 : stream, std::addressof(buffers), std::move(match), initial_amount);
290 : else
291 : return detail::read_until_awaitable<Stream, BareB, M, true>(
292 132 : stream, std::move(buffers), std::move(match), initial_amount);
293 : }
294 :
295 : /** Read data until a delimiter is found.
296 :
297 : This function reads data from the stream into the dynamic buffer
298 : until the specified delimiter string is found. The operation
299 : completes when the delimiter is found, an error occurs, EOF is
300 : reached, or the buffer's max_size is reached.
301 :
302 : If the delimiter already exists in the buffer, the function
303 : returns immediately without performing any I/O.
304 :
305 : @tparam Stream The stream type, must satisfy @ref ReadStream.
306 : @tparam B The buffer type, must satisfy @ref DynamicBufferParam.
307 :
308 : @param stream The stream to read from.
309 : @param buffers The dynamic buffer to read into.
310 : @param delim The delimiter string to search for.
311 : @param initial_amount The initial number of bytes to read per
312 : iteration. Grows automatically for subsequent reads.
313 :
314 : @return An awaitable yielding `(error_code,std::size_t)`.
315 : On success, `ec` is default-constructed and `n` is the number
316 : of bytes up to and including the delimiter. On error:
317 : - `ec == cond::eof`: EOF reached before delimiter, `n` is buffer size
318 : - `ec == cond::not_found`: max_size reached before delimiter, `n` is 0
319 : - Other error: I/O error occurred, `n` is bytes read before error
320 :
321 : @par Example
322 : @code
323 : task<void> read_line(ReadStream auto& stream)
324 : {
325 : std::string line;
326 : auto [ec, n] = co_await read_until(
327 : stream, dynamic_buffer(line), "\r\n");
328 : if(ec)
329 : {
330 : // Handle error or EOF
331 : co_return;
332 : }
333 : // line contains data including "\r\n"
334 : line.resize(n - 2); // Remove delimiter
335 : }
336 : @endcode
337 :
338 : @see ReadStream, DynamicBufferParam
339 : */
340 : template<ReadStream Stream, class B>
341 : requires DynamicBufferParam<B&&>
342 : auto
343 108 : read_until(
344 : Stream& stream,
345 : B&& buffers,
346 : std::string_view delim,
347 : std::size_t initial_amount = 2048)
348 : {
349 : return read_until(
350 : stream,
351 : std::forward<B>(buffers),
352 : match_delim{delim},
353 108 : initial_amount);
354 : }
355 :
356 : } // namespace capy
357 : } // namespace boost
358 :
359 : #endif
|