libzypp  17.35.16
stompframestream.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 
10 #include "stompframestream.h"
11 #include <zypp-core/ByteCount.h>
13 
14 #include <zypp-core/AutoDispose.h>
15 #include <zypp-core/zyppng/base/AutoDisconnect>
16 
17 namespace zyppng {
18 
19  constexpr auto MAX_CMDLEN = 256;
20  constexpr auto MAX_HDRLEN = 8 * 1024; // we might send long paths in headers
21  constexpr auto MAX_BODYLEN = 1024 * 1024; // 1Mb for now, we do not want to use up all the memory
22 
24  : zypp::Exception( zypp::str::Str() << "Invalid Message received: (" << msg <<")" )
25  { }
26 
28  {
30  _nextMessageTimer->setSingleShot(false);
31 
33  if ( _ioDev->isOpen () && _ioDev->canRead () )
34  readAllMessages ();
35  }
36 
38  {
39  const auto &parseError = [this](){
41  _pendingMessage.reset();
42  _pendingBodyLen.reset();
44  };
45 
46  // ATTENTION: Remember to also update the parser logic in zypp-core/rpc/PluginFrame.cc
47  // if code here is changed or features are added.
48 
49  // loop until we have a full message, or we have no more data to read
50  while(true) {
51  switch( _parserState ) {
52  case ParseError: {
53  // we got a parse error before, try to recover by reading until the next \0
54  bool gotTerm = false;
55  while ( _ioDev->bytesAvailable ( ) ) {
56  auto d = _ioDev->read (1);
57  if ( !d.size() )
58  break;
59 
60  if ( d.front () == '\0' ){
61  gotTerm = true;
63  break;
64  }
65  }
66 
67  if ( gotTerm )
68  continue;
69 
70  return false;
71  }
72  case ReceiveCommand: {
73  const auto ba = _ioDev->readBufferCount();
74  if ( !_ioDev->canReadLine() ) {
75  if ( ba > MAX_CMDLEN ) {
76  ERR << "Received malformed message from peer, CMD line exceeds: " << MAX_CMDLEN << " bytes" << std::endl;
77  parseError();
78  continue;
79  }
80  return false;
81  }
82 
83  ByteArray command = _ioDev->readLine( MAX_CMDLEN );
84  command.pop_back(); // remove \n
85  if ( command.empty() ) {
86  // STOMP spec says multiple EOLs after a message are allowed, so we just ignore empty lines
87  // if they happen before a new frame starts
88  WAR << "Received empty line before command, ignoring" << std::endl;
89  return false;
90  }
91 
92  if ( !_pendingMessage )
94  _pendingMessage->setCommand( command.asString() );
96 
97  break;
98  }
99 
100  case ReceiveHeaders: {
101  const auto ba = _ioDev->readBufferCount();
102  if ( !_ioDev->canReadLine() ) {
103  if ( ba > MAX_HDRLEN ) {
104  ERR << "Received malformed message from peer, header line exceeds: " << MAX_HDRLEN << " bytes" << std::endl;
105  parseError();
106  continue;
107  }
108  return false;
109  }
110 
111  ByteArray header = _ioDev->readLine( MAX_HDRLEN );
112  header.pop_back(); // remove \n
113  if ( header.empty () ) {
114  // --> empty line sep. header and body
116 
117  // if we received a content-length header we set the flag for the body parser to know it has to read
118  // n bytes before expecting the \0 terminator
119  const auto &contentLen = _pendingMessage->getHeaderNT( zypp::PluginFrame::contentLengthHeader(), std::string() );
120  std::optional<uint64_t> cLen;
121  if ( !contentLen.empty() ) {
122  cLen = zyppng::str::safe_strtonum<uint64_t>(contentLen);
123  if ( !cLen ) {
124  ERR << "Received malformed message from peer: Invalid value for " << zypp::PluginFrame::contentLengthHeader() << ":" << contentLen << std::endl;
125  parseError();
126  continue;
127  }
128 #if 0
129  if ( (*cLen) > MAX_BODYLEN ) {
130  ERR << "Message body exceeds maximum length: " << zypp::ByteCount( *cLen ) << " vs " << zypp::ByteCount( MAX_BODYLEN ) << std::endl;
131  parseError();
132  continue;
133  }
134 #endif
135 
136  _pendingBodyLen = *cLen;
138  }
139  } else {
140  try {
141  _pendingMessage->addRawHeader ( header );
142  } catch ( const zypp::Exception &e ) {
143  ZYPP_CAUGHT(e);
144  ERR << "Received malformed message from peer, header format invalid: " << header.asStringView() << " (" << e << ")" << std::endl;
145  parseError();
146  continue;
147  }
148  }
149  break;
150  }
151 
152  case ReceiveBody: {
153 
154  ByteArray body;
155  if ( _pendingBodyLen ) {
156  // we need to read the required body bytes plus the terminating \0
157  const auto reqBytes = (*_pendingBodyLen) + 1;
158  if ( _ioDev->bytesAvailable() < reqBytes )
159  return false;
160 
161  body = _ioDev->read( reqBytes );
162  if ( body.back () != '\0' ) {
163  ERR << "Received malformed message from peer: Body was not terminated with \\0" << std::endl;
164  parseError();
165  continue;
166  }
167 
168  body.pop_back (); // remove \0
169 
170  } else {
171  // we do not know the body size, need to read until \0
172  const auto ba = _ioDev->readBufferCount();
173  if ( !_ioDev->canReadUntil( _ioDev->currentReadChannel (), '\0' ) ) {
174  if ( ba > MAX_BODYLEN ) {
175  ERR << "Message body exceeds maximum length: " << zypp::ByteCount( _ioDev->readBufferCount() ) << " vs " << zypp::ByteCount( MAX_BODYLEN ) << std::endl;
176  parseError();
177  continue;
178  }
179  return false;
180  }
181 
182  body = _ioDev->channelReadUntil( _ioDev->currentReadChannel (), '\0' );
183  body.pop_back(); // remove the \0
184  }
185 
186  // if we reach this place we have a full message, store the body and lets go)
187  _pendingMessage->setBody( std::move(body) );
188 
189  _messages.emplace_back( std::move(*_pendingMessage) );
190  _pendingMessage.reset();
191  _pendingBodyLen.reset();
193 
194  _sigNextMessage.emit ();
195 
196  if ( _messages.size() ) {
197  // nag the user code until all messages have been used up
198  _nextMessageTimer->start(0);
199  }
200 
201  // once we have a message, exit the loop so other things can be done
202  return true;
203  }
204  }
205  }
206  }
207 
209  {
210  if ( _messages.size() )
211  _sigNextMessage.emit();
212 
213  if ( !_messages.size() )
214  _nextMessageTimer->stop();
215  }
216 
217  std::optional<zypp::PluginFrame> zyppng::StompFrameStream::nextMessage( const std::string &msgName )
218  {
219  if ( !_messages.size () ) {
220 
221  // try to read the next messages from the fd
222  {
223  _sigNextMessage.block ();
224  zypp::OnScopeExit unblock([&](){
225  _sigNextMessage.unblock();
226  });
227  readAllMessages();
228  }
229 
230  if ( !_messages.size () )
231  return {};
232  }
233 
234  std::optional<zypp::PluginFrame> res;
235 
236  if( msgName.empty() ) {
237  res = std::move( _messages.front () );
238  _messages.pop_front();
239 
240  } else {
241  const auto i = std::find_if( _messages.begin(), _messages.end(), [&]( const zypp::PluginFrame &msg ) {
242  return msg.command() == msgName;
243  });
244 
245  if ( i != _messages.end() ) {
246  res = std::move(*i);
247  _messages.erase(i);
248  }
249  }
250 
251  if ( _messages.size() )
252  _nextMessageTimer->start(0);
253  else
254  _nextMessageTimer->stop();
255 
256  return res;
257  }
258 
259  std::optional<zypp::PluginFrame> StompFrameStream::nextMessageWait( const std::string &msgName )
260  {
261  // make sure the signal is not emitted until we have the next message
262  _sigNextMessage.block ();
263  zypp::OnScopeExit unblock([&](){
264  _sigNextMessage.unblock();
265  });
266 
267  bool receivedInvalidMsg = false;
269  receivedInvalidMsg = true;
270  }));
271 
272  const bool hasMsgName = msgName.size();
273  while ( !receivedInvalidMsg && _ioDev->isOpen() && _ioDev->canRead() ) {
274  if ( _messages.size() ) {
275  if ( hasMsgName ) {
276  std::optional<zypp::PluginFrame> msg = nextMessage(msgName);
277  if ( msg ) return msg;
278  }
279  else {
280  break;
281  }
282  }
283 
284  if ( !_ioDev->waitForReadyRead ( -1 ) ) {
285  // this can only mean that a error happened, like device was closed
286  return {};
287  }
288  }
289  return nextMessage (msgName);
290  }
291 
293  {
294  if ( !_ioDev->canWrite () )
295  return false;
296 
297  try {
298  IODeviceOStreamBuf ostrbuf(_ioDev);
299  std::ostream output(&ostrbuf);
300  env.writeTo ( output );
301  } catch ( const zypp::Exception &e ) {
302  ZYPP_CAUGHT(e);
303  ERR << "Failed to serialize message to stream" << std::endl;
304  return false;
305  }
306 
307  return true;
308  }
309 
311  {
312  return _sigNextMessage;
313  }
314 
316  {
318  }
319 
321  {
322  bool cont = true;
323  while ( cont && _ioDev->bytesAvailable() ) {
324  cont = readNextMessage ();
325  }
326  }
327 }
bool sendFrame(const zypp::PluginFrame &message)
Namespace intended to collect all environment variables we use.
Definition: Env.h:22
constexpr auto MAX_BODYLEN
Command frame for communication with PluginScript.
Definition: PluginFrame.h:41
Signal< void()> _sigNextMessage
Store and operate with byte count.
Definition: ByteCount.h:31
SignalProxy< void()> sigInvalidMessageReceived()
std::string asString() const
Definition: ByteArray.h:24
String related utilities and Regular expression matching.
Definition: Arch.h:363
enum zyppng::StompFrameStream::ParserState _parserState
void timeout(const zyppng::Timer &)
#define ERR
Definition: Logger.h:102
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition: timer.cc:120
SignalProxy< void()> sigMessageReceived()
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
constexpr auto MAX_HDRLEN
#define WAR
Definition: Logger.h:101
std::optional< int64_t > _pendingBodyLen
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
std::shared_ptr< IODevice > Ptr
Definition: iodevice.h:45
StompFrameStream(IODevice::Ptr iostr)
std::optional< zypp::PluginFrame > _pendingMessage
std::optional< zypp::PluginFrame > nextMessage(const std::string &msgName="")
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition: Exception.h:440
Base class for Exception.
Definition: Exception.h:146
constexpr auto MAX_CMDLEN
SignalProxy< void()> sigReadyRead()
Definition: iodevice.cc:368
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition: base.h:163
Signal< void()> _sigInvalidMessageReceived
std::optional< zypp::PluginFrame > nextMessageWait(const std::string &msgName="")
InvalidMessageReceivedException(const std::string &msg={})
Easy-to use interface to the ZYPP dependency resolver.
Definition: Application.cc:19
std::deque< zypp::PluginFrame > _messages
static const std::string & contentLengthHeader()
"content-lenght" header name
Definition: PluginFrame.cc:402