Boost.Redis 1.4.2
A redis client library
Loading...
Searching...
No Matches
runner.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_RUNNER_HPP
8#define BOOST_REDIS_RUNNER_HPP
9
10#include <boost/redis/detail/health_checker.hpp>
11#include <boost/redis/config.hpp>
12#include <boost/redis/response.hpp>
13#include <boost/redis/detail/helper.hpp>
14#include <boost/redis/error.hpp>
15#include <boost/redis/logger.hpp>
16#include <boost/redis/operation.hpp>
17#include <boost/redis/detail/connector.hpp>
18#include <boost/redis/detail/resolver.hpp>
19#include <boost/redis/detail/handshaker.hpp>
20#include <boost/asio/compose.hpp>
21#include <boost/asio/connect.hpp>
22#include <boost/asio/coroutine.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <string>
27#include <memory>
28#include <chrono>
29
30namespace boost::redis::detail
31{
32
33template <class Runner, class Connection, class Logger>
34struct hello_op {
35 Runner* runner_ = nullptr;
36 Connection* conn_ = nullptr;
37 Logger logger_;
38 asio::coroutine coro_{};
39
40 template <class Self>
41 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
42 {
43 BOOST_ASIO_CORO_REENTER (coro_)
44 {
45 runner_->hello_req_.clear();
46 if (runner_->hello_resp_.has_value())
47 runner_->hello_resp_.value().clear();
48 runner_->add_hello();
49
50 BOOST_ASIO_CORO_YIELD
51 conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
52 logger_.on_hello(ec, runner_->hello_resp_);
53
54 if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
55 logger_.trace("hello-op: error/canceled. Exiting ...");
56 conn_->cancel(operation::run);
57 self.complete(!!ec ? ec : asio::error::operation_aborted);
58 return;
59 }
60
61 self.complete({});
62 }
63 }
64};
65
66template <class Runner, class Connection, class Logger>
67class runner_op {
68private:
69 Runner* runner_ = nullptr;
70 Connection* conn_ = nullptr;
71 Logger logger_;
72 asio::coroutine coro_{};
73
74public:
75 runner_op(Runner* runner, Connection* conn, Logger l)
76 : runner_{runner}
77 , conn_{conn}
78 , logger_{l}
79 {}
80
81 template <class Self>
82 void operator()( Self& self
83 , std::array<std::size_t, 3> order = {}
84 , system::error_code ec0 = {}
85 , system::error_code ec1 = {}
86 , system::error_code ec2 = {}
87 , std::size_t = 0)
88 {
89 BOOST_ASIO_CORO_REENTER (coro_)
90 {
91 BOOST_ASIO_CORO_YIELD
92 asio::experimental::make_parallel_group(
93 [this](auto token) { return runner_->async_run_all(*conn_, logger_, token); },
94 [this](auto token) { return runner_->health_checker_.async_check_health(*conn_, logger_, token); },
95 [this](auto token) { return runner_->async_hello(*conn_, logger_, token); }
96 ).async_wait(
97 asio::experimental::wait_for_all(),
98 std::move(self));
99
100 logger_.on_runner(ec0, ec1, ec2);
101
102 if (is_cancelled(self)) {
103 self.complete(asio::error::operation_aborted);
104 return;
105 }
106
107 if (ec0 == error::connect_timeout || ec0 == error::resolve_timeout) {
108 self.complete(ec0);
109 return;
110 }
111
112 if (order[0] == 2 && !!ec2) {
113 self.complete(ec2);
114 return;
115 }
116
117 if (order[0] == 1 && ec1 == error::pong_timeout) {
118 self.complete(ec1);
119 return;
120 }
121
122 self.complete(ec0);
123 }
124 }
125};
126
127template <class Runner, class Connection, class Logger>
128struct run_all_op {
129 Runner* runner_ = nullptr;
130 Connection* conn_ = nullptr;
131 Logger logger_;
132 asio::coroutine coro_{};
133
134 template <class Self>
135 void operator()(Self& self, system::error_code ec = {}, std::size_t = 0)
136 {
137 BOOST_ASIO_CORO_REENTER (coro_)
138 {
139 BOOST_ASIO_CORO_YIELD
140 runner_->resv_.async_resolve(std::move(self));
141 logger_.on_resolve(ec, runner_->resv_.results());
142 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
143
144 BOOST_ASIO_CORO_YIELD
145 runner_->ctor_.async_connect(conn_->next_layer().next_layer(), runner_->resv_.results(), std::move(self));
146 logger_.on_connect(ec, runner_->ctor_.endpoint());
147 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
148
149 if (conn_->use_ssl()) {
150 BOOST_ASIO_CORO_YIELD
151 runner_->hsher_.async_handshake(conn_->next_layer(), std::move(self));
152 logger_.on_ssl_handshake(ec);
153 BOOST_REDIS_CHECK_OP0(conn_->cancel(operation::run);)
154 }
155
156 BOOST_ASIO_CORO_YIELD
157 conn_->async_run_lean(runner_->cfg_, logger_, std::move(self));
158 BOOST_REDIS_CHECK_OP0(;)
159 self.complete(ec);
160 }
161 }
162};
163
164template <class Executor>
165class runner {
166public:
167 runner(Executor ex, config cfg)
168 : resv_{ex}
169 , ctor_{ex}
170 , hsher_{ex}
171 , health_checker_{ex}
172 , cfg_{cfg}
173 { }
174
175 std::size_t cancel(operation op)
176 {
177 resv_.cancel(op);
178 ctor_.cancel(op);
179 hsher_.cancel(op);
180 health_checker_.cancel(op);
181 return 0U;
182 }
183
184 void set_config(config const& cfg)
185 {
186 cfg_ = cfg;
187 resv_.set_config(cfg);
188 ctor_.set_config(cfg);
189 hsher_.set_config(cfg);
190 health_checker_.set_config(cfg);
191 }
192
193 template <class Connection, class Logger, class CompletionToken>
194 auto async_run(Connection& conn, Logger l, CompletionToken token)
195 {
196 return asio::async_compose
197 < CompletionToken
198 , void(system::error_code)
199 >(runner_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
200 }
201
202 config const& get_config() const noexcept {return cfg_;}
203
204private:
205 using resolver_type = resolver<Executor>;
206 using connector_type = connector<Executor>;
207 using handshaker_type = detail::handshaker<Executor>;
208 using health_checker_type = health_checker<Executor>;
209 using timer_type = typename connector_type::timer_type;
210
211 template <class, class, class> friend struct run_all_op;
212 template <class, class, class> friend class runner_op;
213 template <class, class, class> friend struct hello_op;
214
215 template <class Connection, class Logger, class CompletionToken>
216 auto async_run_all(Connection& conn, Logger l, CompletionToken token)
217 {
218 return asio::async_compose
219 < CompletionToken
220 , void(system::error_code)
221 >(run_all_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
222 }
223
224 template <class Connection, class Logger, class CompletionToken>
225 auto async_hello(Connection& conn, Logger l, CompletionToken token)
226 {
227 return asio::async_compose
228 < CompletionToken
229 , void(system::error_code)
230 >(hello_op<runner, Connection, Logger>{this, &conn, l}, token, conn);
231 }
232
233 void add_hello()
234 {
235 if (!cfg_.username.empty() && !cfg_.password.empty() && !cfg_.clientname.empty())
236 hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password, "SETNAME", cfg_.clientname);
237 else if (cfg_.username.empty() && cfg_.password.empty() && cfg_.clientname.empty())
238 hello_req_.push("HELLO", "3");
239 else if (cfg_.clientname.empty())
240 hello_req_.push("HELLO", "3", "AUTH", cfg_.username, cfg_.password);
241 else
242 hello_req_.push("HELLO", "3", "SETNAME", cfg_.clientname);
243
244 if (cfg_.database_index && cfg_.database_index.value() != 0)
245 hello_req_.push("SELECT", cfg_.database_index.value());
246 }
247
248 bool has_error_in_response() const noexcept
249 {
250 if (!hello_resp_.has_value())
251 return true;
252
253 auto f = [](auto const& e)
254 {
255 switch (e.data_type) {
257 case resp3::type::blob_error: return true;
258 default: return false;
259 }
260 };
261
262 return std::any_of(std::cbegin(hello_resp_.value()), std::cend(hello_resp_.value()), f);
263 }
264
265 resolver_type resv_;
266 connector_type ctor_;
267 handshaker_type hsher_;
268 health_checker_type health_checker_;
269 request hello_req_;
270 generic_response hello_resp_;
271 config cfg_;
272};
273
274} // boost::redis::detail
275
276#endif // BOOST_REDIS_RUNNER_HPP
adapter::result< std::vector< resp3::node > > generic_response
A generic response to a request.
Definition: response.hpp:35
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ resolve_timeout
Resolve timeout.
@ pong_timeout
Connect timeout.
@ connect_timeout
Connect timeout.
@ run
Refers to connection::async_run operations.