libzypp  17.32.5
messagestream.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 
10 #include "messagestream.h"
11 
12 #include <zypp-core/AutoDispose.h>
13 #include <zypp-core/zyppng/base/AutoDisconnect>
14 #include <zypp-proto/core/envelope.pb.h>
15 
16 namespace zyppng {
17 
19  : _data( new zypp::proto::Envelope() )
20  { }
21 
22  RpcMessage::RpcMessage( zypp::proto::Envelope data )
23  : _data( new zypp::proto::Envelope( std::move(data) ) )
24  { }
25 
26  void RpcMessage::set_messagetypename(std::string name)
27  {
28  _data->set_messagetypename( std::move(name) );
29  }
30 
31  const std::string &RpcMessage::messagetypename() const
32  {
33  return _data->messagetypename();
34  }
35 
36  void RpcMessage::set_value(std::string name)
37  {
38  _data->set_value( std::move(name) );
39  }
40 
41  const std::string &RpcMessage::value() const
42  {
43  return _data->value();
44  }
45 
46  std::string RpcMessage::serialize() const
47  {
48  return _data->SerializeAsString();
49  }
50 
51  std::string RpcBaseType::serialize() const
52  {
53  std::string target;
54  serializeInto ( target );
55  return target;
56  }
57 
58 
60  : zypp::Exception( zypp::str::Str() << "Invalid Message received: (" << msg <<")" )
61  { }
62 
63 
65  {
67  _nextMessageTimer->setSingleShot(false);
68 
70  if ( _ioDev->isOpen () && _ioDev->canRead () )
71  readAllMessages ();
72  }
73 
75  {
76  if ( _pendingMessageSize == 0 ) {
77  if ( _ioDev->bytesAvailable() >= sizeof( rpc::HeaderSizeType ) ) {
78  _ioDev->read( reinterpret_cast<char *>( &_pendingMessageSize ), sizeof( rpc::HeaderSizeType ) );
79  }
80  }
81 
82  if ( _ioDev->bytesAvailable() < _pendingMessageSize ) {
83  return false;
84  }
85 
86  auto bytes = _ioDev->read( _pendingMessageSize );
88 
89  zypp::proto::Envelope m;
90  if (! m.ParseFromArray( bytes.data(), bytes.size() ) ) {
91  ERR << "Received malformed message from peer" << std::endl;
93  return false;
94  }
95 
96  _messages.emplace_back( std::move(m) );
97  _sigNextMessage.emit ();
98 
99  if ( _messages.size() ) {
100  // nag the user code until all messages have been used up
101  _nextMessageTimer->start(0);
102  }
103 
104  return true;
105  }
106 
108  {
109  if ( _messages.size() )
110  _sigNextMessage.emit();
111 
112  if ( !_messages.size() )
113  _nextMessageTimer->stop();
114  }
115 
116  std::optional<RpcMessage> zyppng::RpcMessageStream::nextMessage( const std::string &msgName )
117  {
118  if ( !_messages.size () ) {
119 
120  // try to read the next messages from the fd
121  {
122  _sigNextMessage.block ();
123  zypp::OnScopeExit unblock([&](){
124  _sigNextMessage.unblock();
125  });
126  readAllMessages();
127  }
128 
129  if ( !_messages.size () )
130  return {};
131  }
132 
133  std::optional<RpcMessage> res;
134 
135  if( msgName.empty() ) {
136  res = std::move( _messages.front () );
137  _messages.pop_front();
138 
139  } else {
140  const auto i = std::find_if( _messages.begin(), _messages.end(), [&]( const RpcMessage &env ) {
141  return env.messagetypename() == msgName;
142  });
143 
144  if ( i != _messages.end() ) {
145  res = std::move(*i);
146  _messages.erase(i);
147  }
148  }
149 
150  if ( _messages.size() )
151  _nextMessageTimer->start(0);
152  else
153  _nextMessageTimer->stop();
154 
155  return res;
156  }
157 
158  std::optional<RpcMessage> RpcMessageStream::nextMessageWait( const std::string &msgName )
159  {
160  // make sure the signal is not emitted until we have the next message
161  _sigNextMessage.block ();
162  zypp::OnScopeExit unblock([&](){
163  _sigNextMessage.unblock();
164  });
165 
166  bool receivedInvalidMsg = false;
168  receivedInvalidMsg = true;
169  }));
170 
171  const bool hasMsgName = msgName.size();
172  while ( !receivedInvalidMsg && _ioDev->isOpen() && _ioDev->canRead() ) {
173  if ( _messages.size() ) {
174  if ( hasMsgName ) {
175  std::optional<RpcMessage> msg = nextMessage(msgName);
176  if ( msg ) return msg;
177  }
178  else {
179  break;
180  }
181  }
182 
183  if ( !_ioDev->waitForReadyRead ( -1 ) ) {
184  // this can only mean that a error happened, like device was closed
185  return {};
186  }
187  }
188  return nextMessage (msgName);
189  }
190 
192  {
193  if ( !_ioDev->canWrite () )
194  return false;
195 
196  const auto &str = env._data->SerializeAsString();
197  rpc::HeaderSizeType msgSize = str.length();
198  _ioDev->write( (char *)(&msgSize), sizeof( rpc::HeaderSizeType ) );
199  _ioDev->write( str.data(), str.size() );
200  return true;
201  }
202 
204  {
205  return _sigNextMessage;
206  }
207 
209  {
211  }
212 
214  {
215  bool cont = true;
216  while ( cont && _ioDev->bytesAvailable() ) {
217  cont = readNextMessage ();
218  }
219  }
220 
221 }
222 
223 namespace zypp {
224  template<>
225  zypp::proto::Envelope* rwcowClone<zypp::proto::Envelope>( const zypp::proto::Envelope * rhs )
226  {
227  return new zypp::proto::Envelope(*rhs);
228  }
229 }
std::deque< RpcMessage > _messages
Signal< void()> _sigInvalidMessageReceived
Namespace intended to collect all environment variables we use.
Definition: Env.h:22
uint32_t HeaderSizeType
Definition: rpc.h:17
zyppng::rpc::HeaderSizeType _pendingMessageSize
zypp::RWCOW_pointer< zypp::proto::Envelope > _data
Definition: messagestream.h:84
std::string serialize() const
String related utilities and Regular expression matching.
SignalProxy< void()> sigInvalidMessageReceived()
virtual void serializeInto(std::string &str) const =0
Definition: Arch.h:363
SignalProxy< void()> sigMessageReceived()
bool sendMessage(const RpcMessage &env)
#define ERR
Definition: Logger.h:98
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition: timer.cc:120
void set_value(std::string name)
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
void set_messagetypename(std::string name)
std::optional< RpcMessage > nextMessageWait(const std::string &msgName="")
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
const std::string & messagetypename() const
std::shared_ptr< IODevice > Ptr
Definition: iodevice.h:44
const std::string & value() const
Base class for Exception.
Definition: Exception.h:146
std::optional< RpcMessage > nextMessage(const std::string &msgName="")
SignalProxy< void()> sigReadyRead()
Definition: iodevice.cc:324
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition: base.h:163
RpcMessageStream(IODevice::Ptr iostr)
void timeout(const zyppng::Timer &)
Signal< void()> _sigNextMessage
InvalidMessageReceivedException(const std::string &msg={})
Easy-to use interface to the ZYPP dependency resolver.
Definition: Application.cc:19
virtual std::string serialize() const