22 #include "node_cache.h" 26 #include "scheduler.h" 29 #include "rate_limiter.h" 52 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203};
53 static const constexpr uint16_t UNAUTHORIZED {401};
54 static const constexpr uint16_t NOT_FOUND {404};
56 static const constexpr uint16_t INVALID_TID_SIZE {421};
57 static const constexpr uint16_t UNKNOWN_TID {422};
58 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423};
60 static const std::string GET_NO_INFOHASH;
61 static const std::string LISTEN_NO_INFOHASH;
62 static const std::string LISTEN_WRONG_TOKEN;
63 static const std::string PUT_NO_INFOHASH;
64 static const std::string PUT_WRONG_TOKEN;
65 static const std::string STORAGE_NOT_FOUND;
66 static const std::string PUT_INVALID_ID;
69 :
DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
71 std::string getMsg()
const {
return msg; }
72 uint16_t getCode()
const {
return code; }
73 const InfoHash getNodeId()
const {
return failing_node_id; }
89 std::vector<Sp<Value>> values {};
90 std::vector<Value::Id> refreshed_values {};
91 std::vector<Value::Id> expired_values {};
92 std::vector<Sp<FieldValueIndex>> fields {};
93 std::vector<Sp<Node>> nodes4 {};
94 std::vector<Sp<Node>> nodes6 {};
131 std::function<void(const Sp<Node>&, int)> onNewNode;
138 std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
144 std::function<RequestAnswer(Sp<Node>)> onPing {};
153 std::function<RequestAnswer(Sp<Node>,
const InfoHash&, want_t)> onFindNode {};
162 std::function<RequestAnswer(Sp<Node>,
const InfoHash&, want_t,
const Query&)> onGetValues {};
171 std::function<RequestAnswer(Sp<Node>,
175 const Query&)> onListen {};
185 std::function<RequestAnswer(Sp<Node>,
188 const std::vector<Sp<Value>>&,
189 const time_point&)> onAnnounce {};
198 std::function<RequestAnswer(Sp<Node>,
201 const Value::Id&)> onRefresh {};
204 using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
205 using RequestExpiredCb = std::function<void(const Request&, bool)>;
209 decltype(NetworkEngine::onError) onError,
210 decltype(NetworkEngine::onNewNode) onNewNode,
211 decltype(NetworkEngine::onReportedAddr) onReportedAddr,
212 decltype(NetworkEngine::onPing) onPing,
213 decltype(NetworkEngine::onFindNode) onFindNode,
214 decltype(NetworkEngine::onGetValues) onGetValues,
215 decltype(NetworkEngine::onListen) onListen,
216 decltype(NetworkEngine::onAnnounce) onAnnounce,
217 decltype(NetworkEngine::onRefresh) onRefresh);
237 std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
238 std::vector<Sp<Value>>&& values,
const Query& q);
240 void tellListenerRefreshed(Sp<Node> n, Tid socket_id,
const InfoHash& hash,
const Blob& ntoken,
const std::vector<Value::Id>& values);
241 void tellListenerExpired(Sp<Node> n, Tid socket_id,
const InfoHash& hash,
const Blob& ntoken,
const std::vector<Value::Id>& values);
243 bool isRunning(sa_family_t af)
const;
244 inline want_t want ()
const {
return dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; }
246 void connectivityChanged(sa_family_t);
262 sendPing(Sp<Node> n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
274 sendPing(
const sockaddr* sa, socklen_t salen, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
275 return sendPing(std::make_shared<Node>(zeroes, sa, salen),
276 std::forward<RequestCb>(on_done),
277 std::forward<RequestExpiredCb>(on_expired));
294 RequestCb&& on_done = {},
295 RequestExpiredCb&& on_expired = {});
311 const InfoHash& hash,
315 RequestExpiredCb&& on_expired);
340 const InfoHash& hash,
343 Sp<Request> previous,
345 RequestExpiredCb&& on_expired,
346 SocketCb&& socket_cb);
361 const InfoHash& hash,
366 RequestExpiredCb&& on_expired);
381 const InfoHash& hash,
382 const Value::Id& vid,
385 RequestExpiredCb&& on_expired);
396 void processMessage(
const uint8_t *buf,
size_t buflen,
const SockAddr& addr);
398 Sp<Node> insertNode(
const InfoHash& myid,
const SockAddr& addr) {
399 auto n = cache.getNode(myid, addr, scheduler.
time(), 0);
404 std::vector<unsigned> getNodeMessageStats(
bool in) {
405 auto& st = in ? in_stats : out_stats;
406 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
411 void blacklistNode(
const Sp<Node>& n);
413 std::vector<Sp<Node>> getCachedNodes(
const InfoHash&
id, sa_family_t sa_f,
size_t count) {
414 return cache.getCachedNodes(
id, sa_f, count);
419 struct PartialMessage;
424 static constexpr
size_t MAX_REQUESTS_PER_SEC {1600};
426 static const constexpr
size_t NODE4_INFO_BUF_LEN {HASH_LEN +
sizeof(in_addr) +
sizeof(in_port_t)};
428 static const constexpr
size_t NODE6_INFO_BUF_LEN {HASH_LEN +
sizeof(in6_addr) +
sizeof(in_port_t)};
430 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
433 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
435 static constexpr std::chrono::seconds RX_TIMEOUT {3};
438 static constexpr
unsigned BLACKLISTED_MAX {10};
440 static constexpr
size_t MTU {1280};
441 static constexpr
size_t MAX_PACKET_VALUE_SIZE {600};
443 static const std::string my_v;
445 void process(std::unique_ptr<ParsedMessage>&&,
const SockAddr& from);
447 bool rateLimit(
const SockAddr& addr);
449 static bool isMartian(
const SockAddr& addr);
450 bool isNodeBlacklisted(
const SockAddr& addr)
const;
452 void requestStep(Sp<Request> req);
458 void sendRequest(
const Sp<Request>& request);
460 struct MessageStats {
466 unsigned refresh {0};
471 int send(
const char *buf,
size_t len,
int flags,
const SockAddr& addr);
473 void sendValueParts(
const TransId& tid,
const std::vector<Blob>& svals,
const SockAddr& addr);
474 std::vector<Blob> packValueHeader(msgpack::sbuffer&,
const std::vector<Sp<Value>>&);
475 void maintainRxBuffer(Tid tid);
481 void sendPong(
const SockAddr& addr, Tid tid);
483 void sendNodesValues(
const SockAddr& addr,
487 const std::vector<Sp<Value>>& st,
490 Blob bufferNodes(sa_family_t af,
const InfoHash&
id, std::vector<Sp<Node>>& nodes);
492 std::pair<Blob, Blob> bufferNodes(sa_family_t af,
495 std::vector<Sp<Node>>& nodes,
496 std::vector<Sp<Node>>& nodes6);
498 void sendListenConfirmation(
const SockAddr& addr, Tid tid);
500 void sendValueAnnounced(
const SockAddr& addr, Tid, Value::Id);
502 void sendError(
const SockAddr& addr,
505 const std::string& message,
506 bool include_id=
false);
508 void deserializeNodes(ParsedMessage& msg,
const SockAddr& from);
511 const InfoHash& myid;
512 const NetId network {0};
513 const int& dht_socket;
514 const int& dht_socket6;
515 const Logger& DHT_LOG;
520 using IpLimiter = RateLimiter<MAX_REQUESTS_PER_SEC/8>;
521 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
522 IpLimiterMap address_rate_limiter {};
523 RateLimiter<MAX_REQUESTS_PER_SEC> rate_limiter {};
524 size_t limiter_maintenance {0};
527 std::map<Tid, Sp<Request>> requests {};
528 std::map<Tid, PartialMessage> partial_messages;
530 MessageStats in_stats {}, out_stats {};
531 std::set<SockAddr> blacklist {};
533 Scheduler& scheduler;
535 bool logIncoming_ {
false};
const time_point & time() const
Sp< Request > sendPing(Sp< Node > n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendRefreshValue(Sp< Node > n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(const sockaddr *sa, socklen_t salen, RequestCb &&on_done, RequestExpiredCb &&on_expired)
An abstraction of communication protocol on the network.
Sp< Request > sendListen(Sp< Node > n, const InfoHash &hash, const Query &query, const Blob &token, Sp< Request > previous, RequestCb &&on_done, RequestExpiredCb &&on_expired, SocketCb &&socket_cb)
Sp< Request > sendFindNode(Sp< Node > n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
std::vector< uint8_t > Blob
Sp< Request > sendAnnounceValue(Sp< Node > n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Describes a query destined to another peer.
void tellListener(Sp< Node > n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node >> &&nodes, std::vector< Sp< Node >> &&nodes6, std::vector< Sp< Value >> &&values, const Query &q)
void processMessage(const uint8_t *buf, size_t buflen, const SockAddr &addr)
Sp< Request > sendGetValues(Sp< Node > n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)