C++ Distributed Hash Table
network_engine.h
1 /*
2  * Copyright (C) 2014-2018 Savoir-faire Linux Inc.
3  * Author(s) : Adrien BĂ©raud <adrien.beraud@savoirfairelinux.com>
4  * Simon DĂ©saulniers <simon.desaulniers@savoirfairelinux.com>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #pragma once
21 
22 #include "node_cache.h"
23 #include "value.h"
24 #include "infohash.h"
25 #include "node.h"
26 #include "scheduler.h"
27 #include "utils.h"
28 #include "rng.h"
29 #include "rate_limiter.h"
30 
31 #include <vector>
32 #include <string>
33 #include <functional>
34 #include <algorithm>
35 #include <memory>
36 #include <queue>
37 
38 namespace dht {
39 namespace net {
40 
41 struct Request;
42 struct Socket;
43 struct TransId;
44 
45 #ifndef MSG_CONFIRM
46 #define MSG_CONFIRM 0
47 #endif
48 
50 public:
51  // sent to another peer (http-like).
52  static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */
53  static const constexpr uint16_t UNAUTHORIZED {401}; /* wrong tokens. */
54  static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */
55  // for internal use (custom).
56  static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */
57  static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */
58  static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423}; /* node info length is wrong */
59 
60  static const std::string GET_NO_INFOHASH; /* received "get" request with no infohash */
61  static const std::string LISTEN_NO_INFOHASH; /* got "listen" request without infohash */
62  static const std::string LISTEN_WRONG_TOKEN; /* wrong token in "listen" request */
63  static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */
64  static const std::string PUT_WRONG_TOKEN; /* got "put" request with wrong token */
65  static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */
66  static const std::string PUT_INVALID_ID; /* invalid id in "put" request */
67 
68  DhtProtocolException(uint16_t code, const std::string& msg="", InfoHash failing_node_id={})
69  : DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
70 
71  std::string getMsg() const { return msg; }
72  uint16_t getCode() const { return code; }
73  const InfoHash getNodeId() const { return failing_node_id; }
74 
75 private:
76  std::string msg;
77  uint16_t code;
78  const InfoHash failing_node_id;
79 };
80 
81 struct ParsedMessage;
82 
86 struct RequestAnswer {
87  Blob ntoken {};
88  Value::Id vid {};
89  std::vector<Sp<Value>> values {};
90  std::vector<Value::Id> refreshed_values {};
91  std::vector<Value::Id> expired_values {};
92  std::vector<Sp<FieldValueIndex>> fields {};
93  std::vector<Sp<Node>> nodes4 {};
94  std::vector<Sp<Node>> nodes6 {};
95  RequestAnswer() {}
96  RequestAnswer(ParsedMessage&& msg);
97 };
98 
117 class NetworkEngine final
118 {
119 private:
123  std::function<void(Sp<Request>, DhtProtocolException)> onError;
124 
131  std::function<void(const Sp<Node>&, int)> onNewNode;
138  std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
144  std::function<RequestAnswer(Sp<Node>)> onPing {};
153  std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {};
162  std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {};
171  std::function<RequestAnswer(Sp<Node>,
172  const InfoHash&,
173  const Blob&,
174  Tid,
175  const Query&)> onListen {};
185  std::function<RequestAnswer(Sp<Node>,
186  const InfoHash&,
187  const Blob&,
188  const std::vector<Sp<Value>>&,
189  const time_point&)> onAnnounce {};
198  std::function<RequestAnswer(Sp<Node>,
199  const InfoHash&,
200  const Blob&,
201  const Value::Id&)> onRefresh {};
202 
203 public:
204  using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
205  using RequestExpiredCb = std::function<void(const Request&, bool)>;
206 
207  NetworkEngine(Logger& log, Scheduler& scheduler, const int& s = -1, const int& s6 = -1);
208  NetworkEngine(InfoHash& myid, NetId net, const int& s, const int& s6, Logger& log, Scheduler& scheduler,
209  decltype(NetworkEngine::onError) onError,
210  decltype(NetworkEngine::onNewNode) onNewNode,
211  decltype(NetworkEngine::onReportedAddr) onReportedAddr,
212  decltype(NetworkEngine::onPing) onPing,
213  decltype(NetworkEngine::onFindNode) onFindNode,
214  decltype(NetworkEngine::onGetValues) onGetValues,
215  decltype(NetworkEngine::onListen) onListen,
216  decltype(NetworkEngine::onAnnounce) onAnnounce,
217  decltype(NetworkEngine::onRefresh) onRefresh);
218 
219  virtual ~NetworkEngine();
220 
221  void clear();
222 
236  void tellListener(Sp<Node> n, Tid socket_id, const InfoHash& hash, want_t want, const Blob& ntoken,
237  std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
238  std::vector<Sp<Value>>&& values, const Query& q);
239 
240  void tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values);
241  void tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values);
242 
243  bool isRunning(sa_family_t af) const;
244  inline want_t want () const { return dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; }
245 
246  void connectivityChanged(sa_family_t);
247 
248  /**************
249  * Requests *
250  **************/
251 
261  Sp<Request>
262  sendPing(Sp<Node> n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
273  Sp<Request>
274  sendPing(const sockaddr* sa, socklen_t salen, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
275  return sendPing(std::make_shared<Node>(zeroes, sa, salen),
276  std::forward<RequestCb>(on_done),
277  std::forward<RequestExpiredCb>(on_expired));
278  }
291  Sp<Request> sendFindNode(Sp<Node> n,
292  const InfoHash& hash,
293  want_t want = -1,
294  RequestCb&& on_done = {},
295  RequestExpiredCb&& on_expired = {});
310  Sp<Request> sendGetValues(Sp<Node> n,
311  const InfoHash& hash,
312  const Query& query,
313  want_t want,
314  RequestCb&& on_done,
315  RequestExpiredCb&& on_expired);
339  Sp<Request> sendListen(Sp<Node> n,
340  const InfoHash& hash,
341  const Query& query,
342  const Blob& token,
343  Sp<Request> previous,
344  RequestCb&& on_done,
345  RequestExpiredCb&& on_expired,
346  SocketCb&& socket_cb);
360  Sp<Request> sendAnnounceValue(Sp<Node> n,
361  const InfoHash& hash,
362  const Sp<Value>& v,
363  time_point created,
364  const Blob& token,
365  RequestCb&& on_done,
366  RequestExpiredCb&& on_expired);
380  Sp<Request> sendRefreshValue(Sp<Node> n,
381  const InfoHash& hash,
382  const Value::Id& vid,
383  const Blob& token,
384  RequestCb&& on_done,
385  RequestExpiredCb&& on_expired);
386 
396  void processMessage(const uint8_t *buf, size_t buflen, const SockAddr& addr);
397 
398  Sp<Node> insertNode(const InfoHash& myid, const SockAddr& addr) {
399  auto n = cache.getNode(myid, addr, scheduler.time(), 0);
400  onNewNode(n, 0);
401  return n;
402  }
403 
404  std::vector<unsigned> getNodeMessageStats(bool in) {
405  auto& st = in ? in_stats : out_stats;
406  std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
407  st = {};
408  return stats;
409  }
410 
411  void blacklistNode(const Sp<Node>& n);
412 
413  std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) {
414  return cache.getCachedNodes(id, sa_f, count);
415  }
416 
417 private:
418 
419  struct PartialMessage;
420 
421  /***************
422  * Constants *
423  ***************/
424  static constexpr size_t MAX_REQUESTS_PER_SEC {1600};
425  /* the length of a node info buffer in ipv4 format */
426  static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
427  /* the length of a node info buffer in ipv6 format */
428  static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN + sizeof(in6_addr) + sizeof(in_port_t)};
429  /* after a UDP reply, the period during which we tell the link layer about it */
430  static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
431 
432  /* Max. time to receive a full fragmented packet */
433  static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
434  /* Max. time between packet fragments */
435  static constexpr std::chrono::seconds RX_TIMEOUT {3};
436  /* The maximum number of nodes that we snub. There is probably little
437  reason to increase this value. */
438  static constexpr unsigned BLACKLISTED_MAX {10};
439 
440  static constexpr size_t MTU {1280};
441  static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
442 
443  static const std::string my_v;
444 
445  void process(std::unique_ptr<ParsedMessage>&&, const SockAddr& from);
446 
447  bool rateLimit(const SockAddr& addr);
448 
449  static bool isMartian(const SockAddr& addr);
450  bool isNodeBlacklisted(const SockAddr& addr) const;
451 
452  void requestStep(Sp<Request> req);
453 
458  void sendRequest(const Sp<Request>& request);
459 
460  struct MessageStats {
461  unsigned ping {0};
462  unsigned find {0};
463  unsigned get {0};
464  unsigned put {0};
465  unsigned listen {0};
466  unsigned refresh {0};
467  };
468 
469 
470  // basic wrapper for socket sendto function
471  int send(const char *buf, size_t len, int flags, const SockAddr& addr);
472 
473  void sendValueParts(const TransId& tid, const std::vector<Blob>& svals, const SockAddr& addr);
474  std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<Sp<Value>>&);
475  void maintainRxBuffer(Tid tid);
476 
477  /*************
478  * Answers *
479  *************/
480  /* answer to a ping request */
481  void sendPong(const SockAddr& addr, Tid tid);
482  /* answer to findnodes/getvalues request */
483  void sendNodesValues(const SockAddr& addr,
484  Tid tid,
485  const Blob& nodes,
486  const Blob& nodes6,
487  const std::vector<Sp<Value>>& st,
488  const Query& query,
489  const Blob& token);
490  Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes);
491 
492  std::pair<Blob, Blob> bufferNodes(sa_family_t af,
493  const InfoHash& id,
494  want_t want,
495  std::vector<Sp<Node>>& nodes,
496  std::vector<Sp<Node>>& nodes6);
497  /* answer to a listen request */
498  void sendListenConfirmation(const SockAddr& addr, Tid tid);
499  /* answer to put request */
500  void sendValueAnnounced(const SockAddr& addr, Tid, Value::Id);
501  /* answer in case of error */
502  void sendError(const SockAddr& addr,
503  Tid tid,
504  uint16_t code,
505  const std::string& message,
506  bool include_id=false);
507 
508  void deserializeNodes(ParsedMessage& msg, const SockAddr& from);
509 
510  /* DHT info */
511  const InfoHash& myid;
512  const NetId network {0};
513  const int& dht_socket;
514  const int& dht_socket6;
515  const Logger& DHT_LOG;
516 
517  NodeCache cache {};
518 
519  // global limiting should be triggered by at least 8 different IPs
520  using IpLimiter = RateLimiter<MAX_REQUESTS_PER_SEC/8>;
521  using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
522  IpLimiterMap address_rate_limiter {};
523  RateLimiter<MAX_REQUESTS_PER_SEC> rate_limiter {};
524  size_t limiter_maintenance {0};
525 
526  // requests handling
527  std::map<Tid, Sp<Request>> requests {};
528  std::map<Tid, PartialMessage> partial_messages;
529 
530  MessageStats in_stats {}, out_stats {};
531  std::set<SockAddr> blacklist {};
532 
533  Scheduler& scheduler;
534 
535  bool logIncoming_ {false};
536 };
537 
538 } /* namespace net */
539 } /* namespace dht */
const time_point & time() const
Definition: scheduler.h:116
Job scheduler.
Definition: scheduler.h:37
Sp< Request > sendPing(Sp< Node > n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendRefreshValue(Sp< Node > n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(const sockaddr *sa, socklen_t salen, RequestCb &&on_done, RequestExpiredCb &&on_expired)
An abstraction of communication protocol on the network.
Sp< Request > sendListen(Sp< Node > n, const InfoHash &hash, const Query &query, const Blob &token, Sp< Request > previous, RequestCb &&on_done, RequestExpiredCb &&on_expired, SocketCb &&socket_cb)
Sp< Request > sendFindNode(Sp< Node > n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
std::vector< uint8_t > Blob
Definition: utils.h:114
Sp< Request > sendAnnounceValue(Sp< Node > n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Describes a query destined to another peer.
Definition: value.h:873
void tellListener(Sp< Node > n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node >> &&nodes, std::vector< Sp< Node >> &&nodes6, std::vector< Sp< Value >> &&values, const Query &q)
void processMessage(const uint8_t *buf, size_t buflen, const SockAddr &addr)
Definition: callbacks.h:34
Sp< Request > sendGetValues(Sp< Node > n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)