Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
11 : #define BOOST_CAPY_TEST_READ_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_copy.hpp>
16 : #include <boost/capy/buffers/make_buffer.hpp>
17 : #include <boost/capy/cond.hpp>
18 : #include <boost/capy/coro.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/test/fuse.hpp>
22 :
23 : #include <stop_token>
24 : #include <string>
25 : #include <string_view>
26 :
27 : namespace boost {
28 : namespace capy {
29 : namespace test {
30 :
31 : /** A mock stream for testing read operations.
32 :
33 : Use this to verify code that performs reads without needing
34 : real I/O. Call @ref provide to supply data, then @ref read_some
35 : to consume it. The associated @ref fuse enables error injection
36 : at controlled points. An optional `max_read_size` constructor
37 : parameter limits bytes per read to simulate chunked delivery.
38 :
39 : @par Thread Safety
40 : Not thread-safe.
41 :
42 : @par Example
43 : @code
44 : fuse f;
45 : read_stream rs( f );
46 : rs.provide( "Hello, " );
47 : rs.provide( "World!" );
48 :
49 : auto r = f.armed( [&]( fuse& ) -> task<void> {
50 : char buf[32];
51 : auto [ec, n] = co_await rs.read_some(
52 : mutable_buffer( buf, sizeof( buf ) ) );
53 : if( ec )
54 : co_return;
55 : // buf contains "Hello, World!"
56 : } );
57 : @endcode
58 :
59 : @see fuse
60 : */
61 : class read_stream
62 : {
63 : fuse* f_;
64 : std::string data_;
65 : std::size_t pos_ = 0;
66 : std::size_t max_read_size_;
67 :
68 : public:
69 : /** Construct a read stream.
70 :
71 : @param f The fuse used to inject errors during reads.
72 :
73 : @param max_read_size Maximum bytes returned per read.
74 : Use to simulate chunked network delivery.
75 : */
76 1309 : explicit read_stream(
77 : fuse& f,
78 : std::size_t max_read_size = std::size_t(-1)) noexcept
79 1309 : : f_(&f)
80 1309 : , max_read_size_(max_read_size)
81 : {
82 1309 : }
83 :
84 : /** Append data to be returned by subsequent reads.
85 :
86 : Multiple calls accumulate data that @ref read_some returns.
87 :
88 : @param sv The data to append.
89 : */
90 : void
91 1283 : provide(std::string_view sv)
92 : {
93 1283 : data_.append(sv);
94 1283 : }
95 :
96 : /// Clear all data and reset the read position.
97 : void
98 : clear() noexcept
99 : {
100 : data_.clear();
101 : pos_ = 0;
102 : }
103 :
104 : /// Return the number of bytes available for reading.
105 : std::size_t
106 2 : available() const noexcept
107 : {
108 2 : return data_.size() - pos_;
109 : }
110 :
111 : /** Asynchronously read data from the stream.
112 :
113 : Transfers up to `buffer_size( buffers )` bytes from the internal
114 : buffer to the provided mutable buffer sequence. If no data remains,
115 : returns `error::eof`. Before every read, the attached @ref fuse is
116 : consulted to possibly inject an error for testing fault scenarios.
117 : The returned `std::size_t` is the number of bytes transferred.
118 :
119 : @par Effects
120 : On success, advances the internal read position by the number of
121 : bytes copied. If an error is injected by the fuse, the read position
122 : remains unchanged.
123 :
124 : @par Exception Safety
125 : No-throw guarantee.
126 :
127 : @param buffers The mutable buffer sequence to receive data.
128 :
129 : @return An awaitable yielding ( error_code, std::size_t ).
130 :
131 : @see fuse
132 : */
133 : template<MutableBufferSequence MB>
134 : auto
135 1570 : read_some(MB buffers)
136 : {
137 : struct awaitable
138 : {
139 : read_stream* self_;
140 : MB buffers_;
141 :
142 1570 : bool await_ready() const noexcept { return true; }
143 :
144 0 : void await_suspend(
145 : coro,
146 : executor_ref,
147 : std::stop_token) const noexcept
148 : {
149 0 : }
150 :
151 : io_result<std::size_t>
152 1570 : await_resume()
153 : {
154 1570 : auto ec = self_->f_->maybe_fail();
155 1374 : if(ec)
156 196 : return {ec, 0};
157 :
158 1178 : if(self_->pos_ >= self_->data_.size())
159 84 : return {error::eof, 0};
160 :
161 1094 : std::size_t avail = self_->data_.size() - self_->pos_;
162 1094 : if(avail > self_->max_read_size_)
163 236 : avail = self_->max_read_size_;
164 1094 : auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
165 1094 : std::size_t const n = buffer_copy(buffers_, src);
166 1094 : self_->pos_ += n;
167 1094 : return {{}, n};
168 : }
169 : };
170 1570 : return awaitable{this, buffers};
171 : }
172 : };
173 :
174 : } // test
175 : } // capy
176 : } // boost
177 :
178 : #endif
|