33 , _queue(
std::move(q) )
37 if (!g_unix_open_pipe (
fds, FD_CLOEXEC, &error))
38 ERR <<
"Creating pipes for AsyncQueueWatch: " << error->message << std::endl;
40 if (!g_unix_set_fd_nonblocking (
fds[0], TRUE, &error) ||
41 !g_unix_set_fd_nonblocking (
fds[1], TRUE, &error))
42 ERR <<
"Set pipes non-blocking for AsyncQueueWatch: "<< error->message << std::endl;
63 std::shared_ptr<AsyncQueueWatch> ptr (
new AsyncQueueWatch( std::move(queue) ) );
64 auto d = ptr->d_func();
66 d->_queue->addWatch( *ptr );
75 d_func()->_queue->removeWatch( *
this );
85 res =
write (d->fds[1], &one,
sizeof one);
86 while (G_UNLIKELY (res == -1 && errno == EINTR));
91 return d_func()->_sigMessageAvailable;
100 while (
read (d->fds[0], buffer,
sizeof buffer) ==
sizeof buffer);
101 d->_sigMessageAvailable.emit();
~AsyncQueueWatchPrivate() override
void onFdReady(int fd, int events) override
void onSignal(int signal) override
std::map< std::string, std::string > read(const Pathname &_path)
Read sysconfig file path_r and return (key,valye) pairs.
void removeWatch(AsyncQueueWatch &watch)
bool write(const Pathname &path_r, const std::string &key_r, const std::string &val_r, const std::string &newcomment_r)
Add or change a value in sysconfig file path_r.
std::recursive_mutex _watchLock
std::set< AsyncQueueWatch * > _watches
SignalProxy< void()> sigMessageAvailable()
void addWatch(AsyncQueueWatch &watch)
virtual ~AsyncQueueBase()
~AsyncQueueWatch() override
static std::shared_ptr< AsyncQueueWatch > create(std::shared_ptr< AsyncQueueBase > queue)
AsyncQueueWatchPrivate(std::shared_ptr< AsyncQueueBase > &&q, AsyncQueueWatch &p)
ZYPP_IMPL_PRIVATE(UnixSignalSource)
AsyncQueueWatch(std::shared_ptr< AsyncQueueBase > &&queue)