00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <queue>
00022
00023 #include <oasys/util/OptParser.h>
00024 #include <oasys/util/StringBuffer.h>
00025 #include <oasys/util/TokenBucket.h>
00026
00027 #include "SimConvergenceLayer.h"
00028 #include "Connectivity.h"
00029 #include "Node.h"
00030 #include "Simulator.h"
00031 #include "Topology.h"
00032 #include "bundling/Bundle.h"
00033 #include "bundling/BundleEvent.h"
00034 #include "bundling/BundleList.h"
00035 #include "contacts/ContactManager.h"
00036
00037 namespace dtnsim {
00038
00039 class InFlightBundle;
00040
00041
00042 class SimLink : public CLInfo,
00043 public oasys::Logger {
00044 public:
00045 struct Params;
00046
00047 SimLink(const LinkRef& link,
00048 const SimLink::Params& params)
00049 : Logger("SimLink", "/dtn/cl/sim/%s", link->name()),
00050 link_(link.object(), "SimLink"),
00051 params_(params),
00052 tb_(((std::string)logpath_ + "/tb").c_str(),
00053 params_.capacity_,
00054 0xffffffff ),
00055 inflight_timer_(this, PendingEventTimer::INFLIGHT),
00056 arrival_timer_(this, PendingEventTimer::ARRIVAL),
00057 transmitted_timer_(this, PendingEventTimer::TRANSMITTED)
00058 {
00059 }
00060
00061 ~SimLink() {};
00062
00063 void start_next_bundle();
00064 void timeout(const oasys::Time& now);
00065 void handle_pending_inflight(const oasys::Time& now);
00066 void handle_arrival_events(const oasys::Time& now);
00067 void handle_transmitted_events(const oasys::Time& now);
00068 void reschedule_timers();
00069
00071 LinkRef link_;
00072
00073 struct Params {
00076 bool deliver_partial_;
00077
00081 bool reliable_;
00082
00084 u_int capacity_;
00085
00087 bool set_remote_eid_;
00088
00090 bool set_prevhop_;
00091
00092 } params_;
00093
00095 Node* peer_node_;
00096
00098 oasys::TokenBucket tb_;
00099
00101 u_char buf_[65536];
00102
00105 struct PendingEvent {
00106 PendingEvent(Bundle* bundle,
00107 size_t total_len,
00108 const oasys::Time& time)
00109 : bundle_(bundle, "SimCL::PendingEvent"),
00110 total_len_(total_len),
00111 time_(time) {}
00112
00113 BundleRef bundle_;
00114 size_t total_len_;
00115 oasys::Time time_;
00116 };
00117
00119 PendingEvent* pending_inflight_;
00120
00122 std::queue<PendingEvent*> arrival_events_;
00123
00125 std::queue<PendingEvent*> transmitted_events_;
00126
00128 class PendingEventTimer : public oasys::Timer {
00129 public:
00130 typedef enum { INFLIGHT, ARRIVAL, TRANSMITTED } type_t;
00131
00132 PendingEventTimer(SimLink* link, type_t type)
00133 : link_(link), type_(type) {}
00134
00135 void timeout(const timeval& now);
00136
00137 protected:
00138 SimLink* link_;
00139 type_t type_;
00140 };
00141
00144 PendingEventTimer inflight_timer_;
00145 PendingEventTimer arrival_timer_;
00146 PendingEventTimer transmitted_timer_;
00148 };
00149
00150
00151 void
00152 SimLink::start_next_bundle()
00153 {
00154 ASSERT(!link_->queue()->empty());
00155 ASSERT(pending_inflight_ == NULL);
00156
00157 Node* src_node = Node::active_node();
00158 ASSERT(src_node != peer_node_);
00159
00160 const ConnState* cs = Connectivity::instance()->lookup(src_node, peer_node_);
00161 ASSERT(cs);
00162
00163 BundleRef src_bundle("SimLink::start_next_bundle");
00164 src_bundle = link_->queue()->front();
00165
00166 BlockInfoVec* blocks = src_bundle->xmit_blocks()->find_blocks(link_);
00167 ASSERT(blocks != NULL);
00168
00169
00170
00171
00172 if (src_bundle->payload().location() == BundlePayload::NODATA) {
00173 BlockInfo* payload = const_cast<BlockInfo*>(
00174 blocks->find_block(BundleProtocol::PAYLOAD_BLOCK));
00175 ASSERT(payload != NULL);
00176 payload->set_data_length(0);
00177 }
00178
00179 bool complete = false;
00180 size_t len = BundleProtocol::produce(src_bundle.object(), blocks,
00181 buf_, 0, sizeof(buf_),
00182 &complete);
00183 ASSERTF(complete, "BundleProtocol non-payload blocks must fit in "
00184 "65 K buffer size");
00185
00186 size_t total_len = len;
00187
00188 if (src_bundle->payload().location() == BundlePayload::NODATA)
00189 total_len += src_bundle->payload().length();
00190
00191 complete = false;
00192 Bundle* dst_bundle = new Bundle(src_bundle->payload().location());
00193 int cc = BundleProtocol::consume(dst_bundle, buf_, len, &complete);
00194 ASSERT(cc == (int)len);
00195 ASSERT(complete);
00196
00197 if (src_bundle->payload().location() == BundlePayload::NODATA) {
00198 dst_bundle->mutable_payload()->set_length(src_bundle->payload().length());
00199 }
00200
00201 tb_.drain(total_len * 8);
00202
00203 oasys::Time bw_delay = tb_.time_to_level(0);
00204 oasys::Time inflight_time = oasys::Time(Simulator::time()) + bw_delay;
00205 oasys::Time arrival_time = inflight_time + cs->latency_;
00206 oasys::Time transmitted_time;
00207
00208
00209
00210 if (params_.reliable_) {
00211 transmitted_time = inflight_time + (cs->latency_ * 2);
00212 } else {
00213 transmitted_time = inflight_time;
00214 }
00215
00216 log_debug("send_bundle src %d dst %d: total len %zu, "
00217 "inflight_time %u.%u arrival_time %u.%u transmitted_time %u.%u",
00218 src_bundle->bundleid(), dst_bundle->bundleid(), total_len,
00219 inflight_time.sec_, inflight_time.usec_,
00220 arrival_time.sec_, arrival_time.usec_,
00221 transmitted_time.sec_, transmitted_time.usec_);
00222
00223 pending_inflight_ = new PendingEvent(src_bundle.object(), total_len, inflight_time);
00224 arrival_events_.push(new PendingEvent(dst_bundle, total_len, arrival_time));
00225 transmitted_events_.push(new PendingEvent(src_bundle.object(), total_len, transmitted_time));
00226
00227 reschedule_timers();
00228 }
00229
00230
00231 void
00232 SimLink::reschedule_timers()
00233 {
00234
00235
00236
00237
00238 if (! inflight_timer_.pending() && pending_inflight_ != NULL)
00239 {
00240 inflight_timer_.schedule_at(pending_inflight_->time_);
00241 }
00242
00243 if (! arrival_timer_.pending() && !arrival_events_.empty())
00244 {
00245 arrival_timer_.schedule_at(arrival_events_.front()->time_);
00246 }
00247
00248 if (! transmitted_timer_.pending() && !transmitted_events_.empty())
00249 {
00250 transmitted_timer_.schedule_at(transmitted_events_.front()->time_);
00251 }
00252 }
00253
00254
00255 void
00256 SimLink::PendingEventTimer::timeout(const timeval& tv)
00257 {
00258 oasys::Time now(tv.tv_sec, tv.tv_usec);
00259 switch (type_) {
00260 case INFLIGHT:
00261 link_->handle_pending_inflight(now);
00262 break;
00263 case ARRIVAL:
00264 link_->handle_arrival_events(now);
00265 break;
00266 case TRANSMITTED:
00267 link_->handle_transmitted_events(now);
00268 break;
00269 default:
00270 NOTREACHED;
00271 }
00272 }
00273
00274
00275 void
00276 SimLink::handle_pending_inflight(const oasys::Time& now)
00277 {
00278 ASSERT(pending_inflight_ != NULL);
00279
00280
00281 if (pending_inflight_->time_ <= now) {
00282 const BundleRef& bundle = pending_inflight_->bundle_;
00283
00284 log_debug("putting *%p in flight", bundle.object());
00285 link_->add_to_inflight(bundle, pending_inflight_->total_len_);
00286 link_->del_from_queue(bundle, pending_inflight_->total_len_);
00287
00288
00289
00290 delete pending_inflight_;
00291 pending_inflight_ = NULL;
00292
00293 if (! link_->queue()->empty()) {
00294 start_next_bundle();
00295 }
00296 }
00297
00298 reschedule_timers();
00299 }
00300
00301
00302 void
00303 SimLink::handle_arrival_events(const oasys::Time& now)
00304 {
00305 ASSERT(! arrival_events_.empty());
00306
00307
00308 while (! arrival_events_.empty()) {
00309 PendingEvent* next = arrival_events_.front();
00310 if (next->time_ <= now) {
00311 const BundleRef& bundle = next->bundle_;
00312 arrival_events_.pop();
00313
00314 log_debug("*%p arrived", bundle.object());
00315
00316 BundleReceivedEvent* rcv_event =
00317 new BundleReceivedEvent(bundle.object(),
00318 EVENTSRC_PEER,
00319 next->total_len_,
00320 params_.set_prevhop_ ?
00321 Node::active_node()->local_eid() :
00322 EndpointID::NULL_EID());
00323 peer_node_->post_event(rcv_event);
00324
00325 delete next;
00326
00327 } else {
00328 break;
00329 }
00330 }
00331
00332 reschedule_timers();
00333 }
00334
00335
00336 void
00337 SimLink::handle_transmitted_events(const oasys::Time& now)
00338 {
00339 ASSERT(! transmitted_events_.empty());
00340
00341
00342 while (! transmitted_events_.empty()) {
00343 PendingEvent* next = transmitted_events_.front();
00344 if (next->time_ <= now) {
00345 const BundleRef& bundle = next->bundle_;
00346 transmitted_events_.pop();
00347
00348 log_debug("*%p transmitted", bundle.object());
00349
00350 ASSERT(link_->contact() != NULL);
00351
00352 BundleTransmittedEvent* xmit_event =
00353 new BundleTransmittedEvent(bundle.object(), link_->contact(), link_,
00354 next->total_len_,
00355 params_.reliable_ ? next->total_len_ : 0);
00356 BundleDaemon::post(xmit_event);
00357
00358 delete next;
00359 } else {
00360 break;
00361 }
00362 }
00363
00364 reschedule_timers();
00365 }
00366
00367
00368 SimConvergenceLayer* SimConvergenceLayer::instance_;
00369
00370 SimConvergenceLayer::SimConvergenceLayer()
00371 : ConvergenceLayer("SimConvergenceLayer", "sim")
00372 {
00373 }
00374
00375
00376 bool
00377 SimConvergenceLayer::init_link(const LinkRef& link,
00378 int argc, const char* argv[])
00379 {
00380 ASSERT(link != NULL);
00381 ASSERT(!link->isdeleted());
00382 ASSERT(link->cl_info() == NULL);
00383
00384 oasys::OptParser p;
00385 SimLink::Params params;
00386
00387 params.deliver_partial_ = true;
00388 params.reliable_ = true;
00389 params.capacity_ = 0;
00390 params.set_remote_eid_ = true;
00391 params.set_prevhop_ = true;
00392
00393 p.addopt(new oasys::BoolOpt("deliver_partial", ¶ms.deliver_partial_));
00394 p.addopt(new oasys::BoolOpt("reliable", ¶ms.reliable_));
00395 p.addopt(new oasys::UIntOpt("capacity", ¶ms.capacity_));
00396 p.addopt(new oasys::BoolOpt("set_remote_eid", ¶ms.set_remote_eid_));
00397 p.addopt(new oasys::BoolOpt("set_prevhop", ¶ms.set_prevhop_));
00398
00399 const char* invalid;
00400 if (! p.parse(argc, argv, &invalid)) {
00401 log_err("error parsing link options: invalid option %s", invalid);
00402 return false;
00403 }
00404
00405 SimLink* sl = new SimLink(link, params);
00406 sl->peer_node_ = Topology::find_node(link->nexthop());
00407
00408 ASSERT(sl->peer_node_);
00409 link->set_cl_info(sl);
00410
00411 return true;
00412 }
00413
00414
00415 void
00416 SimConvergenceLayer::delete_link(const LinkRef& link)
00417 {
00418 ASSERT(link != NULL);
00419 ASSERT(!link->isdeleted());
00420 ASSERT(link->cl_info() != NULL);
00421
00422 log_debug("SimConvergenceLayer::delete_link: "
00423 "deleting link %s", link->name());
00424
00425 delete link->cl_info();
00426 link->set_cl_info(NULL);
00427 }
00428
00429
00430 bool
00431 SimConvergenceLayer::open_contact(const ContactRef& contact)
00432 {
00433 log_debug("opening contact for link [*%p]", contact.object());
00434
00435
00436 SimLink* sl = (SimLink*)contact->link()->cl_info();
00437 ASSERT(sl);
00438
00439 const ConnState* cs = Connectivity::instance()->
00440 lookup(Node::active_node(), sl->peer_node_);
00441 if (cs != NULL && cs->open_) {
00442 log_debug("opening contact");
00443 if (sl->params_.set_remote_eid_) {
00444 contact->link()->set_remote_eid(sl->peer_node_->local_eid());
00445 }
00446 update_connectivity(Node::active_node(), sl->peer_node_, *cs);
00447 BundleDaemon::post(new ContactUpEvent(contact));
00448
00449
00450 if (! contact->link()->queue()->empty()) {
00451 sl->start_next_bundle();
00452 }
00453
00454 } else {
00455 log_debug("connectivity is down when trying to open contact");
00456 BundleDaemon::post(
00457 new LinkStateChangeRequest(contact->link(),
00458 Link::CLOSED,
00459 ContactEvent::BROKEN));
00460 }
00461
00462 return true;
00463 }
00464
00465
00466 void
00467 SimConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
00468 {
00469 (void)bundle;
00470
00471 ASSERT(!link->isdeleted());
00472 ASSERT(link->cl_info() != NULL);
00473
00474 log_debug("bundle_queued *%p on link *%p", bundle.object(), link.object());
00475
00476 SimLink* sl = (SimLink*)link->cl_info();
00477 ASSERT(sl);
00478
00479 if (link->isopen() && (sl->pending_inflight_ == NULL)) {
00480 sl->start_next_bundle();
00481 }
00482 }
00483
00484
00485 void
00486 SimConvergenceLayer::update_connectivity(Node* n1, Node* n2, const ConnState& cs)
00487 {
00488 ASSERT(n1 != NULL);
00489 ASSERT(n2 != NULL);
00490
00491 n1->set_active();
00492
00493 ContactManager* cm = n1->contactmgr();;
00494
00495 oasys::ScopeLock l(cm->lock(), "SimConvergenceLayer::update_connectivity");
00496 const LinkSet* links = cm->links();
00497
00498 for (LinkSet::iterator iter = links->begin();
00499 iter != links->end();
00500 ++iter)
00501 {
00502 LinkRef link = *iter;
00503 SimLink* sl = (SimLink*)link->cl_info();
00504 ASSERT(sl);
00505
00506
00507 sl->tb_.set_rate(cs.bw_);
00508
00509 if (sl->peer_node_ != n2)
00510 continue;
00511
00512 log_debug("update_connectivity: checking node %s link %s",
00513 n1->name(), link->name());
00514
00515 if (cs.open_ == false && link->state() == Link::OPEN) {
00516 log_debug("update_connectivity: closing link %s", link->name());
00517 n1->post_event(
00518 new LinkStateChangeRequest(link, Link::CLOSED,
00519 ContactEvent::BROKEN));
00520 }
00521 }
00522 }
00523
00524
00525 }