34 #include "../dhtrunner.h" 35 #include "../infohash.h" 38 namespace indexation {
51 Prefix(
InfoHash h) : size_(h.size() * 8), content_(h.begin(), h.end()) { }
52 Prefix(
const Blob& d,
const Blob& f={}) : size_(d.size()*8), flags_(f), content_(d) { }
55 size_(std::min(first, p.content_.size()*8)),
56 content_(
Blob(p.content_.begin(), p.content_.begin()+size_/8))
60 if ( not p.flags_.empty() ) {
61 flags_ =
Blob(p.flags_.begin(), p.flags_.begin()+size_/8);
63 flags_.push_back(p.flags_[size_/8] & (0xFF << (8 - rem)));
67 content_.push_back(p.content_[size_/8] & (0xFF << (8 - rem)));
83 if ((
size_t)std::abs(len) >= content_.size() * 8)
84 throw std::out_of_range(
"len larger than prefix size.");
98 return flags_.empty() or isActiveBit(flags_, pos);
105 return isActiveBit(content_, pos);
108 Prefix getFullSize() {
return Prefix(*
this, content_.size()*8); }
125 copy.push_back(size_);
126 return InfoHash::get(copy);
139 auto longest_prefix_size = std::min(p1.size_, p2.size_);
141 for (i = 0; i < longest_prefix_size; i++) {
142 if (p1.content_.data()[i] != p2.content_.data()[i]
150 if (i == longest_prefix_size)
151 return 8*longest_prefix_size;
153 x = p1.content_.data()[i] ^ p2.content_.data()[i];
156 while ((x & 0x80) == 0) {
168 swapBit(content_, bit);
175 swapBit(flags_, bit);
182 content_ = addPadding(content_, size);
187 auto csize = size_ - flags_.size() * 8;
189 flags_.push_back(0xFF);
195 flags_.push_back(0xFF << (8 - csize));
198 for (
auto i = flags_.size(); i < content_.size(); i++ )
199 flags_.push_back(0xFF);
202 std::string toString()
const;
222 Blob addPadding(
Blob toP,
size_t size) {
224 for (
auto i = copy.size(); i < size; i++ )
227 swapBit(copy, size_ + 1);
241 bool isActiveBit(
const Blob &b,
size_t pos)
const {
242 if ( pos >= content_.size() * 8 )
243 throw std::out_of_range(
"Can't detect active bit at pos, pos larger than prefix size or empty prefix");
245 return ((b[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1;
258 void swapBit(
Blob &b,
size_t bit) {
259 if ( bit >= b.size() * 8 )
260 throw std::out_of_range(
"bit larger than prefix size.");
262 size_t offset_bit = (8 - bit) % 8;
263 b[bit / 8] ^= (1 << offset_bit);
267 using Value = std::pair<InfoHash, dht::Value::Id>;
271 virtual void unpackValue(
const dht::Value& v) {
272 Serializable<IndexEntry>::unpackValue(v);
277 auto pack = Serializable<IndexEntry>::packValue();
278 pack.user_type = name;
285 MSGPACK_DEFINE_MAP(prefix, value)
288 class OPENDHT_PUBLIC
Pht {
289 static constexpr
const char* INVALID_KEY =
"Key does not match the PHT key spec.";
292 static constexpr
const char* INDEX_PREFIX =
"index.pht.";
299 static constexpr
const size_t MAX_NODE_ENTRY_COUNT {16};
302 using Key = std::map<std::string, Blob>;
306 using KeySpec = std::map<std::string, size_t>;
307 using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values,
const Prefix& p)>;
309 typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values,
Prefix* p,
void *user_data);
310 static LookupCallback
311 bindLookupCb(LookupCallbackRaw raw_cb,
void* user_data) {
312 if (not raw_cb)
return {};
313 return [=](std::vector<std::shared_ptr<Value>>& values,
const Prefix& p) {
314 raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (
Prefix*) &p, user_data);
317 using LookupCallbackSimple = std::function<void(std::vector<std::shared_ptr<Value>>& values)>;
318 typedef void (*LookupCallbackSimpleRaw)(std::vector<std::shared_ptr<Value>>* values,
void *user_data);
319 static LookupCallbackSimple
320 bindLookupCbSimple(LookupCallbackSimpleRaw raw_cb,
void* user_data) {
321 if (not raw_cb)
return {};
322 return [=](std::vector<std::shared_ptr<Value>>& values) {
323 raw_cb((std::vector<std::shared_ptr<Value>>*) &values, user_data);
327 Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner>
dht)
328 : name_(INDEX_PREFIX + name), canary_(name_ +
".canary"), keySpec_(k_spec), dht_(
dht) {}
335 void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple done_cb = {},
bool exact_match =
true);
336 void lookup(Key k, LookupCallbackSimple cb = {}, DoneCallbackSimple done_cb = {},
bool exact_match =
true)
338 lookup(k, [=](std::vector<std::shared_ptr<Value>>& values,
Prefix) { cb(values); }, done_cb, exact_match);
348 void insert(Key k, Value v, DoneCallbackSimple done_cb = {}) {
351 auto lo = std::make_shared<int>(0);
352 auto hi = std::make_shared<int>(p.size_);
356 entry.prefix = p.content_;
359 Pht::insert(p, entry, lo, hi, clock::now(),
true, done_cb);
376 void insert(
const Prefix& kp, IndexEntry entry, std::shared_ptr<int> lo, std::shared_ptr<int> hi, time_point time_p,
377 bool check_split, DoneCallbackSimple done_cb = {});
385 void insert(
const Prefix& p);
395 int lookup(
const Prefix& p);
398 static constexpr
const size_t MAX_ELEMENT {1024};
399 static constexpr
const std::chrono::minutes NODE_EXPIRE_TIME {5};
402 time_point last_reply;
403 std::shared_ptr<Node> parent;
404 std::weak_ptr<Node> left_child;
405 std::weak_ptr<Node> right_child;
408 std::weak_ptr<Node> root_;
415 std::multimap<time_point, std::shared_ptr<Node>> leaves_;
419 using RealInsertCallback = std::function<void(const Prefix& p, IndexEntry entry)>;
420 using LookupCallbackWrapper = std::function<void(std::vector<std::shared_ptr<IndexEntry>>& values,
const Prefix& p)>;
436 void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
437 std::shared_ptr<std::vector<std::shared_ptr<IndexEntry>>> vals,
438 LookupCallbackWrapper cb, DoneCallbackSimple done_cb,
439 std::shared_ptr<unsigned> max_common_prefix_len,
440 int start = -1,
bool all_values =
false);
449 Prefix zcurve(
const std::vector<Prefix>& all_prefix)
const;
459 virtual Prefix linearize(Key k)
const;
469 void getRealPrefix(
const std::shared_ptr<Prefix>& p, IndexEntry entry, RealInsertCallback end_cb );
479 void checkPhtUpdate(Prefix p, IndexEntry entry, time_point time_p);
488 static size_t findSplitLocation(
const Prefix& compared,
const std::vector<std::shared_ptr<IndexEntry>>& vals) {
489 for (
size_t i = 0; i < compared.content_.size() * 8 - 1; i++ )
490 for (
auto const& v : vals)
491 if ( Prefix(v->prefix).isContentBitActive(i) != compared.isContentBitActive(i) )
493 return compared.content_.size() * 8 - 1;
504 void split(
const Prefix& insert,
const std::vector<std::shared_ptr<IndexEntry>>& vals, IndexEntry entry, RealInsertCallback end_cb);
509 bool validKey(
const Key& k)
const {
510 return k.size() == keySpec_.size() and
511 std::equal(k.begin(), k.end(), keySpec_.begin(),
512 [&](
const Key::value_type& key,
const KeySpec::value_type& key_spec) {
513 return key.first == key_spec.first and key.second.size() <= key_spec.second;
522 void updateCanary(Prefix p);
524 const std::string name_;
525 const std::string canary_;
526 const KeySpec keySpec_;
528 std::shared_ptr<DhtRunner> dht_;
void swapContentBit(size_t bit)
void swapFlagBit(size_t bit)
void insert(Key k, Value v, DoneCallbackSimple done_cb={})
bool isFlagActive(size_t pos) const
bool isContentBitActive(size_t pos) const
A blob structure which prefixes a Key in the PHT.
std::vector< uint8_t > Blob
Prefix getPrefix(ssize_t len) const
void addPaddingContent(size_t size)
Prefix getSibling() const
static unsigned commonBits(const Prefix &p1, const Prefix &p2)