C++ Distributed Hash Table
dht_proxy_server.h
1 /*
2  * Copyright (C) 2017-2018 Savoir-faire Linux Inc.
3  * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #if OPENDHT_PROXY_SERVER
21 
22 #pragma once
23 
24 #include "callbacks.h"
25 #include "def.h"
26 #include "infohash.h"
27 #include "proxy.h"
28 #include "scheduler.h"
29 #include "sockaddr.h"
30 #include "value.h"
31 
32 #include <thread>
33 #include <memory>
34 #include <mutex>
35 #include <restbed>
36 
37 #ifdef OPENDHT_JSONCPP
38 #include <json/json.h>
39 #endif
40 
41 namespace Json {
42  class Value;
43 }
44 
45 namespace dht {
46 
47 class DhtRunner;
48 
52 class OPENDHT_PUBLIC DhtProxyServer
53 {
54 public:
63  DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port = 8000, const std::string& pushServer = "");
64  virtual ~DhtProxyServer();
65 
66  DhtProxyServer(const DhtProxyServer& other) = delete;
67  DhtProxyServer(DhtProxyServer&& other) = delete;
68  DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
69  DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
70 
71  struct ServerStats {
73  size_t listenCount;
75  size_t putCount;
77  size_t pushListenersCount;
79  double requestRate;
81  NodeInfo nodeInfo;
82 
83  std::string toString() const {
84  std::ostringstream ss;
85  ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
86  ss << "Requests: " << requestRate << " per second." << std::endl;
87  auto& ni = nodeInfo;
88  auto& ipv4 = ni.ipv4;
89  if (ipv4.table_depth > 1) {
90  ss << "IPv4 Network estimation: " << ipv4.getNetworkSizeEstimation() << std::endl;;
91  }
92  auto& ipv6 = ni.ipv6;
93  if (ipv6.table_depth > 1) {
94  ss << "IPv6 Network estimation: " << ipv6.getNetworkSizeEstimation() << std::endl;;
95  }
96  return ss.str();
97  }
98 
99 #ifdef OPENDHT_JSONCPP
100 
103  Json::Value toJson() const {
104  Json::Value result;
105  result["listenCount"] = static_cast<Json::UInt64>(listenCount);
106  result["putCount"] = static_cast<Json::UInt64>(putCount);
107  result["pushListenersCount"] = static_cast<Json::UInt64>(pushListenersCount);
108  result["requestRate"] = requestRate;
109  result["nodeInfo"] = nodeInfo.toJson();
110  return result;
111  }
112 #endif
113  };
114 
115  ServerStats stats() const { return stats_; }
116 
117  void updateStats() const;
118 
119  std::shared_ptr<DhtRunner> getNode() const { return dht_; }
120 
124  void stop();
125 
126 private:
134  void getNodeInfo(const std::shared_ptr<restbed::Session>& session) const;
135 
142  void getStats(const std::shared_ptr<restbed::Session>& session) const;
143 
154  void get(const std::shared_ptr<restbed::Session>& session) const;
155 
166  void listen(const std::shared_ptr<restbed::Session>& session);
167 
177  void put(const std::shared_ptr<restbed::Session>& session);
178 
179  void cancelPut(const InfoHash& key, Value::Id vid);
180 
181 #if OPENDHT_PROXY_SERVER_IDENTITY
182 
191  void putSigned(const std::shared_ptr<restbed::Session>& session) const;
192 
202  void putEncrypted(const std::shared_ptr<restbed::Session>& session) const;
203 #endif // OPENDHT_PROXY_SERVER_IDENTITY
204 
215  void getFiltered(const std::shared_ptr<restbed::Session>& session) const;
216 
224  void handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const;
225 
230  void removeClosedListeners(bool testSession = true);
231 
232 #if OPENDHT_PUSH_NOTIFICATIONS
233 
242  void subscribe(const std::shared_ptr<restbed::Session>& session);
250  void unsubscribe(const std::shared_ptr<restbed::Session>& session);
256  void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const;
257 
264  void cancelPushListen(const std::string& pushToken, const InfoHash& key, const std::string& clientId);
265 
266 
267 #endif //OPENDHT_PUSH_NOTIFICATIONS
268 
269  using clock = std::chrono::steady_clock;
270  using time_point = clock::time_point;
271 
272  std::thread server_thread {};
273  std::unique_ptr<restbed::Service> service_;
274  std::shared_ptr<DhtRunner> dht_;
275 
276  std::mutex schedulerLock_;
277  std::condition_variable schedulerCv_;
278  Scheduler scheduler_;
279  std::thread schedulerThread_;
280 
281  Sp<Scheduler::Job> printStatsJob_;
282  mutable std::mutex statsMutex_;
283  mutable NodeInfo nodeInfo_ {};
284 
285  // Handle client quit for listen.
286  // NOTE: can be simplified when we will supports restbed 5.0
287  std::thread listenThread_;
288  struct SessionToHashToken {
289  std::shared_ptr<restbed::Session> session;
290  InfoHash hash;
291  std::future<size_t> token;
292  };
293  std::vector<SessionToHashToken> currentListeners_;
294  std::mutex lockListener_;
295  std::atomic_bool stopListeners {false};
296 
297  struct PermanentPut;
298  struct SearchPuts;
299  std::map<InfoHash, SearchPuts> puts_;
300 
301  mutable std::atomic<size_t> requestNum_ {0};
302  mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
303 
304  const std::string pushServer_;
305 
306  mutable ServerStats stats_;
307 
308 #if OPENDHT_PUSH_NOTIFICATIONS
309  struct Listener;
310  struct PushListener;
311  std::mutex lockPushListeners_;
312  std::map<std::string, PushListener> pushListeners_;
313  proxy::ListenToken tokenPushNotif_ {0};
314 #endif //OPENDHT_PUSH_NOTIFICATIONS
315 };
316 
317 }
318 
319 #endif //OPENDHT_PROXY_SERVER
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
Definition: callbacks.h:34