25 #include "callbacks.h" 27 #include "log_enable.h" 33 #include <condition_variable> 43 struct SecureDhtConfig;
54 typedef std::function<void(NodeStatus, NodeStatus)> StatusCallback;
59 std::string proxy_server;
60 std::string push_node_id;
67 get(id, bindGetCb(cb), donecb, f, w);
70 void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
71 get(id, bindGetCb(cb), donecb, f, w);
74 void get(InfoHash
hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
76 void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
77 get(id, cb, bindDoneCb(donecb), f, w);
79 void get(
const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter(), Where w = {});
82 void get(InfoHash
hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
84 get(
hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
85 return cb(unpackVector<T>(vals));
91 void get(InfoHash
hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
93 get(
hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
94 for (
const auto& v : vals) {
96 if (not cb(Value::unpack<T>(*v)))
98 }
catch (
const std::exception&) {
108 std::future<std::vector<std::shared_ptr<dht::Value>>>
get(InfoHash key, Value::Filter f = Value::AllFilter(), Where w = {}) {
109 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
110 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
111 get(key, [=](
const std::vector<std::shared_ptr<dht::Value>>& vlist) {
112 values->insert(values->end(), vlist.begin(), vlist.end());
115 p->set_value(std::move(*values));
118 return p->get_future();
122 std::future<std::vector<T>>
get(InfoHash key) {
123 auto p = std::make_shared<std::promise<std::vector<T>>>();
124 auto values = std::make_shared<std::vector<T>>();
125 get<T>(key, [=](T&& v) {
126 values->emplace_back(std::move(v));
129 p->set_value(std::move(*values));
131 return p->get_future();
134 void query(
const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
135 void query(
const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
136 query(hash, cb, bindDoneCb(done_cb), q);
139 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
141 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
142 return listen(key, [cb](
const std::vector<Sp<Value>>& vals,
bool expired){
146 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
148 std::future<size_t> listen(
const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
149 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter(), Where w = {}) {
150 return listen(key, bindGetCb(cb), f, w);
154 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb)
156 return listen(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
157 return cb(unpackVector<T>(vals));
162 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&,
bool)> cb)
164 return listen(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
165 return cb(unpackVector<T>(vals), expired);
170 template <
typename T>
171 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
173 return listen(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
174 for (
const auto& v : vals) {
176 if (not cb(Value::unpack<T>(*v)))
178 }
catch (
const std::exception&) {
184 getFilterSet<T>(f), w);
186 template <
typename T>
187 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&,
bool)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
189 return listen(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
190 for (
const auto& v : vals) {
192 if (not cb(Value::unpack<T>(*v), expired))
194 }
catch (
const std::exception&) {
200 getFilterSet<T>(f), w);
203 void cancelListen(InfoHash h,
size_t token);
204 void cancelListen(InfoHash h, std::shared_future<size_t> token);
206 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
207 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
208 put(hash, value, bindDoneCb(cb), created, permanent);
211 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
212 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
213 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
215 void put(
const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(),
bool permanent =
false);
217 void cancelPut(
const InfoHash& h,
const Value::Id&
id);
219 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={});
220 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
221 putSigned(hash, value, bindDoneCb(cb));
224 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={});
225 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb) {
226 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb));
228 void putSigned(
const std::string& key, Value&& value, DoneCallbackSimple cb={});
230 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={});
231 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
232 putEncrypted(hash, to, value, bindDoneCb(cb));
235 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={});
236 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb) {
237 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb));
239 void putEncrypted(
const std::string& key, InfoHash to, Value&& value, DoneCallback cb={});
245 void bootstrap(
const std::vector<SockAddr>& nodes, DoneCallbackSimple&& cb={});
246 void bootstrap(
const SockAddr& addr, DoneCallbackSimple&& cb={});
252 void bootstrap(
const std::vector<NodeExport>& nodes);
260 void bootstrap(
const std::string& host,
const std::string& service);
265 void clearBootstrap();
272 void connectivityChanged();
274 void dumpTables()
const;
279 InfoHash getId()
const;
284 InfoHash getNodeId()
const;
291 return (f == AF_INET) ? bound4 : bound6;
299 return getBound(f).getPort();
302 std::pair<size_t, size_t> getStoreSize()
const;
304 void setStorageLimit(
size_t limit = DEFAULT_STORAGE_LIMIT);
306 std::vector<NodeExport> exportNodes()
const;
308 std::vector<ValuesExport> exportValues()
const;
315 void setLogFilter(
const InfoHash& f = {});
317 void registerType(
const ValueType& type);
319 void importValues(
const std::vector<ValuesExport>& values);
321 bool isRunning()
const {
325 NodeStats getNodesStats(sa_family_t af)
const;
326 unsigned getNodesStats(sa_family_t af,
unsigned *good_return,
unsigned *dubious_return,
unsigned *cached_return,
unsigned *incoming_return)
const;
327 NodeInfo getNodeInfo()
const;
329 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
const;
330 std::string getStorageLog()
const;
331 std::string getStorageLog(
const InfoHash&)
const;
332 std::string getRoutingTablesLog(sa_family_t af)
const;
333 std::string getSearchesLog(sa_family_t af = AF_UNSPEC)
const;
334 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const;
335 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
336 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
340 void findCertificate(InfoHash hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>)>);
341 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
342 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
350 void run(in_port_t port = 4222,
const crypto::Identity identity = {},
bool threaded =
false, NetId network = 0) {
366 void run(in_port_t port, Config config);
376 void run(
const SockAddr& local4,
const SockAddr& local6, Config config);
381 void run(
const char* ip4,
const char* ip6,
const char* service, Config config);
383 void setOnStatusChanged(StatusCallback&& cb) {
384 statusCb = std::move(cb);
393 std::lock_guard<std::mutex> lck(dht_mtx);
394 time_point wakeup = time_point::min();
398 startNetwork(bound4, bound6);
406 void shutdown(ShutdownCallback cb);
415 void setProxyServer(
const std::string& proxy,
const std::string& pushNodeId =
"");
422 void enableProxy(
bool proxify);
429 void setPushNotificationToken(
const std::string& token);
434 void pushNotificationReceived(
const std::map<std::string, std::string>& data);
437 void forwardAllMessages(
bool forward);
440 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
448 void tryBootstrapContinuously();
451 void startNetwork(
const SockAddr sin4,
const SockAddr sin6);
455 return std::max(status4, status6);
459 std::unique_ptr<SecureDht> dht_;
462 std::unique_ptr<SecureDht> dht_via_proxy_;
465 std::atomic_bool use_proxy {
false};
477 SecureDht* activeDht()
const;
483 std::map<size_t, Listener> listeners_;
484 size_t listener_token_ {1};
486 mutable std::mutex dht_mtx {};
487 std::thread dht_thread {};
488 std::condition_variable cv {};
490 std::thread rcv_thread {};
491 std::mutex sock_mtx {};
493 struct ReceivedPacket {
498 std::queue<ReceivedPacket> rcv {};
501 std::atomic_bool bootstraping {
false};
503 std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {};
504 std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
505 std::thread bootstrap_thread {};
507 std::mutex bootstrap_mtx {};
508 std::condition_variable bootstrap_cv {};
510 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
511 std::queue<std::function<void(SecureDht&)>> pending_ops {};
512 std::mutex storage_mtx {};
514 std::atomic_bool running {
false};
515 std::atomic_bool running_network {
false};
517 NodeStatus status4 {NodeStatus::Disconnected},
518 status6 {NodeStatus::Disconnected};
519 StatusCallback statusCb {
nullptr};
521 int stop_writefd {-1};
522 int s4 {-1}, s6 {-1};
527 std::string pushToken_;
void NOLOG(char const *, va_list)
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
void run(in_port_t port=4222, const crypto::Identity identity={}, bool threaded=false, NetId network=0)
std::vector< uint8_t > Blob
in_port_t getBoundPort(sa_family_t f=AF_INET) const
Serializable dht::Value filter.
const SockAddr & getBound(sa_family_t f=AF_INET) const