Boost.Redis 1.4.2
A redis client library
Loading...
Searching...
No Matches
connection.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_HPP
8#define BOOST_REDIS_CONNECTION_HPP
9
10#include <boost/redis/detail/connection_base.hpp>
11#include <boost/redis/logger.hpp>
12#include <boost/redis/config.hpp>
13#include <boost/asio/io_context.hpp>
14#include <boost/asio/coroutine.hpp>
15#include <boost/asio/steady_timer.hpp>
16#include <boost/asio/any_io_executor.hpp>
17#include <boost/asio/any_completion_handler.hpp>
18
19#include <chrono>
20#include <memory>
21#include <limits>
22
23namespace boost::redis {
24namespace detail
25{
26template <class Connection, class Logger>
27struct reconnection_op {
28 Connection* conn_ = nullptr;
29 Logger logger_;
30 asio::coroutine coro_{};
31
32 template <class Self>
33 void operator()(Self& self, system::error_code ec = {})
34 {
35 BOOST_ASIO_CORO_REENTER (coro_) for (;;)
36 {
37 BOOST_ASIO_CORO_YIELD
38 conn_->impl_.async_run(conn_->cfg_, logger_, std::move(self));
39 conn_->cancel(operation::receive);
40 logger_.on_connection_lost(ec);
41 if (!conn_->will_reconnect() || is_cancelled(self)) {
42 conn_->cancel(operation::reconnection);
43 self.complete(!!ec ? ec : asio::error::operation_aborted);
44 return;
45 }
46
47 conn_->timer_.expires_after(conn_->cfg_.reconnect_wait_interval);
48 BOOST_ASIO_CORO_YIELD
49 conn_->timer_.async_wait(std::move(self));
50 BOOST_REDIS_CHECK_OP0(;)
51 if (!conn_->will_reconnect()) {
52 self.complete(asio::error::operation_aborted);
53 return;
54 }
55 conn_->reset_stream();
56 }
57 }
58};
59} // detail
60
71template <class Executor>
73public:
75 using executor_type = Executor;
76
79 { return impl_.get_executor(); }
80
82 template <class Executor1>
84 {
87 };
88
90 explicit
93 asio::ssl::context::method method = asio::ssl::context::tls_client,
94 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
95 : impl_{ex, method, max_read_size}
96 , timer_{ex}
97 { }
98
100 explicit
102 asio::io_context& ioc,
103 asio::ssl::context::method method = asio::ssl::context::tls_client,
104 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)())
105 : basic_connection(ioc.get_executor(), method, max_read_size)
106 { }
107
145 template <
146 class Logger = logger,
147 class CompletionToken = asio::default_completion_token_t<executor_type>>
148 auto
150 config const& cfg = {},
151 Logger l = Logger{},
152 CompletionToken token = CompletionToken{})
153 {
154 using this_type = basic_connection<executor_type>;
155
156 cfg_ = cfg;
157 l.set_prefix(cfg_.log_prefix);
158 return asio::async_compose
159 < CompletionToken
160 , void(system::error_code)
161 >(detail::reconnection_op<this_type, Logger>{this, l}, token, timer_);
162 }
163
186 template <class CompletionToken = asio::default_completion_token_t<executor_type>>
187 auto async_receive(CompletionToken token = CompletionToken{})
188 { return impl_.async_receive(std::move(token)); }
189
190
202 std::size_t receive(system::error_code& ec)
203 {
204 return impl_.receive(ec);
205 }
206
207 template <
208 class Response = ignore_t,
209 class CompletionToken = asio::default_completion_token_t<executor_type>
210 >
211 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
212 auto
214 Response& response,
215 CompletionToken token = CompletionToken{})
216 {
217 return impl_.async_receive(response, token);
218 }
219
243 template <
244 class Response = ignore_t,
245 class CompletionToken = asio::default_completion_token_t<executor_type>
246 >
247 auto
249 request const& req,
250 Response& resp = ignore,
251 CompletionToken&& token = CompletionToken{})
252 {
253 return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
254 }
255
268 {
269 switch (op) {
271 case operation::all:
272 cfg_.reconnect_wait_interval = std::chrono::seconds::zero();
273 timer_.cancel();
274 break;
275 default: /* ignore */;
276 }
277
278 impl_.cancel(op);
279 }
280
282 bool will_reconnect() const noexcept
283 { return cfg_.reconnect_wait_interval != std::chrono::seconds::zero();}
284
286 auto const& get_ssl_context() const noexcept
287 { return impl_.get_ssl_context();}
288
290 auto& get_ssl_context() noexcept
291 { return impl_.get_ssl_context();}
292
295 { impl_.reset_stream(); }
296
298 auto& next_layer() noexcept
299 { return impl_.next_layer(); }
300
302 auto const& next_layer() const noexcept
303 { return impl_.next_layer(); }
304
306 template <class Response>
308 { impl_.set_receive_response(response); }
309
311 usage get_usage() const noexcept
312 { return impl_.get_usage(); }
313
314private:
315 using timer_type =
316 asio::basic_waitable_timer<
317 std::chrono::steady_clock,
318 asio::wait_traits<std::chrono::steady_clock>,
319 Executor>;
320
321 template <class, class> friend struct detail::reconnection_op;
322
323 config cfg_;
325 timer_type timer_;
326};
327
338public:
340 using executor_type = asio::any_io_executor;
341
343 explicit
345 executor_type ex,
346 asio::ssl::context::method method = asio::ssl::context::tls_client,
347 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
348
350 explicit
352 asio::io_context& ioc,
353 asio::ssl::context::method method = asio::ssl::context::tls_client,
354 std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)());
355
358 { return impl_.get_executor(); }
359
361 template <class CompletionToken>
362 auto async_run(config const& cfg, logger l, CompletionToken token)
363 {
364 return asio::async_initiate<
365 CompletionToken, void(boost::system::error_code)>(
366 [](auto handler, connection* self, config const* cfg, logger l)
367 {
368 self->async_run_impl(*cfg, l, std::move(handler));
369 }, token, this, &cfg, l);
370 }
371
373 template <class Response, class CompletionToken>
374 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
375 auto async_receive(Response& response, CompletionToken token)
376 {
377 return impl_.async_receive(response, std::move(token));
378 }
379
381 template <class CompletionToken>
382 auto async_receive(CompletionToken token)
383 { return impl_.async_receive(std::move(token)); }
384
386 std::size_t receive(system::error_code& ec)
387 {
388 return impl_.receive(ec);
389 }
390
392 template <class Response, class CompletionToken>
393 auto async_exec(request const& req, Response& resp, CompletionToken token)
394 {
395 return impl_.async_exec(req, resp, std::move(token));
396 }
397
400
402 bool will_reconnect() const noexcept
403 { return impl_.will_reconnect();}
404
406 auto& next_layer() noexcept
407 { return impl_.next_layer(); }
408
410 auto const& next_layer() const noexcept
411 { return impl_.next_layer(); }
412
415 { impl_.reset_stream();}
416
418 template <class Response>
421
423 usage get_usage() const noexcept
424 { return impl_.get_usage(); }
425
426private:
427 void
428 async_run_impl(
429 config const& cfg,
430 logger l,
431 asio::any_completion_handler<void(boost::system::error_code)> token);
432
434};
435
436} // boost::redis
437
438#endif // BOOST_REDIS_CONNECTION_HPP
A SSL connection to the Redis server.
Definition: connection.hpp:72
void reset_stream()
Resets the underlying stream.
Definition: connection.hpp:294
bool will_reconnect() const noexcept
Returns true if the connection was canceled.
Definition: connection.hpp:282
executor_type get_executor() noexcept
Returns the underlying executor.
Definition: connection.hpp:78
auto async_exec(request const &req, Response &resp=ignore, CompletionToken &&token=CompletionToken{})
Executes commands on the Redis server asynchronously.
Definition: connection.hpp:248
basic_connection(executor_type ex, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
Definition: connection.hpp:91
usage get_usage() const noexcept
Returns connection usage information.
Definition: connection.hpp:311
auto & get_ssl_context() noexcept
Returns the ssl context.
Definition: connection.hpp:290
auto const & get_ssl_context() const noexcept
Returns the ssl context.
Definition: connection.hpp:286
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
Definition: connection.hpp:302
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
Definition: connection.hpp:307
basic_connection(asio::io_context &ioc, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
Definition: connection.hpp:101
void cancel(operation op=operation::all)
Cancel operations.
Definition: connection.hpp:267
auto & next_layer() noexcept
Returns a reference to the next layer.
Definition: connection.hpp:298
std::size_t receive(system::error_code &ec)
Receives server pushes synchronously without blocking.
Definition: connection.hpp:202
auto async_receive(CompletionToken token=CompletionToken{})
Receives server side pushes asynchronously.
Definition: connection.hpp:187
auto async_run(config const &cfg={}, Logger l=Logger{}, CompletionToken token=CompletionToken{})
Starts underlying connection operations.
Definition: connection.hpp:149
Executor executor_type
Executor type.
Definition: connection.hpp:75
Rebinds the socket type to another executor.
Definition: connection.hpp:84
A basic_connection that type erases the executor.
Definition: connection.hpp:337
bool will_reconnect() const noexcept
Calls boost::redis::basic_connection::will_reconnect.
Definition: connection.hpp:402
auto async_exec(request const &req, Response &resp, CompletionToken token)
Calls boost::redis::basic_connection::async_exec.
Definition: connection.hpp:393
auto async_receive(Response &response, CompletionToken token)
Calls boost::redis::basic_connection::async_receive.
Definition: connection.hpp:375
std::size_t receive(system::error_code &ec)
Calls boost::redis::basic_connection::receive.
Definition: connection.hpp:386
auto async_run(config const &cfg, logger l, CompletionToken token)
Calls boost::redis::basic_connection::async_run.
Definition: connection.hpp:362
asio::any_io_executor executor_type
Executor type.
Definition: connection.hpp:340
connection(executor_type ex, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from an executor.
auto & next_layer() noexcept
Calls boost::redis::basic_connection::next_layer.
Definition: connection.hpp:406
void set_receive_response(Response &response)
Sets the response object of async_receive operations.
Definition: connection.hpp:419
auto const & next_layer() const noexcept
Calls boost::redis::basic_connection::next_layer.
Definition: connection.hpp:410
void cancel(operation op=operation::all)
Calls boost::redis::basic_connection::cancel.
connection(asio::io_context &ioc, asio::ssl::context::method method=asio::ssl::context::tls_client, std::size_t max_read_size=(std::numeric_limits< std::size_t >::max)())
Contructs from a context.
usage get_usage() const noexcept
Returns connection usage information.
Definition: connection.hpp:423
auto async_receive(CompletionToken token)
Calls boost::redis::basic_connection::async_receive.
Definition: connection.hpp:382
executor_type get_executor() noexcept
Returns the underlying executor.
Definition: connection.hpp:357
void reset_stream()
Calls boost::redis::basic_connection::reset_stream.
Definition: connection.hpp:414
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
void reset_stream()
Resets the underlying stream.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void cancel(operation op)
Cancels specific operations.
Logger class.
Definition: logger.hpp:27
Creates Redis requests.
Definition: request.hpp:46
std::chrono::steady_clock::duration reconnect_wait_interval
Time waited before trying a reconnection.
Definition: config.hpp:80
std::string log_prefix
Logger prefix, see boost::redis::logger.
Definition: config.hpp:59
ignore_t ignore
Global ignore object.
std::decay_t< decltype(std::ignore)> ignore_t
Type used to ignore responses.
Definition: ignore.hpp:31
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition: response.hpp:25
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ reconnection
Cancels reconnection.
@ all
Refers to all operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition: config.hpp:30
Connection usage information.
Definition: usage.hpp:21