summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Meier <roger@apache.org>2013-07-08 23:35:25 +0200
committerRoger Meier <roger@apache.org>2013-07-08 23:35:25 +0200
commit6f2a5037105ccad05eb84ec0a60da3389c85eb3f (patch)
tree65cf256233ee1a4559310a4c4d7844cf5f67b4f8
parent6b9e1c6a87d745c224f6737b07b3ed7d72fcd6e0 (diff)
downloadthrift-6f2a5037105ccad05eb84ec0a60da3389c85eb3f.tar.gz
THRIFT-1442 TNonblockingServer: Refactor to allow multiple IO Threads
Patch: Pavlin Radoslavov
-rw-r--r--lib/cpp/src/thrift/server/TNonblockingServer.cpp61
-rw-r--r--lib/cpp/src/thrift/server/TNonblockingServer.h49
2 files changed, 76 insertions, 34 deletions
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 398eadecc..dcc90e25b 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -1191,13 +1191,12 @@ void TNonblockingServer::stop() {
}
}
-/**
- * Main workhorse function, starts up the server listening on a port and
- * loops over the libevent handler.
- */
-void TNonblockingServer::serve() {
+void TNonblockingServer::registerEvents(event_base* user_event_base) {
+ userEventBase_ = user_event_base;
+
// init listen socket
- createAndListenOnSocket();
+ if (serverSocket_ < 0)
+ createAndListenOnSocket();
// set up the IO threads
assert(ioThreads_.empty());
@@ -1248,6 +1247,18 @@ void TNonblockingServer::serve() {
}
}
+ // Register the events for the primary (listener) IO thread
+ ioThreads_[0]->registerEvents();
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+
+ registerEvents(NULL);
+
// Run the primary (listener) IO thread loop in our main thread; this will
// only return when the server is shutting down.
ioThreads_[0]->run();
@@ -1267,7 +1278,8 @@ TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
, number_(number)
, listenSocket_(listenSocket)
, useHighPriority_(useHighPriority)
- , eventBase_(NULL) {
+ , eventBase_(NULL)
+ , ownEventBase_(false) {
notificationPipeFDs_[0] = -1;
notificationPipeFDs_[1] = -1;
}
@@ -1276,8 +1288,9 @@ TNonblockingIOThread::~TNonblockingIOThread() {
// make sure our associated thread is fully finished
join();
- if (eventBase_) {
+ if (eventBase_ && ownEventBase_) {
event_base_free(eventBase_);
+ ownEventBase_ = false;
}
if (listenSocket_ >= 0) {
@@ -1330,6 +1343,22 @@ void TNonblockingIOThread::createNotificationPipe() {
* Register the core libevent events onto the proper base.
*/
void TNonblockingIOThread::registerEvents() {
+ threadId_ = Thread::get_current();
+
+ assert(eventBase_ == 0);
+ eventBase_ = getServer()->getUserEventBase();
+ if (eventBase_ == NULL) {
+ eventBase_ = event_base_new();
+ ownEventBase_ = true;
+ }
+
+ // Print some libevent stats
+ if (number_ == 0) {
+ GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+ event_get_version(),
+ event_base_get_method(eventBase_));
+ }
+
if (listenSocket_ >= 0) {
// Register the server event
event_set(&serverEvent_,
@@ -1478,20 +1507,8 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
}
void TNonblockingIOThread::run() {
- threadId_ = Thread::get_current();
-
- assert(eventBase_ == 0);
- eventBase_ = event_base_new();
-
- // Print some libevent stats
- if (number_ == 0) {
- GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
- event_get_version(),
- event_base_get_method(eventBase_));
- }
-
-
- registerEvents();
+ if (eventBase_ == NULL)
+ registerEvents();
GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
number_);
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index 9e6ba170a..585aa79bc 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -160,6 +160,9 @@ class TNonblockingServer : public TServer {
/// Port server runs on
int port_;
+ /// The optional user-provided event-base (for single-thread servers)
+ event_base* userEventBase_;
+
/// For processing via thread pool, may be NULL
boost::shared_ptr<ThreadManager> threadManager_;
@@ -279,6 +282,7 @@ class TNonblockingServer : public TServer {
nextIOThread_ = 0;
useHighPriorityIOThreads_ = false;
port_ = port;
+ userEventBase_ = NULL;
threadPoolProcessing_ = false;
numTConnections_ = 0;
numActiveProcessors_ = 0;
@@ -756,15 +760,6 @@ class TNonblockingServer : public TServer {
*/
void stop();
- private:
- /**
- * Callback function that the threadmanager calls when a task reaches
- * its expiration time. It is needed to clean up the expired connection.
- *
- * @param task the runnable associated with the expired task.
- */
- void expireClose(boost::shared_ptr<Runnable> task);
-
/// Creates a socket to listen on and binds it to the local port.
void createAndListenOnSocket();
@@ -775,6 +770,32 @@ class TNonblockingServer : public TServer {
* @param fd descriptor of socket to be initialized/
*/
void listenSocket(THRIFT_SOCKET fd);
+
+ /**
+ * Register the optional user-provided event-base (for single-thread servers)
+ *
+ * This method should be used when the server is running in a single-thread
+ * mode, and the event base is provided by the user (i.e., the caller).
+ *
+ * @param user_event_base the user-provided event-base. The user is
+ * responsible for freeing the event base memory.
+ */
+ void registerEvents(event_base* user_event_base);
+
+ /**
+ * Returns the optional user-provided event-base (for single-thread servers).
+ */
+ event_base* getUserEventBase() const { return userEventBase_; }
+
+ private:
+ /**
+ * Callback function that the threadmanager calls when a task reaches
+ * its expiration time. It is needed to clean up the expired connection.
+ *
+ * @param task the runnable associated with the expired task.
+ */
+ void expireClose(boost::shared_ptr<Runnable> task);
+
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
@@ -847,6 +868,9 @@ class TNonblockingIOThread : public Runnable {
// Ensures that the event-loop thread is fully finished and shut down.
void join();
+ /// Registers the events for the notification & listen sockets
+ void registerEvents();
+
private:
/**
* C-callable event handler for signaling task completion. Provides a
@@ -873,9 +897,6 @@ class TNonblockingIOThread : public Runnable {
/// Exits the loop ASAP in case of shutdown or error.
void breakLoop(bool error);
- /// Registers the events for the notification & listen sockets
- void registerEvents();
-
/// Create the pipe used to notify I/O process of task completion.
void createNotificationPipe();
@@ -904,6 +925,10 @@ class TNonblockingIOThread : public Runnable {
/// pointer to eventbase to be used for looping
event_base* eventBase_;
+ /// Set to true if this class is responsible for freeing the event base
+ /// memory.
+ bool ownEventBase_;
+
/// Used with eventBase_ for connection events (only in listener thread)
struct event serverEvent_;