Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
11 : #define BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/log_service.hpp>
15 : #include <boost/beast2/format.hpp>
16 : #include <boost/beast2/read.hpp>
17 : #include <boost/beast2/write.hpp>
18 : #include <boost/beast2/server/any_lambda.hpp>
19 : #include <boost/beast2/server/basic_router.hpp>
20 : #include <boost/beast2/server/route_handler_asio.hpp>
21 : #include <boost/beast2/server/router_asio.hpp>
22 : #include <boost/beast2/error.hpp>
23 : #include <boost/beast2/detail/except.hpp>
24 : #include <boost/rts/application.hpp>
25 : #include <boost/http_proto/request_parser.hpp>
26 : #include <boost/http_proto/response.hpp>
27 : #include <boost/http_proto/serializer.hpp>
28 : #include <boost/http_proto/string_body.hpp>
29 : #include <boost/url/parse.hpp>
30 : #include <boost/asio/prepend.hpp>
31 :
32 : namespace boost {
33 : namespace beast2 {
34 :
35 : //------------------------------------------------
36 :
37 : /** An HTTP server stream which routes requests to handlers and sends reesponses.
38 :
39 : An object of this type wraps an asynchronous Boost.ASIO stream and implements
40 : a high level server connection which reads HTTP requests, routes them to
41 : handlers installed in a router, and sends the HTTP response.
42 :
43 : @par Requires
44 : `AsyncStream` must satisfy <em>AsyncReadStream</em> and <em>AsyncWriteStream</em>
45 :
46 : @tparam AsyncStream The type of asynchronous stream.
47 : */
48 : template<class AsyncStream>
49 : class http_stream
50 : : private detacher::owner
51 : {
52 : public:
53 : /** Constructor.
54 :
55 : This initializes a new HTTP connection object that operates on
56 : the given stream, uses the specified router to dispatch incoming
57 : requests, and calls the supplied completion function when the
58 : connection closes or fails.
59 :
60 : Construction does not start any I/O; call @ref on_stream_begin when
61 : the stream is connected to the remote peer to begin reading
62 : requests and processing them.
63 :
64 : @param app The owning application, used to access shared services
65 : such as logging and protocol objects.
66 : @param stream The underlying asynchronous stream to read from
67 : and write to. The caller is responsible for maintaining its
68 : lifetime for the duration of the session.
69 : @param routes The router used to dispatch incoming HTTP requests.
70 : @param close_fn The function invoked when the connection is closed
71 : or an unrecoverable error occurs.
72 : */
73 : http_stream(
74 : rts::application& app,
75 : AsyncStream& stream,
76 : router_asio<AsyncStream&> routes,
77 : any_lambda<void(system::error_code)> close_fn);
78 :
79 : /** Called to start a new HTTP session
80 :
81 : The stream must be in a connected,
82 : correct state for a new session.
83 : */
84 : void on_stream_begin(acceptor_config const& config);
85 :
86 : private:
87 : void do_read();
88 :
89 : void on_read(
90 : system::error_code ec,
91 : std::size_t bytes_transferred);
92 :
93 : void on_write(
94 : system::error_code const& ec,
95 : std::size_t bytes_transferred);
96 :
97 : void do_fail(core::string_view s,
98 : system::error_code const& ec);
99 :
100 : resumer do_detach() override;
101 :
102 : void do_resume(system::error_code const& ec) override;
103 :
104 : void do_resume2(system::error_code ec);
105 :
106 : protected:
107 0 : std::string id() const
108 : {
109 0 : return std::string("[") + std::to_string(id_) + "] ";
110 : }
111 :
112 : protected:
113 : section sect_;
114 : std::size_t id_ = 0;
115 : AsyncStream& stream_;
116 : router_asio<AsyncStream&> routes_;
117 : any_lambda<void(system::error_code)> close_;
118 : acceptor_config const* pconfig_ = nullptr;
119 :
120 : using work_guard = asio::executor_work_guard<decltype(
121 : std::declval<AsyncStream&>().get_executor())>;
122 : std::unique_ptr<work_guard> pwg_;
123 : Request req_;
124 : ResponseAsio<AsyncStream&> res_;
125 : };
126 :
127 : //------------------------------------------------
128 :
129 : template<class AsyncStream>
130 0 : http_stream<AsyncStream>::
131 : http_stream(
132 : rts::application& app,
133 : AsyncStream& stream,
134 : router_asio<AsyncStream&> routes,
135 : any_lambda<void(system::error_code)> close)
136 0 : : sect_(use_log_service(app).get_section("http_stream"))
137 0 : , id_(
138 0 : []() noexcept
139 : {
140 : static std::size_t n = 0;
141 0 : return ++n;
142 0 : }())
143 0 : , stream_(stream)
144 0 : , routes_(std::move(routes))
145 0 : , close_(close)
146 0 : , res_(stream_)
147 : {
148 0 : req_.parser = http_proto::request_parser(app);
149 :
150 0 : res_.serializer = http_proto::serializer(app);
151 0 : res_.detach = detacher(*this);
152 0 : }
153 :
154 : /** Called to start a new HTTP session
155 :
156 : The stream must be in a connected,
157 : correct state for a new session.
158 : */
159 : template<class AsyncStream>
160 : void
161 0 : http_stream<AsyncStream>::
162 : on_stream_begin(
163 : acceptor_config const& config)
164 : {
165 0 : pconfig_ = &config;
166 0 : req_.parser.reset();
167 0 : do_read();
168 0 : }
169 :
170 : template<class AsyncStream>
171 : void
172 0 : http_stream<AsyncStream>::
173 : do_read()
174 : {
175 0 : req_.parser.start();
176 0 : res_.serializer.reset();
177 0 : beast2::async_read(stream_, req_.parser,
178 0 : call_mf(&http_stream::on_read, this));
179 0 : }
180 :
181 : template<class AsyncStream>
182 : void
183 0 : http_stream<AsyncStream>::
184 : on_read(
185 : system::error_code ec,
186 : std::size_t bytes_transferred)
187 : {
188 : (void)bytes_transferred;
189 :
190 0 : if(ec.failed())
191 0 : return do_fail("http_stream::on_read", ec);
192 :
193 0 : LOG_TRC(this->sect_)(
194 : "{} http_stream::on_read bytes={}",
195 : this->id(), bytes_transferred);
196 :
197 0 : BOOST_ASSERT(req_.parser.is_complete());
198 :
199 : //----------------------------------------
200 : //
201 : // set up Request and Response objects
202 : //
203 :
204 : // VFALCO HACK for now we make a copy of the message
205 0 : req_.message = req_.parser.get();
206 :
207 : // copy version
208 0 : res_.message.set_version(req_.message.version());
209 :
210 : // copy keep-alive setting
211 0 : res_.message.set_start_line(
212 0 : http_proto::status::ok, req_.parser.get().version());
213 0 : res_.message.set_keep_alive(req_.parser.get().keep_alive());
214 :
215 : // parse the URL
216 : {
217 0 : auto rv = urls::parse_uri_reference(req_.parser.get().target());
218 0 : if(rv.has_value())
219 : {
220 0 : req_.url = rv.value();
221 0 : req_.base_path = "";
222 0 : req_.path = std::string(rv->encoded_path());
223 : }
224 : else
225 : {
226 : // error parsing URL
227 0 : res_.status(
228 : http_proto::status::bad_request);
229 0 : res_.set_body(
230 0 : "Bad Request: " + rv.error().message());
231 0 : goto do_write;
232 : }
233 : }
234 :
235 : // invoke handlers for the route
236 0 : BOOST_ASSERT(! pwg_);
237 0 : ec = routes_.dispatch(req_.message.method(), req_.url, req_, res_);
238 0 : if(ec == route::send)
239 0 : goto do_write;
240 :
241 0 : if(ec == route::next)
242 : {
243 : // unhandled
244 0 : res_.status(http_proto::status::not_found);
245 0 : std::string s;
246 0 : format_to(s, "The requested URL {} was not found on this server.", req_.url);
247 : //res_.message.set_keep_alive(false); // VFALCO?
248 0 : res_.set_body(s);
249 0 : goto do_write;
250 0 : }
251 :
252 0 : if(ec == route::detach)
253 : {
254 : // make sure they called detach()
255 0 : BOOST_ASSERT(pwg_);
256 0 : return;
257 : }
258 :
259 : // error message of last resort
260 : {
261 0 : BOOST_ASSERT(ec.failed());
262 0 : res_.status(http_proto::status::internal_server_error);
263 0 : std::string s;
264 0 : format_to(s, "An internal server error occurred: {}", ec.message());
265 : //res_.message.set_keep_alive(false); // VFALCO?
266 0 : res_.set_body(s);
267 0 : }
268 :
269 0 : do_write:
270 0 : if(res_.serializer.is_done())
271 : {
272 : // happens when the handler sends the response
273 0 : return on_write(system::error_code(), 0);
274 : }
275 :
276 0 : beast2::async_write(stream_, res_.serializer,
277 0 : call_mf(&http_stream::on_write, this));
278 : }
279 :
280 : template<class AsyncStream>
281 : void
282 0 : http_stream<AsyncStream>::
283 : on_write(
284 : system::error_code const& ec,
285 : std::size_t bytes_transferred)
286 : {
287 : (void)bytes_transferred;
288 :
289 0 : if(ec.failed())
290 0 : return do_fail("http_stream::on_write", ec);
291 :
292 0 : BOOST_ASSERT(res_.serializer.is_done());
293 :
294 0 : LOG_TRC(this->sect_)(
295 : "{} http_stream::on_write bytes={}",
296 : this->id(), bytes_transferred);
297 :
298 0 : if(res_.message.keep_alive())
299 0 : return do_read();
300 :
301 : // tidy up lingering objects
302 0 : req_.parser.reset();
303 0 : res_.serializer.reset();
304 0 : res_.message.clear();
305 :
306 0 : close_({});
307 : }
308 :
309 : template<class AsyncStream>
310 : void
311 0 : http_stream<AsyncStream>::
312 : do_fail(
313 : core::string_view s, system::error_code const& ec)
314 : {
315 0 : LOG_TRC(this->sect_)("{}: {}", s, ec.message());
316 :
317 : // tidy up lingering objects
318 0 : req_.parser.reset();
319 0 : res_.serializer.reset();
320 : //res_.clear();
321 : //preq_.reset();
322 :
323 0 : close_(ec);
324 0 : }
325 :
326 : template<class AsyncStream>
327 : auto
328 0 : http_stream<AsyncStream>::
329 : do_detach() ->
330 : resumer
331 : {
332 0 : BOOST_ASSERT(stream_.get_executor().running_in_this_thread());
333 :
334 : // can't call twice
335 0 : BOOST_ASSERT(! pwg_);
336 0 : pwg_.reset(new work_guard(stream_.get_executor()));
337 :
338 : // VFALCO cancel timer
339 :
340 0 : return resumer(*this);
341 : }
342 :
343 : template<class AsyncStream>
344 : void
345 0 : http_stream<AsyncStream>::
346 : do_resume(system::error_code const& ec)
347 : {
348 0 : asio::dispatch(
349 0 : stream_.get_executor(),
350 0 : asio::prepend(call_mf(
351 : &http_stream::do_resume2, this), ec));
352 0 : }
353 :
354 : template<class AsyncStream>
355 : void
356 0 : http_stream<AsyncStream>::
357 : do_resume2(system::error_code ec)
358 : {
359 0 : BOOST_ASSERT(stream_.get_executor().running_in_this_thread());
360 :
361 0 : BOOST_ASSERT(pwg_.get() != nullptr);
362 0 : pwg_.reset();
363 :
364 : // invoke handlers for the route
365 0 : BOOST_ASSERT(! pwg_);
366 0 : ec = routes_.resume(req_, res_, ec);
367 :
368 0 : if(ec == route::detach)
369 : {
370 : // make sure they called detach()
371 0 : BOOST_ASSERT(pwg_);
372 0 : return;
373 : }
374 :
375 0 : if(ec.failed())
376 : {
377 : // give a default error response?
378 : }
379 0 : beast2::async_write(stream_, res_.serializer,
380 0 : call_mf(&http_stream::on_write, this));
381 : }
382 :
383 : } // beast2
384 : } // boost
385 :
386 : #endif
|