summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <gregory.farnum@dreamhost.com>2012-04-18 18:20:30 -0700
committerGreg Farnum <gregory.farnum@dreamhost.com>2012-04-19 11:16:33 -0700
commitc470e1a04bfc814dd5f7000dee38b7c811c8b462 (patch)
treee638a410fbedcf386dcb51ca51b4674c63991e04
parentdb6d467b0c27333c1e01551fbac8361b5a8ae1bc (diff)
downloadceph-c470e1a04bfc814dd5f7000dee38b7c811c8b462.tar.gz
msgr: start moving functions around in Pipe
-rw-r--r--src/msg/SimpleMessenger.h106
1 files changed, 57 insertions, 49 deletions
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 25fbd16b47a..2c1cf057586 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -410,7 +410,64 @@ private:
* to provide reliable Message delivery when it manages to reconnect.
*/
class Pipe : public RefCountedObject {
+ /**
+ * The Reader thread handles all reads off the socket -- not just
+ * Messages, but also acks and other protocol bits (excepting startup,
+ * when the Writer does a couple of reads).
+ * All the work is implemented in Pipe itself, of course.
+ */
+ class Reader : public Thread {
+ Pipe *pipe;
+ public:
+ Reader(Pipe *p) : pipe(p) {}
+ void *entry() { pipe->reader(); return 0; }
+ } reader_thread;
+ friend class Reader;
+
+ /**
+ * The Writer thread handles all writes to the socket (after startup).
+ * All the work is implemented in Pipe itself, of course.
+ */
+ class Writer : public Thread {
+ Pipe *pipe;
+ public:
+ Writer(Pipe *p) : pipe(p) {}
+ void *entry() { pipe->writer(); return 0; }
+ } writer_thread;
+ friend class Writer;
+
public:
+ Pipe(SimpleMessenger *r, int st) :
+ reader_thread(this), writer_thread(this),
+ msgr(r),
+ sd(-1),
+ peer_type(-1),
+ pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
+ state(st),
+ connection_state(new Connection),
+ reader_running(false), reader_joining(false), writer_running(false),
+ in_qlen(0), keepalive(false), halt_delivery(false),
+ close_on_empty(false), disposable(false),
+ connect_seq(0), peer_global_seq(0),
+ out_seq(0), in_seq(0), in_seq_acked(0) {
+ connection_state->pipe = get();
+ msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
+ if (msgr->timeout == 0)
+ msgr->timeout = -1;
+ }
+ ~Pipe() {
+ for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
+ i != queue_items.end();
+ ++i) {
+ assert(!i->second->is_on_list());
+ delete i->second;
+ }
+ assert(out_q.empty());
+ assert(sent.empty());
+ if (connection_state)
+ connection_state->put();
+ }
+
SimpleMessenger *msgr;
ostream& _pipe_prefix(std::ostream *_dout);
@@ -503,59 +560,10 @@ private:
}
}
- // threads
- class Reader : public Thread {
- Pipe *pipe;
public:
- Reader(Pipe *p) : pipe(p) {}
- void *entry() { pipe->reader(); return 0; }
- } reader_thread;
- friend class Reader;
-
- class Writer : public Thread {
- Pipe *pipe;
- public:
- Writer(Pipe *p) : pipe(p) {}
- void *entry() { pipe->writer(); return 0; }
- } writer_thread;
- friend class Writer;
-
- public:
Pipe(const Pipe& other);
const Pipe& operator=(const Pipe& other);
- Pipe(SimpleMessenger *r, int st) :
- msgr(r),
- sd(-1),
- peer_type(-1),
- pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
- state(st),
- connection_state(new Connection),
- reader_running(false), reader_joining(false), writer_running(false),
- in_qlen(0), keepalive(false), halt_delivery(false),
- close_on_empty(false), disposable(false),
- connect_seq(0), peer_global_seq(0),
- out_seq(0), in_seq(0), in_seq_acked(0),
- reader_thread(this), writer_thread(this) {
- connection_state->pipe = get();
- msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
- if (msgr->timeout == 0)
- msgr->timeout = -1;
- }
- ~Pipe() {
- for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin();
- i != queue_items.end();
- ++i) {
- assert(!i->second->is_on_list());
- delete i->second;
- }
- assert(out_q.empty());
- assert(sent.empty());
- if (connection_state)
- connection_state->put();
- }
-
-
void start_reader() {
assert(pipe_lock.is_locked());
assert(!reader_running);