Boost.Redis 1.4.2
A redis client library
Loading...
Searching...
No Matches
connection_base.hpp
1/* Copyright (c) 2018-2023 Marcelo Zimbres Silva (mzimbres@gmail.com)
2 *
3 * Distributed under the Boost Software License, Version 1.0. (See
4 * accompanying file LICENSE.txt)
5 */
6
7#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
8#define BOOST_REDIS_CONNECTION_BASE_HPP
9
10#include <boost/redis/adapter/adapt.hpp>
11#include <boost/redis/detail/helper.hpp>
12#include <boost/redis/error.hpp>
13#include <boost/redis/operation.hpp>
14#include <boost/redis/request.hpp>
15#include <boost/redis/resp3/type.hpp>
16#include <boost/redis/config.hpp>
17#include <boost/redis/detail/runner.hpp>
18#include <boost/redis/usage.hpp>
19
20#include <boost/system.hpp>
21#include <boost/asio/basic_stream_socket.hpp>
22#include <boost/asio/bind_executor.hpp>
23#include <boost/asio/experimental/parallel_group.hpp>
24#include <boost/asio/ip/tcp.hpp>
25#include <boost/asio/steady_timer.hpp>
26#include <boost/asio/write.hpp>
27#include <boost/assert.hpp>
28#include <boost/core/ignore_unused.hpp>
29#include <boost/asio/ssl/stream.hpp>
30#include <boost/asio/read_until.hpp>
31#include <boost/asio/buffer.hpp>
32#include <boost/asio/experimental/channel.hpp>
33
34#include <algorithm>
35#include <array>
36#include <chrono>
37#include <deque>
38#include <memory>
39#include <string_view>
40#include <type_traits>
41#include <functional>
42
43namespace boost::redis::detail
44{
45
46template <class DynamicBuffer>
47std::string_view buffer_view(DynamicBuffer buf) noexcept
48{
49 char const* start = static_cast<char const*>(buf.data(0, buf.size()).data());
50 return std::string_view{start, std::size(buf)};
51}
52
53template <class AsyncReadStream, class DynamicBuffer>
54class append_some_op {
55private:
56 AsyncReadStream& stream_;
57 DynamicBuffer buf_;
58 std::size_t size_ = 0;
59 std::size_t tmp_ = 0;
60 asio::coroutine coro_{};
61
62public:
63 append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
64 : stream_ {stream}
65 , buf_ {std::move(buf)}
66 , size_{size}
67 { }
68
69 template <class Self>
70 void operator()( Self& self
71 , system::error_code ec = {}
72 , std::size_t n = 0)
73 {
74 BOOST_ASIO_CORO_REENTER (coro_)
75 {
76 tmp_ = buf_.size();
77 buf_.grow(size_);
78
79 BOOST_ASIO_CORO_YIELD
80 stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
81 if (ec) {
82 self.complete(ec, 0);
83 return;
84 }
85
86 buf_.shrink(buf_.size() - tmp_ - n);
87 self.complete({}, n);
88 }
89 }
90};
91
92template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
93auto
94async_append_some(
95 AsyncReadStream& stream,
96 DynamicBuffer buffer,
97 std::size_t size,
98 CompletionToken&& token)
99{
100 return asio::async_compose
101 < CompletionToken
102 , void(system::error_code, std::size_t)
103 >(append_some_op<AsyncReadStream, DynamicBuffer> {stream, buffer, size}, token, stream);
104}
105
106template <class Conn>
107struct exec_op {
108 using req_info_type = typename Conn::req_info;
109 using adapter_type = typename Conn::adapter_type;
110
111 Conn* conn_ = nullptr;
112 std::shared_ptr<req_info_type> info_ = nullptr;
113 asio::coroutine coro{};
114
115 template <class Self>
116 void operator()(Self& self , system::error_code ec = {})
117 {
118 BOOST_ASIO_CORO_REENTER (coro)
119 {
120 // Check whether the user wants to wait for the connection to
121 // be stablished.
122 if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
123 BOOST_ASIO_CORO_YIELD
124 asio::post(std::move(self));
125 return self.complete(error::not_connected, 0);
126 }
127
128 conn_->add_request_info(info_);
129
130EXEC_OP_WAIT:
131 BOOST_ASIO_CORO_YIELD
132 info_->async_wait(std::move(self));
133 BOOST_ASSERT(ec == asio::error::operation_aborted);
134
135 if (info_->ec_) {
136 self.complete(info_->ec_, 0);
137 return;
138 }
139
140 if (info_->stop_requested()) {
141 // Don't have to call remove_request as it has already
142 // been by cancel(exec).
143 return self.complete(ec, 0);
144 }
145
146 if (is_cancelled(self)) {
147 if (info_->is_written()) {
148 using c_t = asio::cancellation_type;
149 auto const c = self.get_cancellation_state().cancelled();
150 if ((c & c_t::terminal) != c_t::none) {
151 // Cancellation requires closing the connection
152 // otherwise it stays in inconsistent state.
153 conn_->cancel(operation::run);
154 return self.complete(ec, 0);
155 } else {
156 // Can't implement other cancelation types, ignoring.
157 self.get_cancellation_state().clear();
158
159 // TODO: Find out a better way to ignore
160 // cancelation.
161 goto EXEC_OP_WAIT;
162 }
163 } else {
164 // Cancelation can be honored.
165 conn_->remove_request(info_);
166 self.complete(ec, 0);
167 return;
168 }
169 }
170
171 self.complete(info_->ec_, info_->read_size_);
172 }
173 }
174};
175
176template <class Conn, class Logger>
177struct run_op {
178 Conn* conn = nullptr;
179 Logger logger_;
180 asio::coroutine coro{};
181
182 template <class Self>
183 void operator()( Self& self
184 , std::array<std::size_t, 2> order = {}
185 , system::error_code ec0 = {}
186 , system::error_code ec1 = {})
187 {
188 BOOST_ASIO_CORO_REENTER (coro)
189 {
190 conn->reset();
191
192 BOOST_ASIO_CORO_YIELD
193 asio::experimental::make_parallel_group(
194 [this](auto token) { return conn->reader(logger_, token);},
195 [this](auto token) { return conn->writer(logger_, token);}
196 ).async_wait(
197 asio::experimental::wait_for_one(),
198 std::move(self));
199
200 if (is_cancelled(self)) {
201 logger_.trace("run-op: canceled. Exiting ...");
202 self.complete(asio::error::operation_aborted);
203 return;
204 }
205
206 logger_.on_run(ec0, ec1);
207
208 switch (order[0]) {
209 case 0: self.complete(ec0); break;
210 case 1: self.complete(ec1); break;
211 default: BOOST_ASSERT(false);
212 }
213 }
214 }
215};
216
217template <class Conn, class Logger>
218struct writer_op {
219 Conn* conn_;
220 Logger logger_;
221 asio::coroutine coro{};
222
223 template <class Self>
224 void operator()( Self& self
225 , system::error_code ec = {}
226 , std::size_t n = 0)
227 {
228 ignore_unused(n);
229
230 BOOST_ASIO_CORO_REENTER (coro) for (;;)
231 {
232 while (conn_->coalesce_requests()) {
233 if (conn_->use_ssl())
234 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
235 else
236 BOOST_ASIO_CORO_YIELD asio::async_write(conn_->next_layer().next_layer(), asio::buffer(conn_->write_buffer_), std::move(self));
237
238 logger_.on_write(ec, conn_->write_buffer_);
239
240 if (ec) {
241 logger_.trace("writer-op: error. Exiting ...");
242 conn_->cancel(operation::run);
243 self.complete(ec);
244 return;
245 }
246
247 if (is_cancelled(self)) {
248 logger_.trace("writer-op: canceled. Exiting ...");
249 self.complete(asio::error::operation_aborted);
250 return;
251 }
252
253 conn_->on_write();
254
255 // A socket.close() may have been called while a
256 // successful write might had already been queued, so we
257 // have to check here before proceeding.
258 if (!conn_->is_open()) {
259 logger_.trace("writer-op: canceled (2). Exiting ...");
260 self.complete({});
261 return;
262 }
263 }
264
265 BOOST_ASIO_CORO_YIELD
266 conn_->writer_timer_.async_wait(std::move(self));
267 if (!conn_->is_open() || is_cancelled(self)) {
268 logger_.trace("writer-op: canceled (3). Exiting ...");
269 // Notice this is not an error of the op, stoping was
270 // requested from the outside, so we complete with
271 // success.
272 self.complete({});
273 return;
274 }
275 }
276 }
277};
278
279template <class Conn, class Logger>
280struct reader_op {
281 using parse_result = typename Conn::parse_result;
282 using parse_ret_type = typename Conn::parse_ret_type;
283 Conn* conn_;
284 Logger logger_;
285 parse_ret_type res_{parse_result::resp, 0};
286 asio::coroutine coro{};
287
288 template <class Self>
289 void operator()( Self& self
290 , system::error_code ec = {}
291 , std::size_t n = 0)
292 {
293 ignore_unused(n);
294
295 BOOST_ASIO_CORO_REENTER (coro) for (;;)
296 {
297 // Appends some data to the buffer if necessary.
298 if ((res_.first == parse_result::needs_more) || std::empty(conn_->read_buffer_)) {
299 if (conn_->use_ssl()) {
300 BOOST_ASIO_CORO_YIELD
301 async_append_some(
302 conn_->next_layer(),
303 conn_->dbuf_,
304 conn_->get_suggested_buffer_growth(),
305 std::move(self));
306 } else {
307 BOOST_ASIO_CORO_YIELD
308 async_append_some(
309 conn_->next_layer().next_layer(),
310 conn_->dbuf_,
311 conn_->get_suggested_buffer_growth(),
312 std::move(self));
313 }
314
315 logger_.on_read(ec, n);
316
317 // EOF is not treated as error.
318 if (ec == asio::error::eof) {
319 logger_.trace("reader-op: EOF received. Exiting ...");
320 conn_->cancel(operation::run);
321 return self.complete({}); // EOFINAE: EOF is not an error.
322 }
323
324 // The connection is not viable after an error.
325 if (ec) {
326 logger_.trace("reader-op: error. Exiting ...");
327 conn_->cancel(operation::run);
328 self.complete(ec);
329 return;
330 }
331
332 // Somebody might have canceled implicitly or explicitly
333 // while we were suspended and after queueing so we have to
334 // check.
335 if (!conn_->is_open() || is_cancelled(self)) {
336 logger_.trace("reader-op: canceled. Exiting ...");
337 self.complete(ec);
338 return;
339 }
340 }
341
342 res_ = conn_->on_read(buffer_view(conn_->dbuf_), ec);
343 if (ec) {
344 logger_.trace("reader-op: parse error. Exiting ...");
345 conn_->cancel(operation::run);
346 self.complete(ec);
347 return;
348 }
349
350 if (res_.first == parse_result::push) {
351 if (!conn_->receive_channel_.try_send(ec, res_.second)) {
352 BOOST_ASIO_CORO_YIELD
353 conn_->receive_channel_.async_send(ec, res_.second, std::move(self));
354 }
355
356 if (ec) {
357 logger_.trace("reader-op: error. Exiting ...");
358 conn_->cancel(operation::run);
359 self.complete(ec);
360 return;
361 }
362
363 if (!conn_->is_open() || is_cancelled(self)) {
364 logger_.trace("reader-op: canceled (2). Exiting ...");
365 self.complete(asio::error::operation_aborted);
366 return;
367 }
368
369 }
370 }
371 }
372};
373
380template <class Executor>
382public:
384 using executor_type = Executor;
385
387 using next_layer_type = asio::ssl::stream<asio::basic_stream_socket<asio::ip::tcp, Executor>>;
388
389 using clock_type = std::chrono::steady_clock;
390 using clock_traits_type = asio::wait_traits<clock_type>;
391 using timer_type = asio::basic_waitable_timer<clock_type, clock_traits_type, executor_type>;
392
394
397 executor_type ex,
398 asio::ssl::context::method method,
399 std::size_t max_read_size)
400 : ctx_{method}
401 , stream_{std::make_unique<next_layer_type>(ex, ctx_)}
402 , writer_timer_{ex}
403 , receive_channel_{ex, 256}
404 , runner_{ex, {}}
405 , dbuf_{read_buffer_, max_read_size}
406 {
407 set_receive_response(ignore);
408 writer_timer_.expires_at((std::chrono::steady_clock::time_point::max)());
409 }
410
412 auto const& get_ssl_context() const noexcept
413 { return ctx_;}
414
416 auto& get_ssl_context() noexcept
417 { return ctx_;}
418
421 {
422 stream_ = std::make_unique<next_layer_type>(writer_timer_.get_executor(), ctx_);
423 }
424
426 auto& next_layer() noexcept { return *stream_; }
427
429 auto const& next_layer() const noexcept { return *stream_; }
430
432 auto get_executor() {return writer_timer_.get_executor();}
433
436 {
437 runner_.cancel(op);
438 if (op == operation::all) {
439 cancel_impl(operation::run);
440 cancel_impl(operation::receive);
441 cancel_impl(operation::exec);
442 return;
443 }
444
445 cancel_impl(op);
446 }
447
448 template <class Response, class CompletionToken>
449 auto async_exec(request const& req, Response& resp, CompletionToken token)
450 {
451 using namespace boost::redis::adapter;
452 auto f = boost_redis_adapt(resp);
453 BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
454
455 auto info = std::make_shared<req_info>(req, f, get_executor());
456
457 return asio::async_compose
458 < CompletionToken
459 , void(system::error_code, std::size_t)
460 >(exec_op<this_type>{this, info}, token, writer_timer_);
461 }
462
463 template <class Response, class CompletionToken>
464 [[deprecated("Set the response with set_receive_response and use the other overload.")]]
465 auto async_receive(Response& response, CompletionToken token)
466 {
467 set_receive_response(response);
468 return receive_channel_.async_receive(std::move(token));
469 }
470
471 template <class CompletionToken>
472 auto async_receive(CompletionToken token)
473 { return receive_channel_.async_receive(std::move(token)); }
474
475 std::size_t receive(system::error_code& ec)
476 {
477 std::size_t size = 0;
478
479 auto f = [&](system::error_code const& ec2, std::size_t n)
480 {
481 ec = ec2;
482 size = n;
483 };
484
485 auto const res = receive_channel_.try_receive(f);
486 if (ec)
487 return 0;
488
489 if (!res)
491
492 return size;
493 }
494
495 template <class Logger, class CompletionToken>
496 auto async_run(config const& cfg, Logger l, CompletionToken token)
497 {
498 runner_.set_config(cfg);
499 l.set_prefix(runner_.get_config().log_prefix);
500 return runner_.async_run(*this, l, std::move(token));
501 }
502
503 template <class Response>
504 void set_receive_response(Response& response)
505 {
506 using namespace boost::redis::adapter;
507 auto g = boost_redis_adapt(response);
508 receive_adapter_ = adapter::detail::make_adapter_wrapper(g);
509 }
510
511 usage get_usage() const noexcept
512 { return usage_; }
513
514private:
515 using receive_channel_type = asio::experimental::channel<executor_type, void(system::error_code, std::size_t)>;
516 using runner_type = runner<executor_type>;
517 using adapter_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;
518 using receiver_adapter_type = std::function<void(resp3::basic_node<std::string_view> const&, system::error_code&)>;
519
520 auto use_ssl() const noexcept
521 { return runner_.get_config().use_ssl;}
522
523 auto cancel_on_conn_lost() -> std::size_t
524 {
525 // Must return false if the request should be removed.
526 auto cond = [](auto const& ptr)
527 {
528 BOOST_ASSERT(ptr != nullptr);
529
530 if (ptr->is_written()) {
531 return !ptr->req_->get_config().cancel_if_unresponded;
532 } else {
533 return !ptr->req_->get_config().cancel_on_connection_lost;
534 }
535 };
536
537 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), cond);
538
539 auto const ret = std::distance(point, std::end(reqs_));
540
541 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
542 ptr->stop();
543 });
544
545 reqs_.erase(point, std::end(reqs_));
546 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
547 return ptr->reset_status();
548 });
549
550 return ret;
551 }
552
553 auto cancel_unwritten_requests() -> std::size_t
554 {
555 auto f = [](auto const& ptr)
556 {
557 BOOST_ASSERT(ptr != nullptr);
558 return ptr->is_written();
559 };
560
561 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), f);
562
563 auto const ret = std::distance(point, std::end(reqs_));
564
565 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
566 ptr->stop();
567 });
568
569 reqs_.erase(point, std::end(reqs_));
570 return ret;
571 }
572
573 void cancel_impl(operation op)
574 {
575 switch (op) {
576 case operation::exec:
577 {
578 cancel_unwritten_requests();
579 } break;
580 case operation::run:
581 {
582 close();
583 writer_timer_.cancel();
584 receive_channel_.cancel();
585 cancel_on_conn_lost();
586 } break;
588 {
589 receive_channel_.cancel();
590 } break;
591 default: /* ignore */;
592 }
593 }
594
595 void on_write()
596 {
597 // We have to clear the payload right after writing it to use it
598 // as a flag that informs there is no ongoing write.
599 write_buffer_.clear();
600
601 // Notice this must come before the for-each below.
602 cancel_push_requests();
603
604 // There is small optimization possible here: traverse only the
605 // partition of unwritten requests instead of them all.
606 std::for_each(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
607 BOOST_ASSERT_MSG(ptr != nullptr, "Expects non-null pointer.");
608 if (ptr->is_staged())
609 ptr->mark_written();
610 });
611 }
612
613 struct req_info {
614 public:
615 using node_type = resp3::basic_node<std::string_view>;
616 using wrapped_adapter_type = std::function<void(node_type const&, system::error_code&)>;
617
618 enum class action
619 {
620 stop,
621 proceed,
622 none,
623 };
624
625 explicit req_info(request const& req, adapter_type adapter, executor_type ex)
626 : timer_{ex}
627 , action_{action::none}
628 , req_{&req}
629 , adapter_{}
630 , expected_responses_{req.get_expected_responses()}
631 , status_{status::none}
632 , ec_{{}}
633 , read_size_{0}
634 {
635 timer_.expires_at((std::chrono::steady_clock::time_point::max)());
636
637 adapter_ = [this, adapter](node_type const& nd, system::error_code& ec)
638 {
639 auto const i = req_->get_expected_responses() - expected_responses_;
640 adapter(i, nd, ec);
641 };
642 }
643
644 auto proceed()
645 {
646 timer_.cancel();
647 action_ = action::proceed;
648 }
649
650 void stop()
651 {
652 timer_.cancel();
653 action_ = action::stop;
654 }
655
656 [[nodiscard]] auto is_waiting_write() const noexcept
657 { return !is_written() && !is_staged(); }
658
659 [[nodiscard]] auto is_written() const noexcept
660 { return status_ == status::written; }
661
662 [[nodiscard]] auto is_staged() const noexcept
663 { return status_ == status::staged; }
664
665 void mark_written() noexcept
666 { status_ = status::written; }
667
668 void mark_staged() noexcept
669 { status_ = status::staged; }
670
671 void reset_status() noexcept
672 { status_ = status::none; }
673
674 [[nodiscard]] auto stop_requested() const noexcept
675 { return action_ == action::stop;}
676
677 template <class CompletionToken>
678 auto async_wait(CompletionToken token)
679 {
680 return timer_.async_wait(std::move(token));
681 }
682
683 //private:
684 enum class status
685 { none
686 , staged
687 , written
688 };
689
690 timer_type timer_;
691 action action_;
692 request const* req_;
693 wrapped_adapter_type adapter_;
694
695 // Contains the number of commands that haven't been read yet.
696 std::size_t expected_responses_;
697 status status_;
698
699 system::error_code ec_;
700 std::size_t read_size_;
701 };
702
703 void remove_request(std::shared_ptr<req_info> const& info)
704 {
705 reqs_.erase(std::remove(std::begin(reqs_), std::end(reqs_), info));
706 }
707
708 using reqs_type = std::deque<std::shared_ptr<req_info>>;
709
710 template <class, class> friend struct reader_op;
711 template <class, class> friend struct writer_op;
712 template <class, class> friend struct run_op;
713 template <class> friend struct exec_op;
714 template <class, class, class> friend struct run_all_op;
715
716 void cancel_push_requests()
717 {
718 auto point = std::stable_partition(std::begin(reqs_), std::end(reqs_), [](auto const& ptr) {
719 return !(ptr->is_staged() && ptr->req_->get_expected_responses() == 0);
720 });
721
722 std::for_each(point, std::end(reqs_), [](auto const& ptr) {
723 ptr->proceed();
724 });
725
726 reqs_.erase(point, std::end(reqs_));
727 }
728
729 [[nodiscard]] bool is_writing() const noexcept
730 {
731 return !write_buffer_.empty();
732 }
733
734 void add_request_info(std::shared_ptr<req_info> const& info)
735 {
736 reqs_.push_back(info);
737
738 if (info->req_->has_hello_priority()) {
739 auto rend = std::partition_point(std::rbegin(reqs_), std::rend(reqs_), [](auto const& e) {
740 return e->is_waiting_write();
741 });
742
743 std::rotate(std::rbegin(reqs_), std::rbegin(reqs_) + 1, rend);
744 }
745
746 if (is_open() && !is_writing())
747 writer_timer_.cancel();
748 }
749
750 template <class CompletionToken, class Logger>
751 auto reader(Logger l, CompletionToken&& token)
752 {
753 return asio::async_compose
754 < CompletionToken
755 , void(system::error_code)
756 >(reader_op<this_type, Logger>{this, l}, token, writer_timer_);
757 }
758
759 template <class CompletionToken, class Logger>
760 auto writer(Logger l, CompletionToken&& token)
761 {
762 return asio::async_compose
763 < CompletionToken
764 , void(system::error_code)
765 >(writer_op<this_type, Logger>{this, l}, token, writer_timer_);
766 }
767
768 template <class Logger, class CompletionToken>
769 auto async_run_lean(config const& cfg, Logger l, CompletionToken token)
770 {
771 runner_.set_config(cfg);
772 l.set_prefix(runner_.get_config().log_prefix);
773 return asio::async_compose
774 < CompletionToken
775 , void(system::error_code)
776 >(run_op<this_type, Logger>{this, l}, token, writer_timer_);
777 }
778
779 [[nodiscard]] bool coalesce_requests()
780 {
781 // Coalesces the requests and marks them staged. After a
782 // successful write staged requests will be marked as written.
783 auto const point = std::partition_point(std::cbegin(reqs_), std::cend(reqs_), [](auto const& ri) {
784 return !ri->is_waiting_write();
785 });
786
787 std::for_each(point, std::cend(reqs_), [this](auto const& ri) {
788 // Stage the request.
789 write_buffer_ += ri->req_->payload();
790 ri->mark_staged();
791 usage_.commands_sent += ri->expected_responses_;
792 });
793
794 usage_.bytes_sent += std::size(write_buffer_);
795
796 return point != std::cend(reqs_);
797 }
798
799 bool is_waiting_response() const noexcept
800 {
801 return !std::empty(reqs_) && reqs_.front()->is_written();
802 }
803
804 void close()
805 {
806 if (stream_->next_layer().is_open()) {
807 system::error_code ec;
808 stream_->next_layer().close(ec);
809 }
810 }
811
812 auto is_open() const noexcept { return stream_->next_layer().is_open(); }
813 auto& lowest_layer() noexcept { return stream_->lowest_layer(); }
814
815 auto is_next_push()
816 {
817 // We handle unsolicited events in the following way
818 //
819 // 1. Its resp3 type is a push.
820 //
821 // 2. A non-push type is received with an empty requests
822 // queue. I have noticed this is possible (e.g. -MISCONF).
823 // I expect them to have type push so we can distinguish
824 // them from responses to commands, but it is a
825 // simple-error. If we are lucky enough to receive them
826 // when the command queue is empty we can treat them as
827 // server pushes, otherwise it is impossible to handle
828 // them properly
829 //
830 // 3. The request does not expect any response but we got
831 // one. This may happen if for example, subscribe with
832 // wrong syntax.
833 //
834 // Useful links:
835 //
836 // - https://github.com/redis/redis/issues/11784
837 // - https://github.com/redis/redis/issues/6426
838 //
839
840 BOOST_ASSERT(!read_buffer_.empty());
841
842 return
843 (resp3::to_type(read_buffer_.front()) == resp3::type::push)
844 || reqs_.empty()
845 || (!reqs_.empty() && reqs_.front()->expected_responses_ == 0)
846 || !is_waiting_response(); // Added to deal with MONITOR.
847 }
848
849 auto get_suggested_buffer_growth() const noexcept
850 {
851 return parser_.get_suggested_buffer_growth(4096);
852 }
853
854 enum class parse_result { needs_more, push, resp };
855
856 using parse_ret_type = std::pair<parse_result, std::size_t>;
857
858 parse_ret_type on_finish_parsing(parse_result t)
859 {
860 if (t == parse_result::push) {
861 usage_.pushes_received += 1;
862 usage_.push_bytes_received += parser_.get_consumed();
863 } else {
864 usage_.responses_received += 1;
865 usage_.response_bytes_received += parser_.get_consumed();
866 }
867
868 on_push_ = false;
869 dbuf_.consume(parser_.get_consumed());
870 auto const res = std::make_pair(t, parser_.get_consumed());
871 parser_.reset();
872 return res;
873 }
874
875 parse_ret_type on_read(std::string_view data, system::error_code& ec)
876 {
877 // We arrive here in two states:
878 //
879 // 1. While we are parsing a message. In this case we
880 // don't want to determine the type of the message in the
881 // buffer (i.e. response vs push) but leave it untouched
882 // until the parsing of a complete message ends.
883 //
884 // 2. On a new message, in which case we have to determine
885 // whether the next messag is a push or a response.
886 //
887 if (!on_push_) // Prepare for new message.
888 on_push_ = is_next_push();
889
890 if (on_push_) {
891 if (!resp3::parse(parser_, data, receive_adapter_, ec))
892 return std::make_pair(parse_result::needs_more, 0);
893
894 if (ec)
895 return std::make_pair(parse_result::push, 0);
896
897 return on_finish_parsing(parse_result::push);
898 }
899
900 BOOST_ASSERT_MSG(is_waiting_response(), "Not waiting for a response (using MONITOR command perhaps?)");
901 BOOST_ASSERT(!reqs_.empty());
902 BOOST_ASSERT(reqs_.front() != nullptr);
903 BOOST_ASSERT(reqs_.front()->expected_responses_ != 0);
904
905 if (!resp3::parse(parser_, data, reqs_.front()->adapter_, ec))
906 return std::make_pair(parse_result::needs_more, 0);
907
908 if (ec) {
909 reqs_.front()->ec_ = ec;
910 reqs_.front()->proceed();
911 return std::make_pair(parse_result::resp, 0);
912 }
913
914 reqs_.front()->read_size_ += parser_.get_consumed();
915
916 if (--reqs_.front()->expected_responses_ == 0) {
917 // Done with this request.
918 reqs_.front()->proceed();
919 reqs_.pop_front();
920 }
921
922 return on_finish_parsing(parse_result::resp);
923 }
924
925 void reset()
926 {
927 write_buffer_.clear();
928 read_buffer_.clear();
929 parser_.reset();
930 on_push_ = false;
931 }
932
933 asio::ssl::context ctx_;
934 std::unique_ptr<next_layer_type> stream_;
935
936 // Notice we use a timer to simulate a condition-variable. It is
937 // also more suitable than a channel and the notify operation does
938 // not suspend.
939 timer_type writer_timer_;
940 receive_channel_type receive_channel_;
941 runner_type runner_;
942 receiver_adapter_type receive_adapter_;
943
944 using dyn_buffer_type = asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>>;
945
946 std::string read_buffer_;
947 dyn_buffer_type dbuf_;
948 std::string write_buffer_;
949 reqs_type reqs_;
950 resp3::parser parser_{};
951 bool on_push_ = false;
952
953 usage usage_;
954};
955
956} // boost::redis::detail
957
958#endif // BOOST_REDIS_CONNECTION_BASE_HPP
Base class for high level Redis asynchronous connections.
auto get_executor()
Returns the associated executor.
void reset_stream()
Resets the underlying stream.
connection_base(executor_type ex, asio::ssl::context::method method, std::size_t max_read_size)
Constructs from an executor.
asio::ssl::stream< asio::basic_stream_socket< asio::ip::tcp, Executor > > next_layer_type
Type of the next layer.
auto & next_layer() noexcept
Returns a reference to the next layer.
auto & get_ssl_context() noexcept
Returns the ssl context.
auto const & get_ssl_context() const noexcept
Returns the ssl context.
void cancel(operation op)
Cancels specific operations.
auto const & next_layer() const noexcept
Returns a const reference to the next layer.
Creates Redis requests.
Definition: request.hpp:46
ignore_t ignore
Global ignore object.
std::tuple< adapter::result< Ts >... > response
Response with compile-time size.
Definition: response.hpp:25
operation
Connection operations that can be cancelled.
Definition: operation.hpp:18
@ sync_receive_push_failed
Can't receive push synchronously without blocking.
@ not_connected
There is no stablished connection.
@ exec
Refers to connection::async_exec operations.
@ all
Refers to all operations.
@ run
Refers to connection::async_run operations.
@ receive
Refers to connection::async_receive operations.
Configure parameters used by the connection classes.
Definition: config.hpp:30
Connection usage information.
Definition: usage.hpp:21
A node in the response tree.
Definition: node.hpp:28