C++ Distributed Hash Table
dht_proxy_client.h
1 /*
2  * Copyright (C) 2016-2018 Savoir-faire Linux Inc.
3  * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #if OPENDHT_PROXY_CLIENT
21 
22 #pragma once
23 
24 #include <functional>
25 #include <thread>
26 #include <mutex>
27 
28 #include "callbacks.h"
29 #include "def.h"
30 #include "dht_interface.h"
31 #include "scheduler.h"
32 #include "proxy.h"
33 
34 namespace restbed {
35  class Request;
36 }
37 
38 namespace Json {
39  class Value;
40 }
41 
42 namespace dht {
43 
44 class SearchCache;
45 
46 class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
47 public:
48 
49  DhtProxyClient();
50 
51  explicit DhtProxyClient(std::function<void()> loopSignal, const std::string& serverHost, const std::string& pushClientId = "");
52 
53  virtual void setPushNotificationToken(const std::string& token) {
54 #if OPENDHT_PUSH_NOTIFICATIONS
55  deviceKey_ = token;
56 #endif
57  }
58 
59  virtual ~DhtProxyClient();
60 
64  inline const InfoHash& getNodeId() const { return myid; }
65 
69  NodeStatus getStatus(sa_family_t af) const;
70  NodeStatus getStatus() const {
71  return std::max(getStatus(AF_INET), getStatus(AF_INET6));
72  }
73 
77  void shutdown(ShutdownCallback cb);
78 
85  bool isRunning(sa_family_t af = 0) const;
86 
97  virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {});
98  virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) {
99  get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
100  }
101  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) {
102  get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
103  }
104  virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) {
105  get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
106  }
107 
115  void put(const InfoHash& key,
116  Sp<Value>,
117  DoneCallback cb=nullptr,
118  time_point created=time_point::max(),
119  bool permanent = false);
120  void put(const InfoHash& key,
121  const Sp<Value>& v,
122  DoneCallbackSimple cb,
123  time_point created=time_point::max(),
124  bool permanent = false)
125  {
126  put(key, v, bindDoneCb(cb), created, permanent);
127  }
128 
129  void put(const InfoHash& key,
130  Value&& v,
131  DoneCallback cb=nullptr,
132  time_point created=time_point::max(),
133  bool permanent = false)
134  {
135  put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
136  }
137  void put(const InfoHash& key,
138  Value&& v,
139  DoneCallbackSimple cb,
140  time_point created=time_point::max(),
141  bool permanent = false)
142  {
143  put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
144  }
145 
150  NodeStats getNodesStats(sa_family_t af) const;
151 
156  std::vector<SockAddr> getPublicAddress(sa_family_t family = 0);
157 
165  virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={});
166 
167  virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) {
168  return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
169  if (not expired)
170  return cb(vals);
171  return true;
172  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
173  }
174  virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) {
175  return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
176  }
177  virtual bool cancelListen(const InfoHash& key, size_t token);
178 
183  void pushNotificationReceived(const std::map<std::string, std::string>& notification);
184 
185  time_point periodic(const uint8_t*, size_t, const SockAddr&);
186  time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) {
187  return periodic(buf, buflen, SockAddr(from, fromlen));
188  }
189 
190 
201  virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) { }
202  virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) {
203  query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
204  }
205 
209  std::vector<Sp<Value>> getPut(const InfoHash&);
210 
214  Sp<Value> getPut(const InfoHash&, const Value::Id&);
215 
220  bool cancelPut(const InfoHash&, const Value::Id&);
221 
222  void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { }
223 
224  virtual void registerType(const ValueType& type) {
225  types.registerType(type);
226  }
227  const ValueType& getType(ValueType::Id type_id) const {
228  return types.getType(type_id);
229  }
230 
231  std::vector<Sp<Value>> getLocal(const InfoHash& k, Value::Filter filter) const;
232  Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const;
233 
238  void insertNode(const InfoHash&, const SockAddr&) { }
239  void insertNode(const InfoHash&, const sockaddr*, socklen_t) { }
240  void insertNode(const NodeExport&) { }
241  std::pair<size_t, size_t> getStoreSize() const { return {}; }
242  std::vector<NodeExport> exportNodes() { return {}; }
243  std::vector<ValuesExport> exportValues() const { return {}; }
244  void importValues(const std::vector<ValuesExport>&) {}
245  std::string getStorageLog() const { return {}; }
246  std::string getStorageLog(const InfoHash&) const { return {}; }
247  std::string getRoutingTablesLog(sa_family_t) const { return {}; }
248  std::string getSearchesLog(sa_family_t) const { return {}; }
249  std::string getSearchLog(const InfoHash&, sa_family_t) const { return {}; }
250  void dumpTables() const {}
251  std::vector<unsigned> getNodeMessageStats(bool) { return {}; }
252  void setStorageLimit(size_t) {}
253  void connectivityChanged(sa_family_t) {
254  restartListeners();
255  }
256  void connectivityChanged() {
257  getProxyInfos();
258  restartListeners();
259  loopSignal_();
260  }
261 
262 private:
266  void startProxy();
267 
272  struct InfoState;
273  void getProxyInfos();
274  void onProxyInfos(const Json::Value& val, sa_family_t family);
275  SockAddr parsePublicAddress(const Json::Value& val);
276 
277  void opFailed();
278 
279  size_t doListen(const InfoHash& key, ValueCallback, Value::Filter);
280  bool doCancelListen(const InfoHash& key, size_t token);
281 
282  struct ListenState;
283  void sendListen(const std::shared_ptr<restbed::Request>& request, const ValueCallback&, const Value::Filter& filter, const Sp<ListenState>& state);
284  void sendSubscribe(const std::shared_ptr<restbed::Request>& request, const Sp<proxy::ListenToken>&, const Sp<ListenState>& state);
285 
286  void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent);
287 
291  void getConnectivityStatus();
295  void cancelAllListeners();
299  void cancelAllOperations();
300 
301  std::string serverHost_;
302  std::string pushClientId_;
303 
304  mutable std::mutex lockCurrentProxyInfos_;
305  NodeStatus statusIpv4_ {NodeStatus::Disconnected};
306  NodeStatus statusIpv6_ {NodeStatus::Disconnected};
307  NodeStats stats4_ {};
308  NodeStats stats6_ {};
309  SockAddr publicAddressV4_;
310  SockAddr publicAddressV6_;
311 
312  InfoHash myid {};
313 
314  // registred types
315  TypeStore types;
316 
320  struct Listener;
321  struct ProxySearch;
322 
323  size_t listenerToken_ {0};
324  std::map<InfoHash, ProxySearch> searches_;
325  mutable std::mutex searchLock_;
326 
330  struct Operation
331  {
332  std::shared_ptr<restbed::Request> req;
333  std::thread thread;
334  std::shared_ptr<std::atomic_bool> finished;
335  };
336  std::vector<Operation> operations_;
337  std::mutex lockOperations_;
341  std::vector<std::function<void()>> callbacks_;
342  std::mutex lockCallbacks;
343 
344  Sp<InfoState> infoState_;
345  std::thread statusThread_;
346  mutable std::mutex statusLock_;
347 
348  Scheduler scheduler;
352  void confirmProxy();
353  Sp<Scheduler::Job> nextProxyConfirmation {};
354  Sp<Scheduler::Job> listenerRestart {};
355 
359  void restartListeners();
360 
365  void resubscribe(const InfoHash& key, Listener& listener);
366 
371  std::string deviceKey_ {};
372 
373  const std::function<void()> loopSignal_;
374 
375 #if OPENDHT_PUSH_NOTIFICATIONS
376  void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned token = 0);
377  void getPushRequest(Json::Value&) const;
378 #endif // OPENDHT_PUSH_NOTIFICATIONS
379 
380  std::atomic_bool isDestroying_ {false};
381 };
382 
383 }
384 
385 #endif // OPENDHT_PROXY_CLIENT
NodeStatus
Definition: callbacks.h:41
Definition: callbacks.h:34