00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include "BundleActions.h"
00022 #include "Bundle.h"
00023 #include "BundleDaemon.h"
00024 #include "BundleList.h"
00025 #include "conv_layers/ConvergenceLayer.h"
00026 #include "contacts/Link.h"
00027 #include "storage/BundleStore.h"
00028
00029 namespace dtn {
00030
00031
00032 void
00033 BundleActions::open_link(const LinkRef& link)
00034 {
00035 ASSERT(link != NULL);
00036 if (link->isdeleted()) {
00037 log_debug("BundleActions::open_link: "
00038 "cannot open deleted link %s", link->name());
00039 return;
00040 }
00041
00042 oasys::ScopeLock l(link->lock(), "BundleActions::open_link");
00043
00044 if (link->isopen() || link->contact() != NULL) {
00045 log_err("not opening link %s since already open", link->name());
00046 return;
00047 }
00048
00049 if (! link->isavailable()) {
00050 log_err("not opening link %s since not available", link->name());
00051 return;
00052 }
00053
00054 log_debug("BundleActions::open_link: opening link %s", link->name());
00055
00056 link->open();
00057 }
00058
00059
00060 void
00061 BundleActions::close_link(const LinkRef& link)
00062 {
00063 ASSERT(link != NULL);
00064
00065 if (! link->isopen() && ! link->isopening()) {
00066 log_err("not closing link %s since not open", link->name());
00067 return;
00068 }
00069
00070 log_debug("BundleActions::close_link: closing link %s", link->name());
00071
00072 link->close();
00073 ASSERT(link->contact() == NULL);
00074 }
00075
00076
00077 bool
00078 BundleActions::queue_bundle(Bundle* bundle, const LinkRef& link,
00079 ForwardingInfo::action_t action,
00080 const CustodyTimerSpec& custody_timer)
00081 {
00082 BundleRef bref(bundle, "BundleActions::queue_bundle");
00083
00084 ASSERT(link != NULL);
00085 if (link->isdeleted()) {
00086 log_warn("BundleActions::queue_bundle: "
00087 "failed to send bundle *%p on link %s",
00088 bundle, link->name());
00089 return false;
00090 }
00091
00092 log_debug("trying to find xmit blocks for bundle id:%d on link %s",
00093 bundle->bundleid(), link->name());
00094
00095 if (bundle->xmit_blocks()->find_blocks(link) != NULL) {
00096 log_err("BundleActions::queue_bundle: "
00097 "link not ready to handle bundle (block vector already exists), "
00098 "dropping send request");
00099 return false;
00100 }
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 log_debug("trying to create xmit blocks for bundle id:%d on link %s",
00111 bundle->bundleid(), link->name());
00112 BlockInfoVec* blocks = BundleProtocol::prepare_blocks(bundle, link);
00113 size_t total_len = BundleProtocol::generate_blocks(bundle, blocks, link);
00114
00115 log_debug("queue bundle *%p on %s link %s (%s) (total len %zu)",
00116 bundle, link->type_str(), link->name(), link->nexthop(),
00117 total_len);
00118
00119 ForwardingInfo::state_t state = bundle->fwdlog()->get_latest_entry(link);
00120 if (state == ForwardingInfo::QUEUED) {
00121 log_err("queue bundle *%p on %s link %s (%s): "
00122 "already queued or in flight",
00123 bundle, link->type_str(), link->name(), link->nexthop());
00124 return false;
00125 }
00126
00127 if ((link->params().mtu_ != 0) && (total_len > link->params().mtu_)) {
00128 log_err("queue bundle *%p on %s link %s (%s): length %zu > mtu %u",
00129 bundle, link->type_str(), link->name(), link->nexthop(),
00130 total_len, link->params().mtu_);
00131 return false;
00132 }
00133
00134
00135
00136 if (link->queue()->contains(bundle))
00137 {
00138 log_err("queue bundle *%p on link *%p: already queued on link",
00139 bundle, link.object());
00140 return false;
00141 }
00142
00143 if (link->inflight()->contains(bundle))
00144 {
00145 log_err("queue bundle *%p on link *%p: already in flight on link",
00146 bundle, link.object());
00147 return false;
00148 }
00149
00150 log_debug("adding QUEUED forward log entry for %s link %s "
00151 "with nexthop %s and remote eid %s to *%p",
00152 link->type_str(), link->name(),
00153 link->nexthop(), link->remote_eid().c_str(), bundle);
00154
00155 bundle->fwdlog()->add_entry(link, action, ForwardingInfo::QUEUED,
00156 custody_timer);
00157
00158 log_debug("adding *%p to link %s's queue (length %u)",
00159 bundle, link->name(), link->bundles_queued());
00160
00161 if (! link->add_to_queue(bref, total_len)) {
00162 log_err("error adding bundle *%p to link *%p queue",
00163 bundle, link.object());
00164 }
00165
00166
00167 link->clayer()->bundle_queued(link, bref);
00168
00169 return true;
00170 }
00171
00172
00173 void
00174 BundleActions::cancel_bundle(Bundle* bundle, const LinkRef& link)
00175 {
00176 BundleRef bref(bundle, "BundleActions::cancel_bundle");
00177
00178 ASSERT(link != NULL);
00179 if (link->isdeleted()) {
00180 log_debug("BundleActions::cancel_bundle: "
00181 "cannot cancel bundle on deleted link %s", link->name());
00182 return;
00183 }
00184
00185 log_debug("BundleActions::cancel_bundle: cancelling *%p on *%p",
00186 bundle, link.object());
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
00198 if (blocks == NULL) {
00199 log_warn("BundleActions::cancel_bundle: "
00200 "cancel *%p but no blocks queued or inflight on *%p",
00201 bundle, link.object());
00202 return;
00203 }
00204
00205 size_t total_len = BundleProtocol::total_length(blocks);
00206
00207 if (link->del_from_queue(bref, total_len)) {
00208 BundleDaemon::post(new BundleSendCancelledEvent(bundle, link));
00209
00210 } else if (link->inflight()->contains(bundle)) {
00211 link->clayer()->cancel_bundle(link, bref);
00212 }
00213 else {
00214 log_warn("BundleActions::cancel_bundle: "
00215 "cancel *%p but not queued or inflight on *%p",
00216 bundle, link.object());
00217 }
00218 }
00219
00220
00221 void
00222 BundleActions::inject_bundle(Bundle* bundle)
00223 {
00224 PANIC("XXX/demmer fix inject bundle");
00225
00226 log_debug("inject bundle *%p", bundle);
00227 BundleDaemon::instance()->pending_bundles()->push_back(bundle);
00228 store_add(bundle);
00229 }
00230
00231
00232 bool
00233 BundleActions::delete_bundle(Bundle* bundle,
00234 BundleProtocol::status_report_reason_t reason,
00235 bool log_on_error)
00236 {
00237 BundleRef bref(bundle, "BundleActions::delete_bundle");
00238
00239 log_debug("attempting to delete bundle *%p from data store", bundle);
00240 bool del = BundleDaemon::instance()->delete_bundle(bref, reason);
00241
00242 if (log_on_error && !del) {
00243 log_err("Failed to delete bundle *%p from data store", bundle);
00244 }
00245 return del;
00246 }
00247
00248
00249 void
00250 BundleActions::store_add(Bundle* bundle)
00251 {
00252 log_debug("adding bundle %d to data store", bundle->bundleid());
00253 bool added = BundleStore::instance()->add(bundle);
00254 if (! added) {
00255 log_crit("error adding bundle %d to data store!!", bundle->bundleid());
00256 }
00257 }
00258
00259
00260 void
00261 BundleActions::store_update(Bundle* bundle)
00262 {
00263 log_debug("updating bundle %d in data store", bundle->bundleid());
00264 bool updated = BundleStore::instance()->update(bundle);
00265 if (! updated) {
00266 log_crit("error updating bundle %d in data store!!", bundle->bundleid());
00267 }
00268 }
00269
00270
00271 void
00272 BundleActions::store_del(Bundle* bundle)
00273 {
00274 log_debug("removing bundle %d from data store", bundle->bundleid());
00275 bool removed = BundleStore::instance()->del(bundle);
00276 if (! removed) {
00277 log_crit("error removing bundle %d from data store!!",
00278 bundle->bundleid());
00279 }
00280 }
00281
00282 }