diff options
author | Greg Farnum <gregory.farnum@dreamhost.com> | 2012-04-18 18:20:30 -0700 |
---|---|---|
committer | Greg Farnum <gregory.farnum@dreamhost.com> | 2012-04-19 11:16:33 -0700 |
commit | c470e1a04bfc814dd5f7000dee38b7c811c8b462 (patch) | |
tree | e638a410fbedcf386dcb51ca51b4674c63991e04 | |
parent | db6d467b0c27333c1e01551fbac8361b5a8ae1bc (diff) | |
download | ceph-c470e1a04bfc814dd5f7000dee38b7c811c8b462.tar.gz |
msgr: start moving functions around in Pipe
-rw-r--r-- | src/msg/SimpleMessenger.h | 106 |
1 files changed, 57 insertions, 49 deletions
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 25fbd16b47a..2c1cf057586 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -410,7 +410,64 @@ private: * to provide reliable Message delivery when it manages to reconnect. */ class Pipe : public RefCountedObject { + /** + * The Reader thread handles all reads off the socket -- not just + * Messages, but also acks and other protocol bits (excepting startup, + * when the Writer does a couple of reads). + * All the work is implemented in Pipe itself, of course. + */ + class Reader : public Thread { + Pipe *pipe; + public: + Reader(Pipe *p) : pipe(p) {} + void *entry() { pipe->reader(); return 0; } + } reader_thread; + friend class Reader; + + /** + * The Writer thread handles all writes to the socket (after startup). + * All the work is implemented in Pipe itself, of course. + */ + class Writer : public Thread { + Pipe *pipe; + public: + Writer(Pipe *p) : pipe(p) {} + void *entry() { pipe->writer(); return 0; } + } writer_thread; + friend class Writer; + public: + Pipe(SimpleMessenger *r, int st) : + reader_thread(this), writer_thread(this), + msgr(r), + sd(-1), + peer_type(-1), + pipe_lock("SimpleMessenger::Pipe::pipe_lock"), + state(st), + connection_state(new Connection), + reader_running(false), reader_joining(false), writer_running(false), + in_qlen(0), keepalive(false), halt_delivery(false), + close_on_empty(false), disposable(false), + connect_seq(0), peer_global_seq(0), + out_seq(0), in_seq(0), in_seq_acked(0) { + connection_state->pipe = get(); + msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms + if (msgr->timeout == 0) + msgr->timeout = -1; + } + ~Pipe() { + for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin(); + i != queue_items.end(); + ++i) { + assert(!i->second->is_on_list()); + delete i->second; + } + assert(out_q.empty()); + assert(sent.empty()); + if (connection_state) + connection_state->put(); + } + SimpleMessenger *msgr; ostream& _pipe_prefix(std::ostream *_dout); @@ -503,59 +560,10 @@ private: } } - // threads - class Reader : public Thread { - Pipe *pipe; public: - Reader(Pipe *p) : pipe(p) {} - void *entry() { pipe->reader(); return 0; } - } reader_thread; - friend class Reader; - - class Writer : public Thread { - Pipe *pipe; - public: - Writer(Pipe *p) : pipe(p) {} - void *entry() { pipe->writer(); return 0; } - } writer_thread; - friend class Writer; - - public: Pipe(const Pipe& other); const Pipe& operator=(const Pipe& other); - Pipe(SimpleMessenger *r, int st) : - msgr(r), - sd(-1), - peer_type(-1), - pipe_lock("SimpleMessenger::Pipe::pipe_lock"), - state(st), - connection_state(new Connection), - reader_running(false), reader_joining(false), writer_running(false), - in_qlen(0), keepalive(false), halt_delivery(false), - close_on_empty(false), disposable(false), - connect_seq(0), peer_global_seq(0), - out_seq(0), in_seq(0), in_seq_acked(0), - reader_thread(this), writer_thread(this) { - connection_state->pipe = get(); - msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms - if (msgr->timeout == 0) - msgr->timeout = -1; - } - ~Pipe() { - for (map<int, xlist<Pipe *>::item* >::iterator i = queue_items.begin(); - i != queue_items.end(); - ++i) { - assert(!i->second->is_on_list()); - delete i->second; - } - assert(out_q.empty()); - assert(sent.empty()); - if (connection_state) - connection_state->put(); - } - - void start_reader() { assert(pipe_lock.is_locked()); assert(!reader_running); |