libzypp  17.35.12
provideworker.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 \---------------------------------------------------------------------*/
9 
10 #include "provideworker.h"
11 #include <zypp-core/base/DtorReset>
12 #include <zypp-core/AutoDispose.h>
13 #include <zypp-core/Url.h>
14 #include <zypp-core/Date.h>
15 #include <zypp-core/zyppng/pipelines/AsyncResult>
17 #include <zypp-core/fs/PathInfo.h>
18 #include <zypp-core/fs/TmpPath.h>
20 #include <zypp-core/zyppng/base/AutoDisconnect>
21 #include <zypp-core/zyppng/base/EventDispatcher>
22 #include <zypp-media/MediaConfig>
23 #include <ostream>
24 
26 
27 #undef ZYPP_BASE_LOGGER_LOGGROUP
28 #define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
29 
30 namespace zyppng::worker {
31 
32  using namespace zyppng::operators;
33 
34  RequestCancelException::RequestCancelException() : zypp::media::MediaException ("Request was cancelled")
35  { }
36 
37  ProvideWorker::ProvideWorker(std::string_view workerName) : _workerName(workerName)
38  {
39  // do not change the order of these calls, otherwise showing the threadname does not work
40  // enableLogForwardingMode will initialize the log which would override the current thread name
42  ThreadData::current().setName( workerName );
43 
44  // we use a singleshot timer that triggers message handling
46  _msgAvail->setSingleShot(true);
47 
48  // another timer to trigger a delayed shutdown
51  }, *this );
52  _delayedShutdown->setSingleShot(true);
53  }
54 
56  { }
57 
58  StompFrameStreamRef ProvideWorker::messageStream() const
59  {
60  return _stream;
61  }
62 
63  expected<void> ProvideWorker::run( int recv, int send )
64  {
65  // reentry not supported
66  assert ( !_isRunning );
67 
69  _isRunning = true;
70 
71  initLog();
72 
73  zypp::OnScopeExit cleanup([&](){
74  _stream.reset();
75  _controlIO.reset();
76  _loop.reset();
77  });
78 
80  if ( !_controlIO->openFds( { recv }, send ) ) {
81  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to open control FDs")) );
82  }
83 
86 
88 
89  return executeHandshake () | and_then( [&]() {
90  AutoDisconnect disC[] = {
93  };
94  _loop->run();
95  if ( _fatalError )
97  return expected<void>::success();
98  });
99  }
100 
101  std::deque<ProvideWorkerItemRef> &ProvideWorker::requestQueue()
102  {
103  return _pendingProvides;
104  }
105 
107  return _provNotificationMode;
108  }
109 
112  }
113 
115  {
116  // by default we log to strErr, if user code wants to change that it can overload this function
118  }
119 
120  ProvideWorkerItemRef ProvideWorker::makeItem( ProvideMessage &&spec )
121  {
122  return std::make_shared<ProvideWorkerItem>( std::move(spec) );
123  }
124 
125  void ProvideWorker::provideStart(const uint32_t id, const zypp::Url &url, const zypp::filesystem::Pathname &localFile, const zypp::Pathname &stagingFile )
126  {
127  if ( !_stream->sendMessage( ProvideMessage::createProvideStarted ( id
128  , url
129  , localFile.empty () ? std::optional<std::string>() : localFile.asString ()
130  , stagingFile.empty () ? std::optional<std::string>() : stagingFile.asString ()
131  ) ) ) {
132  ERR << "Failed to send ProvideStart message" << std::endl;
133  }
134  }
135 
136  void ProvideWorker::provideSuccess(const uint32_t id, bool cacheHit, const zypp::filesystem::Pathname &localFile, const HeaderValueMap extra )
137  {
138  MIL_PRV << "Sending provideSuccess for id " << id << " file " << localFile << std::endl;
139  auto msg = ProvideMessage::createProvideFinished( id ,localFile.asString() ,cacheHit);
140  for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
141  for ( const auto &val : i->second )
142  msg.addValue( i->first, val );
143  }
144  if ( !_stream->sendMessage( msg ) ) {
145  ERR << "Failed to send ProvideSuccess message" << std::endl;
146  }
147  }
148 
149  void ProvideWorker::provideFailed(const uint32_t id, const ProvideMessage::Code code, const std::string &reason, const bool transient, const HeaderValueMap extra )
150  {
151  MIL_PRV << "Sending provideFailed for request " << id << " err: " << reason << std::endl;
152  auto msg = ProvideMessage::createErrorResponse ( id, code, reason, transient );
153  for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
154  for ( const auto &val : i->second )
155  msg.addValue( i->first, val );
156  }
157  if ( !_stream->sendMessage( msg ) ) {
158  ERR << "Failed to send ProvideFailed message" << std::endl;
159  }
160  }
161 
162 
163  void ProvideWorker::provideFailed ( const uint32_t id, const ProvideMessage::Code code, const bool transient, const zypp::Exception &e )
164  {
166  if ( !e.historyEmpty() ) {
168  }
169  provideFailed( id
170  , code
171  , e.asUserString()
172  , transient
173  , extra );
174  }
175 
176 
177  void ProvideWorker::attachSuccess(const uint32_t id, const std::optional<std::string> &localMountPoint)
178  {
179  MIL_PRV << "Sending attachSuccess for request " << id << std::endl;
180  if ( !_stream->sendMessage( ProvideMessage::createAttachFinished ( id, localMountPoint ) ) ) {
181  ERR << "Failed to send AttachFinished message" << std::endl;
182  } else {
183  MIL << "Sent back attach success" << std::endl;
184  }
185  }
186 
187  void ProvideWorker::detachSuccess(const uint32_t id)
188  {
189  MIL_PRV << "Sending detachSuccess for request " << id << std::endl;
190  if ( !_stream->sendMessage( ProvideMessage::createDetachFinished ( id ) ) ) {
191  ERR << "Failed to send DetachFinished message" << std::endl;
192  }
193  }
194 
195  expected<ProvideMessage> ProvideWorker::sendAndWaitForResponse( const ProvideMessage &request , const std::vector<uint> &responseCodes )
196  {
197  // make sure immediateShutdown is not called while we are blocking here
198  zypp::DtorReset delayedReset( _inControllerRequest );
199  _inControllerRequest = true;
200 
201  if ( !_stream->sendMessage( request ) )
202  return expected<ProvideMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send message")) );
203 
204  // flush the io device, this will block until all bytes are written
205  _controlIO->flush();
206 
207  while ( !_fatalError ) {
208 
209  const auto &msg = _stream->nextMessageWait() | [&]( auto &&nextMessage ) {
210  if ( !nextMessage ) {
211  if ( _fatalError )
213  else
214  return expected<zypp::PluginFrame>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to wait for response")) );
215  }
216  return expected<zypp::PluginFrame>::success( std::move(*nextMessage) );
217  } | and_then ( [&]( auto && m) {
218  return parseReceivedMessage(m);
219  } );
220 
221  if ( !msg ) {
222  ERR << "Failed to receive message" << std::endl;
223  return msg;
224  }
225 
226  if ( std::find( responseCodes.begin (), responseCodes.end(), msg->code() ) != responseCodes.end() ) {
227  return msg;
228  }
229 
230  // remember other messages for later
231  MIL << "Remembering message for later: " << msg->code () << std::endl;
232  _pendingMessages.push_back(*msg);
233  _msgAvail->start(0);
234  }
236  }
237 
238  ProvideWorker::MediaChangeRes ProvideWorker::requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc )
239  {
241  | [&]( expected<ProvideMessage> &&m ) {
242  if ( !m ) {
243  MIL << "Failed to wait for message, aborting the request " << std::endl;
244  return ProvideWorker::MediaChangeRes::ABORT;
245  }
246  MIL << "Wait finished, with messages still pending: " << this->_pendingMessages.size() << " and provs still pending: " << this->_pendingProvides.size() << std::endl;
247  if ( m->code() == ProvideMessage::Code::MediaChanged )
248  return ProvideWorker::MediaChangeRes::SUCCESS;
249  else if ( m->code() == ProvideMessage::Code::MediaChangeSkip )
250  return ProvideWorker::MediaChangeRes::SKIP;
251  else
252  return ProvideWorker::MediaChangeRes::ABORT;
253  };
254  }
255 
256  expected<AuthInfo> ProvideWorker::requireAuthorization( const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername, const int64_t lastTimestamp, const std::map<std::string, std::string> &extraFields )
257  {
258  return sendAndWaitForResponse( ProvideMessage::createAuthDataRequest( id, url, lastTriedUsername, lastTimestamp, extraFields ), { ProvideMessage::Code::AuthInfo, ProvideMessage::Code::NoAuthData } )
259  | and_then( [&]( ProvideMessage &&m ) {
260  if ( m.code() == ProvideMessage::Code::AuthInfo ) {
261 
262  AuthInfo inf;
263  for( const auto &hdr : m.headers () ) {
264  if ( hdr.first == AuthInfoMsgFields::Username ) {
265  inf.username = hdr.second.asString();
266  } else if ( hdr.first == AuthInfoMsgFields::Password ) {
267  inf.password = hdr.second.asString();
268  } else if ( hdr.first == AuthInfoMsgFields::AuthTimestamp ) {
269  inf.last_auth_timestamp = hdr.second.asInt64();
270  } else {
271  if ( !hdr.second.isString() ) {
272  ERR << "Ignoring invalid extra value, " << hdr.first << " is not of type string" << std::endl;
273  }
274  inf.extraKeys[hdr.first] = hdr.second.asString();
275  }
276  }
277  return expected<AuthInfo>::success(inf);
278 
279  }
281  });
282  }
283 
285  {
286  return *_controlIO.get();
287  }
288 
290  {
291  const auto &helo = _stream->nextMessageWait();
292  if ( !helo ) {
293  ERR << "Could not receive a handshake message, aborting" << std::endl;
294  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to receive handshake message")) );;
295  }
296 
297  auto exp = _stream->parseMessage<zyppng::worker::Configuration>( *helo );
298  if ( !exp ) {
299  invalidMessageReceived( exp.error() );
300  return expected<void>::error(exp.error());
301  }
302 
303  return std::move(*exp) | [&]( auto conf ) {
304 
305  _workerConf = std::move(conf);
306 
307  auto &mediaConf = zypp::MediaConfig::instance();
308  for( const auto &[key,value] : _workerConf ) {
309  zypp::Url keyUrl( key );
310  if ( keyUrl.getScheme() == "zconfig" && keyUrl.getAuthority() == "main" ) {
311  mediaConf.setConfigValue( keyUrl.getAuthority(), zypp::Pathname(keyUrl.getPathName()).basename(), value );
312  }
313  }
314 
315  return initialize( _workerConf ) | and_then([&]( WorkerCaps &&caps ){
316 
317  caps.set_worker_name( _workerName.data() );
318 
319  caps.set_cfg_flags ( WorkerCaps::Flags(caps.cfg_flags() | WorkerCaps::ZyppLogFormat) );
320  if ( !_stream->sendMessage ( caps ) ) {
321  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send capabilities")) );
322  }
323  return expected<void>::success ();
324  });
325  };
326  }
327 
329  {
330  if ( _fatalError )
331  return;
332 
333  while ( _pendingMessages.size () ) {
334  auto m = _pendingMessages.front ();
335  _pendingMessages.pop_front ();
337  }
338 
339  if ( !_fatalError && _pendingProvides.size() ) {
340  provide();
341  }
342 
343  // keep poking until there are no provides anymore
344  if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
345  _msgAvail->start(0);
346  }
347 
348  }
349 
351  {
352  if ( _inControllerRequest ) {
353  _delayedShutdown->start(0);
354  return;
355  }
356 
358  _loop->quit ();
359  }
360 
362  {
363  MIL << "Read FD closed, exiting." << std::endl;
365  }
366 
368  {
369  MIL << "Write FD closed, exiting." << std::endl;
371  }
372 
374  {
375  while ( auto message = _stream->nextMessage() ) {
376  if ( _fatalError )
377  break;
378  pushSingleMessage(*message);
379  }
380  }
381 
383  {
384  invalidMessageReceived( std::exception_ptr() );
385  }
386 
387  void ProvideWorker::invalidMessageReceived( std::exception_ptr p )
388  {
389  ERR << "Received a invalid message on the input stream, aborting" << std::endl;
390  if ( p )
391  _fatalError = p;
392  else
395  _loop->quit();
396  }
397 
399  {
400  const auto code = provide.code();
401  // we only accept requests here
403 
404  MIL_PRV << "Received request: " << code << std::endl;
405 
406  if ( code == ProvideMessage::Code::Cancel ) {
407  const auto &i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [ id = provide.requestId() ]( const auto &it ){ return it->_spec.requestId() == id; } );
408  if ( i != _pendingProvides.end() ) {
409  switch ( (*i)->_state ) {
411  _stream->sendMessage ( ProvideMessage::createErrorResponse ( provide.requestId (), ProvideMessage::Code::Cancelled, "Cancelled by user." ) );
412  _pendingProvides.erase(i);
413  break;
415  cancel(i);
416  break;
418  break;
419  }
420  MIL << "Received Cancel for unknown request: " << provide.requestId() << ", ignoring!" << std::endl;
421  }
422  return;
423  }
424 
426  return;
427  }
428  ERR << "Unsupported request with code: " << code << " received!" << std::endl;
429  }
430 
432  {
433  const auto &handle = [&]( const zypp::PluginFrame &message ){
434  return parseReceivedMessage (message )
435  | and_then( [&]( ProvideMessage &&provide ){
436  _pendingMessages.push_back(provide);
437  _msgAvail->start(0);
438  return expected<void>::success();
439  })
440  | or_else( [&]( std::exception_ptr ) -> expected<void> {
441  return expected<void>::error( ZYPP_EXCPT_PTR( std::invalid_argument(zypp::str::Str()<<"Unknown message received: " << message.command() )) );
442  });
443  };
444 
445  const auto &exp = handle( message );
446  if ( !exp ) {
447  try {
448  std::rethrow_exception ( exp.error () );
449  } catch ( const zypp::Exception &e ) {
450  ERR << "Catched exception during message handling: " << e << std::endl;
451  } catch ( const std::exception &e ) {
452  ERR << "Catched exception during message handling: " << e.what()<< std::endl;
453  } catch ( ... ) {
454  ERR << "Unknown Exception during message handling" << std::endl;
455  }
456  }
457  }
458 
460  {
461  auto exp = ProvideMessage::create(m);
462  if ( !exp )
463  invalidMessageReceived( exp.error() );
464  return exp;
465  }
466 }
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:537
void pushSingleMessage(const zypp::PluginFrame &msg)
void enableLogForwardingMode(bool enable=true)
Definition: LogControl.cc:903
#define MIL
Definition: Logger.h:98
ProvideNotificatioMode provNotificationMode() const
static Ptr create(IODevice::Ptr iostr)
ProvideNotificatioMode _provNotificationMode
std::exception_ptr _fatalError
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
virtual void cancel(const std::deque< ProvideWorkerItemRef >::iterator &request)=0
Command frame for communication with PluginScript.
Definition: PluginFrame.h:41
StompFrameStreamRef messageStream() const
const std::string & command() const
Return the frame command.
Definition: PluginFrame.cc:439
SignalProxy< void()> sigInvalidMessageReceived()
ValueMap::iterator endList()
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
static ProvideMessage createErrorResponse(const uint32_t reqId, const Code code, const std::string &reason, bool transient=false)
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:428
void addValue(const std::string &name, const FieldVal &value)
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
#define ERR
Definition: Logger.h:100
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
constexpr std::string_view Password("password")
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition: timer.cc:120
void invalidMessageReceived(std::exception_ptr p)
constexpr std::string_view Username("username")
expected< ProvideMessage > parseReceivedMessage(const zypp::PluginFrame &m)
SignalProxy< void()> sigMessageReceived()
ProviderConfiguration _workerConf
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
ResultType or_else(const expected< T, E > &exp, Function &&f)
Definition: expected.h:463
void logToStdErr()
Log to std::err.
Definition: LogControl.cc:916
bool empty() const
Test for an empty path.
Definition: Pathname.h:116
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
virtual void immediateShutdown()
Definition: provideworker.h:85
static expected< ProvideMessage > create(const zypp::PluginFrame &message)
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
Definition: String.h:211
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
ProvideWorker(std::string_view workerName)
const std::string & asString() const
String representation.
Definition: Pathname.h:93
Just inherits Exception to separate media exceptions.
std::string asUserString() const
Translated error message as string suitable for the user.
Definition: Exception.cc:118
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
bool historyEmpty() const
Whether the history list is empty.
Definition: Exception.h:263
AsyncDataSource & controlIO()
std::string historyAsString() const
The history as string.
Definition: Exception.cc:182
expected< void > executeHandshake()
std::string getAuthority() const
Returns the encoded authority component of the URL.
Definition: Url.cc:545
ValueMap::iterator beginList()
static expected success(ConsParams &&...params)
Definition: expected.h:115
#define MIL_PRV
Definition: providedbg_p.h:35
std::deque< ProvideWorkerItemRef > & requestQueue()
constexpr std::string_view AuthTimestamp("auth_timestamp")
std::map< std::string, std::string > extraKeys
Definition: provideworker.h:38
static ZYPP_API ThreadData & current()
Definition: threaddata.cc:16
void attachSuccess(const uint32_t id, const std::optional< std::string > &localMountPoint={})
Base class for Exception.
Definition: Exception.h:146
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition: base.h:163
void handleSingleMessage(const ProvideMessage &provide)
std::string getPathName(EEncoding eflag=zypp::url::E_DECODED) const
Returns the path name from the URL.
Definition: Url.cc:608
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
AsyncDataSource::Ptr _controlIO
void detachSuccess(const uint32_t id)
constexpr std::string_view History("history")
ResultType and_then(const expected< T, E > &exp, Function &&f)
Definition: expected.h:423
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
void setName(T &&name)
Definition: threaddata_p.h:18
std::deque< ProvideWorkerItemRef > _pendingProvides
void provideFailed(const uint32_t id, const ProvideMessage::Code code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
Easy-to use interface to the ZYPP dependency resolver.
Definition: Application.cc:19
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
static ProvideMessage createDetachFinished(const uint32_t reqId)
std::deque< ProvideMessage > _pendingMessages
bool provide(const Pathname &delta_r, const Pathname &new_r, const Progress &report_r)
Apply a binary delta to on-disk data to re-create a new rpm.
Url manipulation class.
Definition: Url.h:91
virtual expected< WorkerCaps > initialize(const Configuration &conf)=0
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static MediaConfig & instance()
Definition: mediaconfig.cc:46
StompFrameStreamRef _stream
static ProvideMessage createAttachFinished(const uint32_t reqId, const std::optional< std::string > &localMountPoint={})