7#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
8#define BOOST_REDIS_CONNECTION_BASE_HPP
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/detail/helper.hpp>
12#include <boost/redis/error.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/request.hpp>
15#include <boost/redis/resp3/type.hpp>
16#include <boost/redis/config.hpp>
17#include <boost/redis/detail/runner.hpp>
18#include <boost/redis/usage.hpp>
20#include <boost/system.hpp>
21#include <boost/asio/basic_stream_socket.hpp>
22#include <boost/asio/bind_executor.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/asio/write.hpp>
27#include <boost/assert.hpp>
28#include <boost/core/ignore_unused.hpp>
29#include <boost/asio/ssl/stream.hpp>
30#include <boost/asio/read_until.hpp>
31#include <boost/asio/buffer.hpp>
32#include <boost/asio/experimental/channel.hpp>
43namespace boost::redis::detail
46template <
class DynamicBuffer>
47std::string_view buffer_view(DynamicBuffer buf)
noexcept
49 char const* start =
static_cast<char const*
>(buf.data(0, buf.size()).data());
50 return std::string_view{start, std::size(buf)};
53template <
class AsyncReadStream,
class DynamicBuffer>
56 AsyncReadStream& stream_;
58 std::size_t size_ = 0;
60 asio::coroutine coro_{};
63 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
65 , buf_ {std::move(buf)}
70 void operator()( Self& self
71 , system::error_code ec = {}
74 BOOST_ASIO_CORO_REENTER (coro_)
80 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
86 buf_.shrink(buf_.size() - tmp_ - n);
92template <
class AsyncReadStream,
class DynamicBuffer,
class CompletionToken>
95 AsyncReadStream& stream,
98 CompletionToken&& token)
100 return asio::async_compose
102 , void(system::error_code, std::size_t)
103 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
108 using req_info_type =
typename Conn::req_info;
109 using adapter_type =
typename Conn::adapter_type;
111 Conn* conn_ =
nullptr;
112 std::shared_ptr<req_info_type> info_ =
nullptr;
113 asio::coroutine coro{};
115 template <
class Self>
116 void operator()(Self& self , system::error_code ec = {})
118 BOOST_ASIO_CORO_REENTER (coro)
122 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
123 BOOST_ASIO_CORO_YIELD
124 asio::post(std::move(self));
128 conn_->add_request_info(info_);
131 BOOST_ASIO_CORO_YIELD
132 info_->async_wait(std::move(self));
133 BOOST_ASSERT(ec == asio::error::operation_aborted);
136 self.complete(info_->ec_, 0);
140 if (info_->stop_requested()) {
143 return self.complete(ec, 0);
146 if (is_cancelled(self)) {
147 if (info_->is_written()) {
148 using c_t = asio::cancellation_type;
149 auto const c = self.get_cancellation_state().cancelled();
150 if ((c & c_t::terminal) != c_t::none) {
154 return self.complete(ec, 0);
157 self.get_cancellation_state().clear();
165 conn_->remove_request(info_);
166 self.complete(ec, 0);
171 self.complete(info_->ec_, info_->read_size_);
176template <
class Conn,
class Logger>
178 Conn* conn =
nullptr;
180 asio::coroutine coro{};
182 template <
class Self>
183 void operator()( Self& self
184 , std::array<std::size_t, 2> order = {}
185 , system::error_code ec0 = {}
186 , system::error_code ec1 = {})
188 BOOST_ASIO_CORO_REENTER (coro)
192 BOOST_ASIO_CORO_YIELD
193 asio::experimental::make_parallel_group(
194 [
this](
auto token) {
return conn->reader(logger_, token);},
195 [
this](
auto token) {
return conn->writer(logger_, token);}
197 asio::experimental::wait_for_one(),
200 if (is_cancelled(self)) {
201 logger_.trace(
"run-op: canceled. Exiting ...");
202 self.complete(asio::error::operation_aborted);
206 logger_.on_run(ec0, ec1);
209 case 0: self.complete(ec0);
break;
210 case 1: self.complete(ec1);
break;
211 default: BOOST_ASSERT(
false);
217template <
class Conn,
class Logger>
221 asio::coroutine coro{};
223 template <
class Self>
224 void operator()( Self& self
225 , system::error_code ec = {}
230 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
232 while (conn_->coalesce_requests()) {
233 if (conn_->use_ssl())
234 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
236 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
238 logger_.on_write(ec, conn_->write_buffer_);
241 logger_.trace(
"writer-op: error. Exiting ...");
247 if (is_cancelled(self)) {
248 logger_.trace(
"writer-op: canceled. Exiting ...");
249 self.complete(asio::error::operation_aborted);
258 if (!conn_->is_open()) {
259 logger_.trace(
"writer-op: canceled (2). Exiting ...");
265 BOOST_ASIO_CORO_YIELD
266 conn_->writer_timer_.async_wait(std::move(self));
267 if (!conn_->is_open() || is_cancelled(self)) {
268 logger_.trace(
"writer-op: canceled (3). Exiting ...");
279template <
class Conn,
class Logger>
281 using parse_result =
typename Conn::parse_result;
282 using parse_ret_type =
typename Conn::parse_ret_type;
285 parse_ret_type res_{parse_result::resp, 0};
286 asio::coroutine coro{};
288 template <
class Self>
289 void operator()( Self& self
290 , system::error_code ec = {}
295 BOOST_ASIO_CORO_REENTER (coro)
for (;;)
298 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
299 if (conn_->use_ssl()) {
300 BOOST_ASIO_CORO_YIELD
304 conn_->get_suggested_buffer_growth(),
307 BOOST_ASIO_CORO_YIELD
309 conn_->next_layer().next_layer(),
311 conn_->get_suggested_buffer_growth(),
315 logger_.on_read(ec, n);
318 if (ec == asio::error::eof) {
319 logger_.trace(
"reader-op: EOF received. Exiting ...");
321 return self.complete({});
326 logger_.trace(
"reader-op: error. Exiting ...");
335 if (!conn_->is_open() || is_cancelled(self)) {
336 logger_.trace(
"reader-op: canceled. Exiting ...");
342 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
344 logger_.trace(
"reader-op: parse error. Exiting ...");
350 if (res_.first == parse_result::push) {
351 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
352 BOOST_ASIO_CORO_YIELD
353 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
357 logger_.trace(
"reader-op: error. Exiting ...");
363 if (!conn_->is_open() || is_cancelled(self)) {
364 logger_.trace(
"reader-op: canceled (2). Exiting ...");
365 self.complete(asio::error::operation_aborted);
380template <
class Executor>
387 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
389 using clock_type = std::chrono::steady_clock;
390 using clock_traits_type = asio::wait_traits<clock_type>;
391 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
398 asio::ssl::context::method method,
399 std::size_t max_read_size)
403 , receive_channel_{ex, 256}
405 , dbuf_{read_buffer_, max_read_size}
407 set_receive_response(
ignore);
408 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
422 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
448 template <
class Response,
class CompletionToken>
449 auto async_exec(
request const& req, Response& resp, CompletionToken token)
451 using namespace boost::redis::adapter;
452 auto f = boost_redis_adapt(resp);
453 BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(),
"Request and response have incompatible sizes.");
455 auto info = std::make_shared<req_info>(req, f, get_executor());
457 return asio::async_compose
459 , void(system::error_code, std::size_t)
460 >(exec_op<this_type>{
this, info}, token, writer_timer_);
463 template <
class Response,
class CompletionToken>
464 [[deprecated(
"Set the response with set_receive_response and use the other overload.")]]
465 auto async_receive(Response&
response, CompletionToken token)
468 return receive_channel_.async_receive(std::move(token));
471 template <
class CompletionToken>
472 auto async_receive(CompletionToken token)
473 {
return receive_channel_.async_receive(std::move(token)); }
475 std::size_t
receive(system::error_code& ec)
477 std::size_t size = 0;
479 auto f = [&](system::error_code
const& ec2, std::size_t n)
485 auto const res = receive_channel_.try_receive(f);
495 template <
class Logger,
class CompletionToken>
496 auto async_run(
config const& cfg, Logger l, CompletionToken token)
498 runner_.set_config(cfg);
499 l.set_prefix(runner_.get_config().log_prefix);
500 return runner_.async_run(*
this, l, std::move(token));
503 template <
class Response>
504 void set_receive_response(Response&
response)
506 using namespace boost::redis::adapter;
507 auto g = boost_redis_adapt(
response);
508 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
511 usage get_usage() const noexcept
515 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
516 using runner_type = runner<executor_type>;
520 auto use_ssl() const noexcept
521 {
return runner_.get_config().use_ssl;}
523 auto cancel_on_conn_lost() -> std::size_t
526 auto cond = [](
auto const& ptr)
528 BOOST_ASSERT(ptr !=
nullptr);
530 if (ptr->is_written()) {
531 return !ptr->req_->get_config().cancel_if_unresponded;
533 return !ptr->req_->get_config().cancel_on_connection_lost;
537 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
539 auto const ret = std::distance(point, std::end(reqs_));
541 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
545 reqs_.erase(point, std::end(reqs_));
546 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
547 return ptr->reset_status();
553 auto cancel_unwritten_requests() -> std::size_t
555 auto f = [](
auto const& ptr)
557 BOOST_ASSERT(ptr !=
nullptr);
558 return ptr->is_written();
561 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
563 auto const ret = std::distance(point, std::end(reqs_));
565 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
569 reqs_.erase(point, std::end(reqs_));
578 cancel_unwritten_requests();
583 writer_timer_.cancel();
584 receive_channel_.cancel();
585 cancel_on_conn_lost();
589 receive_channel_.cancel();
599 write_buffer_.clear();
602 cancel_push_requests();
606 std::for_each(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
607 BOOST_ASSERT_MSG(ptr !=
nullptr,
"Expects non-null pointer.");
608 if (ptr->is_staged())
616 using wrapped_adapter_type = std::function<void(node_type
const&, system::error_code&)>;
625 explicit req_info(
request const& req, adapter_type adapter, executor_type ex)
627 , action_{action::none}
630 , expected_responses_{req.get_expected_responses()}
631 , status_{status::none}
635 timer_.expires_at((std::chrono::steady_clock::time_point::max)());
637 adapter_ = [
this, adapter](node_type
const& nd, system::error_code& ec)
639 auto const i = req_->get_expected_responses() - expected_responses_;
647 action_ = action::proceed;
653 action_ = action::stop;
656 [[nodiscard]]
auto is_waiting_write() const noexcept
657 {
return !is_written() && !is_staged(); }
659 [[nodiscard]]
auto is_written() const noexcept
660 {
return status_ == status::written; }
662 [[nodiscard]]
auto is_staged() const noexcept
663 {
return status_ == status::staged; }
665 void mark_written() noexcept
666 { status_ = status::written; }
668 void mark_staged() noexcept
669 { status_ = status::staged; }
671 void reset_status() noexcept
672 { status_ = status::none; }
674 [[nodiscard]]
auto stop_requested() const noexcept
675 {
return action_ == action::stop;}
677 template <
class CompletionToken>
678 auto async_wait(CompletionToken token)
680 return timer_.async_wait(std::move(token));
693 wrapped_adapter_type adapter_;
696 std::size_t expected_responses_;
699 system::error_code ec_;
700 std::size_t read_size_;
703 void remove_request(std::shared_ptr<req_info>
const& info)
705 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
708 using reqs_type = std::deque<std::shared_ptr<req_info>>;
710 template <
class,
class>
friend struct reader_op;
711 template <
class,
class>
friend struct writer_op;
712 template <
class,
class>
friend struct run_op;
713 template <
class>
friend struct exec_op;
714 template <
class,
class,
class>
friend struct run_all_op;
716 void cancel_push_requests()
718 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](
auto const& ptr) {
719 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
722 std::for_each(point, std::end(reqs_), [](
auto const& ptr) {
726 reqs_.erase(point, std::end(reqs_));
729 [[nodiscard]]
bool is_writing() const noexcept
731 return !write_buffer_.empty();
734 void add_request_info(std::shared_ptr<req_info>
const& info)
736 reqs_.push_back(info);
738 if (info->req_->has_hello_priority()) {
739 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](
auto const& e) {
740 return e->is_waiting_write();
743 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
746 if (is_open() && !is_writing())
747 writer_timer_.cancel();
750 template <
class CompletionToken,
class Logger>
751 auto reader(Logger l, CompletionToken&& token)
753 return asio::async_compose
755 , void(system::error_code)
756 >(reader_op<this_type, Logger>{
this, l}, token, writer_timer_);
759 template <
class CompletionToken,
class Logger>
760 auto writer(Logger l, CompletionToken&& token)
762 return asio::async_compose
764 , void(system::error_code)
765 >(writer_op<this_type, Logger>{
this, l}, token, writer_timer_);
768 template <
class Logger,
class CompletionToken>
769 auto async_run_lean(
config const& cfg, Logger l, CompletionToken token)
771 runner_.set_config(cfg);
772 l.set_prefix(runner_.get_config().log_prefix);
773 return asio::async_compose
775 , void(system::error_code)
776 >(run_op<this_type, Logger>{
this, l}, token, writer_timer_);
779 [[nodiscard]]
bool coalesce_requests()
783 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](
auto const& ri) {
784 return !ri->is_waiting_write();
787 std::for_each(point, std::cend(reqs_), [
this](
auto const& ri) {
789 write_buffer_ += ri->req_->payload();
791 usage_.commands_sent += ri->expected_responses_;
794 usage_.bytes_sent += std::size(write_buffer_);
796 return point != std::cend(reqs_);
799 bool is_waiting_response() const noexcept
801 return !std::empty(reqs_) && reqs_.front()->is_written();
806 if (stream_->next_layer().is_open()) {
807 system::error_code ec;
808 stream_->next_layer().close(ec);
812 auto is_open() const noexcept {
return stream_->next_layer().is_open(); }
813 auto& lowest_layer() noexcept {
return stream_->lowest_layer(); }
840 BOOST_ASSERT(!read_buffer_.empty());
843 (resp3::to_type(read_buffer_.front()) == resp3::type::push)
845 || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
846 || !is_waiting_response();
849 auto get_suggested_buffer_growth() const noexcept
851 return parser_.get_suggested_buffer_growth(4096);
854 enum class parse_result { needs_more,
push, resp };
856 using parse_ret_type = std::pair<parse_result, std::size_t>;
858 parse_ret_type on_finish_parsing(parse_result t)
860 if (t == parse_result::push) {
861 usage_.pushes_received += 1;
862 usage_.push_bytes_received += parser_.get_consumed();
864 usage_.responses_received += 1;
865 usage_.response_bytes_received += parser_.get_consumed();
869 dbuf_.consume(parser_.get_consumed());
870 auto const res = std::make_pair(t, parser_.get_consumed());
875 parse_ret_type on_read(std::string_view data, system::error_code& ec)
888 on_push_ = is_next_push();
891 if (!resp3::parse(parser_, data, receive_adapter_, ec))
892 return std::make_pair(parse_result::needs_more, 0);
895 return std::make_pair(parse_result::push, 0);
897 return on_finish_parsing(parse_result::push);
900 BOOST_ASSERT_MSG(is_waiting_response(),
"Not waiting for a response (using MONITOR command perhaps?)");
901 BOOST_ASSERT(!reqs_.empty());
902 BOOST_ASSERT(reqs_.front() !=
nullptr);
903 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
905 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
906 return std::make_pair(parse_result::needs_more, 0);
909 reqs_.front()->ec_ = ec;
910 reqs_.front()->proceed();
911 return std::make_pair(parse_result::resp, 0);
914 reqs_.front()->read_size_ += parser_.get_consumed();
916 if (--reqs_.front()->expected_responses_ == 0) {
918 reqs_.front()->proceed();
922 return on_finish_parsing(parse_result::resp);
927 write_buffer_.clear();
928 read_buffer_.clear();
933 asio::ssl::context ctx_;
934 std::unique_ptr<next_layer_type> stream_;
939 timer_type writer_timer_;
940 receive_channel_type receive_channel_;
942 receiver_adapter_type receive_adapter_;
944 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
946 std::string read_buffer_;
947 dyn_buffer_type dbuf_;
948 std::string write_buffer_;
950 resp3::parser parser_{};
951 bool on_push_ =
false;
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
void reset_stream()
Resets the underlying stream.
connection_base(executor_type ex, asio::ssl::context::method method, std::size_t max_read_size)
Constructs from an executor.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto & get_ssl_context() noexcept
Returns the ssl context.
Executor executor_type
Executor type.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void cancel(operation op)
Cancels specific operations.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
ignore_t ignore
Global ignore object.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
operation
Connection operations that can be cancelled.
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ not_connected
There is no stablished connection.
@ exec
Refers to connection::async_exec operations.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Connection usage information.
A node in the response tree.