00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/io/IO.h>
00022 #include <oasys/tclcmd/TclCommand.h>
00023 #include <oasys/util/Time.h>
00024
00025 #include "Bundle.h"
00026 #include "BundleActions.h"
00027 #include "BundleEvent.h"
00028 #include "BundleDaemon.h"
00029 #include "BundleStatusReport.h"
00030 #include "BundleTimestamp.h"
00031 #include "CustodySignal.h"
00032 #include "ExpirationTimer.h"
00033 #include "FragmentManager.h"
00034 #include "contacts/Link.h"
00035 #include "contacts/Contact.h"
00036 #include "contacts/ContactManager.h"
00037 #include "conv_layers/ConvergenceLayer.h"
00038 #include "reg/AdminRegistration.h"
00039 #include "reg/APIRegistration.h"
00040 #include "reg/PingRegistration.h"
00041 #include "reg/Registration.h"
00042 #include "reg/RegistrationTable.h"
00043 #include "routing/BundleRouter.h"
00044 #include "routing/RouteTable.h"
00045 #include "session/Session.h"
00046 #include "storage/BundleStore.h"
00047 #include "storage/RegistrationStore.h"
00048
00049 #ifdef BSP_ENABLED
00050 # include "security/Ciphersuite.h"
00051 # include "security/SPD.h"
00052 # include "security/KeyDB.h"
00053 #endif
00054
00055 namespace dtn {
00056
00057 template <>
00058 BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL;
00059
00060 BundleDaemon::Params::Params()
00061 : early_deletion_(true),
00062 suppress_duplicates_(true),
00063 accept_custody_(true),
00064 reactive_frag_enabled_(true),
00065 retry_reliable_unacked_(true),
00066 test_permuted_delivery_(false),
00067 injected_bundles_in_memory_(false) {}
00068
00069 BundleDaemon::Params BundleDaemon::params_;
00070
00071 bool BundleDaemon::shutting_down_ = false;
00072
00073
00074 BundleDaemon::BundleDaemon()
00075 : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
00076 Thread("BundleDaemon", CREATE_JOINABLE)
00077 {
00078
00079 local_eid_.assign(EndpointID::NULL_EID());
00080
00081 actions_ = NULL;
00082 eventq_ = NULL;
00083
00084 memset(&stats_, 0, sizeof(stats_));
00085
00086 pending_bundles_ = new BundleList("pending_bundles");
00087 custody_bundles_ = new BundleList("custody_bundles");
00088
00089 contactmgr_ = new ContactManager();
00090 fragmentmgr_ = new FragmentManager();
00091 reg_table_ = new RegistrationTable();
00092
00093 router_ = 0;
00094
00095 app_shutdown_proc_ = NULL;
00096 app_shutdown_data_ = NULL;
00097
00098 rtr_shutdown_proc_ = 0;
00099 rtr_shutdown_data_ = 0;
00100 }
00101
00102
00103 BundleDaemon::~BundleDaemon()
00104 {
00105 delete pending_bundles_;
00106 delete custody_bundles_;
00107
00108 delete contactmgr_;
00109 delete fragmentmgr_;
00110 delete reg_table_;
00111 delete router_;
00112
00113 delete actions_;
00114 delete eventq_;
00115 }
00116
00117
00118 void
00119 BundleDaemon::do_init()
00120 {
00121 actions_ = new BundleActions();
00122 eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
00123 eventq_->notify_when_empty();
00124 BundleProtocol::init_default_processors();
00125 #ifdef BSP_ENABLED
00126 Ciphersuite::init_default_ciphersuites();
00127 SPD::init();
00128 KeyDB::init();
00129 #endif
00130 }
00131
00132
00133 void
00134 BundleDaemon::post(BundleEvent* event)
00135 {
00136 instance_->post_event(event);
00137 }
00138
00139
00140 void
00141 BundleDaemon::post_at_head(BundleEvent* event)
00142 {
00143 instance_->post_event(event, false);
00144 }
00145
00146
00147 bool
00148 BundleDaemon::post_and_wait(BundleEvent* event,
00149 oasys::Notifier* notifier,
00150 int timeout, bool at_back)
00151 {
00152
00153
00154
00155
00156 ASSERT(! oasys::Thread::start_barrier_enabled());
00157
00158 ASSERT(event->processed_notifier_ == NULL);
00159 event->processed_notifier_ = notifier;
00160 if (at_back) {
00161 post(event);
00162 } else {
00163 post_at_head(event);
00164 }
00165 return notifier->wait(NULL, timeout);
00166 }
00167
00168
00169 void
00170 BundleDaemon::post_event(BundleEvent* event, bool at_back)
00171 {
00172 log_debug("posting event (%p) with type %s (at %s)",
00173 event, event->type_str(), at_back ? "back" : "head");
00174 event->posted_time_.get_time();
00175 eventq_->push(event, at_back);
00176 }
00177
00178
00179 void
00180 BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
00181 {
00182 router_->get_routing_state(buf);
00183 contactmgr_->dump(buf);
00184 }
00185
00186
00187 void
00188 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
00189 {
00190 buf->appendf("%zu pending -- "
00191 "%zu custody -- "
00192 "%u received -- "
00193 "%u delivered -- "
00194 "%u generated -- "
00195 "%u transmitted -- "
00196 "%u expired -- "
00197 "%u duplicate -- "
00198 "%u deleted -- "
00199 "%u injected",
00200 pending_bundles()->size(),
00201 custody_bundles()->size(),
00202 stats_.received_bundles_,
00203 stats_.delivered_bundles_,
00204 stats_.generated_bundles_,
00205 stats_.transmitted_bundles_,
00206 stats_.expired_bundles_,
00207 stats_.duplicate_bundles_,
00208 stats_.deleted_bundles_,
00209 stats_.injected_bundles_);
00210 }
00211
00212
00213 void
00214 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
00215 {
00216 buf->appendf("%zu pending_events -- "
00217 "%u processed_events -- "
00218 "%zu pending_timers",
00219 event_queue_size(),
00220 stats_.events_processed_,
00221 oasys::TimerSystem::instance()->num_pending_timers());
00222 }
00223
00224
00225
00226 void
00227 BundleDaemon::reset_stats()
00228 {
00229 memset(&stats_, 0, sizeof(stats_));
00230
00231 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
00232
00233 const LinkSet* links = contactmgr_->links();
00234 LinkSet::const_iterator iter;
00235 for (iter = links->begin(); iter != links->end(); ++iter) {
00236 (*iter)->reset_stats();
00237 }
00238 }
00239
00240
00241 void
00242 BundleDaemon::generate_status_report(Bundle* orig_bundle,
00243 BundleStatusReport::flag_t flag,
00244 status_report_reason_t reason)
00245 {
00246 log_debug("generating return receipt status report, "
00247 "flag = 0x%x, reason = 0x%x", flag, reason);
00248
00249 Bundle* report = new Bundle();
00250 BundleStatusReport::create_status_report(report, orig_bundle,
00251 local_eid_, flag, reason);
00252
00253 BundleReceivedEvent e(report, EVENTSRC_ADMIN);
00254 handle_event(&e);
00255 }
00256
00257
00258 void
00259 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
00260 custody_signal_reason_t reason)
00261 {
00262 if (bundle->local_custody()) {
00263 log_err("send_custody_signal(*%p): already have local custody",
00264 bundle);
00265 return;
00266 }
00267
00268 if (bundle->custodian().equals(EndpointID::NULL_EID())) {
00269 log_err("send_custody_signal(*%p): current custodian is NULL_EID",
00270 bundle);
00271 return;
00272 }
00273
00274 Bundle* signal = new Bundle();
00275 CustodySignal::create_custody_signal(signal, bundle, local_eid_,
00276 succeeded, reason);
00277
00278 BundleReceivedEvent e(signal, EVENTSRC_ADMIN);
00279 handle_event(&e);
00280 }
00281
00282
00283 void
00284 BundleDaemon::cancel_custody_timers(Bundle* bundle)
00285 {
00286 oasys::ScopeLock l(bundle->lock(), "BundleDaemon::cancel_custody_timers");
00287
00288 CustodyTimerVec::iterator iter;
00289 for (iter = bundle->custody_timers()->begin();
00290 iter != bundle->custody_timers()->end();
00291 ++iter)
00292 {
00293 bool ok = (*iter)->cancel();
00294 if (!ok) {
00295 log_crit("unexpected error cancelling custody timer for bundle *%p",
00296 bundle);
00297 }
00298
00299
00300
00301 }
00302
00303 bundle->custody_timers()->clear();
00304 }
00305
00306
00307 void
00308 BundleDaemon::accept_custody(Bundle* bundle)
00309 {
00310 log_info("accept_custody *%p", bundle);
00311
00312 if (bundle->local_custody()) {
00313 log_err("accept_custody(*%p): already have local custody",
00314 bundle);
00315 return;
00316 }
00317
00318 if (bundle->custodian().equals(local_eid_)) {
00319 log_err("send_custody_signal(*%p): "
00320 "current custodian is already local_eid",
00321 bundle);
00322 return;
00323 }
00324
00325
00326
00327 if (! bundle->custodian().equals(EndpointID::NULL_EID())) {
00328 generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00329 }
00330
00331
00332
00333 bundle->mutable_custodian()->assign(local_eid_);
00334 bundle->set_local_custody(true);
00335 actions_->store_update(bundle);
00336
00337 custody_bundles_->push_back(bundle);
00338
00339
00340
00341 if (bundle->custody_rcpt()) {
00342 generate_status_report(bundle,
00343 BundleStatusReport::STATUS_CUSTODY_ACCEPTED);
00344 }
00345 }
00346
00347
00348 void
00349 BundleDaemon::release_custody(Bundle* bundle)
00350 {
00351 log_info("release_custody *%p", bundle);
00352
00353 if (!bundle->local_custody()) {
00354 log_err("release_custody(*%p): don't have local custody",
00355 bundle);
00356 return;
00357 }
00358
00359 cancel_custody_timers(bundle);
00360
00361 bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
00362 bundle->set_local_custody(false);
00363 actions_->store_update(bundle);
00364
00365 custody_bundles_->erase(bundle);
00366 }
00367
00368
00369 void
00370 BundleDaemon::deliver_to_registration(Bundle* bundle,
00371 Registration* registration)
00372 {
00373 ASSERT(!bundle->is_fragment());
00374
00375 ForwardingInfo::state_t state = bundle->fwdlog()->get_latest_entry(registration);
00376 if (state != ForwardingInfo::NONE)
00377 {
00378 ASSERT(state == ForwardingInfo::DELIVERED);
00379 log_debug("not delivering bundle *%p to registration %d (%s) "
00380 "since already delivered",
00381 bundle, registration->regid(),
00382 registration->endpoint().c_str());
00383 return;
00384 }
00385
00386
00387
00388
00389
00390 if (registration->session_flags() == Session::PUBLISH)
00391 {
00392 log_debug("not delivering bundle *%p to registration %d (%s) "
00393 "since it's a publish-only session registration",
00394 bundle, registration->regid(),
00395 registration->endpoint().c_str());
00396 return;
00397 }
00398
00399 log_debug("delivering bundle *%p to registration %d (%s)",
00400 bundle, registration->regid(),
00401 registration->endpoint().c_str());
00402
00403 if (registration->deliver_if_not_duplicate(bundle)) {
00404
00405
00406
00407 bundle->fwdlog()->add_entry(registration,
00408 ForwardingInfo::FORWARD_ACTION,
00409 ForwardingInfo::DELIVERED);
00410 } else {
00411 log_notice("suppressing duplicate delivery of bundle *%p "
00412 "to registration %d (%s)",
00413 bundle, registration->regid(),
00414 registration->endpoint().c_str());
00415 }
00416 }
00417
00418
00419 bool
00420 BundleDaemon::check_local_delivery(Bundle* bundle, bool deliver)
00421 {
00422 log_debug("checking for matching registrations for bundle *%p", bundle);
00423
00424 RegistrationList matches;
00425 RegistrationList::iterator iter;
00426
00427 reg_table_->get_matching(bundle->dest(), &matches);
00428
00429 if (deliver) {
00430 ASSERT(!bundle->is_fragment());
00431 for (iter = matches.begin(); iter != matches.end(); ++iter) {
00432 Registration* registration = *iter;
00433 deliver_to_registration(bundle, registration);
00434 }
00435 }
00436
00437 return (matches.size() > 0) || bundle->dest().subsume(local_eid_);
00438 }
00439
00440
00441 void
00442 BundleDaemon::check_and_deliver_to_registrations(Bundle* bundle, const EndpointID& reg_eid)
00443 {
00444 int num;
00445 log_debug("checking for matching entries in table for %s", reg_eid.c_str());
00446
00447 RegistrationList matches;
00448 RegistrationList::iterator iter;
00449
00450 num = reg_table_->get_matching(reg_eid, &matches);
00451
00452 for (iter = matches.begin(); iter != matches.end(); ++iter)
00453 {
00454 Registration* registration = *iter;
00455 deliver_to_registration(bundle, registration);
00456 }
00457 }
00458
00459
00460 void
00461 BundleDaemon::handle_bundle_delete(BundleDeleteRequest* request)
00462 {
00463 if (request->bundle_.object()) {
00464 log_info("BUNDLE_DELETE: bundle *%p (reason %s)",
00465 request->bundle_.object(),
00466 BundleStatusReport::reason_to_str(request->reason_));
00467 delete_bundle(request->bundle_, request->reason_);
00468 }
00469 }
00470
00471
00472 void
00473 BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request)
00474 {
00475 *request->result_ =
00476 router_->accept_bundle(request->bundle_.object(), request->reason_);
00477
00478 log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)",
00479 request->bundle_.object(),
00480 *request->result_ ? "accepted" : "not accepted",
00481 BundleStatusReport::reason_to_str(*request->reason_));
00482 }
00483
00484
00485 void
00486 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
00487 {
00488 const BundleRef& bundleref = event->bundleref_;
00489 Bundle* bundle = bundleref.object();
00490
00491
00492 const char* source_str = "";
00493 switch (event->source_) {
00494 case EVENTSRC_PEER:
00495 stats_.received_bundles_++;
00496 break;
00497
00498 case EVENTSRC_APP:
00499 stats_.received_bundles_++;
00500 source_str = " (from app)";
00501 break;
00502
00503 case EVENTSRC_STORE:
00504 source_str = " (from data store)";
00505 break;
00506
00507 case EVENTSRC_ADMIN:
00508 stats_.generated_bundles_++;
00509 source_str = " (generated)";
00510 break;
00511
00512 case EVENTSRC_FRAGMENTATION:
00513 stats_.generated_bundles_++;
00514 source_str = " (from fragmentation)";
00515 break;
00516
00517 case EVENTSRC_ROUTER:
00518 stats_.generated_bundles_++;
00519 source_str = " (from router)";
00520 break;
00521
00522 default:
00523 NOTREACHED;
00524 }
00525
00526
00527
00528 if (log_enabled(oasys::LOG_DEBUG)) {
00529 oasys::StaticStringBuffer<1024> buf;
00530 buf.appendf("BUNDLE_RECEIVED%s: prevhop %s (%u bytes recvd)\n",
00531 source_str, event->prevhop_.c_str(), event->bytes_received_);
00532 bundle->format_verbose(&buf);
00533 log_multiline(oasys::LOG_DEBUG, buf.c_str());
00534 } else {
00535 log_info("BUNDLE_RECEIVED%s *%p prevhop %s (%u bytes recvd)",
00536 source_str, bundle, event->prevhop_.c_str(), event->bytes_received_);
00537 }
00538
00539
00540 if (event->source_ == EVENTSRC_PEER && event->link_ != NULL)
00541 {
00542 bundle->fwdlog()->add_entry(event->link_,
00543 ForwardingInfo::FORWARD_ACTION,
00544 ForwardingInfo::RECEIVED);
00545 }
00546 else if (event->source_ == EVENTSRC_APP)
00547 {
00548 if (event->registration_ != NULL) {
00549 bundle->fwdlog()->add_entry(event->registration_,
00550 ForwardingInfo::FORWARD_ACTION,
00551 ForwardingInfo::RECEIVED);
00552 }
00553 }
00554
00555
00556
00557
00558 if (bundle->expiration() == 0) {
00559 log_warn("bundle id %d arrived with zero expiration time",
00560 bundle->bundleid());
00561 }
00562
00563 u_int32_t now = BundleTimestamp::get_current_time();
00564 if ((bundle->creation_ts().seconds_ > now) &&
00565 (bundle->creation_ts().seconds_ - now > 30000))
00566 {
00567 log_warn("bundle id %d arrived with creation time in the future "
00568 "(%u > %u)",
00569 bundle->bundleid(), bundle->creation_ts().seconds_, now);
00570 }
00571
00572
00573
00574
00575
00576
00577 if (event->source_ == EVENTSRC_PEER)
00578 {
00579 if (bundle->prevhop() == EndpointID::NULL_EID() ||
00580 bundle->prevhop().str() == "")
00581 {
00582 bundle->mutable_prevhop()->assign(event->prevhop_);
00583 }
00584
00585 if (bundle->prevhop() != event->prevhop_)
00586 {
00587 log_warn("previous hop mismatch: prevhop header contains '%s' but "
00588 "convergence layer indicates prevhop is '%s'",
00589 bundle->prevhop().c_str(),
00590 event->prevhop_.c_str());
00591 }
00592 }
00593
00594
00595
00596
00597
00598 if (event->source_ == EVENTSRC_PEER) {
00599 ASSERT(event->bytes_received_ != 0);
00600 fragmentmgr_->try_to_convert_to_fragment(bundle);
00601 }
00602
00603
00604
00605
00606 if (event->source_ == EVENTSRC_PEER) {
00607
00608
00609
00610
00611 status_report_reason_t
00612 reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO,
00613 deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO;
00614
00615 bool valid = BundleProtocol::validate(bundle,
00616 &reception_reason,
00617 &deletion_reason);
00618
00619
00620
00621
00622
00623
00624
00625 if (bundle->receive_rcpt() ||
00626 reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO)
00627 {
00628 generate_status_report(bundle, BundleStatusReport::STATUS_RECEIVED,
00629 reception_reason);
00630 }
00631
00632
00633
00634
00635
00636 bool accept_bundle = false;
00637 if (valid) {
00638 int reason = BundleProtocol::REASON_NO_ADDTL_INFO;
00639 accept_bundle = router_->accept_bundle(bundle, &reason);
00640 deletion_reason = static_cast<BundleProtocol::status_report_reason_t>(reason);
00641 }
00642
00643
00644
00645
00646
00647
00648 if (!accept_bundle) {
00649 delete_bundle(bundleref, deletion_reason);
00650 event->daemon_only_ = true;
00651 return;
00652 }
00653 }
00654
00655
00656
00657
00658
00659
00660 Bundle* duplicate = find_duplicate(bundle);
00661 if (duplicate != NULL) {
00662 log_notice("got duplicate bundle: %s -> %s creation %u.%u",
00663 bundle->source().c_str(),
00664 bundle->dest().c_str(),
00665 bundle->creation_ts().seconds_,
00666 bundle->creation_ts().seqno_);
00667
00668 stats_.duplicate_bundles_++;
00669
00670 if (bundle->custody_requested() && duplicate->local_custody())
00671 {
00672 generate_custody_signal(bundle, false,
00673 BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
00674 }
00675
00676 if (params_.suppress_duplicates_) {
00677
00678
00679
00680
00681
00682 event->daemon_only_ = true;
00683 return;
00684 }
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697 if (bundle->custody_requested() && duplicate->local_custody()) {
00698 event->daemon_only_ = true;
00699 return;
00700 }
00701
00702 }
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713 bool ok_to_route =
00714 add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
00715
00716 if (!ok_to_route) {
00717 event->daemon_only_ = true;
00718 return;
00719 }
00720
00721
00722
00723
00724
00725
00726
00727 if (bundle->custody_requested() && params_.accept_custody_
00728 && (duplicate == NULL || !duplicate->local_custody()))
00729 {
00730 if (event->source_ != EVENTSRC_STORE) {
00731 accept_custody(bundle);
00732
00733 } else if (bundle->local_custody()) {
00734 custody_bundles_->push_back(bundle);
00735 }
00736 }
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746 if (duplicate != NULL) {
00747 return;
00748 }
00749
00750
00751
00752
00753
00754 if (! bundle->is_fragment()) {
00755 fragmentmgr_->delete_obsoleted_fragments(bundle);
00756 }
00757
00758
00759
00760
00761
00762
00763 bool is_local =
00764 check_local_delivery(bundle,
00765 (event->source_ != EVENTSRC_ROUTER) &&
00766 (bundle->is_fragment() == false));
00767
00768
00769
00770
00771 if (bundle->is_fragment() && is_local) {
00772 log_debug("deferring delivery of bundle *%p "
00773 "since bundle is a fragment", bundle);
00774 fragmentmgr_->process_for_reassembly(bundle);
00775 }
00776
00777
00778
00779
00780
00781 }
00782
00783
00784 void
00785 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
00786 {
00787 Bundle* bundle = event->bundleref_.object();
00788
00789 LinkRef link = event->link_;
00790 ASSERT(link != NULL);
00791
00792 log_debug("trying to find xmit blocks for bundle id:%d on link %s",
00793 bundle->bundleid(),link->name());
00794 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
00795
00796
00797
00798
00799
00800
00801 if(blocks == NULL)
00802 {
00803 log_info("received a redundant/conflicting bundle_transmit event about "
00804 "bundle id:%d -> %s (%s)",
00805 bundle->bundleid(),
00806 link->name(),
00807 link->nexthop());
00808 return;
00809 }
00810
00811
00812
00813
00814
00815
00816
00817
00818 size_t total_len = BundleProtocol::total_length(blocks);
00819
00820 stats_.transmitted_bundles_++;
00821
00822 link->stats()->bundles_transmitted_++;
00823 link->stats()->bytes_transmitted_ += event->bytes_sent_;
00824
00825
00826 if (link->del_from_inflight(event->bundleref_, total_len)) {
00827 log_debug("removed bundle id:%d from link %s inflight queue",
00828 bundle->bundleid(),
00829 link->name());
00830 } else {
00831 log_warn("bundle id:%d not on link %s inflight queue",
00832 bundle->bundleid(),
00833 link->name());
00834 }
00835
00836
00837 if (link->del_from_queue(event->bundleref_, total_len)) {
00838 log_warn("bundle id:%d unexpectedly on link %s queue in transmitted event",
00839 bundle->bundleid(),
00840 link->name());
00841 }
00842
00843 log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)",
00844 bundle->bundleid(),
00845 event->bytes_sent_,
00846 event->reliably_sent_,
00847 link->name(),
00848 link->nexthop());
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865 if (params_.retry_reliable_unacked_ &&
00866 link->is_reliable() &&
00867 (event->bytes_sent_ != event->reliably_sent_) &&
00868 (event->reliably_sent_ == 0))
00869 {
00870 bundle->fwdlog()->update(link, ForwardingInfo::TRANSMIT_FAILED);
00871 log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
00872 bundle->bundleid(),link->name());
00873 BundleProtocol::delete_blocks(bundle, link);
00874
00875 log_warn("XXX/demmer fixme transmitted special case");
00876
00877 return;
00878 }
00879
00880
00881
00882
00883
00884 ForwardingInfo fwdinfo;
00885 bool ok = bundle->fwdlog()->get_latest_entry(link, &fwdinfo);
00886 if(!ok)
00887 {
00888 oasys::StringBuffer buf;
00889 bundle->fwdlog()->dump(&buf);
00890 log_debug("%s",buf.c_str());
00891 }
00892 ASSERTF(ok, "no forwarding log entry for transmission");
00893
00894 if (fwdinfo.state() != ForwardingInfo::QUEUED) {
00895 log_err("*%p fwdinfo state %s != expected QUEUED",
00896 bundle, ForwardingInfo::state_to_str(fwdinfo.state()));
00897 }
00898
00899
00900
00901
00902
00903 log_debug("updating forwarding log entry on *%p for *%p to TRANSMITTED",
00904 bundle, link.object());
00905 bundle->fwdlog()->update(link, ForwardingInfo::TRANSMITTED);
00906
00907
00908
00909
00910
00911
00912
00913 if (link->reliable_) {
00914 fragmentmgr_->try_to_reactively_fragment(bundle,
00915 blocks,
00916 event->reliably_sent_);
00917 } else {
00918 fragmentmgr_->try_to_reactively_fragment(bundle,
00919 blocks,
00920 event->bytes_sent_);
00921 }
00922
00923
00924
00925
00926
00927 log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
00928 bundle->bundleid(),link->name());
00929 BundleProtocol::delete_blocks(bundle, link);
00930 blocks = NULL;
00931
00932
00933
00934
00935 if (bundle->forward_rcpt()) {
00936 generate_status_report(bundle, BundleStatusReport::STATUS_FORWARDED);
00937 }
00938
00939
00940
00941
00942 if (bundle->local_custody()) {
00943 bundle->custody_timers()->push_back(
00944 new CustodyTimer(fwdinfo.timestamp(),
00945 fwdinfo.custody_spec(),
00946 bundle, link));
00947
00948
00949
00950
00951
00952
00953
00954 }
00955 }
00956
00957
00958 void
00959 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
00960 {
00961
00962 stats_.delivered_bundles_++;
00963
00964
00965
00966
00967 Bundle* bundle = event->bundleref_.object();
00968
00969 log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
00970 bundle->bundleid(), bundle->payload().length(),
00971 event->registration_->regid(),
00972 event->registration_->endpoint().c_str());
00973
00974
00975
00976
00977 if (bundle->delivery_rcpt())
00978 {
00979 generate_status_report(bundle, BundleStatusReport::STATUS_DELIVERED);
00980 }
00981
00982
00983
00984
00985
00986
00987
00988
00989 if (bundle->custody_requested())
00990 {
00991 if (bundle->local_custody()) {
00992 release_custody(bundle);
00993
00994 } else if (bundle->custodian().equals(EndpointID::NULL_EID())) {
00995 log_info("custodial bundle *%p delivered before custody accepted",
00996 bundle);
00997
00998 } else {
00999 generate_custody_signal(bundle, true,
01000 BundleProtocol::CUSTODY_NO_ADDTL_INFO);
01001 }
01002 }
01003 }
01004
01005
01006 void
01007 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
01008 {
01009
01010 stats_.expired_bundles_++;
01011
01012 const BundleRef& bundle = event->bundleref_;
01013
01014 log_info("BUNDLE_EXPIRED *%p", bundle.object());
01015
01016
01017
01018
01019
01020 delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
01021
01022
01023 }
01024
01025
01026 void
01027 BundleDaemon::handle_bundle_send(BundleSendRequest* event)
01028 {
01029 LinkRef link = contactmgr_->find_link(event->link_.c_str());
01030 if (link == NULL){
01031 log_err("Cannot send bundle on unknown link %s", event->link_.c_str());
01032 return;
01033 }
01034
01035 BundleRef br = event->bundle_;
01036 if (! br.object()){
01037 log_err("NULL bundle object in BundleSendRequest");
01038 return;
01039 }
01040
01041 ForwardingInfo::action_t fwd_action =
01042 (ForwardingInfo::action_t)event->action_;
01043
01044 actions_->queue_bundle(br.object(), link,
01045 fwd_action, CustodyTimerSpec::defaults_);
01046 }
01047
01048
01049 void
01050 BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event)
01051 {
01052 BundleRef br = event->bundle_;
01053
01054 if(!br.object()) {
01055 log_err("NULL bundle object in BundleCancelRequest");
01056 return;
01057 }
01058
01059
01060
01061 if (!event->link_.empty()) {
01062 LinkRef link = contactmgr_->find_link(event->link_.c_str());
01063 if (link == NULL) {
01064 log_err("BUNDLE_CANCEL no link with name %s", event->link_.c_str());
01065 return;
01066 }
01067
01068 log_info("BUNDLE_CANCEL bundle %d on link %s", br->bundleid(),
01069 event->link_.c_str());
01070
01071 actions_->cancel_bundle(br.object(), link);
01072 }
01073
01074
01075
01076 else {
01077 delete_bundle(br);
01078 }
01079 }
01080
01081
01082 void
01083 BundleDaemon::handle_bundle_cancelled(BundleSendCancelledEvent* event)
01084 {
01085 Bundle* bundle = event->bundleref_.object();
01086 LinkRef link = event->link_;
01087
01088 log_info("BUNDLE_CANCELLED id:%d -> %s (%s)",
01089 bundle->bundleid(),
01090 link->name(),
01091 link->nexthop());
01092
01093 log_debug("trying to find xmit blocks for bundle id:%d on link %s",
01094 bundle->bundleid(), link->name());
01095 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
01096
01097
01098
01099
01100
01101
01102
01103 if (blocks == NULL)
01104 {
01105 log_info("received a redundant/conflicting bundle_cancelled event "
01106 "about bundle id:%d -> %s (%s)",
01107 bundle->bundleid(),
01108 link->name(),
01109 link->nexthop());
01110 return;
01111 }
01112
01113
01114
01115
01116
01117 if (link->queue()->contains(bundle))
01118 {
01119 log_warn("cancelled bundle id:%d still on link %s queue",
01120 bundle->bundleid(), link->name());
01121 }
01122
01123
01124
01125
01126
01127 if (link->inflight()->contains(bundle))
01128 {
01129 log_warn("cancelled bundle id:%d still on link %s inflight list",
01130 bundle->bundleid(), link->name());
01131 }
01132
01133
01134
01135
01136
01137 link->stats()->bundles_cancelled_++;
01138
01139
01140
01141
01142
01143 log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
01144 bundle->bundleid(), link->name());
01145 BundleProtocol::delete_blocks(bundle, link);
01146 blocks = NULL;
01147
01148
01149
01150
01151 log_debug("trying to update the forwarding log for "
01152 "bundle id:%d on link %s to state CANCELLED",
01153 bundle->bundleid(), link->name());
01154 bundle->fwdlog()->update(link, ForwardingInfo::CANCELLED);
01155 }
01156
01157
01158 void
01159 BundleDaemon::handle_bundle_inject(BundleInjectRequest* event)
01160 {
01161
01162
01163
01164
01165
01166
01167
01168
01169
01170 EndpointID src(event->src_);
01171 EndpointID dest(event->dest_);
01172 if ((! src.valid()) || (! dest.valid())) return;
01173
01174
01175
01176 const RegistrationTable* reg_table =
01177 BundleDaemon::instance()->reg_table();
01178 std::string base_reg_str = src.uri().scheme() + "://" + src.uri().host();
01179
01180 if (!reg_table->get(EndpointIDPattern(base_reg_str)) &&
01181 src != EndpointID::NULL_EID()) {
01182 log_err("this node is not a member of the injected bundle's source "
01183 "EID (%s)", src.str().c_str());
01184 return;
01185 }
01186
01187
01188
01189 Bundle* bundle = new Bundle(params_.injected_bundles_in_memory_ ?
01190 BundlePayload::MEMORY : BundlePayload::DISK);
01191
01192 bundle->mutable_source()->assign(src);
01193 bundle->mutable_dest()->assign(dest);
01194
01195 if (! bundle->mutable_replyto()->assign(event->replyto_))
01196 bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
01197
01198 if (! bundle->mutable_custodian()->assign(event->custodian_))
01199 bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
01200
01201
01202 bundle->set_priority(event->priority_);
01203
01204
01205
01206 if(event->expiration_ == 0)
01207 bundle->set_expiration(300);
01208 else
01209 bundle->set_expiration(event->expiration_);
01210
01211
01212 bundle->mutable_payload()->
01213 replace_with_file(event->payload_file_.c_str());
01214 log_debug("bundle payload size after replace_with_file(): %zd",
01215 bundle->payload().length());
01216 oasys::IO::unlink(event->payload_file_.c_str(), logpath_);
01217
01218
01219
01220
01221
01222
01223 bool is_local = check_local_delivery(bundle, !bundle->is_fragment());
01224
01225
01226
01227
01228 if (bundle->is_fragment() && is_local) {
01229 log_debug("deferring delivery of injected bundle *%p "
01230 "since bundle is a fragment", bundle);
01231 fragmentmgr_->process_for_reassembly(bundle);
01232 }
01233
01234
01235
01236
01237
01238
01239 if (add_to_pending(bundle, 0))
01240 BundleDaemon::post(new BundleInjectedEvent(bundle, event->request_id_));
01241
01242 ++stats_.injected_bundles_;
01243 }
01244
01245
01246 void
01247 BundleDaemon::handle_bundle_query(BundleQueryRequest*)
01248 {
01249 BundleDaemon::post_at_head(new BundleReportEvent());
01250 }
01251
01252
01253 void
01254 BundleDaemon::handle_bundle_report(BundleReportEvent*)
01255 {
01256 }
01257
01258
01259 void
01260 BundleDaemon::handle_bundle_attributes_query(BundleAttributesQueryRequest* request)
01261 {
01262 BundleRef &br = request->bundle_;
01263 if (! br.object()) return;
01264
01265 log_debug(
01266 "BundleDaemon::handle_bundle_attributes_query: query %s, bundle *%p",
01267 request->query_id_.c_str(), br.object());
01268
01269
01270
01271 BundleDaemon::post(
01272 new BundleAttributesReportEvent(request->query_id_,
01273 br,
01274 request->attribute_names_,
01275 request->metadata_blocks_));
01276 }
01277
01278
01279 void
01280 BundleDaemon::handle_bundle_attributes_report(BundleAttributesReportEvent* event)
01281 {
01282 (void)event;
01283 log_debug("BundleDaemon::handle_bundle_attributes_report: query %s",
01284 event->query_id_.c_str());
01285 }
01286
01287
01288 void
01289 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
01290 {
01291 Registration* registration = event->registration_;
01292 log_info("REGISTRATION_ADDED %d %s",
01293 registration->regid(), registration->endpoint().c_str());
01294
01295 if (!reg_table_->add(registration,
01296 (event->source_ == EVENTSRC_APP) ? true : false))
01297 {
01298 log_err("error adding registration %d to table",
01299 registration->regid());
01300 }
01301
01302 oasys::ScopeLock l(pending_bundles_->lock(),
01303 "BundleDaemon::handle_registration_added");
01304 BundleList::iterator iter;
01305 for (iter = pending_bundles_->begin();
01306 iter != pending_bundles_->end();
01307 ++iter)
01308 {
01309 Bundle* bundle = *iter;
01310
01311 if (!bundle->is_fragment() &&
01312 registration->endpoint().match(bundle->dest())) {
01313 deliver_to_registration(bundle, registration);
01314 }
01315 }
01316 }
01317
01318
01319 void
01320 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
01321 {
01322 Registration* registration = event->registration_;
01323 log_info("REGISTRATION_REMOVED %d %s",
01324 registration->regid(), registration->endpoint().c_str());
01325
01326 if (!reg_table_->del(registration->regid())) {
01327 log_err("error removing registration %d from table",
01328 registration->regid());
01329 return;
01330 }
01331
01332 post(new RegistrationDeleteRequest(registration));
01333 }
01334
01335
01336 void
01337 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
01338 {
01339 Registration* registration = event->registration_;
01340
01341 if (reg_table_->get(registration->regid()) == NULL) {
01342
01343 log_err("REGISTRATION_EXPIRED -- dead regid %d", registration->regid());
01344 return;
01345 }
01346
01347 registration->set_expired(true);
01348
01349 if (registration->active()) {
01350
01351
01352
01353 log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
01354 registration->regid());
01355 } else {
01356
01357 log_info("REGISTRATION_EXPIRED %d", registration->regid());
01358 reg_table_->del(registration->regid());
01359 post_at_head(new RegistrationDeleteRequest(registration));
01360 }
01361 }
01362
01363
01364 void
01365 BundleDaemon::handle_registration_delete(RegistrationDeleteRequest* request)
01366 {
01367 log_info("REGISTRATION_DELETE %d", request->registration_->regid());
01368 delete request->registration_;
01369 }
01370
01371
01372 void
01373 BundleDaemon::handle_link_created(LinkCreatedEvent* event)
01374 {
01375 LinkRef link = event->link_;
01376 ASSERT(link != NULL);
01377
01378 if (link->isdeleted()) {
01379 log_warn("BundleDaemon::handle_link_created: "
01380 "link %s deleted prior to full creation", link->name());
01381 event->daemon_only_ = true;
01382 return;
01383 }
01384
01385 log_info("LINK_CREATED *%p", link.object());
01386 }
01387
01388
01389 void
01390 BundleDaemon::handle_link_deleted(LinkDeletedEvent* event)
01391 {
01392 LinkRef link = event->link_;
01393 ASSERT(link != NULL);
01394
01395 log_info("LINK_DELETED *%p", link.object());
01396 }
01397
01398
01399 void
01400 BundleDaemon::handle_link_available(LinkAvailableEvent* event)
01401 {
01402 LinkRef link = event->link_;
01403 ASSERT(link != NULL);
01404 ASSERT(link->isavailable());
01405
01406 if (link->isdeleted()) {
01407 log_warn("BundleDaemon::handle_link_available: "
01408 "link %s already deleted", link->name());
01409 event->daemon_only_ = true;
01410 return;
01411 }
01412
01413 log_info("LINK_AVAILABLE *%p", link.object());
01414 }
01415
01416
01417 void
01418 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
01419 {
01420 LinkRef link = event->link_;
01421 ASSERT(link != NULL);
01422 ASSERT(!link->isavailable());
01423
01424 log_info("LINK UNAVAILABLE *%p", link.object());
01425 }
01426
01427
01428 void
01429 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
01430 {
01431 LinkRef link = request->link_;
01432 if (link == NULL) {
01433 log_warn("LINK_STATE_CHANGE_REQUEST received invalid link");
01434 return;
01435 }
01436
01437 Link::state_t new_state = Link::state_t(request->state_);
01438 Link::state_t old_state = Link::state_t(request->old_state_);
01439 int reason = request->reason_;
01440
01441 if (link->isdeleted() && new_state != Link::CLOSED) {
01442 log_warn("BundleDaemon::handle_link_state_change_request: "
01443 "link %s already deleted; cannot change link state to %s",
01444 link->name(), Link::state_to_str(new_state));
01445 return;
01446 }
01447
01448 if (link->contact() != request->contact_) {
01449 log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for "
01450 "link *%p: contact %p != current contact %p",
01451 Link::state_to_str(old_state), Link::state_to_str(new_state),
01452 ContactEvent::reason_to_str(reason), link.object(),
01453 request->contact_.object(), link->contact().object());
01454 return;
01455 }
01456
01457 log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
01458 Link::state_to_str(old_state), Link::state_to_str(new_state),
01459 ContactEvent::reason_to_str(reason), link.object());
01460
01461
01462 oasys::ScopeLock l;
01463 if (new_state == Link::OPEN)
01464 {
01465 l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request");
01466 }
01467
01468 switch(new_state) {
01469 case Link::UNAVAILABLE:
01470 if (link->state() != Link::AVAILABLE) {
01471 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01472 "tried to set state UNAVAILABLE in state %s",
01473 link.object(), Link::state_to_str(link->state()));
01474 return;
01475 }
01476 link->set_state(new_state);
01477 post_at_head(new LinkUnavailableEvent(link,
01478 ContactEvent::reason_t(reason)));
01479 break;
01480
01481 case Link::AVAILABLE:
01482 if (link->state() == Link::UNAVAILABLE) {
01483 link->set_state(Link::AVAILABLE);
01484
01485 } else {
01486 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01487 "tried to set state AVAILABLE in state %s",
01488 link.object(), Link::state_to_str(link->state()));
01489 return;
01490 }
01491
01492 post_at_head(new LinkAvailableEvent(link,
01493 ContactEvent::reason_t(reason)));
01494 break;
01495
01496 case Link::OPENING:
01497 case Link::OPEN:
01498
01499 if (link->state() == Link::UNAVAILABLE) {
01500 link->set_state(Link::AVAILABLE);
01501 }
01502 actions_->open_link(link);
01503 break;
01504
01505 case Link::CLOSED:
01506
01507
01508
01509 if (! link->isopen() && ! link->isopening()) {
01510 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01511 "setting state CLOSED (%s) in unexpected state %s",
01512 link.object(), ContactEvent::reason_to_str(reason),
01513 Link::state_to_str(link->state()));
01514 break;
01515 }
01516
01517
01518 if (link->isopen()) {
01519 ASSERT(link->contact() != NULL);
01520 post_at_head(new ContactDownEvent(link->contact(),
01521 ContactEvent::reason_t(reason)));
01522 }
01523
01524
01525 actions_->close_link(link);
01526
01527
01528
01529 if (reason == ContactEvent::IDLE) {
01530 link->set_state(Link::AVAILABLE);
01531 } else {
01532 link->set_state(Link::UNAVAILABLE);
01533 post_at_head(new LinkUnavailableEvent(link,
01534 ContactEvent::reason_t(reason)));
01535 }
01536
01537 break;
01538
01539 default:
01540 PANIC("unhandled state %s", Link::state_to_str(new_state));
01541 }
01542 }
01543
01544
01545 void
01546 BundleDaemon::handle_link_create(LinkCreateRequest* request)
01547 {
01548
01549 ContactManager* cm = BundleDaemon::instance()->contactmgr();
01550 oasys::ScopeLock l(cm->lock(), "BundleDaemon::handle_link_create");
01551
01552 LinkRef linkCheck = cm->find_link(request->name_.c_str());
01553 if(linkCheck != NULL)
01554 {
01555 log_err( "Link already exists with name %s, aborting create", request->name_.c_str());
01556 request->daemon_only_ = true;
01557 return;
01558 }
01559
01560 std::string nexthop("");
01561
01562 int argc = request->parameters_.size();
01563 char* argv[argc];
01564 AttributeVector::iterator iter;
01565 int i = 0;
01566 for (iter = request->parameters_.begin();
01567 iter != request->parameters_.end();
01568 iter++)
01569 {
01570 if (iter->name() == "nexthop") {
01571 nexthop = iter->string_val();
01572 }
01573 else {
01574 std::string arg = iter->name() + iter->string_val();
01575 argv[i] = new char[arg.length()+1];
01576 memcpy(argv[i], arg.c_str(), arg.length()+1);
01577 i++;
01578 }
01579 }
01580 argc = i+1;
01581
01582 const char *invalidp;
01583 LinkRef link = Link::create_link(request->name_, request->link_type_,
01584 request->cla_, nexthop.c_str(), argc,
01585 (const char**)argv, &invalidp);
01586 for (i = 0; i < argc; i++) {
01587 delete argv[i];
01588 }
01589
01590 if (link == NULL) {
01591 log_err("LINK_CREATE %s failed", request->name_.c_str());
01592 return;
01593 }
01594 if (!contactmgr_->add_new_link(link)) {
01595 log_err("LINK_CREATE %s failed, already exists",
01596 request->name_.c_str());
01597 link->delete_link();
01598 return;
01599 }
01600 log_info("LINK_CREATE %s: *%p", request->name_.c_str(), link.object());
01601 }
01602
01603
01604 void
01605 BundleDaemon::handle_link_delete(LinkDeleteRequest* request)
01606 {
01607 LinkRef link = request->link_;
01608 ASSERT(link != NULL);
01609
01610 log_info("LINK_DELETE *%p", link.object());
01611 if (!link->isdeleted()) {
01612 contactmgr_->del_link(link);
01613 }
01614 }
01615
01616
01617 void
01618 BundleDaemon::handle_link_reconfigure(LinkReconfigureRequest *request)
01619 {
01620 LinkRef link = request->link_;
01621 ASSERT(link != NULL);
01622
01623 link->reconfigure_link(request->parameters_);
01624 log_info("LINK_RECONFIGURE *%p", link.object());
01625 }
01626
01627
01628 void
01629 BundleDaemon::handle_link_query(LinkQueryRequest*)
01630 {
01631 BundleDaemon::post_at_head(new LinkReportEvent());
01632 }
01633
01634
01635 void
01636 BundleDaemon::handle_link_report(LinkReportEvent*)
01637 {
01638 }
01639
01640
01641 void
01642 BundleDaemon::handle_bundle_queued_query(BundleQueuedQueryRequest* request)
01643 {
01644 LinkRef link = request->link_;
01645 ASSERT(link != NULL);
01646 ASSERT(link->clayer() != NULL);
01647
01648 log_debug("BundleDaemon::handle_bundle_queued_query: "
01649 "query %s, checking if bundle *%p is queued on link *%p",
01650 request->query_id_.c_str(),
01651 request->bundle_.object(), link.object());
01652
01653 bool is_queued = request->bundle_->is_queued_on(link->queue());
01654 BundleDaemon::post(
01655 new BundleQueuedReportEvent(request->query_id_, is_queued));
01656 }
01657
01658
01659 void
01660 BundleDaemon::handle_bundle_queued_report(BundleQueuedReportEvent* event)
01661 {
01662 (void)event;
01663 log_debug("BundleDaemon::handle_bundle_queued_report: query %s, %s",
01664 event->query_id_.c_str(),
01665 (event->is_queued_? "true" : "false"));
01666 }
01667
01668
01669 void
01670 BundleDaemon::handle_eid_reachable_query(EIDReachableQueryRequest* request)
01671 {
01672 Interface *iface = request->iface_;
01673 ASSERT(iface != NULL);
01674 ASSERT(iface->clayer() != NULL);
01675
01676 log_debug("BundleDaemon::handle_eid_reachable_query: query %s, "
01677 "checking if endpoint %s is reachable via interface *%p",
01678 request->query_id_.c_str(), request->endpoint_.c_str(), iface);
01679
01680 iface->clayer()->is_eid_reachable(request->query_id_,
01681 iface,
01682 request->endpoint_);
01683 }
01684
01685
01686 void
01687 BundleDaemon::handle_eid_reachable_report(EIDReachableReportEvent* event)
01688 {
01689 (void)event;
01690 log_debug("BundleDaemon::handle_eid_reachable_report: query %s, %s",
01691 event->query_id_.c_str(),
01692 (event->is_reachable_? "true" : "false"));
01693 }
01694
01695
01696 void
01697 BundleDaemon::handle_link_attribute_changed(LinkAttributeChangedEvent *event)
01698 {
01699 LinkRef link = event->link_;
01700
01701 if (link->isdeleted()) {
01702 log_debug("BundleDaemon::handle_link_attribute_changed: "
01703 "link %s deleted", link->name());
01704 event->daemon_only_ = true;
01705 return;
01706 }
01707
01708
01709 AttributeVector::iterator iter;
01710 for (iter = event->attributes_.begin();
01711 iter != event->attributes_.end();
01712 iter++)
01713 {
01714 if (iter->name() == "nexthop") {
01715 link->set_nexthop(iter->string_val());
01716 }
01717 else if (iter->name() == "how_reliable") {
01718 link->stats()->reliability_ = iter->u_int_val();
01719 }
01720 else if (iter->name() == "how_available") {
01721 link->stats()->availability_ = iter->u_int_val();
01722 }
01723 }
01724 log_info("LINK_ATTRIB_CHANGED *%p", link.object());
01725 }
01726
01727
01728 void
01729 BundleDaemon::handle_link_attributes_query(LinkAttributesQueryRequest* request)
01730 {
01731 LinkRef link = request->link_;
01732 ASSERT(link != NULL);
01733 ASSERT(link->clayer() != NULL);
01734
01735 log_debug("BundleDaemon::handle_link_attributes_query: query %s, link *%p",
01736 request->query_id_.c_str(), link.object());
01737
01738 link->clayer()->query_link_attributes(request->query_id_,
01739 link,
01740 request->attribute_names_);
01741 }
01742
01743
01744 void
01745 BundleDaemon::handle_link_attributes_report(LinkAttributesReportEvent* event)
01746 {
01747 (void)event;
01748 log_debug("BundleDaemon::handle_link_attributes_report: query %s",
01749 event->query_id_.c_str());
01750 }
01751
01752
01753 void
01754 BundleDaemon::handle_iface_attributes_query(
01755 IfaceAttributesQueryRequest* request)
01756 {
01757 Interface *iface = request->iface_;
01758 ASSERT(iface != NULL);
01759 ASSERT(iface->clayer() != NULL);
01760
01761 log_debug("BundleDaemon::handle_iface_attributes_query: "
01762 "query %s, interface *%p", request->query_id_.c_str(), iface);
01763
01764 iface->clayer()->query_iface_attributes(request->query_id_,
01765 iface,
01766 request->attribute_names_);
01767 }
01768
01769
01770 void
01771 BundleDaemon::handle_iface_attributes_report(IfaceAttributesReportEvent* event)
01772 {
01773 (void)event;
01774 log_debug("BundleDaemon::handle_iface_attributes_report: query %s",
01775 event->query_id_.c_str());
01776 }
01777
01778
01779 void
01780 BundleDaemon::handle_cla_parameters_query(CLAParametersQueryRequest* request)
01781 {
01782 ASSERT(request->cla_ != NULL);
01783
01784 log_debug("BundleDaemon::handle_cla_parameters_query: "
01785 "query %s, convergence layer %s",
01786 request->query_id_.c_str(), request->cla_->name());
01787
01788 request->cla_->query_cla_parameters(request->query_id_,
01789 request->parameter_names_);
01790 }
01791
01792
01793 void
01794 BundleDaemon::handle_cla_parameters_report(CLAParametersReportEvent* event)
01795 {
01796 (void)event;
01797 log_debug("Bundledaemon::handle_cla_parameters_report: query %s",
01798 event->query_id_.c_str());
01799 }
01800
01801
01802 void
01803 BundleDaemon::handle_contact_up(ContactUpEvent* event)
01804 {
01805 const ContactRef& contact = event->contact_;
01806 LinkRef link = contact->link();
01807 ASSERT(link != NULL);
01808
01809 if (link->isdeleted()) {
01810 log_debug("BundleDaemon::handle_contact_up: "
01811 "cannot bring contact up on deleted link %s", link->name());
01812 event->daemon_only_ = true;
01813 return;
01814 }
01815
01816
01817 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up");
01818 if (link->contact() != contact)
01819 {
01820 log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)",
01821 link.object(), contact.object());
01822 return;
01823 }
01824
01825 log_info("CONTACT_UP *%p (contact %p)", link.object(), contact.object());
01826 link->set_state(Link::OPEN);
01827 link->stats_.contacts_++;
01828 }
01829
01830
01831 void
01832 BundleDaemon::handle_contact_down(ContactDownEvent* event)
01833 {
01834 const ContactRef& contact = event->contact_;
01835 int reason = event->reason_;
01836 LinkRef link = contact->link();
01837 ASSERT(link != NULL);
01838
01839 log_info("CONTACT_DOWN *%p (%s) (contact %p)",
01840 link.object(), ContactEvent::reason_to_str(reason),
01841 contact.object());
01842
01843
01844 link->stats_.uptime_ += (contact->start_time().elapsed_ms() / 1000);
01845 }
01846
01847
01848 void
01849 BundleDaemon::handle_contact_query(ContactQueryRequest*)
01850 {
01851 BundleDaemon::post_at_head(new ContactReportEvent());
01852 }
01853
01854
01855 void
01856 BundleDaemon::handle_contact_report(ContactReportEvent*)
01857 {
01858 }
01859
01860
01861 void
01862 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
01863 {
01864 log_info("REASSEMBLY_COMPLETED bundle id %d",
01865 event->bundle_->bundleid());
01866
01867
01868 BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
01869 while ((ref = event->fragments_.pop_front()) != NULL) {
01870 delete_bundle(ref);
01871 }
01872
01873
01874 post_at_head(new BundleReceivedEvent(event->bundle_.object(),
01875 EVENTSRC_FRAGMENTATION));
01876 }
01877
01878
01879
01880 void
01881 BundleDaemon::handle_route_add(RouteAddEvent* event)
01882 {
01883 log_info("ROUTE_ADD *%p", event->entry_);
01884 }
01885
01886
01887 void
01888 BundleDaemon::handle_route_del(RouteDelEvent* event)
01889 {
01890 log_info("ROUTE_DEL %s", event->dest_.c_str());
01891 }
01892
01893
01894 void
01895 BundleDaemon::handle_route_query(RouteQueryRequest*)
01896 {
01897 BundleDaemon::post_at_head(new RouteReportEvent());
01898 }
01899
01900
01901 void
01902 BundleDaemon::handle_route_report(RouteReportEvent*)
01903 {
01904 }
01905
01906
01907 void
01908 BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
01909 {
01910 log_info("CUSTODY_SIGNAL: %s %u.%u %s (%s)",
01911 event->data_.orig_source_eid_.c_str(),
01912 event->data_.orig_creation_tv_.seconds_,
01913 event->data_.orig_creation_tv_.seqno_,
01914 event->data_.succeeded_ ? "succeeded" : "failed",
01915 CustodySignal::reason_to_str(event->data_.reason_));
01916
01917 GbofId gbof_id;
01918 gbof_id.source_ = event->data_.orig_source_eid_;
01919 gbof_id.creation_ts_ = event->data_.orig_creation_tv_;
01920 gbof_id.is_fragment_
01921 = event->data_.admin_flags_ & BundleProtocol::ADMIN_IS_FRAGMENT;
01922 gbof_id.frag_length_
01923 = gbof_id.is_fragment_ ? event->data_.orig_frag_length_ : 0;
01924 gbof_id.frag_offset_
01925 = gbof_id.is_fragment_ ? event->data_.orig_frag_offset_ : 0;
01926
01927 BundleRef orig_bundle =
01928 custody_bundles_->find(gbof_id);
01929
01930 if (orig_bundle == NULL) {
01931 log_warn("received custody signal for bundle %s %u.%u "
01932 "but don't have custody",
01933 event->data_.orig_source_eid_.c_str(),
01934 event->data_.orig_creation_tv_.seconds_,
01935 event->data_.orig_creation_tv_.seqno_);
01936 return;
01937 }
01938
01939
01940
01941 bool release = event->data_.succeeded_;
01942 if ((event->data_.succeeded_ == false) &&
01943 (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
01944 {
01945 log_notice("releasing custody for bundle %s %u.%u "
01946 "due to redundant reception",
01947 event->data_.orig_source_eid_.c_str(),
01948 event->data_.orig_creation_tv_.seconds_,
01949 event->data_.orig_creation_tv_.seqno_);
01950
01951 release = true;
01952 }
01953
01954 if (release) {
01955 release_custody(orig_bundle.object());
01956 try_to_delete(orig_bundle);
01957 }
01958 }
01959
01960
01961 void
01962 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
01963 {
01964 Bundle* bundle = event->bundle_.object();
01965 LinkRef link = event->link_;
01966 ASSERT(link != NULL);
01967
01968 log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link.object());
01969
01970
01971 oasys::ScopeLock l(bundle->lock(), "BundleDaemon::handle_custody_timeout");
01972
01973 bool found = false;
01974 CustodyTimer* timer = NULL;
01975 CustodyTimerVec::iterator iter;
01976 for (iter = bundle->custody_timers()->begin();
01977 iter != bundle->custody_timers()->end();
01978 ++iter)
01979 {
01980 timer = *iter;
01981 if (timer->link_ == link)
01982 {
01983 if (timer->pending()) {
01984 log_err("multiple pending custody timers for link %s",
01985 link->nexthop());
01986 continue;
01987 }
01988
01989 found = true;
01990 bundle->custody_timers()->erase(iter);
01991 break;
01992 }
01993 }
01994
01995 if (!found) {
01996 log_err("custody timeout for *%p *%p: timer not found in bundle list",
01997 bundle, link.object());
01998 return;
01999 }
02000
02001 ASSERT(!timer->cancelled());
02002
02003 if (!pending_bundles_->contains(bundle)) {
02004 log_err("custody timeout for *%p *%p: bundle not in pending list",
02005 bundle, link.object());
02006 }
02007
02008
02009
02010
02011
02012 bool ok = bundle->fwdlog()->update(link, ForwardingInfo::CUSTODY_TIMEOUT);
02013 if (!ok) {
02014 log_err("custody timeout can't find ForwardingLog entry for link *%p",
02015 link.object());
02016 }
02017
02018 delete timer;
02019
02020
02021
02022 }
02023
02024
02025 void
02026 BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
02027 {
02028 shutting_down_ = true;
02029
02030 (void)request;
02031
02032 log_notice("Received shutdown request");
02033
02034 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
02035
02036 const LinkSet* links = contactmgr_->links();
02037 LinkSet::const_iterator iter;
02038
02039
02040 for (iter = links->begin(); iter != links->end(); ++iter)
02041 {
02042 LinkRef link = *iter;
02043 if (link->isopen()) {
02044 log_debug("Shutdown: closing link *%p\n", link.object());
02045 link->close();
02046 }
02047 }
02048
02049
02050 ConvergenceLayer::shutdown_clayers();
02051
02052
02053 if (rtr_shutdown_proc_) {
02054 (*rtr_shutdown_proc_)(rtr_shutdown_data_);
02055 }
02056
02057
02058 if (app_shutdown_proc_) {
02059 (*app_shutdown_proc_)(app_shutdown_data_);
02060 }
02061
02062
02063 set_should_stop();
02064
02065
02066
02067 }
02068
02069
02070 void
02071 BundleDaemon::handle_cla_set_params(CLASetParamsRequest* request)
02072 {
02073 ASSERT(request->cla_ != NULL);
02074 request->cla_->set_cla_parameters(request->parameters_);
02075 }
02076
02077
02078 void
02079 BundleDaemon::handle_status_request(StatusRequest* request)
02080 {
02081 (void)request;
02082 log_info("Received status request");
02083 }
02084
02085
02086 void
02087 BundleDaemon::event_handlers_completed(BundleEvent* event)
02088 {
02089 log_debug("event handlers completed for (%p) %s", event, event->type_str());
02090
02096 BundleRef bundle("BundleDaemon::event_handlers_completed");
02097 if (event->type_ == BUNDLE_RECEIVED) {
02098 bundle = ((BundleReceivedEvent*)event)->bundleref_;
02099 } else if (event->type_ == BUNDLE_TRANSMITTED) {
02100 bundle = ((BundleTransmittedEvent*)event)->bundleref_;
02101 } else if (event->type_ == BUNDLE_DELIVERED) {
02102 bundle = ((BundleTransmittedEvent*)event)->bundleref_;
02103 }
02104
02105 if (bundle != NULL) {
02106 try_to_delete(bundle);
02107 }
02108
02113 if (event->type_ == BUNDLE_EXPIRED) {
02114 bundle = ((BundleExpiredEvent*)event)->bundleref_.object();
02115 size_t num_mappings = bundle->num_mappings();
02116 if (num_mappings != 0) {
02117 log_warn("expired bundle *%p still has %zu mappings",
02118 bundle.object(), num_mappings);
02119 }
02120 }
02121 }
02122
02123
02124 bool
02125 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
02126 {
02127 log_debug("adding bundle *%p to pending list", bundle);
02128
02129 pending_bundles_->push_back(bundle);
02130
02131 if (add_to_store) {
02132 bundle->set_in_datastore(true);
02133 actions_->store_add(bundle);
02134 }
02135
02136 struct timeval now;
02137 gettimeofday(&now, 0);
02138
02139
02140 struct timeval expiration_time;
02141 expiration_time.tv_sec =
02142 BundleTimestamp::TIMEVAL_CONVERSION +
02143 bundle->creation_ts().seconds_ +
02144 bundle->expiration();
02145
02146 expiration_time.tv_usec = now.tv_usec;
02147
02148 long int when = expiration_time.tv_sec - now.tv_sec;
02149
02150 bool ok_to_route = true;
02151
02152 if (when > 0) {
02153 log_debug_p("/dtn/bundle/expiration",
02154 "scheduling expiration for bundle id %d at %u.%u "
02155 "(in %lu seconds)",
02156 bundle->bundleid(),
02157 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
02158 when);
02159 } else {
02160 log_warn_p("/dtn/bundle/expiration",
02161 "scheduling IMMEDIATE expiration for bundle id %d: "
02162 "[expiration %u, creation time %u.%u, offset %u, now %u.%u]",
02163 bundle->bundleid(), bundle->expiration(),
02164 (u_int)bundle->creation_ts().seconds_,
02165 (u_int)bundle->creation_ts().seqno_,
02166 BundleTimestamp::TIMEVAL_CONVERSION,
02167 (u_int)now.tv_sec, (u_int)now.tv_usec);
02168 expiration_time = now;
02169 ok_to_route = false;
02170 }
02171
02172 bundle->set_expiration_timer(new ExpirationTimer(bundle));
02173 bundle->expiration_timer()->schedule_at(&expiration_time);
02174
02175 return ok_to_route;
02176 }
02177
02178
02179 bool
02180 BundleDaemon::delete_from_pending(const BundleRef& bundle)
02181 {
02182 log_debug("removing bundle *%p from pending list", bundle.object());
02183
02184
02185
02186 if (bundle->expiration_timer()) {
02187 log_debug("cancelling expiration timer for bundle id %d",
02188 bundle->bundleid());
02189
02190 bool cancelled = bundle->expiration_timer()->cancel();
02191 if (!cancelled) {
02192 log_crit("unexpected error cancelling expiration timer "
02193 "for bundle *%p", bundle.object());
02194 }
02195
02196 bundle->expiration_timer()->bundleref_.release();
02197 bundle->set_expiration_timer(NULL);
02198 }
02199
02200
02201
02202
02203 log_debug("pending_bundles size %zd", pending_bundles_->size());
02204
02205 oasys::Time now;
02206 now.get_time();
02207
02208 bool erased = pending_bundles_->erase(bundle);
02209
02210 log_debug("BundleDaemon: pending_bundles erasure took %u ms",
02211 now.elapsed_ms());
02212
02213 if (!erased) {
02214 log_err("unexpected error removing bundle from pending list");
02215 }
02216
02217 return erased;
02218 }
02219
02220
02221 bool
02222 BundleDaemon::try_to_delete(const BundleRef& bundle)
02223 {
02224
02225
02226
02227
02228
02229
02230
02231
02232 log_debug("pending_bundles size %zd", pending_bundles_->size());
02233 if (! bundle->is_queued_on(pending_bundles_))
02234 {
02235 if (bundle->expired()) {
02236 log_debug("try_to_delete(*%p): bundle already expired",
02237 bundle.object());
02238 return false;
02239 }
02240
02241 log_err("try_to_delete(*%p): bundle not in pending list!",
02242 bundle.object());
02243 return false;
02244 }
02245
02246 if (!params_.early_deletion_) {
02247 log_debug("try_to_delete(*%p): not deleting because "
02248 "early deletion disabled",
02249 bundle.object());
02250 return false;
02251 }
02252
02253 if (! router_->can_delete_bundle(bundle)) {
02254 log_debug("try_to_delete(*%p): not deleting because "
02255 "router wants to keep bundle",
02256 bundle.object());
02257 return false;
02258 }
02259
02260 return delete_bundle(bundle, BundleProtocol::REASON_NO_ADDTL_INFO);
02261 }
02262
02263
02264 bool
02265 BundleDaemon::delete_bundle(const BundleRef& bundleref,
02266 status_report_reason_t reason)
02267 {
02268 Bundle* bundle = bundleref.object();
02269
02270 ++stats_.deleted_bundles_;
02271
02272
02273
02274
02275 bool send_status = (bundle->local_custody() ||
02276 (bundle->deletion_rcpt() &&
02277 reason != BundleProtocol::REASON_NO_ADDTL_INFO));
02278
02279
02280 if (bundle->local_custody()) {
02281 release_custody(bundle);
02282 }
02283
02284
02285
02286
02287
02288
02289 if (bundle->is_fragment()) {
02290 fragmentmgr_->delete_fragment(bundle);
02291 }
02292
02293
02294 router_->delete_bundle(bundleref);
02295
02296
02297 log_debug("pending_bundles size %zd", pending_bundles_->size());
02298 bool erased = true;
02299 if (bundle->is_queued_on(pending_bundles_)) {
02300 erased = delete_from_pending(bundleref);
02301 }
02302
02303 if (erased && send_status) {
02304 generate_status_report(bundle, BundleStatusReport::STATUS_DELETED, reason);
02305 }
02306
02307
02308 oasys::Time now;
02309 now.get_time();
02310 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::delete_bundle");
02311 const LinkSet* links = contactmgr_->links();
02312 LinkSet::const_iterator iter;
02313 for (iter = links->begin(); iter != links->end(); ++iter) {
02314 const LinkRef& link = *iter;
02315
02316 if (link->queue()->contains(bundle) ||
02317 link->inflight()->contains(bundle))
02318 {
02319 actions_->cancel_bundle(bundle, link);
02320 }
02321 }
02322
02323
02324
02325
02326 log_debug("BundleDaemon: canceling deleted bundle on all links took %u ms",
02327 now.elapsed_ms());
02328
02329 return erased;
02330 }
02331
02332
02333 Bundle*
02334 BundleDaemon::find_duplicate(Bundle* b)
02335 {
02336 oasys::ScopeLock l(pending_bundles_->lock(),
02337 "BundleDaemon::find_duplicate");
02338 log_debug("pending_bundles size %zd", pending_bundles_->size());
02339 Bundle *found = NULL;
02340 BundleList::iterator iter;
02341 for (iter = pending_bundles_->begin();
02342 iter != pending_bundles_->end();
02343 ++iter)
02344 {
02345 Bundle* b2 = *iter;
02346
02347 if ((b->source().equals(b2->source())) &&
02348 (b->creation_ts().seconds_ == b2->creation_ts().seconds_) &&
02349 (b->creation_ts().seqno_ == b2->creation_ts().seqno_) &&
02350 (b->is_fragment() == b2->is_fragment()) &&
02351 (b->frag_offset() == b2->frag_offset()) &&
02352
02353 (b->payload().length() == b2->payload().length()))
02354 {
02355
02356 found = b2;
02357
02358
02359
02360
02361
02362
02363
02364 if (params_.suppress_duplicates_ || b2->local_custody()) {
02365 break;
02366 }
02367 }
02368 }
02369
02370 return found;
02371 }
02372
02373
02374 void
02375 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
02376 {
02377 Bundle* bundle = event->bundle_;
02378 event->bundle_ = NULL;
02379 ASSERT(bundle->refcount() == 0);
02380
02381 bundle->lock()->lock("BundleDaemon::handle_bundle_free");
02382
02383 if (bundle->in_datastore()) {
02384 log_debug("removing freed bundle from data store");
02385 actions_->store_del(bundle);
02386 }
02387 log_debug("deleting freed bundle");
02388 delete bundle;
02389 }
02390
02391
02392 void
02393 BundleDaemon::handle_event(BundleEvent* event)
02394 {
02395 dispatch_event(event);
02396
02397 if (! event->daemon_only_) {
02398
02399 router_->handle_event(event);
02400 contactmgr_->handle_event(event);
02401 }
02402
02403 event_handlers_completed(event);
02404
02405 stats_.events_processed_++;
02406
02407 if (event->processed_notifier_) {
02408 event->processed_notifier_->notify();
02409 }
02410 }
02411
02412
02413 void
02414 BundleDaemon::load_registrations()
02415 {
02416 admin_reg_ = new AdminRegistration();
02417 {
02418 RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
02419 handle_event(&e);
02420 }
02421
02422 EndpointID ping_eid(local_eid());
02423 bool ok = ping_eid.append_service_tag("ping");
02424 if (!ok) {
02425 log_crit("local eid (%s) scheme must be able to append service tags",
02426 local_eid().c_str());
02427 exit(1);
02428 }
02429
02430 ping_reg_ = new PingRegistration(ping_eid);
02431 {
02432 RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN);
02433 handle_event(&e);
02434 }
02435
02436 Registration* reg;
02437 RegistrationStore* reg_store = RegistrationStore::instance();
02438 RegistrationStore::iterator* iter = reg_store->new_iterator();
02439
02440 while (iter->next() == 0) {
02441 reg = reg_store->get(iter->cur_val());
02442 if (reg == NULL) {
02443 log_err("error loading registration %d from data store",
02444 iter->cur_val());
02445 continue;
02446 }
02447
02448 RegistrationAddedEvent e(reg, EVENTSRC_STORE);
02449 handle_event(&e);
02450 }
02451
02452 delete iter;
02453 }
02454
02455
02456 void
02457 BundleDaemon::load_bundles()
02458 {
02459 Bundle* bundle;
02460 BundleStore* bundle_store = BundleStore::instance();
02461 BundleStore::iterator* iter = bundle_store->new_iterator();
02462
02463 log_notice("loading bundles from data store");
02464
02465 u_int64_t total_size = 0;
02466
02467 std::vector<Bundle*> doa_bundles;
02468
02469 for (iter->begin(); iter->more(); iter->next()) {
02470 bundle = bundle_store->get(iter->cur_val());
02471
02472 if (bundle == NULL) {
02473 log_err("error loading bundle %d from data store",
02474 iter->cur_val());
02475 continue;
02476 }
02477
02478 total_size += bundle->durable_size();
02479
02480
02481
02482
02483 if (bundle->payload().location() != BundlePayload::DISK) {
02484 log_err("error loading payload for *%p from data store",
02485 bundle);
02486 doa_bundles.push_back(bundle);
02487 continue;
02488 }
02489
02490 BundleProtocol::reload_post_process(bundle);
02491
02492 BundleReceivedEvent e(bundle, EVENTSRC_STORE);
02493 handle_event(&e);
02494
02495
02496
02497
02498
02499 }
02500
02501 bundle_store->set_total_size(total_size);
02502
02503 delete iter;
02504
02505
02506 for (unsigned int i = 0; i < doa_bundles.size(); ++i) {
02507 actions_->store_del(doa_bundles[i]);
02508 delete doa_bundles[i];
02509 }
02510 }
02511
02512
02513 bool
02514 BundleDaemon::DaemonIdleExit::is_idle(const struct timeval& tv)
02515 {
02516 oasys::Time now(tv.tv_sec, tv.tv_usec);
02517 u_int elapsed = (now - BundleDaemon::instance()->last_event_).in_milliseconds();
02518
02519 BundleDaemon* d = BundleDaemon::instance();
02520 d->logf(oasys::LOG_DEBUG,
02521 "checking if is_idle -- last event was %u msecs ago",
02522 elapsed);
02523
02524
02525 if (elapsed + 500 > interval_ * 1000) {
02526 d->logf(oasys::LOG_NOTICE,
02527 "more than %u seconds since last event, "
02528 "shutting down daemon due to idle timer",
02529 interval_);
02530
02531 return true;
02532 } else {
02533 return false;
02534 }
02535 }
02536
02537
02538 void
02539 BundleDaemon::init_idle_shutdown(int interval)
02540 {
02541 idle_exit_ = new DaemonIdleExit(interval);
02542 }
02543
02544
02545 void
02546 BundleDaemon::run()
02547 {
02548 static const char* LOOP_LOG = "/dtn/bundle/daemon/loop";
02549
02550 if (! BundleTimestamp::check_local_clock()) {
02551 exit(1);
02552 }
02553
02554 router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str());
02555 router_->initialize();
02556
02557 load_registrations();
02558 load_bundles();
02559
02560 BundleEvent* event;
02561
02562 oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
02563
02564 last_event_.get_time();
02565
02566 struct pollfd pollfds[2];
02567 struct pollfd* event_poll = &pollfds[0];
02568 struct pollfd* timer_poll = &pollfds[1];
02569
02570 event_poll->fd = eventq_->read_fd();
02571 event_poll->events = POLLIN;
02572
02573 timer_poll->fd = timersys->notifier()->read_fd();
02574 timer_poll->events = POLLIN;
02575
02576 while (1) {
02577 if (should_stop()) {
02578 log_debug("BundleDaemon: stopping");
02579 break;
02580 }
02581
02582 int timeout = timersys->run_expired_timers();
02583
02584 log_debug_p(LOOP_LOG,
02585 "BundleDaemon: checking eventq_->size() > 0, its size is %zu",
02586 eventq_->size());
02587
02588 if (eventq_->size() > 0) {
02589 bool ok = eventq_->try_pop(&event);
02590 ASSERT(ok);
02591
02592 oasys::Time now;
02593 now.get_time();
02594
02595
02596 if (now >= event->posted_time_) {
02597 oasys::Time in_queue;
02598 in_queue = now - event->posted_time_;
02599 if (in_queue.sec_ > 2) {
02600 log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds",
02601 event->type_str(), in_queue.sec_, in_queue.usec_);
02602 }
02603 } else {
02604 log_warn_p(LOOP_LOG, "time moved backwards: "
02605 "now %u.%u, event posted_time %u.%u",
02606 now.sec_, now.usec_,
02607 event->posted_time_.sec_, event->posted_time_.usec_);
02608 }
02609
02610
02611 log_debug_p(LOOP_LOG, "BundleDaemon: handling event %s",
02612 event->type_str());
02613
02614 handle_event(event);
02615
02616 int elapsed = now.elapsed_ms();
02617 if (elapsed > 2000) {
02618 log_warn_p(LOOP_LOG, "event %s took %u ms to process",
02619 event->type_str(), elapsed);
02620 }
02621
02622
02623 last_event_.get_time();
02624
02625 log_debug_p(LOOP_LOG, "BundleDaemon: deleting event %s",
02626 event->type_str());
02627
02628 delete event;
02629
02630 continue;
02631 }
02632
02633 pollfds[0].revents = 0;
02634 pollfds[1].revents = 0;
02635
02636 log_debug_p(LOOP_LOG, "BundleDaemon: poll_multiple waiting for %d ms",
02637 timeout);
02638 int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
02639 log_debug_p(LOOP_LOG, "poll returned %d", cc);
02640
02641 if (cc == oasys::IOTIMEOUT) {
02642 log_debug_p(LOOP_LOG, "poll timeout");
02643 continue;
02644
02645 } else if (cc <= 0) {
02646 log_err_p(LOOP_LOG, "unexpected return %d from poll_multiple!", cc);
02647 continue;
02648 }
02649
02650
02651
02652 if (event_poll->revents != 0) {
02653 log_debug_p(LOOP_LOG, "poll returned new event to handle");
02654 }
02655
02656
02657
02658
02659 if (timer_poll->revents != 0) {
02660 log_debug_p(LOOP_LOG, "poll returned new timers to handle");
02661 timersys->notifier()->clear();
02662 }
02663 }
02664 }
02665
02666 }