summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-13 19:28:23 +0000
committerAlan Conway <aconway@apache.org>2006-11-13 19:28:23 +0000
commit922d1249a1a48c13a2e37f1efc1b3204ca5e9813 (patch)
tree0a5284b22967b19a97d02284d3bafe2ae9d79df0 /cpp/src
parentfc5ad32cc8f2d4d793c0cf79382755e9fca2357c (diff)
downloadqpid-python-922d1249a1a48c13a2e37f1efc1b3204ca5e9813.tar.gz
EventChannel classes, start of epoll-based posix implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@474452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/Exception.h63
-rw-r--r--cpp/src/qpid/posix/EpollEventChannel.cpp74
-rw-r--r--cpp/src/qpid/posix/EpollEventChannel.h48
-rw-r--r--cpp/src/qpid/posix/Socket.cpp12
-rw-r--r--cpp/src/qpid/posix/check.cpp14
-rw-r--r--cpp/src/qpid/posix/check.h8
-rw-r--r--cpp/src/qpid/sys/EventChannel.h239
-rw-r--r--cpp/src/qpid/sys/Monitor.h28
-rw-r--r--cpp/src/qpid/sys/SessionHandlerFactory.h2
-rw-r--r--cpp/src/qpid/sys/Thread.h4
-rw-r--r--cpp/src/qpid/sys/signal.h46
-rw-r--r--cpp/src/qpidd.cpp4
12 files changed, 492 insertions, 50 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 2ba3620095..2aba43586d 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -24,26 +24,55 @@
#include <exception>
#include <string>
+#include <memory>
namespace qpid
{
- /**
- * Exception base class for all Qpid exceptions.
- */
- class Exception : public std::exception
- {
- protected:
- std::string whatStr;
-
- public:
- Exception() throw() {}
- Exception(const std::string& str) throw() : whatStr(str) {}
- Exception(const char* str) throw() : whatStr(str) {}
- virtual ~Exception() throw();
-
- const char* what() const throw() { return whatStr.c_str(); }
- virtual std::string toString() const throw() { return whatStr; }
- };
+/**
+ * Exception base class for all Qpid exceptions.
+ */
+class Exception : public std::exception
+{
+ protected:
+ std::string whatStr;
+
+ public:
+ Exception() throw() {}
+ Exception(const std::string& str) throw() : whatStr(str) {}
+ Exception(const char* str) throw() : whatStr(str) {}
+ virtual ~Exception() throw();
+
+ virtual const char* what() const throw() { return whatStr.c_str(); }
+ virtual std::string toString() const throw() { return whatStr; }
+};
+
+/**
+ * Wrapper for heap-allocated exceptions. Use like this:
+ * <code>
+ * std::auto_ptr<Exception> ex = new SomeEx(...)
+ * HeapException hex(ex); // Takes ownership
+ * throw hex; // Auto-deletes ex
+ * </code>
+ */
+class HeapException : public Exception, public std::auto_ptr<Exception>
+{
+ public:
+ HeapException() {}
+ HeapException(std::auto_ptr<Exception> e) : std::auto_ptr<Exception>(e) {}
+
+ HeapException& operator=(std::auto_ptr<Exception>& e) {
+ std::auto_ptr<Exception>::operator=(e);
+ return *this;
+ }
+
+ ~HeapException() throw() {}
+
+ virtual const char* what() const throw() { return (*this)->what(); }
+ virtual std::string toString() const throw() {
+ return (*this)->toString();
+ }
+};
+
}
#endif /*!_Exception_*/
diff --git a/cpp/src/qpid/posix/EpollEventChannel.cpp b/cpp/src/qpid/posix/EpollEventChannel.cpp
new file mode 100644
index 0000000000..1418507542
--- /dev/null
+++ b/cpp/src/qpid/posix/EpollEventChannel.cpp
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/sys/EventChannel.h>
+#include <sys/epoll.h>
+#include "EpollEventChannel.h"
+
+namespace qpid {
+namespace sys {
+
+EventChannel::shared_ptr EventChannel::create()
+{
+ return EventChannel::shared_ptr(new EpollEventChannel());
+}
+
+EpollEventChannel::EpollEventChannel()
+{
+ // TODO aconway 2006-11-13: How to choose size parameter?
+ static const size_t estimatedFdsForEpoll = 1000;
+ epollFd = epoll_create(estimatedFdsForEpoll);
+}
+
+void
+EpollEventChannel::post(ReadEvent& /*event*/)
+{
+}
+
+void
+EpollEventChannel::post(WriteEvent& /*event*/)
+{
+}
+
+void
+EpollEventChannel::post(AcceptEvent& /*event*/)
+{
+}
+
+void
+EpollEventChannel::post(NotifyEvent& /*event*/)
+{
+}
+
+inline void
+EpollEventChannel::post(Event& /*event*/)
+{
+}
+
+Event*
+EpollEventChannel::getEvent()
+{
+ return 0;
+}
+
+void
+EpollEventChannel::dispose(void* /*buffer*/, size_t)
+{
+}
+
+}}
diff --git a/cpp/src/qpid/posix/EpollEventChannel.h b/cpp/src/qpid/posix/EpollEventChannel.h
new file mode 100644
index 0000000000..8128d5276f
--- /dev/null
+++ b/cpp/src/qpid/posix/EpollEventChannel.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/sys/EventChannel.h>
+
+namespace qpid {
+namespace sys {
+
+/** Epoll-based implementation of the event channel */
+class EpollEventChannel : public EventChannel
+{
+ public:
+
+ EpollEventChannel();
+ ~EpollEventChannel();
+
+ virtual void post(ReadEvent& event);
+ virtual void post(WriteEvent& event);
+ virtual void post(AcceptEvent& event);
+ virtual void post(NotifyEvent& event);
+
+ inline void post(Event& event);
+
+ virtual Event* getEvent();
+
+ virtual void dispose(void* buffer, size_t size);
+
+ private:
+ int epollFd;
+
+};
+
+}}
diff --git a/cpp/src/qpid/posix/Socket.cpp b/cpp/src/qpid/posix/Socket.cpp
index 3101be54f4..1321ae6b0d 100644
--- a/cpp/src/qpid/posix/Socket.cpp
+++ b/cpp/src/qpid/posix/Socket.cpp
@@ -33,7 +33,7 @@ using namespace qpid::sys;
Socket::Socket() : socket(::socket (PF_INET, SOCK_STREAM, 0))
{
- if (socket == 0) CHECK(-1);
+ CHECKNN(socket == 0);
}
void
@@ -53,16 +53,16 @@ Socket::connect(const std::string& host, int port)
name.sin_family = AF_INET;
name.sin_port = htons(port);
struct hostent* hp = gethostbyname ( host.c_str() );
- if (hp == 0) CHECK(-1); // TODO aconway 2006-11-09: error message?
+ if (hp == 0) CHECK0(-1); // TODO aconway 2006-11-09: error message?
memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
- CHECK(::connect(socket, (struct sockaddr*)(&name), sizeof(name)));
+ CHECK0(::connect(socket, (struct sockaddr*)(&name), sizeof(name)));
}
void
Socket::close()
{
if (socket == 0) return;
- CHECK(::close(socket));
+ CHECK0(::close(socket));
socket = 0;
}
@@ -73,7 +73,7 @@ Socket::send(const char* data, size_t size)
if (sent < 0) {
if (errno == ECONNRESET) return SOCKET_EOF;
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- CHECK(sent);
+ CHECK0(sent);
}
return sent;
}
@@ -84,7 +84,7 @@ Socket::recv(char* data, size_t size)
ssize_t received = ::recv(socket, data, size, 0);
if (received < 0) {
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
- CHECK(received);
+ CHECK0(received);
}
return received;
}
diff --git a/cpp/src/qpid/posix/check.cpp b/cpp/src/qpid/posix/check.cpp
index 5e8e9e7ca2..3470906639 100644
--- a/cpp/src/qpid/posix/check.cpp
+++ b/cpp/src/qpid/posix/check.cpp
@@ -22,10 +22,12 @@
#include <qpid/QpidError.h>
#include "check.h"
-void qpid::sys::check(long result, const char* file, const int line) {
- if (result != 0) {
- char buf[512];
- char* msg = strerror_r(errno, buf, sizeof(buf));
- throw QpidError(errno, msg, file, line);
- }
+namespace qpid {
+namespace sys {
+
+std::string strError() {
+ char buf[512];
+ return strerror_r(errno, buf, sizeof(buf));
}
+
+}}
diff --git a/cpp/src/qpid/posix/check.h b/cpp/src/qpid/posix/check.h
index d1de9360a4..666637b1c2 100644
--- a/cpp/src/qpid/posix/check.h
+++ b/cpp/src/qpid/posix/check.h
@@ -23,14 +23,16 @@
*/
#include <errno.h>
+#include <string>
+#include <qpid/QpidError.h>
namespace qpid {
namespace sys {
-void check(long result, const char* file, const int line);
-
-#define CHECK(N) qpid::sys::check(N, __FILE__, __LINE__)
+std::string errnoToString();
+#define CHECK0(N) if ((N)!=0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString())
+#define CHECKNN(N) if ((N)<0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString())
}}
diff --git a/cpp/src/qpid/sys/EventChannel.h b/cpp/src/qpid/sys/EventChannel.h
new file mode 100644
index 0000000000..dd857c02c7
--- /dev/null
+++ b/cpp/src/qpid/sys/EventChannel.h
@@ -0,0 +1,239 @@
+#ifndef _sys_EventChannel_h
+#define _sys_EventChannel_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/SharedObject.h>
+#include <qpid/Exception.h>
+#include <qpid/sys/Time.h>
+
+namespace qpid {
+namespace sys {
+
+class EventChannel;
+
+class Event;
+class ReadEvent;
+class WriteEvent;
+class AcceptEvent;
+class NotifyEvent;
+
+/**
+ Active event channel. Events represent async IO requests or
+ inter-task synchronization. Posting an Event registers interest in
+ the IO or sync event. When it occurs the posted Event is
+ corresponding IO or sync event occurs they are returned to one
+ of the threads waiting on the channel. For more details see
+ the Event hierarchy.
+*/
+class EventChannel : public qpid::SharedObject<EventChannel>
+{
+ public:
+ static EventChannel::shared_ptr create();
+
+ virtual ~EventChannel() {}
+
+ virtual void post(ReadEvent& event) = 0;
+ virtual void post(WriteEvent& event) = 0;
+ virtual void post(AcceptEvent& event) = 0;
+ virtual void post(NotifyEvent& event) = 0;
+
+ inline void post(Event& event);
+
+ /**
+ * Wait for the next completed event.
+ * @return An Event or 0 to indicate the calling thread should shut down.
+ */
+ virtual Event* getEvent() = 0;
+
+ /** Dispose of a system-allocated buffer. Called by ReadEvent */
+ virtual void dispose(void* buffer, size_t size) = 0;
+
+ protected:
+ EventChannel() {}
+};
+
+
+/**
+ * Base class for all events. There are two possible styles of use:
+ *
+ * Task style: the event is allocated as a local variable on the initiating
+ * task, which blocks in wait(). Event::dispatch() resumes that task
+ * with the event data available.
+ *
+ * Proactor style: Threads post events but do not
+ * wait. Event::dispatch() processes the event in the dispatching
+ * thread and then deletes itself.
+ *
+ * Tasks give less kernel context switching and blocking AND simpler
+ * coding. Tasks can call any number of pseudo-blocking opereations
+ * that are actually event post/wait pairs. At each such point the
+ * current thread can continue with the task or switch to another task
+ * to minimise blocking.
+ *
+ * With Proactor style dispatch() is an atomic unit of work as far as
+ * the EventChannel is concerned. To avoid such blocking the
+ * application has to be written as a collection of non-blocking
+ * dispatch() callbacks, which is more complex than tasks that can
+ * call pseudo-blocking operations.
+ */
+class Event : private boost::noncopyable
+{
+ public:
+ virtual ~Event() {}
+
+ /** Post this event to the channel */
+ virtual void post(EventChannel& channel) = 0;
+
+ /**
+ * Block till the event is delivered.
+ * At most one task can wait on an event.
+ */
+ virtual void wait() const = 0;
+
+ /**
+ * Dispatch the event. Runs some event-specific code, may switch
+ * context to resume a waiting task.
+ */
+ virtual void dispatch() = 0;
+};
+
+
+/**
+ * Base class for asynchronous request events, provides exception
+ * handling.
+ */
+class RequestEvent : public Event
+{
+ public:
+ /** True if the async request failed */
+ bool hasException() const { return ex.get(); }
+
+ const qpid::Exception& getException() const { return *ex; }
+
+ void setException(std::auto_ptr<qpid::Exception>& e) { ex = e; }
+
+ /** If the event has an exception throw it, else do nothing */
+ void verify() const { if (ex.get()) throw *ex; }
+
+ void post(EventChannel& channel) { channel.post(*this); }
+
+ private:
+ qpid::HeapException ex;
+};
+
+
+/** An asynchronous read event. */
+class ReadEvent : public RequestEvent {
+ public:
+ /**
+ * Read data from fd.
+ */
+ ReadEvent(int fileDescriptor, void* buffer, size_t bytesToRead) :
+ fd(fileDescriptor), data(buffer), size(bytesToRead) {}
+
+ /** Number of bytes read. */
+ size_t getBytesRead() const { verify(); return size; }
+
+ /**
+ * If the system supports direct access to DMA buffers then
+ * it may provide a direct pointer to such a buffer to avoid
+ * a copy into the user buffer.
+ * @return true if getData() is returning a system-supplied buffer.
+ */
+ bool isSystemData() const { verify(); return channel != 0; }
+
+ /**
+ * Pointer to data read. Note if isSystemData() is true then this
+ * is NOT the same buffer that was supplied to the constructor.
+ * The user buffer is untouched. See dispose().
+ */
+ void* getData() const { verify(); return data; }
+
+ /** Called by the event channel on completion. */
+ void complete(EventChannel::shared_ptr ec, void* _data, size_t _size) {
+ if (data != _data) channel = ec; data = _data; size = _size;
+ }
+
+ /**
+ * Dispose of system-provided data buffer, if any. This is
+ * automatically called by the destructor.
+ */
+ void dispose() { if(channel && data) channel->dispose(data,size); data=0; }
+
+ ~ReadEvent() { dispose(); }
+
+ void post(EventChannel& channel) { channel.post(*this); }
+
+ private:
+ int fd;
+ void* data;
+ size_t size;
+ EventChannel::shared_ptr channel;
+};
+
+/** Asynchronous write event */
+class WriteEvent : public RequestEvent {
+ public:
+ WriteEvent(int fileDescriptor, void* buffer, size_t bytesToWrite) :
+ fd(fileDescriptor), data(buffer), size(bytesToWrite) {}
+
+ /** Number of bytes written */
+ size_t getBytesWritten() const { verify(); return size; }
+
+ void post(EventChannel& channel) { channel.post(*this); }
+
+ private:
+ int fd;
+ void* data;
+ size_t size;
+};
+
+/** Asynchronous socket accept event */
+class AcceptEvent : public RequestEvent {
+ public:
+ /** Accept a connection on listeningFd */
+ AcceptEvent(int listeningFd) : listen(listeningFd) {}
+
+ /** Get accepted file descriptor */
+ int getAcceptedFd() const { verify(); return accepted; }
+
+ void post(EventChannel& channel) { channel.post(*this); }
+
+ private:
+ int listen;
+ int accepted;
+};
+
+/**
+ * NotifyEvent is delievered immediately to be dispatched by an
+ * EventChannel thread.
+ */
+class NotifyEvent : public RequestEvent {
+ public:
+ void post(EventChannel& channel) { channel.post(*this); }
+};
+
+
+inline void EventChannel::post(Event& event) { event.post(*this); }
+
+}}
+
+
+#endif /*!_sys_EventChannel_h*/
diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h
index 54956777de..a3abe37748 100644
--- a/cpp/src/qpid/sys/Monitor.h
+++ b/cpp/src/qpid/sys/Monitor.h
@@ -164,46 +164,46 @@ struct PODMutex
void PODMutex::lock() {
- CHECK(pthread_mutex_lock(&mutex));
+ CHECK0(pthread_mutex_lock(&mutex));
}
void PODMutex::unlock() {
- CHECK(pthread_mutex_unlock(&mutex));
+ CHECK0(pthread_mutex_unlock(&mutex));
}
void PODMutex::trylock() {
- CHECK(pthread_mutex_trylock(&mutex));
+ CHECK0(pthread_mutex_trylock(&mutex));
}
Mutex::Mutex() {
- CHECK(pthread_mutex_init(&mutex, 0));
+ CHECK0(pthread_mutex_init(&mutex, 0));
}
Mutex::~Mutex(){
- CHECK(pthread_mutex_destroy(&mutex));
+ CHECK0(pthread_mutex_destroy(&mutex));
}
void Mutex::lock() {
- CHECK(pthread_mutex_lock(&mutex));
+ CHECK0(pthread_mutex_lock(&mutex));
}
void Mutex::unlock() {
- CHECK(pthread_mutex_unlock(&mutex));
+ CHECK0(pthread_mutex_unlock(&mutex));
}
void Mutex::trylock() {
- CHECK(pthread_mutex_trylock(&mutex));
+ CHECK0(pthread_mutex_trylock(&mutex));
}
Monitor::Monitor() {
- CHECK(pthread_cond_init(&condition, 0));
+ CHECK0(pthread_cond_init(&condition, 0));
}
Monitor::~Monitor() {
- CHECK(pthread_cond_destroy(&condition));
+ CHECK0(pthread_cond_destroy(&condition));
}
void Monitor::wait() {
- CHECK(pthread_cond_wait(&condition, &mutex));
+ CHECK0(pthread_cond_wait(&condition, &mutex));
}
bool Monitor::wait(int64_t nsecs){
@@ -211,17 +211,17 @@ bool Monitor::wait(int64_t nsecs){
int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec());
if(status != 0) {
if (errno == ETIMEDOUT) return false;
- CHECK(status);
+ CHECK0(status);
}
return true;
}
void Monitor::notify(){
- CHECK(pthread_cond_signal(&condition));
+ CHECK0(pthread_cond_signal(&condition));
}
void Monitor::notifyAll(){
- CHECK(pthread_cond_broadcast(&condition));
+ CHECK0(pthread_cond_broadcast(&condition));
}
diff --git a/cpp/src/qpid/sys/SessionHandlerFactory.h b/cpp/src/qpid/sys/SessionHandlerFactory.h
index da58f5928e..2a01aebcb0 100644
--- a/cpp/src/qpid/sys/SessionHandlerFactory.h
+++ b/cpp/src/qpid/sys/SessionHandlerFactory.h
@@ -21,6 +21,8 @@
#ifndef _SessionHandlerFactory_
#define _SessionHandlerFactory_
+#include <boost/noncopyable.hpp>
+
namespace qpid {
namespace sys {
diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h
index ed42dfd811..79dfc184e0 100644
--- a/cpp/src/qpid/sys/Thread.h
+++ b/cpp/src/qpid/sys/Thread.h
@@ -87,11 +87,11 @@ Thread Thread::current(){
#else
Thread::Thread(Runnable* runnable) {
- CHECK(pthread_create(&thread, NULL, runRunnable, runnable));
+ CHECK0(pthread_create(&thread, NULL, runRunnable, runnable));
}
void Thread::join(){
- if (thread != 0) CHECK(pthread_join(thread, 0));
+ if (thread != 0) CHECK0(pthread_join(thread, 0));
}
Thread::Thread(pthread_t thr) : thread(thr) {}
diff --git a/cpp/src/qpid/sys/signal.h b/cpp/src/qpid/sys/signal.h
new file mode 100644
index 0000000000..6cae2b9e4a
--- /dev/null
+++ b/cpp/src/qpid/sys/signal.h
@@ -0,0 +1,46 @@
+#ifndef _sys_signal_h
+#define _sys_signal_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifdef USE_APR
+# include <apr-1/apr_signal.h>
+#else
+# include <signal.h>
+#endif
+
+namespace qpid {
+namespace sys {
+
+typedef void (*SignalHandler)(int);
+
+SignalHandler signal(int sig, SignalHandler handler)
+{
+#ifdef USE_APR
+ return apr_signal(sig, handler);
+#else
+ return ::signal (sig, handler);
+#endif
+}
+
+}}
+
+
+
+#endif /*!_sys_signal_h*/
diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp
index f4afad0467..cd50760f17 100644
--- a/cpp/src/qpidd.cpp
+++ b/cpp/src/qpidd.cpp
@@ -20,7 +20,7 @@
*/
#include <qpid/broker/Broker.h>
#include <qpid/broker/Configuration.h>
-#include <apr-1/apr_signal.h>
+#include <qpid/sys/signal.h>
#include <iostream>
#include <memory>
@@ -43,7 +43,7 @@ int main(int argc, char** argv)
config.usage();
}else{
broker = Broker::create(config);
- apr_signal(SIGINT, handle_signal);
+ qpid::sys::signal(SIGINT, handle_signal);
broker->run();
}
return 0;