diff options
author | Ask Solem <ask@celeryproject.org> | 2012-07-24 12:10:32 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-07-24 12:11:01 +0100 |
commit | 9f595a8ce2b3209e4f3907c026b9c25248978b76 (patch) | |
tree | 20e11bf8de92d31783caf175a572e364df1c011f | |
parent | 775a2ab809cf4b8d7e1edb931ef53fbb6ea239c0 (diff) | |
download | py-amqp-9f595a8ce2b3209e4f3907c026b9c25248978b76.tar.gz |
No longer any need to loop in Connection.__init__
-rw-r--r-- | amqp/connection.py | 71 | ||||
-rwxr-xr-x | tests/fake_redirect.py | 166 |
2 files changed, 35 insertions, 202 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index ad09019..5b2605b 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -120,47 +120,46 @@ class Connection(AbstractChannel): self.known_hosts = '' self._method_override = {(60, 50): self._dispatch_basic_return} - while 1: - self.channels = {} - # The connection object itself is treated as channel 0 - super(Connection, self).__init__(self, 0) - - self.transport = None - - # Properties set in the Tune method - self.channel_max = channel_max - self.frame_max = frame_max - self.heartbeat = heartbeat - - # Properties set in the Start method - self.version_major = 0 - self.version_minor = 0 - self.server_properties = {} - self.mechanisms = [] - self.locales = [] - - # Let the transport.py module setup the actual - # socket connection to the broker. - # - self.transport = create_transport(host, connect_timeout, ssl) + self.channels = {} + # The connection object itself is treated as channel 0 + super(Connection, self).__init__(self, 0) + + self.transport = None + + # Properties set in the Tune method + self.channel_max = channel_max + self.frame_max = frame_max + self.heartbeat = heartbeat + + # Properties set in the Start method + self.version_major = 0 + self.version_minor = 0 + self.server_properties = {} + self.mechanisms = [] + self.locales = [] + + # Let the transport.py module setup the actual + # socket connection to the broker. + # + self.transport = create_transport(host, connect_timeout, ssl) - self.method_reader = MethodReader(self.transport) - self.method_writer = MethodWriter(self.transport, self.frame_max) + self.method_reader = MethodReader(self.transport) + self.method_writer = MethodWriter(self.transport, self.frame_max) - self.wait(allowed_methods=[ - (10, 10), # start - ]) + self.wait(allowed_methods=[ + (10, 10), # start + ]) - self._x_start_ok(d, login_method, login_response, locale) + self._x_start_ok(d, login_method, login_response, locale) - self._wait_tune_ok = True - while self._wait_tune_ok: - self.wait(allowed_methods=[ - (10, 20), # secure - (10, 30), # tune - ]) + self._wait_tune_ok = True + while self._wait_tune_ok: + self.wait(allowed_methods=[ + (10, 20), # secure + (10, 30), # tune + ]) - return self._x_open(virtual_host, insist=insist) + return self._x_open(virtual_host, insist=insist) def _do_close(self): try: diff --git a/tests/fake_redirect.py b/tests/fake_redirect.py deleted file mode 100755 index 94e1aee..0000000 --- a/tests/fake_redirect.py +++ /dev/null @@ -1,166 +0,0 @@ -#!/usr/bin/env python -""" -Fake AMQP Redirect - simulate an AMQP server that redirects connections to -another server. A bit ugly, but it's just to test that the client library -actually handles a redirect, without having to have an unbalanced cluster -of real AMQP servers. - -2007-12-08 Barry Pederson <bp@barryp.org> - -""" -# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 - -import socket -import sys -from optparse import OptionParser -from Queue import Queue - -import amqp -from amqp.connection import AMQP_PROTOCOL_HEADER, _MethodReader -from amqp.serialization import AMQPReader, AMQPWriter - -class FakeRedirectConnection(amqp.Connection): - def __init__(self, sock): - self.channels = {} - super(amqp.Connection, self).__init__(self, 0) - - self.out = AMQPWriter(sock.makefile('w')) - self.input = AMQPReader(sock.makefile('r')) - self.method_reader = _MethodReader(self.input) - - - def do_redirect(self, dest): - if self.input.read(8) != AMQP_PROTOCOL_HEADER: - print "Didn't receive AMQP 0-8 header" - return - - # major, minor seems backwards, but that's what RabbitMQ sends - self.start(8, 0, - {'product': 'fake_redirect_0_8.py'}, - ['AMQPLAIN'], - ['en_US']) - - self.wait(allowed_methods=[ - (10, 11), # start_ok - ]) - - self.tune(0, 0, 0) - - self.wait(allowed_methods=[ - (10, 31), # tune_ok - ]) - - self.wait(allowed_methods=[ - (10, 40), # open - ]) - - if self.insist: - self.close(reply_text="Can't redirect, insist was set to True") - else: - self.redirect(dest, '') - try: - self.wait(allowed_methods=[ - (10, 60), # close - ]) - except amqp.AMQPConnectionException: - pass - - print 'Redirect finished' - - - def fake_op(self, args): - """ - We're not really much interested in what the client sends for - start_ok, tune_ok - - """ - pass - - ############## - - def _open(self, args): - virtual_host = args.read_shortstr() - capabilities = args.read_shortstr() - self.insist = args.read_bit() - - - def redirect(self, host, known_hosts): - args = AMQPWriter() - args.write_shortstr(host) - args.write_shortstr(known_hosts) - self._send_channel_method_frame(0, (10, 50), args) - - - def start(self, version_major, version_minor, server_properties, - mechanisms, locales): - args = AMQPWriter() - args.write_octet(version_major) - args.write_octet(version_minor) - args.write_table(server_properties) - args.write_longstr(' '.join(mechanisms)) - args.write_longstr(' '.join(locales)) - self._send_channel_method_frame(0, (10, 10), args) - - - def tune(self, channel_max, frame_max, heartbeat): - args = AMQPWriter() - args.write_short(channel_max) - args.write_long(frame_max) - args.write_short(heartbeat) - self._send_channel_method_frame(0, (10, 30), args) - -# -# Monkeypatch the amqp.Connection _METHOD_MAP dict to -# work with our FakeRedirectConnection -# -amqp.Connection._METHOD_MAP[(10, 11)] = FakeRedirectConnection.fake_op -amqp.Connection._METHOD_MAP[(10, 31)] = FakeRedirectConnection.fake_op -amqp.Connection._METHOD_MAP[(10, 40)] = FakeRedirectConnection._open - - -def main(): - parser = OptionParser(usage='usage: %prog [options]\nexample: %prog --listen=127.0.0.1:5000 --redirect=127.0.0.1:5672') - parser.add_option('--listen', dest='listen', - help='ip:port to listen for an AMQP connection on', - default=None) - parser.add_option('--redirect', dest='redirect', - help='ip:port to redirect AMQP connection to', - default=None) - - options, args = parser.parse_args() - - if not options.listen or not options.redirect: - parser.print_help() - sys.exit(1) - - listen_ip, listen_port = options.listen.split(':', 1) - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.bind((listen_ip, int(listen_port))) - print 'listening for connection...' - s.listen(1) - - while True: - sock, addr = s.accept() - print 'Accepted connection from', addr - - conn = FakeRedirectConnection(sock) - conn.do_redirect(options.redirect) - - -if __name__ == '__main__': - main() - |