libzypp  17.35.14
providequeue.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 \---------------------------------------------------------------------*/
9 
10 #include "private/providequeue_p.h"
11 #include "private/provideitem_p.h"
12 #include "private/provide_p.h"
13 #include "private/providedbg_p.h"
14 
15 #include <zypp-core/fs/PathInfo.h>
17 #include <zypp-core/base/StringV.h>
19 #include <zypp-media/MediaException>
20 #include <zypp-media/auth/CredentialManager>
21 
22 #include <zypp-core/Globals.h>
23 #include <bitset>
24 
25 namespace zyppng {
26 
28  {
29  if ( !_request )
30  return false;
31  return ( _request->code () == ProvideMessage::Code::Attach );
32  }
33 
35  {
36  if ( !_request )
37  return false;
38  return ( _request->code () == ProvideMessage::Code::Prov );
39  }
40 
42  {
43  if ( !_request )
44  return false;
45  return ( _request->code () == ProvideMessage::Code::Detach );
46  }
47 
49  { }
50 
52  {
54  if ( this->_activeItems.size() || this->_waitQueue.size() ) {
55  DBG << "Queue shutdown with Items still running" << std::endl;
56  }
57  }
58  immediateShutdown(std::make_exception_ptr(zypp::media::MediaException("Cancelled by queue shutdown")));
59  }
60 
61  bool ProvideQueue::startup(const std::string &workerScheme, const zypp::filesystem::Pathname &workDir, const std::string &hostname ) {
62 
63  if ( _workerProc ) {
64  ERR << "Queue Worker was already initialized" << std::endl;
65  return true;
66  }
67 
69 
70  const auto &pN = _parent.workerPath() / ( "zypp-media-"+workerScheme ) ;
71  MIL << "Trying to start " << pN << std::endl;
72  const auto &pi = zypp::PathInfo( pN );
73  if ( !pi.isExist() ) {
74  ERR << "Failed to find worker for " << workerScheme << std::endl;
75  return false;
76  }
77 
78  if ( !pi.userMayX() ) {
79  ERR << "Failed to start worker for " << workerScheme << " binary " << pi.asString() << " is not executable." << std::endl;
80  return false;
81  }
82 
83  if ( zypp::filesystem::assert_dir( workDir ) != 0 ) {
84  ERR << "Failed to assert working directory '" << workDir << "' for worker " << workerScheme << std::endl;
85  return false;
86  }
87 
88  _currentExe = pN;
90  _workerProc->setWorkingDirectory ( workDir );
92  return doStartup();
93  }
94 
95 
96  void ProvideQueue::enqueue( ProvideRequestRef request )
97  {
98  Item i;
99  i._request = request;
100  i._request->provideMessage().setRequestId( nextRequestId() );
101  request->setCurrentQueue( shared_this<ProvideQueue>() );
102  _waitQueue.push_back( std::move(i) );
103  if ( _parent.isRunning() )
104  scheduleNext();
105  }
106 
107  void ProvideQueue::cancel( ProvideRequest *item , std::exception_ptr error )
108  {
109  const auto &isSameItem = [item]( const Item &i ){
110  if ( i.isDetachRequest () )
111  return false;
112  return i._request.get() == item;
113  };
114 
115  if ( !item )
116  return;
117 
118  if ( item->code() != ProvideMessage::Code::Attach
119  && item->code() != ProvideMessage::Code::Prov ) {
120  ERR << "Can not cancel a " << item->code() << " request!" << std::endl;
121  return;
122  }
123 
124  if ( auto i = std::find_if( _waitQueue.begin(), _waitQueue.end(), isSameItem ); i != _waitQueue.end() ) {
125  auto &reqRef = i->_request;
126  reqRef->setCurrentQueue(nullptr);
127  if ( reqRef->owner() )
128  reqRef->owner()->finishReq( this, reqRef, error );
129  _waitQueue.erase(i);
130  _parent.schedule( ProvidePrivate::FinishReq ); // let the parent scheduler run since we have a open spot now
131  } else if ( auto i = std::find_if( _activeItems.begin(), _activeItems.end(), isSameItem ); i != _activeItems.end() ) {
132  cancelActiveItem(i, error);
133  }
134  }
135 
136  std::list<ProvideQueue::Item>::iterator ProvideQueue::dequeueActive( std::list<Item>::iterator it )
137  {
138  if ( it == _activeItems.end() )
139  return it;
140 
141  if ( it->_request )
142  it->_request->setCurrentQueue( nullptr );
143 
144  auto i = _activeItems.erase(it);
145  _parent.schedule ( ProvidePrivate::FinishReq ); // Trigger the scheduler
146  scheduleNext (); // keep the active items full
147  return i;
148  }
149 
150  void ProvideQueue::fatalWorkerError( const std::exception_ptr &reason )
151  {
152  immediateShutdown( reason ? reason : std::make_exception_ptr( zypp::media::MediaException("Fatal worker error")) );
153  }
154 
155  void ProvideQueue::immediateShutdown( const std::exception_ptr &reason )
156  {
157  _queueShuttingDown = true;
158 
159  while ( _waitQueue.size() ) {
160  auto &item = _waitQueue.front();
161  auto &reqRef = item._request;
162  if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
163  reqRef->owner()->finishReq( this, reqRef, reason );
164  _waitQueue.pop_front();
165  }
166 
167  for ( auto i = _activeItems.begin(); i != _activeItems.end(); ) {
168  auto &reqRef = i->_request;
169  if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
170  i = cancelActiveItem(i, reason );
171  } else {
172  i++;
173  }
174  }
175 
176  if ( _workerProc && _workerProc->isRunning() ) {
177  _workerProc->flush();
178  _workerProc->closeWriteChannel();
179  _workerProc->waitForExit();
180  readAllStderr();
181  }
182  }
183 
184  std::list< ProvideQueue::Item >::iterator ProvideQueue::cancelActiveItem( std::list< Item >::iterator i , const std::__exception_ptr::exception_ptr &error )
185  {
186  auto &reqRef = i->_request;
187 
188  // already in cancelling process or finished
189  if ( i->_state == Item::Cancelling || i->_state == Item::Finished )
190  return (++i);
191 
192  // not possible but lets be safe
193  if ( i->_state == Item::Pending ) {
194  reqRef->setCurrentQueue(nullptr);
195  if ( reqRef->owner() )
196  reqRef->owner()->finishReq( this, reqRef, error );
197  return dequeueActive(i);
198  }
199 
200  // we first need to cancel the item
201  auto c = ProvideMessage::createCancel ( i->_request->provideMessage().requestId() );
202  if( !_messageStream->sendMessage(c) )
203  ERR << "Failed to send cancel message to worker" << std::endl;
204 
205  i->_state = Item::Cancelling;
206  reqRef->setCurrentQueue(nullptr);
207  if ( reqRef->owner() )
208  reqRef->owner()->finishReq( this, reqRef, error );
209  reqRef.reset();
210  return (++i);
211  }
212 
214  {
215  if ( _queueShuttingDown )
216  return;
217 
218  while ( _waitQueue.size() && canScheduleMore() ) {
219  auto item = std::move( _waitQueue.front() );
220  _waitQueue.pop_front();
221 
222  auto &reqRef = item._request;
223  if ( !reqRef->activeUrl() ) {
224  ERR << "Item without active URL enqueued, this is a BUG." << std::endl;
225  if ( reqRef->owner() )
226  reqRef->owner()->finishReq( this, reqRef, ZYPP_EXCPT_PTR (zypp::media::MediaException("Item needs a activeURL to be queued.")) );
227  continue;
228  }
229 
230  if ( !_messageStream->sendMessage( reqRef->provideMessage() ) ) {
231  ERR << "Failed to send message to worker process." << std::endl;
232  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
233  return;
234  }
235 
236  item._state = Item::Queued;
237  _activeItems.push_back( std::move(item) );
238  _idleSince.reset();
239  }
240 
241  if ( _waitQueue.empty() && _activeItems.empty() ) {
243  if ( !_idleSince )
244  _idleSince = std::chrono::steady_clock::now();
245  _sigIdle.emit();
246  }
247  }
248 
250  {
252  }
253 
254  bool ProvideQueue::isIdle() const
255  {
256  return ( empty() );
257  }
258 
259  std::optional<ProvideQueue::TimePoint> ProvideQueue::idleSince() const
260  {
261  return _idleSince;
262  }
263 
264  bool ProvideQueue::empty() const
265  {
266  return ( _activeItems.empty() && _waitQueue.empty() );
267  }
268 
270  {
271  return _activeItems.size() + _waitQueue.size();
272  }
273 
275  {
276  return _activeItems.size();
277  }
278 
280  {
281  zypp::ByteCount dlSize;
282  for ( const auto &i : _waitQueue ) {
283  if ( i.isDetachRequest () )
284  continue;
285 
286  auto &reqRef = i._request;
287  if ( reqRef->code() != ProvideMessage::Code::Prov )
288  continue;
289  dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
290  }
291  for ( const auto &i : _activeItems ) {
292  if ( i.isDetachRequest () )
293  continue;
294  auto &reqRef = i._request;
295  if ( reqRef->code() != ProvideMessage::Code::Prov )
296  continue;
297  dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
298  }
299  return dlSize;
300  }
301 
302  const std::string &ProvideQueue::hostname() const
303  {
304  return _myHostname;
305  }
306 
308  {
309  return _capabilities;
310  }
311 
313  {
314  return _sigIdle;
315  }
316 
318  {
319  if ( _currentExe.empty() )
320  return false;
321 
322  //const char *argv[] = { "gdbserver", ":10000", _currentExe.c_str(), nullptr };
323  const char *argv[] = { _currentExe.c_str(), nullptr };
324  if ( !_workerProc->start( argv) ) {
325  ERR << "Failed to execute worker" << std::endl;
326 
327  _messageStream.reset ();
328  _workerProc.reset ();
329 
330  return false;
331  }
332 
333  // make sure the default read channel is StdOut so RpcMessageStream gets all the rpc messages
334  _workerProc->setReadChannel ( Process::StdOut );
335 
336  // we are ready to send the data
337 
339  // @TODO actually write real config data :D
340  conf.insert ( { AGENT_STRING_CONF.data (), "ZYpp " LIBZYPP_VERSION_STRING } );
341  conf.insert ( { ATTACH_POINT.data (), _workerProc->workingDirectory().asString() } );
342  conf.insert ( { PROVIDER_ROOT.data (), _parent.z_func()->providerWorkdir().asString() } );
343 
344  const auto &cleanupOnErr = [&](){
345  readAllStderr();
346  _messageStream.reset ();
347  _workerProc->close();
348  _workerProc.reset();
349  return false;
350  };
351 
352  if ( !_messageStream->sendMessage( conf ) ) {
353  ERR << "Failed to send initial message to queue worker" << std::endl;
354  return cleanupOnErr();
355  }
356 
357  // wait for the data to be written
358  _workerProc->flush ();
359 
360  // wait until we receive a message
361  const auto &caps = _messageStream->nextMessageWait();
362  if ( !caps || caps->command() != WorkerCaps::typeName ) {
363  ERR << "Worker did not sent a capabilities message, aborting" << std::endl;
364  return cleanupOnErr();
365  }
366 
367  {
368  auto p = _messageStream->parseMessage<WorkerCaps>( *caps );
369  if ( !p )
370  return cleanupOnErr();
371 
372  _capabilities = std::move(*p);
373  }
374 
375  DBG << "Received config for worker: " << this->_currentExe.asString() << " Worker Type: " << this->_capabilities.worker_type() << " Flags: " << std::bitset<32>( _capabilities.cfg_flags() ).to_string() << std::endl;
376 
377  // now we can set up signals and start processing messages
381 
382  // make sure we do not miss messages
383  processMessage();
384  return true;
385  }
386 
388 
389  const auto &getRequest = [&]( const auto &exp ) -> decltype(_activeItems)::iterator {
390  if ( !exp ) {
391  ERR << "Ignoring invalid request!" << std::endl;
392  return _activeItems.end();
393  }
394 
395  auto i = std::find_if( _activeItems.begin(), _activeItems.end(), [&]( const auto &elem ) {
396  if ( ! elem._request )
397  return false;
398  return exp->requestId() == elem._request->provideMessage().requestId();
399  });
400 
401  if ( i == _activeItems.end() ) {
402  ERR << "Ignoring unknown request ID: " << exp->requestId() << std::endl;
403  return _activeItems.end();
404  }
405 
406  return i;
407  };
408 
409  const auto &sendErrorToWorker = [&]( const uint32_t reqId, const MessageCodes code, const std::string &reason, bool transient = false ) {
410  auto r = ProvideMessage::createErrorResponse ( reqId, code, reason, transient );
411  if ( !_messageStream->sendMessage( r ) ) {
412  ERR << "Failed to send Error message to worker process." << std::endl;
413  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
414  return false;
415  }
416  return true;
417  };
418 
419  const bool doesDownload = this->_capabilities.worker_type() == Config::Downloading;
420  const bool fileNeedsCleanup = doesDownload || ( _capabilities.worker_type() == Config::CPUBound && _capabilities.cfg_flags() & Config::FileArtifacts );
421 
422  while ( auto msg = _messageStream->nextMessage () ) {
423 
424  if ( msg->command() == ProvideMessage::typeName ) {
425 
426  const auto &provMsg = ProvideMessage::create(*msg);
427  if ( !provMsg ) {
428  fatalWorkerError( provMsg.error() );
429  return;
430  }
431 
432  const auto &reqIter = getRequest( provMsg );
433  if ( reqIter == _activeItems.end() ) {
434  if ( provMsg->code() == ProvideMessage::Code::ProvideFinished && fileNeedsCleanup ) {
435  const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
436  if ( !_parent.isInCache(locFName) ) {
437  MIL << "Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
438  zypp::filesystem::unlink( locFName );
439  }
440  }
441  continue;
442  }
443 
444  auto &req = *reqIter;
445  auto &reqRef =req._request;
446 
447  const auto code = provMsg->code();
448 
450 
451  // send the message to the item but don't dequeue
452  if ( reqRef && reqRef->owner() )
453  reqRef->owner()->informalMessage ( *this, reqRef, *provMsg );
454  continue;
455 
457 
458  if ( req._state == Item::Cancelling ) {
459  req._state = Item::Finished;
460  dequeueActive( reqIter );
461  continue;
462  }
463 
465 
466  // we are going to register the file to the cache if this is a downloading worker, so it can not leak
467  // no matter if the item does the correct dance or not, this code is duplicated by all ProvideItems that receive ProvideFinished
468  // results that require file cleanups.
469  // we keep the ref around until after sending the result to the item. At that point it should take a reference
470  std::optional<zypp::ManagedFile> dataRef;
471 
472  if ( !reqIter->isFileRequest() ) {
473  ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
475  return;
476  }
477 
478  const auto &isCheckExistsOnlyVal = reqRef->provideMessage().value( ProvideMsgFields::CheckExistOnly );
479  bool isCheckExistsOnly = isCheckExistsOnlyVal.valid() ? isCheckExistsOnlyVal.asBool() : false;
480 
481  // when a worker is downloading we keep a internal book of cache files
482  if ( doesDownload && !isCheckExistsOnly ) {
483  const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
484  if ( provMsg->value( ProvideFinishedMsgFields::CacheHit, false ).asBool()) {
485  dataRef = _parent.addToFileCache ( locFName );
486  if ( !dataRef ) {
487  MIL << "CACHE MISS, file " << locFName << " was already removed, queueing again" << std::endl;
488  if ( reqRef->owner() )
489  reqRef->owner()->cacheMiss( reqRef );
490  reqRef->provideMessage().setRequestId( InvalidId );
491  req._state = Item::Pending;
492  _waitQueue.push_front( req );
493  dequeueActive( reqIter );
494  continue;
495  }
496  } else {
497 
498  dataRef = _parent.addToFileCache ( locFName );
499 
500  // unlikely this can happen but better be safe than sorry
501  if ( !dataRef ) {
502  req._state = Item::Finished;
503  reqRef->setCurrentQueue(nullptr);
504  auto resp = ProvideMessage::createErrorResponse ( provMsg->requestId(), ProvideMessage::Code::InternalError, "File vanished between downloading and adding it to cache." );
505  if ( reqRef->owner() )
506  reqRef->owner()->finishReq( *this, reqRef, resp );
507  dequeueActive( reqIter );
508  continue;
509  }
510  }
511  }
512  }
513 
514  // send the message to the item and dequeue
515  reqRef->setCurrentQueue(nullptr);
516  if ( reqRef->owner() )
517  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
518  req._state = Item::Finished;
519  dequeueActive( reqIter );
520  continue;
521 
523 
524  if ( req._state == Item::Cancelling ) {
525  req._state = Item::Finished;
526  dequeueActive( reqIter );
527  continue;
528  }
529 
530  // send the message to the item and dequeue
531  reqRef->setCurrentQueue(nullptr);
532 
533  if ( reqRef->owner() )
534  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
535 
536  req._state = Item::Finished;
537  dequeueActive( reqIter );
538  continue;
539 
541 
542  // redir is like a finished message, we can simply forgot about a cancelling request
543  if ( req._state == Item::Cancelling ) {
544  req._state = Item::Finished;
545  dequeueActive( reqIter );
546  continue;
547  }
548 
549  // send the message to the item and dequeue
550  reqRef->setCurrentQueue(nullptr);
551  if ( reqRef->owner() )
552  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
553  req._state = Item::Finished;
554  dequeueActive( reqIter );
555  continue;
556 
558 
559  ERR << "Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
560  fatalWorkerError ( ZYPP_EXCPT_PTR( zypp::media::MediaException("Controller message received from worker.") ) );
561  return;
562 
564 
566  if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
567  ERR << "Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
569  return;
570  }
571 
572  // if the file was cancelled we send a failure back
573  if( reqIter->_state == Item::Cancelling ) {
574  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item was cancelled") )
575  return;
576  continue;
577  }
578 
579  // we need a owner item to fetch the auth data for us
580  if ( !reqRef->owner() ) {
581  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Request has no owner" ) )
582  return;
583  continue;
584  }
585 
586  if ( !reqRef->activeUrl() ) {
587  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item has no active URL, this is a bug." ) )
588  return;
589  continue;
590  }
591 
592  try {
593  zypp::Url u( provMsg->value( AuthDataRequestMsgFields::EffectiveUrl ).asString() );
594 
595  std::map<std::string, std::string> extraVals;
596  for( const auto &hdr : provMsg->headers() ) {
597 
600  continue;
601 
602  if ( !hdr.second.isString() ) {
603  WAR << "Ignoring non string value for " << hdr.first << std::endl;
604  continue;
605  }
606 
607  extraVals[hdr.first] = hdr.second.asString();
608  }
609 
610  const auto &authOpt = reqRef->owner()->authenticationRequired( *this, reqRef, u, provMsg->value( AuthDataRequestMsgFields::LastAuthTimestamp ).asInt64(), extraVals );
611  if ( !authOpt ) {
612  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "No auth given by user." ) )
613  return;
614  continue;
615  }
616 
617  auto r = ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
618  if ( !_messageStream->sendMessage( r ) ) {
619  ERR << "Failed to send AuthorizationInfo to worker process." << std::endl;
620  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
621  return;
622  }
623  continue;
624 
625  } catch ( const zypp::Exception &e ) {
626  ZYPP_CAUGHT(e);
627  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, e.asString() ) )
628  return;
629  continue;
630  }
631 
632  } else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
633 
634  if ( !reqIter->isAttachRequest() ) {
635  ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
637  return;
638  }
639 
640  // if the file was cancelled we send a failure back
641  if( reqIter->_state == Item::Cancelling ) {
642  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Item was cancelled" ) )
643  return;
644  continue;
645  }
646 
647  MIL << "Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
648 
649  //const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc
650  std::vector<std::string> freeDevs;
651  for ( const auto &val : provMsg->values( MediaChangeRequestMsgFields::Device) ) {
652  freeDevs.push_back( val.asString() );
653  }
654 
655  std::optional<std::string> desc;
656  const auto &descVal = provMsg->value( MediaChangeRequestMsgFields::Desc );
657  if ( descVal.valid () && descVal.isString() )
658  desc = descVal.asString();
659 
660  auto res = _parent._sigMediaChange.emit(
661  _parent.queueName(*this),
662  provMsg->value( MediaChangeRequestMsgFields::Label ).asString(),
663  provMsg->value( MediaChangeRequestMsgFields::MediaNr ).asInt(),
664  freeDevs,
665  desc
666  );
667 
668  auto action = res ? *res : Provide::Action::ABORT;
669  switch ( action ) {
670  case Provide::Action::RETRY: {
671  MIL << "Sending back a MediaChanged message, retrying to find medium " << std::endl;
672  auto r = ProvideMessage::createMediaChanged ( reqIter->_request->provideMessage().requestId() );
673  if ( !_messageStream->sendMessage( r ) ){
674  ERR << "Failed to send MediaChanged to worker process." << std::endl;
675  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
676  return;
677  }
678  continue;
679  }
680  case Provide::Action::ABORT: {
681  MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
682  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Cancelled by User" ) )
683  return;
684  continue;
685  }
686  case Provide::Action::SKIP: {
687  MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
688  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip, "Skipped by User" ) )
689  return;
690  continue;
691  }
692  }
693  } else {
694  // if there is a unsupported worker request we need to stop immediately because the worker will be blocked until it gets a answer
695  ERR << "Unsupported worker request: "<<code<<", this is a fatal error!" << std::endl;
697  return;
698  }
699 
700  } else {
701  // unknown code
702  ERR << "Received unsupported message " << msg->command() << " with code " << code << " ignoring! " << std::endl;
703  }
704 
705  } else {
706  ERR << "Received unsupported message " << msg->command() << "ignoring" << std::endl;
707  }
708  }
709  }
710 
716  {
717  // read all stderr data so we get the full logs
718  auto ba = _workerProc->channelReadLine(Process::StdErr);
719  while ( !ba.empty() ) {
720  forwardToLog(std::string( ba.data(), ba.size() ) );
721  ba = _workerProc->channelReadLine(Process::StdErr);
722  }
723  }
724 
725  void ProvideQueue::forwardToLog( std::string &&logLine )
726  {
728  zypp::base::LogControl::instance ().logRawLine( std::move(logLine) );
729  else
730  MIL << "Message from worker: " << _capabilities.worker_name() << ":" << logLine << std::endl;
731  }
732 
733  void ProvideQueue::processReadyRead(int channel) {
734  // ignore stdout here
735  if ( channel == Process::StdOut )
736  return;
737 
738  // forward the stderr output to the log bypassing the formatter
739  // the worker already formatted the line
740  while ( _workerProc->canReadLine(Process::StdErr) ) {
741  const auto &data = _workerProc->channelReadLine( Process::StdErr );
742  if ( data.empty() )
743  return;
744 
745  forwardToLog(std::string( data.data(), data.size() ) );
746  }
747  }
748 
749  void ProvideQueue::procFinished(int exitCode)
750  {
751  // process all pending messages
752  processMessage();
753 
754  // get all of the log lines
755  readAllStderr();
756 
757  // shut down
758  // @todo implement worker restart in case of a unexpected exit
759  if ( !_queueShuttingDown )
760  immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
761 
762 #if 0
763  if ( !_queueShuttingDown ) {
764 
765  _crashCounter++;
766  if ( _crashCounter > 3 ) {
767  immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
768  return;
769  }
770 
771  MIL << "Unexpected queue worker exit with code: " << exitCode << std::endl;
772  // try to spawn the worker again, move active items back to wait list and start over
773 
774  if ( !doStartup () ) {
775 
776  }
777  }
778 #endif
779  }
780 
782  return _parent.nextRequestId();
783  }
784 }
std::list< Item > _activeItems
#define MIL
Definition: Logger.h:100
int assert_dir(const Pathname &path, unsigned mode)
Like &#39;mkdir -p&#39;.
Definition: PathInfo.cc:324
static Ptr create(IODevice::Ptr iostr)
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition: provide.cc:759
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) > _sigMediaChange
Definition: provide_p.h:96
SignalProxy< void(int)> sigFinished()
Definition: process.cpp:294
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
Store and operate with byte count.
Definition: ByteCount.h:31
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
const zypp::Pathname & workerPath() const
Definition: provide.cc:859
const char * c_str() const
String representation.
Definition: Pathname.h:112
static ProvideMessage createMediaChanged(const uint32_t reqId)
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:96
bool provideDebugEnabled()
Definition: providedbg_p.h:28
static ProvideMessage createErrorResponse(const uint32_t reqId, const Code code, const std::string &reason, bool transient=false)
uint32_t nextRequestId()
Definition: provide.cc:946
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
Provides API related macros.
bool canScheduleMore() const
std::optional< TimePoint > idleSince() const
StompFrameStreamRef _messageStream
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:428
static constexpr uint32_t InvalidId
ProvideRequestRef _request
constexpr std::string_view Label("label")
#define ERR
Definition: Logger.h:102
Signal< void()> _sigIdle
void cancel(ProvideRequest *item, std::exception_ptr error)
SignalProxy< void()> sigMessageReceived()
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition: provide.cc:779
constexpr std::string_view CheckExistOnly("check_existance_only")
WeakPtr parent() const
Definition: base.cc:26
bool empty() const
Test for an empty path.
Definition: Pathname.h:116
uint32_t nextRequestId()
zypp::ByteCount expectedProvideSize() const
static expected< ProvideMessage > create(const zypp::PluginFrame &message)
void immediateShutdown(const std::exception_ptr &reason)
constexpr std::string_view LocalFilename("local_filename")
uint activeRequests() const
static Ptr create()
Definition: process.cpp:49
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
const std::string & asString() const
String representation.
Definition: Pathname.h:93
const Config & workerConfig() const
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:111
void procFinished(int exitCode)
Just inherits Exception to separate media exceptions.
constexpr std::string_view Device("device")
#define WAR
Definition: Logger.h:101
constexpr std::string_view MediaNr("media_nr")
bool isRunning() const
Definition: provide.cc:873
void forwardToLog(std::string &&logLine)
std::optional< TimePoint > _idleSince
ProvideQueue(ProvidePrivate &parent)
Definition: providequeue.cc:48
ProvidePrivate & _parent
void schedule(ScheduleReason reason)
Definition: provide.cc:38
WorkerType worker_type() const
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:864
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:705
constexpr std::string_view EffectiveUrl("effective_url")
static constexpr std::string_view typeName
Process::Ptr _workerProc
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:61
const std::string & worker_name() const
SignalProxy< void()> sigIdle()
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition: Exception.h:440
SignalProxy< void(uint)> sigChannelReadyRead()
Definition: iodevice.cc:373
zypp::Pathname _currentExe
Base class for Exception.
Definition: Exception.h:146
static ProvideMessage createCancel(const uint32_t reqId)
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
Definition: LogControl.cc:935
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
Wrapper class for ::stat/::lstat.
Definition: PathInfo.h:221
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
Flags cfg_flags() const
static constexpr std::string_view typeName
uint requestCount() const
std::deque< Item > _waitQueue
constexpr std::string_view Desc("desc")
constexpr std::string_view CacheHit("cacheHit")
void processReadyRead(int channel)
Url manipulation class.
Definition: Url.h:91
#define DBG
Definition: Logger.h:99
constexpr std::string_view ExpectedFilesize("expected_filesize")
const std::string & hostname() const