C++ Distributed Hash Table
dhtrunner.h
1 /*
2  * Copyright (C) 2014-2017 Savoir-faire Linux Inc.
3  * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include "infohash.h"
24 #include "value.h"
25 #include "callbacks.h"
26 #include "sockaddr.h"
27 #include "log_enable.h"
28 #include "def.h"
29 
30 #include <thread>
31 #include <mutex>
32 #include <atomic>
33 #include <condition_variable>
34 #include <future>
35 #include <exception>
36 #include <queue>
37 #include <chrono>
38 
39 namespace dht {
40 
41 struct Node;
42 class SecureDht;
43 struct SecureDhtConfig;
44 
51 class OPENDHT_PUBLIC DhtRunner {
52 
53 public:
54  typedef std::function<void(NodeStatus, NodeStatus)> StatusCallback;
55 
56  struct Config {
57  SecureDhtConfig dht_config;
58  bool threaded;
59  std::string proxy_server;
60  std::string push_node_id;
61  };
62 
63  DhtRunner();
64  virtual ~DhtRunner();
65 
66  void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
67  get(id, bindGetCb(cb), donecb, f, w);
68  }
69 
70  void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
71  get(id, bindGetCb(cb), donecb, f, w);
72  }
73 
74  void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
75 
76  void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
77  get(id, cb, bindDoneCb(donecb), f, w);
78  }
79  void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter(), Where w = {});
80 
81  template <class T>
82  void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
83  {
84  get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
85  return cb(unpackVector<T>(vals));
86  },
87  dcb,
88  getFilterSet<T>());
89  }
90  template <class T>
91  void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
92  {
93  get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
94  for (const auto& v : vals) {
95  try {
96  if (not cb(Value::unpack<T>(*v)))
97  return false;
98  } catch (const std::exception&) {
99  continue;
100  }
101  }
102  return true;
103  },
104  dcb,
105  getFilterSet<T>());
106  }
107 
108  std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = Value::AllFilter(), Where w = {}) {
109  auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
110  auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
111  get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
112  values->insert(values->end(), vlist.begin(), vlist.end());
113  return true;
114  }, [=](bool) {
115  p->set_value(std::move(*values));
116  },
117  f, w);
118  return p->get_future();
119  }
120 
121  template <class T>
122  std::future<std::vector<T>> get(InfoHash key) {
123  auto p = std::make_shared<std::promise<std::vector<T>>>();
124  auto values = std::make_shared<std::vector<T>>();
125  get<T>(key, [=](T&& v) {
126  values->emplace_back(std::move(v));
127  return true;
128  }, [=](bool) {
129  p->set_value(std::move(*values));
130  });
131  return p->get_future();
132  }
133 
134  void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
135  void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
136  query(hash, cb, bindDoneCb(done_cb), q);
137  }
138 
139  std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
140 
141  std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
142  return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
143  if (not expired)
144  return cb(vals);
145  return true;
146  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
147  }
148  std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
149  std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter(), Where w = {}) {
150  return listen(key, bindGetCb(cb), f, w);
151  }
152 
153  template <class T>
154  std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
155  {
156  return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
157  return cb(unpackVector<T>(vals));
158  },
159  getFilterSet<T>());
160  }
161  template <class T>
162  std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&, bool)> cb)
163  {
164  return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
165  return cb(unpackVector<T>(vals), expired);
166  },
167  getFilterSet<T>());
168  }
169 
170  template <typename T>
171  std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
172  {
173  return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
174  for (const auto& v : vals) {
175  try {
176  if (not cb(Value::unpack<T>(*v)))
177  return false;
178  } catch (const std::exception&) {
179  continue;
180  }
181  }
182  return true;
183  },
184  getFilterSet<T>(f), w);
185  }
186  template <typename T>
187  std::future<size_t> listen(InfoHash hash, std::function<bool(T&&, bool)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
188  {
189  return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals, bool expired) {
190  for (const auto& v : vals) {
191  try {
192  if (not cb(Value::unpack<T>(*v), expired))
193  return false;
194  } catch (const std::exception&) {
195  continue;
196  }
197  }
198  return true;
199  },
200  getFilterSet<T>(f), w);
201  }
202 
203  void cancelListen(InfoHash h, size_t token);
204  void cancelListen(InfoHash h, std::shared_future<size_t> token);
205 
206  void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
207  void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
208  put(hash, value, bindDoneCb(cb), created, permanent);
209  }
210 
211  void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
212  void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
213  put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
214  }
215  void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
216 
217  void cancelPut(const InfoHash& h, const Value::Id& id);
218 
219  void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={});
220  void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
221  putSigned(hash, value, bindDoneCb(cb));
222  }
223 
224  void putSigned(InfoHash hash, Value&& value, DoneCallback cb={});
225  void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb) {
226  putSigned(hash, std::forward<Value>(value), bindDoneCb(cb));
227  }
228  void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={});
229 
230  void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={});
231  void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
232  putEncrypted(hash, to, value, bindDoneCb(cb));
233  }
234 
235  void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={});
236  void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb) {
237  putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb));
238  }
239  void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={});
240 
245  void bootstrap(const std::vector<SockAddr>& nodes, DoneCallbackSimple&& cb={});
246  void bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb={});
247 
252  void bootstrap(const std::vector<NodeExport>& nodes);
253 
260  void bootstrap(const std::string& host, const std::string& service);
261 
265  void clearBootstrap();
266 
272  void connectivityChanged();
273 
274  void dumpTables() const;
275 
279  InfoHash getId() const;
280 
284  InfoHash getNodeId() const;
285 
290  const SockAddr& getBound(sa_family_t f = AF_INET) const {
291  return (f == AF_INET) ? bound4 : bound6;
292  }
293 
298  in_port_t getBoundPort(sa_family_t f = AF_INET) const {
299  return getBound(f).getPort();
300  }
301 
302  std::pair<size_t, size_t> getStoreSize() const;
303 
304  void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
305 
306  std::vector<NodeExport> exportNodes() const;
307 
308  std::vector<ValuesExport> exportValues() const;
309 
310  void setLoggers(LogMethod err = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG);
311 
315  void setLogFilter(const InfoHash& f = {});
316 
317  void registerType(const ValueType& type);
318 
319  void importValues(const std::vector<ValuesExport>& values);
320 
321  bool isRunning() const {
322  return running;
323  }
324 
325  NodeStats getNodesStats(sa_family_t af) const;
326  unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
327  NodeInfo getNodeInfo() const;
328 
329  std::vector<unsigned> getNodeMessageStats(bool in = false) const;
330  std::string getStorageLog() const;
331  std::string getStorageLog(const InfoHash&) const;
332  std::string getRoutingTablesLog(sa_family_t af) const;
333  std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
334  std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
335  std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
336  std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
337 
338  // securedht methods
339 
340  void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)>);
341  void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
342  void setLocalCertificateStore(CertificateStoreQuery&& query_method);
343 
350  void run(in_port_t port = 4222, const crypto::Identity identity = {}, bool threaded = false, NetId network = 0) {
351  run(port, {
352  /*.dht_config = */{
353  /*.node_config = */{
354  /*.node_id = */{},
355  /*.network = */network,
356  /*.is_bootstrap = */false,
357  /*.maintain_storage*/false
358  },
359  /*.id = */identity
360  },
361  /*.threaded = */threaded,
362  /*.proxy_server = */"",
363  /*.push_node_id = */""
364  });
365  }
366  void run(in_port_t port, Config config);
367 
376  void run(const SockAddr& local4, const SockAddr& local6, Config config);
377 
381  void run(const char* ip4, const char* ip6, const char* service, Config config);
382 
383  void setOnStatusChanged(StatusCallback&& cb) {
384  statusCb = std::move(cb);
385  }
386 
392  time_point loop() {
393  std::lock_guard<std::mutex> lck(dht_mtx);
394  time_point wakeup = time_point::min();
395  try {
396  wakeup = loop_();
397  } catch (const dht::SocketException& e) {
398  startNetwork(bound4, bound6);
399  }
400  return wakeup;
401  }
402 
406  void shutdown(ShutdownCallback cb);
407 
413  void join();
414 
415  void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
416 
422  void enableProxy(bool proxify);
423 
424  /* Push notification methods */
425 
429  void setPushNotificationToken(const std::string& token);
430 
434  void pushNotificationReceived(const std::map<std::string, std::string>& data);
435 
436  /* Proxy server mothods */
437  void forwardAllMessages(bool forward);
438 
439 private:
440  static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
441 
448  void tryBootstrapContinuously();
449 
450  void stopNetwork();
451  void startNetwork(const SockAddr sin4, const SockAddr sin6);
452  time_point loop_();
453 
454  NodeStatus getStatus() const {
455  return std::max(status4, status6);
456  }
457 
459  std::unique_ptr<SecureDht> dht_;
460 
462  std::unique_ptr<SecureDht> dht_via_proxy_;
463 
465  std::atomic_bool use_proxy {false};
466 
468  Config config_;
469 
473  void resetDht();
477  SecureDht* activeDht() const;
478 
482  struct Listener;
483  std::map<size_t, Listener> listeners_;
484  size_t listener_token_ {1};
485 
486  mutable std::mutex dht_mtx {};
487  std::thread dht_thread {};
488  std::condition_variable cv {};
489 
490  std::thread rcv_thread {};
491  std::mutex sock_mtx {};
492 
493  struct ReceivedPacket {
494  Blob data;
495  SockAddr from;
496  time_point received;
497  };
498  std::queue<ReceivedPacket> rcv {};
499 
501  std::atomic_bool bootstraping {false};
502  /* bootstrap nodes given as (host, service) pairs */
503  std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {};
504  std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
505  std::thread bootstrap_thread {};
507  std::mutex bootstrap_mtx {};
508  std::condition_variable bootstrap_cv {};
509 
510  std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
511  std::queue<std::function<void(SecureDht&)>> pending_ops {};
512  std::mutex storage_mtx {};
513 
514  std::atomic_bool running {false};
515  std::atomic_bool running_network {false};
516 
517  NodeStatus status4 {NodeStatus::Disconnected},
518  status6 {NodeStatus::Disconnected};
519  StatusCallback statusCb {nullptr};
520 
521  int stop_writefd {-1};
522  int s4 {-1}, s6 {-1};
523  SockAddr bound4 {};
524  SockAddr bound6 {};
525 
527  std::string pushToken_;
528 };
529 
530 }
void NOLOG(char const *, va_list)
Definition: log_enable.h:38
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
void run(in_port_t port=4222, const crypto::Identity identity={}, bool threaded=false, NetId network=0)
Definition: dhtrunner.h:350
NodeStatus
Definition: callbacks.h:41
std::vector< uint8_t > Blob
Definition: utils.h:114
in_port_t getBoundPort(sa_family_t f=AF_INET) const
Definition: dhtrunner.h:298
Serializable dht::Value filter.
Definition: value.h:749
time_point loop()
Definition: dhtrunner.h:392
const SockAddr & getBound(sa_family_t f=AF_INET) const
Definition: dhtrunner.h:290
Definition: callbacks.h:34