00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "Controller.h"
00018
00019 #define NEXT_INSTANCE \
00020 (++next_instance_ == 0) ? ++next_instance_ : next_instance_
00021
00022 #define LOG(_level, _args...) core_->print_log( \
00023 "controller", BundleCore::_level, _args )
00024
00025 namespace prophet
00026 {
00027
00028 Controller::Controller(BundleCore* core, Repository* repository,
00029 ProphetParams* params)
00030 : ExpirationHandler("controller"),
00031 core_(core),
00032 params_(params),
00033 max_route_(0),
00034 nodes_(core,"local",true),
00035 next_instance_(0),
00036 timeout_(params_->hello_interval_ * 100),
00037 repository_(repository)
00038 {
00039 if (core == NULL || params == NULL || repository == NULL)
00040
00041 {
00042 LOG(LOG_ERR,"constructor invoked with NULL parameter");
00043 return;
00044 }
00045
00046 if (repository_->get_comparator()->qp() != params_->qp())
00047 {
00048 QueueComp* qc = QueuePolicy::policy(params_->qp(), &stats_,
00049 &nodes_, params_->min_forward());
00050 repository_->set_comparator(qc);
00051 }
00052
00053
00054 alarm_ = core_->create_alarm(this,timeout_ * 100);
00055
00056
00057 max_route_ = params->max_table_size_;
00058 nodes_.set_max_route(max_route_);
00059
00060 LOG(LOG_DEBUG,"constructor");
00061 }
00062
00063 Controller::~Controller()
00064 {
00065 List::iterator i = list_.begin();
00066 while (i != list_.end())
00067 {
00068 delete *i;
00069 list_.erase(i++);
00070 }
00071 }
00072
00073 bool
00074 Controller::find(const Link* link, List::iterator& i)
00075 {
00076 if (link == NULL) return false;
00077
00078 i = list_.begin();
00079 while (i != list_.end())
00080 if ((*i)->nexthop() == link)
00081 break;
00082 else
00083 i++;
00084 return (i != list_.end());
00085 }
00086
00087 bool
00088 Controller::is_prophet_control(const Bundle* b) const
00089 {
00090 if (b == NULL) return false;
00091
00092 std::string str = b->destination_id();
00093 return (str.rfind("/prophet") ==
00094 str.length() - strlen("/prophet"));
00095 }
00096
00097 void
00098 Controller::new_neighbor(const Link* link)
00099 {
00100 if (link == NULL) return;
00101
00102 LOG(LOG_DEBUG,"new_neighbor(%s)",link->remote_eid());
00103
00104
00105 List::iterator i;
00106 if (find(link,i))
00107 {
00108 if ((*i)->neighbor_gone())
00109 {
00110 LOG(LOG_INFO,"encounter-%d: neighbor gone",
00111 (*i)->local_instance());
00112 delete *i;
00113 list_.erase(i);
00114 }
00115 else
00116 {
00117 LOG(LOG_DEBUG,"session exists (%s)",(*i)->name());
00118 return;
00119 }
00120 }
00121
00122
00123 Encounter* e = new Encounter(link,this,NEXT_INSTANCE);
00124 if (e->neighbor_gone())
00125 {
00126 LOG(LOG_INFO,"encounter-%d: neighbor gone",
00127 e->local_instance());
00128 delete e;
00129 }
00130 else
00131 list_.push_back(e);
00132 }
00133
00134 void
00135 Controller::neighbor_gone(const Link* link)
00136 {
00137 if (link == NULL) return;
00138
00139 LOG(LOG_DEBUG,"neighbor_gone(%s)",link->remote_eid());
00140
00141
00142 List::iterator i;
00143 if (find(link,i))
00144 {
00145 Encounter* e = (*i);
00146 list_.erase(i);
00147 delete e;
00148 }
00149 }
00150
00151 bool
00152 Controller::accept_bundle(const Bundle* b)
00153 {
00154 if (b == NULL) return false;
00155
00156 LOG(LOG_DEBUG,"accept_bundle(%u)",b->sequence_num());
00157
00158
00159 if (params_->relay_node() == false)
00160 {
00161 std::string eid(core_->local_eid());
00162
00163
00164 if (!core_->is_route(eid,b->destination_id()) &&
00165 !core_->is_route(eid,b->source_id()))
00166 {
00167 return false;
00168 }
00169 }
00170
00171
00172 return true;
00173 }
00174
00175 void
00176 Controller::handle_bundle_received(const Bundle* b, const Link* link)
00177 {
00178 if (b == NULL) return;
00179
00180 LOG(LOG_DEBUG,"handle_bundle_received(%d,%s)",
00181 b->sequence_num(), (link == NULL) ? "NULL" : link->remote_eid());
00182
00183 if (link == NULL)
00184 return;
00185
00186 Encounter* e = NULL;
00187 List::iterator i;
00188
00189 if (find(link,i))
00190 {
00191 e = *i;
00192 if (e->neighbor_gone())
00193 {
00194 LOG(LOG_INFO,"encounter-%d: neighbor gone",
00195 e->local_instance());
00196 delete e;
00197 list_.erase(i);
00198 e = NULL;
00199 }
00200 }
00201 if (e == NULL)
00202 {
00203 LOG(LOG_INFO,"unable to find session to assign bundle from %s",
00204 b->source_id().c_str());
00205 return;
00206 }
00207
00208 if (is_prophet_control(b))
00209 {
00210
00211 size_t len = b->size();
00212 u_char* buf = new u_char[len];
00213 if (!core_->read_bundle(b,buf,len))
00214 {
00215 delete [] buf;
00216
00217 LOG(LOG_ERR,"BundleCore failed to read bundle buffer");
00218 core_->drop_bundle(b);
00219 return;
00220 }
00221
00222 ProphetTLV* tlv = ProphetTLV::deserialize(b->source_id(),
00223 b->destination_id(), buf, len);
00224
00225 delete [] buf;
00226
00227 if (tlv == NULL)
00228 {
00229
00230 LOG(LOG_ERR,"ProphetTLV failed to deserialize");
00231 core_->drop_bundle(b);
00232 return;
00233 }
00234
00235 e->receive_tlv(tlv);
00236 if (e->neighbor_gone())
00237 {
00238 LOG(LOG_INFO,"encounter-%d: neighbor gone",
00239 e->local_instance());
00240 delete e;
00241 list_.erase(i);
00242 }
00243
00244
00245 core_->drop_bundle(b);
00246 }
00247 else
00248 {
00249
00250 e->handle_bundle_received(b);
00251 }
00252 }
00253
00254 void
00255 Controller::handle_bundle_transmitted(const Bundle* b, const Link* l)
00256 {
00257 if (b == NULL) return;
00258 if (l == NULL) return;
00259 if (is_prophet_control(b))
00260 {
00261 core()->drop_bundle(b);
00262 return;
00263 }
00264 stats_.update_stats(b,nodes_.p_value(l->remote_eid()));
00265 repository_->change_priority(b);
00266 }
00267
00268 void
00269 Controller::ack(const Bundle* b)
00270 {
00271 if (b == NULL || is_prophet_control(b)) return;
00272
00273 LOG(LOG_DEBUG,"ack(%u)", b->sequence_num());
00274
00275 Oracle::ack(b);
00276 }
00277
00278 void
00279 Controller::set_queue_policy()
00280 {
00281
00282 QueueComp* qc = const_cast<QueueComp*>(repository_->get_comparator());
00283
00284 if (qc->qp() != params_->qp() || params_->min_forward() != qc->min_fwd_)
00285 {
00286 LOG(LOG_DEBUG,"set_queue_policy (%s -> %s)",
00287 QueuePolicy::qp_to_str(qc->qp()),
00288 QueuePolicy::qp_to_str(params_->qp()));
00289
00290 qc = QueuePolicy::policy(params_->qp(), &stats_, &nodes_,
00291 params_->min_forward());
00292
00293 repository_->set_comparator(qc);
00294 }
00295 }
00296
00297 void
00298 Controller::set_hello_interval()
00299 {
00300 if (timeout_ != ((u_int)(params_->hello_interval_ * 100)))
00301 {
00302 LOG(LOG_DEBUG,"set_hello_interval (%u -> %u)",
00303 timeout_/100, params_->hello_interval_);
00304
00305 timeout_ = params_->hello_interval_ * 100;
00306
00307 for (List::iterator i = list_.begin(); i != list_.end(); i++)
00308 (*i)->hello_interval_changed();
00309 }
00310 }
00311
00312 void
00313 Controller::set_max_route()
00314 {
00315 if (max_route_ != ((u_int)(params_->max_table_size_)))
00316 {
00317 LOG(LOG_DEBUG,"set_max_route (%u -> %u)",
00318 max_route_, params_->max_table_size_);
00319 nodes_.set_max_route(max_route_);
00320 }
00321 }
00322
00323 void
00324 Controller::handle_timeout()
00325 {
00326 LOG(LOG_DEBUG,"handle_timeout");
00327
00328 acks_.expire();
00329 nodes_.age_nodes();
00330 nodes_.truncate(params_->epsilon());
00331
00332
00333 alarm_ = core_->create_alarm(this,timeout_ * 100);
00334
00335 List::iterator i = list_.begin();
00336 while (i != list_.end())
00337 {
00338 if ((*i)->neighbor_gone())
00339 {
00340 LOG(LOG_INFO,"encounter-%d: neighbor gone",
00341 (*i)->local_instance());
00342 delete *i;
00343 list_.erase(i++);
00344 }
00345 else
00346 i++;
00347 }
00348 }
00349
00350 void
00351 Controller::shutdown()
00352 {
00353 LOG(LOG_DEBUG,"shutdown");
00354 if (alarm_ != NULL && alarm_->pending())
00355
00356 alarm_->cancel();
00357 while (! list_.empty())
00358 {
00359 delete list_.front();
00360 list_.pop_front();
00361 }
00362 }
00363
00364 };