summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-07-24 12:10:32 +0100
committerAsk Solem <ask@celeryproject.org>2012-07-24 12:11:01 +0100
commit9f595a8ce2b3209e4f3907c026b9c25248978b76 (patch)
tree20e11bf8de92d31783caf175a572e364df1c011f
parent775a2ab809cf4b8d7e1edb931ef53fbb6ea239c0 (diff)
downloadpy-amqp-9f595a8ce2b3209e4f3907c026b9c25248978b76.tar.gz
No longer any need to loop in Connection.__init__
-rw-r--r--amqp/connection.py71
-rwxr-xr-xtests/fake_redirect.py166
2 files changed, 35 insertions, 202 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index ad09019..5b2605b 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -120,47 +120,46 @@ class Connection(AbstractChannel):
self.known_hosts = ''
self._method_override = {(60, 50): self._dispatch_basic_return}
- while 1:
- self.channels = {}
- # The connection object itself is treated as channel 0
- super(Connection, self).__init__(self, 0)
-
- self.transport = None
-
- # Properties set in the Tune method
- self.channel_max = channel_max
- self.frame_max = frame_max
- self.heartbeat = heartbeat
-
- # Properties set in the Start method
- self.version_major = 0
- self.version_minor = 0
- self.server_properties = {}
- self.mechanisms = []
- self.locales = []
-
- # Let the transport.py module setup the actual
- # socket connection to the broker.
- #
- self.transport = create_transport(host, connect_timeout, ssl)
+ self.channels = {}
+ # The connection object itself is treated as channel 0
+ super(Connection, self).__init__(self, 0)
+
+ self.transport = None
+
+ # Properties set in the Tune method
+ self.channel_max = channel_max
+ self.frame_max = frame_max
+ self.heartbeat = heartbeat
+
+ # Properties set in the Start method
+ self.version_major = 0
+ self.version_minor = 0
+ self.server_properties = {}
+ self.mechanisms = []
+ self.locales = []
+
+ # Let the transport.py module setup the actual
+ # socket connection to the broker.
+ #
+ self.transport = create_transport(host, connect_timeout, ssl)
- self.method_reader = MethodReader(self.transport)
- self.method_writer = MethodWriter(self.transport, self.frame_max)
+ self.method_reader = MethodReader(self.transport)
+ self.method_writer = MethodWriter(self.transport, self.frame_max)
- self.wait(allowed_methods=[
- (10, 10), # start
- ])
+ self.wait(allowed_methods=[
+ (10, 10), # start
+ ])
- self._x_start_ok(d, login_method, login_response, locale)
+ self._x_start_ok(d, login_method, login_response, locale)
- self._wait_tune_ok = True
- while self._wait_tune_ok:
- self.wait(allowed_methods=[
- (10, 20), # secure
- (10, 30), # tune
- ])
+ self._wait_tune_ok = True
+ while self._wait_tune_ok:
+ self.wait(allowed_methods=[
+ (10, 20), # secure
+ (10, 30), # tune
+ ])
- return self._x_open(virtual_host, insist=insist)
+ return self._x_open(virtual_host, insist=insist)
def _do_close(self):
try:
diff --git a/tests/fake_redirect.py b/tests/fake_redirect.py
deleted file mode 100755
index 94e1aee..0000000
--- a/tests/fake_redirect.py
+++ /dev/null
@@ -1,166 +0,0 @@
-#!/usr/bin/env python
-"""
-Fake AMQP Redirect - simulate an AMQP server that redirects connections to
-another server. A bit ugly, but it's just to test that the client library
-actually handles a redirect, without having to have an unbalanced cluster
-of real AMQP servers.
-
-2007-12-08 Barry Pederson <bp@barryp.org>
-
-"""
-# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-
-import socket
-import sys
-from optparse import OptionParser
-from Queue import Queue
-
-import amqp
-from amqp.connection import AMQP_PROTOCOL_HEADER, _MethodReader
-from amqp.serialization import AMQPReader, AMQPWriter
-
-class FakeRedirectConnection(amqp.Connection):
- def __init__(self, sock):
- self.channels = {}
- super(amqp.Connection, self).__init__(self, 0)
-
- self.out = AMQPWriter(sock.makefile('w'))
- self.input = AMQPReader(sock.makefile('r'))
- self.method_reader = _MethodReader(self.input)
-
-
- def do_redirect(self, dest):
- if self.input.read(8) != AMQP_PROTOCOL_HEADER:
- print "Didn't receive AMQP 0-8 header"
- return
-
- # major, minor seems backwards, but that's what RabbitMQ sends
- self.start(8, 0,
- {'product': 'fake_redirect_0_8.py'},
- ['AMQPLAIN'],
- ['en_US'])
-
- self.wait(allowed_methods=[
- (10, 11), # start_ok
- ])
-
- self.tune(0, 0, 0)
-
- self.wait(allowed_methods=[
- (10, 31), # tune_ok
- ])
-
- self.wait(allowed_methods=[
- (10, 40), # open
- ])
-
- if self.insist:
- self.close(reply_text="Can't redirect, insist was set to True")
- else:
- self.redirect(dest, '')
- try:
- self.wait(allowed_methods=[
- (10, 60), # close
- ])
- except amqp.AMQPConnectionException:
- pass
-
- print 'Redirect finished'
-
-
- def fake_op(self, args):
- """
- We're not really much interested in what the client sends for
- start_ok, tune_ok
-
- """
- pass
-
- ##############
-
- def _open(self, args):
- virtual_host = args.read_shortstr()
- capabilities = args.read_shortstr()
- self.insist = args.read_bit()
-
-
- def redirect(self, host, known_hosts):
- args = AMQPWriter()
- args.write_shortstr(host)
- args.write_shortstr(known_hosts)
- self._send_channel_method_frame(0, (10, 50), args)
-
-
- def start(self, version_major, version_minor, server_properties,
- mechanisms, locales):
- args = AMQPWriter()
- args.write_octet(version_major)
- args.write_octet(version_minor)
- args.write_table(server_properties)
- args.write_longstr(' '.join(mechanisms))
- args.write_longstr(' '.join(locales))
- self._send_channel_method_frame(0, (10, 10), args)
-
-
- def tune(self, channel_max, frame_max, heartbeat):
- args = AMQPWriter()
- args.write_short(channel_max)
- args.write_long(frame_max)
- args.write_short(heartbeat)
- self._send_channel_method_frame(0, (10, 30), args)
-
-#
-# Monkeypatch the amqp.Connection _METHOD_MAP dict to
-# work with our FakeRedirectConnection
-#
-amqp.Connection._METHOD_MAP[(10, 11)] = FakeRedirectConnection.fake_op
-amqp.Connection._METHOD_MAP[(10, 31)] = FakeRedirectConnection.fake_op
-amqp.Connection._METHOD_MAP[(10, 40)] = FakeRedirectConnection._open
-
-
-def main():
- parser = OptionParser(usage='usage: %prog [options]\nexample: %prog --listen=127.0.0.1:5000 --redirect=127.0.0.1:5672')
- parser.add_option('--listen', dest='listen',
- help='ip:port to listen for an AMQP connection on',
- default=None)
- parser.add_option('--redirect', dest='redirect',
- help='ip:port to redirect AMQP connection to',
- default=None)
-
- options, args = parser.parse_args()
-
- if not options.listen or not options.redirect:
- parser.print_help()
- sys.exit(1)
-
- listen_ip, listen_port = options.listen.split(':', 1)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.bind((listen_ip, int(listen_port)))
- print 'listening for connection...'
- s.listen(1)
-
- while True:
- sock, addr = s.accept()
- print 'Accepted connection from', addr
-
- conn = FakeRedirectConnection(sock)
- conn.do_redirect(options.redirect)
-
-
-if __name__ == '__main__':
- main()
-