C++ Distributed Hash Table
pht.h
1 /*
2  * Copyright (C) 2014-2017 Savoir-faire Linux Inc.
3  * Author(s) : Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
4  * Simon DĂ©saulniers <simon.desaulniers@savoirfairelinux.com>
5  * Nicolas Reynaud <nicolas.reynaud@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include <string>
24 #include <vector>
25 #include <memory>
26 #include <map>
27 #include <functional>
28 #include <stdexcept>
29 #include <bitset>
30 #include <iostream>
31 #include <sstream>
32 
33 #include "../value.h"
34 #include "../dhtrunner.h"
35 #include "../infohash.h"
36 
37 namespace dht {
38 namespace indexation {
39 
49 struct OPENDHT_PUBLIC Prefix {
50  Prefix() {}
51  Prefix(InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) { }
52  Prefix(const Blob& d, const Blob& f={}) : size_(d.size()*8), flags_(f), content_(d) { }
53 
54  Prefix(const Prefix& p, size_t first) :
55  size_(std::min(first, p.content_.size()*8)),
56  content_(Blob(p.content_.begin(), p.content_.begin()+size_/8))
57  {
58 
59  auto rem = size_ % 8;
60  if ( not p.flags_.empty() ) {
61  flags_ = Blob(p.flags_.begin(), p.flags_.begin()+size_/8);
62  if (rem)
63  flags_.push_back(p.flags_[size_/8] & (0xFF << (8 - rem)));
64  }
65 
66  if (rem)
67  content_.push_back(p.content_[size_/8] & (0xFF << (8 - rem)));
68  }
69 
82  Prefix getPrefix(ssize_t len) const {
83  if ((size_t)std::abs(len) >= content_.size() * 8)
84  throw std::out_of_range("len larger than prefix size.");
85  if (len < 0)
86  len += size_;
87 
88  return Prefix(*this, len);
89  }
90 
97  bool isFlagActive(size_t pos) const {
98  return flags_.empty() or isActiveBit(flags_, pos);
99  }
100 
104  bool isContentBitActive(size_t pos) const {
105  return isActiveBit(content_, pos);
106  }
107 
108  Prefix getFullSize() { return Prefix(*this, content_.size()*8); }
109 
115  Prefix getSibling() const {
116  Prefix copy = *this;
117  if ( size_ )
118  copy.swapContentBit(size_ - 1);
119 
120  return copy;
121  }
122 
123  InfoHash hash() const {
124  Blob copy(content_);
125  copy.push_back(size_);
126  return InfoHash::get(copy);
127  }
128 
136  static inline unsigned commonBits(const Prefix& p1, const Prefix& p2) {
137  unsigned i, j;
138  uint8_t x;
139  auto longest_prefix_size = std::min(p1.size_, p2.size_);
140 
141  for (i = 0; i < longest_prefix_size; i++) {
142  if (p1.content_.data()[i] != p2.content_.data()[i]
143  or not p1.isFlagActive(i)
144  or not p2.isFlagActive(i) ) {
145 
146  break;
147  }
148  }
149 
150  if (i == longest_prefix_size)
151  return 8*longest_prefix_size;
152 
153  x = p1.content_.data()[i] ^ p2.content_.data()[i];
154 
155  j = 0;
156  while ((x & 0x80) == 0) {
157  x <<= 1;
158  j++;
159  }
160 
161  return 8 * i + j;
162  }
163 
167  void swapContentBit(size_t bit) {
168  swapBit(content_, bit);
169  }
170 
174  void swapFlagBit(size_t bit) {
175  swapBit(flags_, bit);
176  }
177 
181  void addPaddingContent(size_t size) {
182  content_ = addPadding(content_, size);
183  }
184 
185  void updateFlags() {
186  /* Fill first known bit */
187  auto csize = size_ - flags_.size() * 8;
188  while(csize >= 8) {
189  flags_.push_back(0xFF);
190  csize -= 8;
191  }
192 
193  /* if needed fill remaining bit */
194  if ( csize )
195  flags_.push_back(0xFF << (8 - csize));
196 
197  /* Complet vector space missing */
198  for ( auto i = flags_.size(); i < content_.size(); i++ )
199  flags_.push_back(0xFF);
200  }
201 
202  std::string toString() const;
203 
204  size_t size_ {0};
205 
206  /* Will contain flags according to content_.
207  If flags_[i] == 0, then content_[i] is unknown
208  else if flags_[i] == 1, then content_[i] is known */
209  Blob flags_ {};
210  Blob content_ {};
211 
212 private:
213 
222  Blob addPadding(Blob toP, size_t size) {
223  Blob copy = toP;
224  for ( auto i = copy.size(); i < size; i++ )
225  copy.push_back(0);
226 
227  swapBit(copy, size_ + 1);
228  return copy;
229  }
230 
241  bool isActiveBit(const Blob &b, size_t pos) const {
242  if ( pos >= content_.size() * 8 )
243  throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix");
244 
245  return ((b[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1;
246  }
247 
258  void swapBit(Blob &b, size_t bit) {
259  if ( bit >= b.size() * 8 )
260  throw std::out_of_range("bit larger than prefix size.");
261 
262  size_t offset_bit = (8 - bit) % 8;
263  b[bit / 8] ^= (1 << offset_bit);
264  }
265 };
266 
267 using Value = std::pair<InfoHash, dht::Value::Id>;
268 struct OPENDHT_PUBLIC IndexEntry : public dht::Value::Serializable<IndexEntry> {
269  static const ValueType TYPE;
270 
271  virtual void unpackValue(const dht::Value& v) {
272  Serializable<IndexEntry>::unpackValue(v);
273  name = v.user_type;
274  }
275 
276  virtual dht::Value packValue() const {
277  auto pack = Serializable<IndexEntry>::packValue();
278  pack.user_type = name;
279  return pack;
280  }
281 
282  Blob prefix;
283  Value value;
284  std::string name;
285  MSGPACK_DEFINE_MAP(prefix, value)
286 };
287 
288 class OPENDHT_PUBLIC Pht {
289  static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec.";
290 
291  /* Prefixes the user_type for all dht values put on the DHT */
292  static constexpr const char* INDEX_PREFIX = "index.pht.";
293 
294 public:
295 
296  /* This is the maximum number of entries per node. This parameter is
297  * critical and influences the traffic a lot during a lookup operation.
298  */
299  static constexpr const size_t MAX_NODE_ENTRY_COUNT {16};
300 
301  /* A key for a an index entry */
302  using Key = std::map<std::string, Blob>;
303 
304  /* Specifications of the keys. It defines the number, the length and the
305  * serialization order of fields. */
306  using KeySpec = std::map<std::string, size_t>;
307  using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, const Prefix& p)>;
308 
309  typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data);
310  static LookupCallback
311  bindLookupCb(LookupCallbackRaw raw_cb, void* user_data) {
312  if (not raw_cb) return {};
313  return [=](std::vector<std::shared_ptr<Value>>& values, const Prefix& p) {
314  raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (Prefix*) &p, user_data);
315  };
316  }
317  using LookupCallbackSimple = std::function<void(std::vector<std::shared_ptr<Value>>& values)>;
318  typedef void (*LookupCallbackSimpleRaw)(std::vector<std::shared_ptr<Value>>* values, void *user_data);
319  static LookupCallbackSimple
320  bindLookupCbSimple(LookupCallbackSimpleRaw raw_cb, void* user_data) {
321  if (not raw_cb) return {};
322  return [=](std::vector<std::shared_ptr<Value>>& values) {
323  raw_cb((std::vector<std::shared_ptr<Value>>*) &values, user_data);
324  };
325  }
326 
327  Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner> dht)
328  : name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht) {}
329 
330  virtual ~Pht () { }
331 
335  void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true);
336  void lookup(Key k, LookupCallbackSimple cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true)
337  {
338  lookup(k, [=](std::vector<std::shared_ptr<Value>>& values, Prefix) { cb(values); }, done_cb, exact_match);
339  }
340 
348  void insert(Key k, Value v, DoneCallbackSimple done_cb = {}) {
349  Prefix p = linearize(k);
350 
351  auto lo = std::make_shared<int>(0);
352  auto hi = std::make_shared<int>(p.size_);
353 
354  IndexEntry entry;
355  entry.value = v;
356  entry.prefix = p.content_;
357  entry.name = name_;
358 
359  Pht::insert(p, entry, lo, hi, clock::now(), true, done_cb);
360  }
361 
362 private:
363 
376  void insert(const Prefix& kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p,
377  bool check_split, DoneCallbackSimple done_cb = {});
378 
379  class Cache {
380  public:
385  void insert(const Prefix& p);
386 
395  int lookup(const Prefix& p);
396 
397  private:
398  static constexpr const size_t MAX_ELEMENT {1024};
399  static constexpr const std::chrono::minutes NODE_EXPIRE_TIME {5};
400 
401  struct Node {
402  time_point last_reply; /* Made the assocation between leaves and leaves multimap */
403  std::shared_ptr<Node> parent; /* Share_ptr to the parent, it allow the self destruction of tree */
404  std::weak_ptr<Node> left_child; /* Left child, for bit equal to 1 */
405  std::weak_ptr<Node> right_child; /* Right child, for bit equal to 0 */
406  };
407 
408  std::weak_ptr<Node> root_; /* Root of the tree */
409 
415  std::multimap<time_point, std::shared_ptr<Node>> leaves_;
416  };
417 
418  /* Callback used for insert value by using the pht */
419  using RealInsertCallback = std::function<void(const Prefix& p, IndexEntry entry)>;
420  using LookupCallbackWrapper = std::function<void(std::vector<std::shared_ptr<IndexEntry>>& values, const Prefix& p)>;
421 
436  void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
437  std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals,
438  LookupCallbackWrapper cb, DoneCallbackSimple done_cb,
439  std::shared_ptr<unsigned> max_common_prefix_len,
440  int start = -1, bool all_values = false);
441 
449  Prefix zcurve(const std::vector<Prefix>& all_prefix) const;
450 
459  virtual Prefix linearize(Key k) const;
460 
469  void getRealPrefix(const std::shared_ptr<Prefix>& p, IndexEntry entry, RealInsertCallback end_cb );
470 
479  void checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p);
480 
488  static size_t findSplitLocation(const Prefix& compared, const std::vector<std::shared_ptr<IndexEntry>>& vals) {
489  for ( size_t i = 0; i < compared.content_.size() * 8 - 1; i++ )
490  for ( auto const& v : vals)
491  if ( Prefix(v->prefix).isContentBitActive(i) != compared.isContentBitActive(i) )
492  return i + 1;
493  return compared.content_.size() * 8 - 1;
494  }
495 
504  void split(const Prefix& insert, const std::vector<std::shared_ptr<IndexEntry>>& vals, IndexEntry entry, RealInsertCallback end_cb);
505 
509  bool validKey(const Key& k) const {
510  return k.size() == keySpec_.size() and
511  std::equal(k.begin(), k.end(), keySpec_.begin(),
512  [&](const Key::value_type& key, const KeySpec::value_type& key_spec) {
513  return key.first == key_spec.first and key.second.size() <= key_spec.second;
514  }
515  );
516  }
517 
522  void updateCanary(Prefix p);
523 
524  const std::string name_;
525  const std::string canary_;
526  const KeySpec keySpec_;
527  Cache cache_;
528  std::shared_ptr<DhtRunner> dht_;
529 };
530 
531 } /* indexation */
532 } /* dht */
533 
void swapContentBit(size_t bit)
Definition: pht.h:167
void swapFlagBit(size_t bit)
Definition: pht.h:174
void insert(Key k, Value v, DoneCallbackSimple done_cb={})
Definition: pht.h:348
bool isFlagActive(size_t pos) const
Definition: pht.h:97
std::string user_type
Definition: value.h:577
bool isContentBitActive(size_t pos) const
Definition: pht.h:104
A blob structure which prefixes a Key in the PHT.
Definition: pht.h:49
std::vector< uint8_t > Blob
Definition: utils.h:114
Prefix getPrefix(ssize_t len) const
Definition: pht.h:82
void addPaddingContent(size_t size)
Definition: pht.h:181
Prefix getSibling() const
Definition: pht.h:115
static unsigned commonBits(const Prefix &p1, const Prefix &p2)
Definition: pht.h:136
Definition: callbacks.h:34