libfilezilla
rate_limiter.hpp
Go to the documentation of this file.
1 #ifndef LIBFILEZILLA_RATE_LIMITER_HEADER
2 #define LIBFILEZILLA_RATE_LIMITER_HEADER
3 
12 #include "event_handler.hpp"
13 
14 #include <atomic>
15 #include <vector>
16 
17 namespace fz {
18 
19 namespace rate {
20 using type = uint64_t;
21 enum : type {
22  unlimited = static_cast<type>(-1)
23 };
24 }
25 
26 namespace direction {
27 enum type : size_t {
28  inbound,
29  outbound
30 };
31 }
32 
33 class rate_limiter;
34 
45 class FZ_PUBLIC_SYMBOL rate_limit_manager final : public event_handler
46 {
47 public:
48  explicit rate_limit_manager(event_loop & loop);
49  virtual ~rate_limit_manager();
50 
57  void add(rate_limiter* limiter);
58 
60  void set_burst_tolerance(rate::type tolerance);
61 
62 private:
63  friend class rate_limiter;
64  friend class bucket_base;
65  friend class bucket;
66 
67  void record_activity();
68 
69  void operator()(event_base const& ev);
70  void on_timer(timer_id const&);
71 
72  void process(rate_limiter* limiter, bool locked);
73 
74  mutex mtx_{false};
75  std::vector<rate_limiter*> limiters_;
76 
77  std::atomic<timer_id> timer_{};
78 
79  std::atomic<int> activity_{2};
80 
81  std::atomic<rate::type> burst_tolerance_{1};
82 };
83 
85 class FZ_PUBLIC_SYMBOL bucket_base
86 {
87 public:
88  virtual ~bucket_base() noexcept = default;
89 
95  void remove_bucket();
96 
97 protected:
98  friend class rate_limiter;
99 
105  virtual void lock_tree() { mtx_.lock(); }
106 
112  virtual void update_stats(bool & active) = 0;
113 
119  virtual size_t weight() const { return 1; }
120 
126  virtual size_t unsaturated(direction::type const /*d*/) const { return 0; }
127 
133  virtual void set_mgr_recursive(rate_limit_manager * mgr);
134 
144  virtual rate::type add_tokens(direction::type const /*d*/, rate::type /*tokens*/, rate::type /*limit*/) = 0;
145 
155  virtual rate::type distribute_overflow(direction::type const /*d*/, rate::type /*tokens*/) { return 0; }
156 
160  virtual void unlock_tree() { mtx_.unlock(); }
161 
167  virtual std::array<rate::type, 2> gather_unspent_for_removal() = 0;
168 
169  mutex mtx_{false};
170  rate_limit_manager * mgr_{};
171  void * parent_{};
172  size_t idx_{static_cast<size_t>(-1)};
173 };
174 
185 class FZ_PUBLIC_SYMBOL rate_limiter final : public bucket_base
186 {
187 public:
188  rate_limiter() = default;
189  explicit rate_limiter(rate_limit_manager * mgr);
190  virtual ~rate_limiter();
191 
198  void add(bucket_base* bucket);
199 
207  void set_limits(rate::type download_limit, rate::type upload_limit);
208 
210  rate::type limit(direction::type const d);
211 
212 private:
213  friend class bucket_base;
214  friend class rate_limit_manager;
215 
216  virtual void lock_tree() override;
217 
218  bool do_set_limit(direction::type const d, rate::type limit);
219 
220  virtual void update_stats(bool & active) override;
221  virtual size_t weight() const override { return weight_; }
222  virtual size_t unsaturated(direction::type const d) const override { return data_[d].unused_capacity_ ? data_[d].unsaturated_ : 0; }
223  virtual void set_mgr_recursive(rate_limit_manager * mgr) override;
224 
225  virtual rate::type add_tokens(direction::type const d, rate::type tokens, rate::type limit) override;
226  virtual rate::type distribute_overflow(direction::type const d, rate::type tokens) override;
227 
228  virtual void unlock_tree() override;
229 
230  void pay_debt(direction::type const d);
231 
232  virtual std::array<rate::type, 2> gather_unspent_for_removal() override;
233 
234  std::vector<bucket_base*> buckets_;
235  std::vector<size_t> scratch_buffer_;
236  size_t weight_{};
237 
238  struct data_t {
239  rate::type limit_{rate::unlimited};
240  rate::type merged_tokens_;
241  rate::type overflow_{};
242  rate::type debt_{};
243  rate::type unused_capacity_{};
244  rate::type carry_{};
245  size_t unsaturated_{};
246  } data_[2];
247 };
248 
252 class FZ_PUBLIC_SYMBOL bucket : public bucket_base
253 {
254 public:
255  virtual ~bucket();
256 
262  rate::type available(direction::type const d);
263 
270  void consume(direction::type const d, rate::type amount);
271 
272 protected:
278  virtual void wakeup(direction::type /*d*/) {}
279 
280 private:
281  virtual void update_stats(bool & active) override;
282  virtual size_t unsaturated(direction::type const d) const override { return data_[d].unsaturated_ ? 1 : 0; }
283 
284  virtual rate::type add_tokens(direction::type const d, rate::type tokens, rate::type limit) override;
285  virtual rate::type distribute_overflow(direction::type const d, rate::type tokens) override;
286 
287  virtual void unlock_tree() override;
288 
289  virtual std::array<rate::type, 2> gather_unspent_for_removal() override;
290 
291  struct data_t {
292  rate::type available_{rate::unlimited};
293  rate::type overflow_multiplier_{1};
294  rate::type bucket_size_{rate::unlimited};
295  bool waiting_{};
296  bool unsaturated_{};
297  } data_[2];
298 };
299 
300 }
301 
302 #endif
virtual size_t weight() const
Returns the weight of the tree.
Definition: rate_limiter.hpp:119
Context for rate_limiters.
Definition: rate_limiter.hpp:45
A rate-limited token bucket.
Definition: rate_limiter.hpp:252
Simple handler for asynchronous event processing.
Definition: event_handler.hpp:54
Declares the event_handler class.
virtual void unlock_tree()
Recursively unlocks the mutexes of self and all childen.
Definition: rate_limiter.hpp:160
virtual rate::type distribute_overflow(direction::type const, rate::type)
Recursively distributes overflow.
Definition: rate_limiter.hpp:155
virtual void wakeup(direction::type)
Called in response to unlock_tree if tokens have become available.
Definition: rate_limiter.hpp:278
type
Definition: logger.hpp:15
A threaded event loop that supports sending events and timers.
Definition: event_loop.hpp:33
A limiter for the attached buckets.
Definition: rate_limiter.hpp:185
Base class for buckets.
Definition: rate_limiter.hpp:85
The namespace used by libfilezilla.
Definition: apply.hpp:17
virtual size_t unsaturated(direction::type const) const
Returns the number of buckets not yet full.
Definition: rate_limiter.hpp:126
virtual void lock_tree()
Recursively locks the mutexes of self and all children.
Definition: rate_limiter.hpp:105
The process class manages an asynchronous process with redirected IO.
Definition: process.hpp:21
Lean replacement for std::(recursive_)mutex.
Definition: mutex.hpp:27
Common base class for all events.
Definition: event.hpp:22