C++ Distributed Hash Table
dht.h
1 /*
2  * Copyright (C) 2014-2017 Savoir-faire Linux Inc.
3  * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include "infohash.h"
24 #include "value.h"
25 #include "utils.h"
26 #include "network_engine.h"
27 #include "scheduler.h"
28 #include "routing_table.h"
29 #include "callbacks.h"
30 #include "dht_interface.h"
31 
32 #include <string>
33 #include <array>
34 #include <vector>
35 #include <map>
36 #include <functional>
37 #include <memory>
38 
39 #ifdef _WIN32
40 #include <iso646.h>
41 #endif
42 
43 namespace dht {
44 
45 namespace net {
46 struct Request;
47 } /* namespace net */
48 
49 struct Storage;
50 struct ValueStorage;
51 class StorageBucket;
52 struct Listener;
53 struct LocalListener;
54 
62 class OPENDHT_PUBLIC Dht final : public DhtInterface {
63 public:
64 
65  Dht();
66 
71  Dht(const int& s, const int& s6, Config config);
72  virtual ~Dht();
73 
77  inline const InfoHash& getNodeId() const { return myid; }
78 
82  NodeStatus getStatus(sa_family_t af) const;
83 
84  NodeStatus getStatus() const {
85  return std::max(getStatus(AF_INET), getStatus(AF_INET6));
86  }
87 
91  void shutdown(ShutdownCallback cb);
92 
99  bool isRunning(sa_family_t af = 0) const;
100 
101  virtual void registerType(const ValueType& type) {
102  types.registerType(type);
103  }
104  const ValueType& getType(ValueType::Id type_id) const {
105  return types.getType(type_id);
106  }
107 
113  void insertNode(const InfoHash& id, const SockAddr&);
114  void insertNode(const InfoHash& id, const sockaddr* sa, socklen_t salen) {
115  insertNode(id, SockAddr(sa, salen));
116  }
117  void insertNode(const NodeExport& n) {
118  insertNode(n.id, SockAddr(n.ss, n.sslen));
119  }
120 
121  void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& cb={});
122 
123  time_point periodic(const uint8_t *buf, size_t buflen, const SockAddr&);
124  time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) {
125  return periodic(buf, buflen, SockAddr(from, fromlen));
126  }
127 
138  virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {});
139  virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) {
140  get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
141  }
142  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) {
143  get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
144  }
145  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) {
146  get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
147  }
158  virtual void query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {});
159  virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) {
160  query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
161  }
162 
166  std::vector<Sp<Value>> getLocal(const InfoHash& key, Value::Filter f = Value::AllFilter()) const;
167 
171  Sp<Value> getLocalById(const InfoHash& key, Value::Id vid) const;
172 
179  void put(const InfoHash& key,
180  Sp<Value>,
181  DoneCallback cb=nullptr,
182  time_point created=time_point::max(),
183  bool permanent = false);
184  void put(const InfoHash& key,
185  const Sp<Value>& v,
186  DoneCallbackSimple cb,
187  time_point created=time_point::max(),
188  bool permanent = false)
189  {
190  put(key, v, bindDoneCb(cb), created, permanent);
191  }
192 
193  void put(const InfoHash& key,
194  Value&& v,
195  DoneCallback cb=nullptr,
196  time_point created=time_point::max(),
197  bool permanent = false)
198  {
199  put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
200  }
201  void put(const InfoHash& key,
202  Value&& v,
203  DoneCallbackSimple cb,
204  time_point created=time_point::max(),
205  bool permanent = false)
206  {
207  put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
208  }
209 
213  std::vector<Sp<Value>> getPut(const InfoHash&);
214 
218  Sp<Value> getPut(const InfoHash&, const Value::Id&);
219 
224  bool cancelPut(const InfoHash&, const Value::Id&);
225 
233  virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={});
234 
235  virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) {
236  return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
237  if (not expired)
238  return cb(vals);
239  return true;
240  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
241  }
242  virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) {
243  return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
244  }
245 
246  virtual bool cancelListen(const InfoHash&, size_t token);
247 
253  void connectivityChanged(sa_family_t);
254  void connectivityChanged() {
255  reported_addr.clear();
256  connectivityChanged(AF_INET);
257  connectivityChanged(AF_INET6);
258  }
259 
264  std::vector<NodeExport> exportNodes();
265 
266  std::vector<ValuesExport> exportValues() const;
267  void importValues(const std::vector<ValuesExport>&);
268 
269  NodeStats getNodesStats(sa_family_t af) const;
270 
271  std::string getStorageLog() const;
272  std::string getStorageLog(const InfoHash&) const;
273 
274  std::string getRoutingTablesLog(sa_family_t) const;
275  std::string getSearchesLog(sa_family_t) const;
276  std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
277 
278  void dumpTables() const;
279  std::vector<unsigned> getNodeMessageStats(bool in = false) {
280  return network_engine.getNodeMessageStats(in);
281  }
282 
286  void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT) {
287  max_store_size = limit;
288  }
289 
294  std::pair<size_t, size_t> getStoreSize() const {
295  return {total_store_size, total_values};
296  }
297 
298  std::vector<SockAddr> getPublicAddress(sa_family_t family = 0);
299 
300  void pushNotificationReceived(const std::map<std::string, std::string>&) {}
301  void resubscribe(unsigned) {}
302 
303 private:
304 
305  /* When performing a search, we search for up to SEARCH_NODES closest nodes
306  to the destination, and use the additional ones to backtrack if any of
307  the target 8 turn out to be dead. */
308  static constexpr unsigned SEARCH_NODES {14};
309 
310  /* The number of bad nodes is limited in order to help determine
311  * presence of connectivity changes. See
312  * https://github.com/savoirfairelinux/opendht/issues/137 for details.
313  *
314  * According to the tables, 25 is a good average value for big networks. If
315  * the network is small, normal search expiration process will handle the
316  * situation.
317  * */
318  static constexpr unsigned SEARCH_MAX_BAD_NODES {25};
319 
320  /* Concurrent search nodes requested count */
321  static constexpr unsigned MAX_REQUESTED_SEARCH_NODES {4};
322 
323  /* Number of listening nodes */
324  static constexpr unsigned LISTEN_NODES {4};
325 
326  /* The maximum number of hashes we're willing to track. */
327  static constexpr unsigned MAX_HASHES {64 * 1024};
328 
329  /* The maximum number of searches we keep data about. */
330  static constexpr unsigned MAX_SEARCHES {64 * 1024};
331 
332  static constexpr std::chrono::minutes MAX_STORAGE_MAINTENANCE_EXPIRE_TIME {10};
333 
334  /* The time after which we consider a search to be expirable. */
335  static constexpr std::chrono::minutes SEARCH_EXPIRE_TIME {62};
336 
337  /* Timeout for listen */
338  static constexpr std::chrono::seconds LISTEN_EXPIRE_TIME {30};
339 
340  static constexpr std::chrono::seconds REANNOUNCE_MARGIN {10};
341 
342  static constexpr size_t TOKEN_SIZE {32};
343 
344  // internal structures
345  struct SearchNode;
346  struct Get;
347  struct Announce;
348  struct Search;
349 
350  // prevent copy
351  Dht(const Dht&) = delete;
352  Dht& operator=(const Dht&) = delete;
353 
354  InfoHash myid {};
355 
356  uint64_t secret {};
357  uint64_t oldsecret {};
358 
359  // registred types
360  TypeStore types;
361 
362  // are we a bootstrap node ?
363  // note: Any running node can be used as a bootstrap node.
364  // Only nodes running only as bootstrap nodes should
365  // be put in bootstrap mode.
366  const bool is_bootstrap {false};
367  const bool maintain_storage {false};
368 
369  // the stuff
370  RoutingTable buckets4 {};
371  RoutingTable buckets6 {};
372 
373  std::map<InfoHash, Storage> store;
374  std::map<SockAddr, StorageBucket, SockAddr::ipCmp> store_quota;
375  size_t total_values {0};
376  size_t total_store_size {0};
377  size_t max_store_size {DEFAULT_STORAGE_LIMIT};
378 
379  using SearchMap = std::map<InfoHash, Sp<Search>>;
380  SearchMap searches4 {};
381  SearchMap searches6 {};
382  uint16_t search_id {0};
383 
384  // map a global listen token to IPv4, IPv6 specific listen tokens.
385  // 0 is the invalid token.
386  std::map<size_t, std::tuple<size_t, size_t, size_t>> listeners {};
387  size_t listener_token {1};
388 
389  // timing
390  Scheduler scheduler;
391  Sp<Scheduler::Job> nextNodesConfirmation {};
392  Sp<Scheduler::Job> nextStorageMaintenance {};
393 
394  net::NetworkEngine network_engine;
395  unsigned pending_pings4 {0};
396  unsigned pending_pings6 {0};
397 
398  using ReportedAddr = std::pair<unsigned, SockAddr>;
399  std::vector<ReportedAddr> reported_addr;
400 
401  std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
402 
403  void rotateSecrets();
404 
405  Blob makeToken(const SockAddr&, bool old) const;
406  bool tokenMatch(const Blob& token, const SockAddr&) const;
407 
408  void reportedAddr(const SockAddr&);
409 
410  // Storage
411  void storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t tid, Query&& = {});
412  bool storageStore(const InfoHash& id, const Sp<Value>& value, time_point created, const SockAddr& sa = {}, bool permanent = false);
413  bool storageErase(const InfoHash& id, Value::Id vid);
414  bool storageRefresh(const InfoHash& id, Value::Id vid);
415  void expireStore();
416  void expireStorage(InfoHash h);
417  void expireStore(decltype(store)::iterator);
418 
419  void storageChanged(const InfoHash& id, Storage& st, ValueStorage&, bool newValue);
420  std::string printStorageLog(const decltype(store)::value_type&) const;
421 
427  void dataPersistence(InfoHash id);
428  size_t maintainStorage(decltype(store)::value_type&, bool force=false, DoneCallback donecb=nullptr);
429 
430  // Buckets
431  RoutingTable& buckets(sa_family_t af) { return af == AF_INET ? buckets4 : buckets6; }
432  const RoutingTable& buckets(sa_family_t af) const { return af == AF_INET ? buckets4 : buckets6; }
433  Bucket* findBucket(const InfoHash& id, sa_family_t af) {
434  RoutingTable::iterator b;
435  switch (af) {
436  case AF_INET:
437  b = buckets4.findBucket(id);
438  return b == buckets4.end() ? nullptr : &(*b);
439  case AF_INET6:
440  b = buckets6.findBucket(id);
441  return b == buckets6.end() ? nullptr : &(*b);
442  default:
443  return nullptr;
444  }
445  }
446  const Bucket* findBucket(const InfoHash& id, sa_family_t af) const {
447  return const_cast<Dht*>(this)->findBucket(id, af);
448  }
449 
450  void expireBuckets(RoutingTable&);
451  void sendCachedPing(Bucket& b);
452  bool bucketMaintenance(RoutingTable&);
453  void dumpBucket(const Bucket& b, std::ostream& out) const;
454 
455  // Nodes
456  void onNewNode(const Sp<Node>& node, int confirm);
457  const Sp<Node> findNode(const InfoHash& id, sa_family_t af) const;
458  bool trySearchInsert(const Sp<Node>& node);
459 
460  // Searches
461 
462  inline SearchMap& searches(sa_family_t af) { return af == AF_INET ? searches4 : searches6; }
463  inline const SearchMap& searches(sa_family_t af) const { return af == AF_INET ? searches4 : searches6; }
464 
469  Sp<Search> search(const InfoHash& id, sa_family_t af, GetCallback = {}, QueryCallback = {}, DoneCallback = {}, Value::Filter = {}, const Sp<Query>& q = {});
470 
471  void announce(const InfoHash& id, sa_family_t af, Sp<Value> value, DoneCallback callback, time_point created=time_point::max(), bool permanent = false);
472  size_t listenTo(const InfoHash& id, sa_family_t af, ValueCallback cb, Value::Filter f = Value::AllFilter(), const Sp<Query>& q = {});
473 
481  unsigned refill(Search& sr);
482  void expireSearches();
483 
484  void confirmNodes();
485  void expire();
486 
495  void searchNodeGetDone(const net::Request& status,
496  net::RequestAnswer&& answer,
497  std::weak_ptr<Search> ws,
498  Sp<Query> query);
499 
509  void searchNodeGetExpired(const net::Request& status, bool over, std::weak_ptr<Search> ws, Sp<Query> query);
510 
518  void paginate(std::weak_ptr<Search> ws, Sp<Query> query, SearchNode* n);
519 
523  SearchNode* searchSendGetValues(Sp<Search> sr, SearchNode *n = nullptr, bool update = true);
524 
531  void searchSendAnnounceValue(const Sp<Search>& sr);
532 
540  void searchStep(Sp<Search>);
541  void searchSynchedNodeListen(const Sp<Search>&, SearchNode&);
542 
543  void dumpSearch(const Search& sr, std::ostream& out) const;
544 
545  bool neighbourhoodMaintenance(RoutingTable&);
546 
547  void onError(Sp<net::Request> node, net::DhtProtocolException e);
548  /* when our address is reported by a distant peer. */
549  void onReportedAddr(const InfoHash& id, const SockAddr&);
550  /* when we receive a ping request */
551  net::RequestAnswer onPing(Sp<Node> node);
552  /* when we receive a "find node" request */
553  net::RequestAnswer onFindNode(Sp<Node> node, const InfoHash& hash, want_t want);
554  void onFindNodeDone(const Sp<Node>& status,
555  net::RequestAnswer& a,
556  Sp<Search> sr);
557  /* when we receive a "get values" request */
558  net::RequestAnswer onGetValues(Sp<Node> node,
559  const InfoHash& hash,
560  want_t want,
561  const Query& q);
562  void onGetValuesDone(const Sp<Node>& status,
563  net::RequestAnswer& a,
564  Sp<Search>& sr,
565  const Sp<Query>& orig_query);
566  /* when we receive a listen request */
567  net::RequestAnswer onListen(Sp<Node> node,
568  const InfoHash& hash,
569  const Blob& token,
570  size_t socket_id,
571  const Query& query);
572  void onListenDone(const Sp<Node>& status,
573  net::RequestAnswer& a,
574  Sp<Search>& sr);
575  /* when we receive an announce request */
576  net::RequestAnswer onAnnounce(Sp<Node> node,
577  const InfoHash& hash,
578  const Blob& token,
579  const std::vector<Sp<Value>>& v,
580  const time_point& created);
581  net::RequestAnswer onRefresh(Sp<Node> node,
582  const InfoHash& hash,
583  const Blob& token,
584  const Value::Id& vid);
585  void onAnnounceDone(const Sp<Node>& status,
586  net::RequestAnswer& a,
587  Sp<Search>& sr);
588 };
589 
590 }
std::pair< size_t, size_t > getStoreSize() const
Definition: dht.h:294
const InfoHash & getNodeId() const
Definition: dht.h:77
NodeStatus
Definition: callbacks.h:41
std::vector< uint8_t > Blob
Definition: utils.h:114
Serializable dht::Value filter.
Definition: value.h:749
void pushNotificationReceived(const std::map< std::string, std::string > &)
Definition: dht.h:300
virtual size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={})
Definition: dht.h:235
Definition: callbacks.h:34
Definition: dht.h:62
void setStorageLimit(size_t limit=DEFAULT_STORAGE_LIMIT)
Definition: dht.h:286