summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmer Katz <omer.drow@gmail.com>2019-01-09 13:21:20 +0200
committerOmer Katz <omer.drow@gmail.com>2019-01-09 13:21:20 +0200
commit662b11624042665cc5d7ef29c5946ea79d45f917 (patch)
treeb7d448827c80627b9ba7aa03cbe3583e732bc9bd
parenta28e7a27fbeeb296fd1a16d5dbebd26dfdcdfa6d (diff)
downloadkombu-refactor-connection-init.tar.gz
Refactor connection initialization code.refactor-connection-init
-rw-r--r--kombu/connection.py61
1 files changed, 36 insertions, 25 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index d74cb80d..fd69500d 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -160,7 +160,7 @@ class Connection(object):
transport_options=None, login_method=None, uri_prefix=None,
heartbeat=0, failover_strategy='round-robin',
alternates=None, **kwargs):
- alt = [] if alternates is None else alternates
+ self.alt = alternates or []
# have to spell the args out, just to get nice docstrings :(
params = self._initial_params = {
'hostname': hostname, 'userid': userid,
@@ -170,22 +170,37 @@ class Connection(object):
'login_method': login_method, 'heartbeat': heartbeat
}
- if hostname and not isinstance(hostname, string_t):
- alt.extend(hostname)
- hostname = alt[0]
- if hostname and '://' in hostname:
- if ';' in hostname:
- alt.extend(hostname.split(';'))
- hostname = alt[0]
+ if hostname:
+ hostname = self._parse_alternatives(hostname)
+ self._parse_transport_and_options_from_uri(hostname, transport, params)
+
+ self._init_params(**params)
+
+ # fallback hosts
+ self._initialize_failover_strategy(failover_strategy)
+
+ self.transport_options = transport_options or {}
+
+ if _log_connection: # pragma: no cover
+ self._logger = True
+
+ if uri_prefix:
+ self.uri_prefix = uri_prefix
+
+ self.declared_entities = set()
+
+ def _parse_transport_and_options_from_uri(self, hostname, transport, params):
+ if '://' in hostname:
if '+' in hostname[:hostname.index('://')]:
# e.g. sqla+mysql://root:masterkey@localhost/
params['transport'], params['hostname'] = \
hostname.split('+', 1)
self.uri_prefix = params['transport']
else:
+ # Figure out transport from arguments or URI
transport = transport or urlparse(hostname).scheme
if not get_transport_cls(transport).can_parse_url:
- # we must parse the URL
+ # We must parse the URL
url_params = parse_url(hostname)
params.update(
dictfilter(url_params),
@@ -194,10 +209,18 @@ class Connection(object):
params['transport'] = transport
- self._init_params(**params)
-
- # fallback hosts
- self.alt = alt
+ def _parse_alternatives(self, hostname):
+ if not isinstance(hostname, string_t):
+ # hostname is actually an iterable of hostnames
+ self.alt.extend(hostname)
+ hostname = self.alt[0]
+ elif ';' in hostname:
+ # hostname is a semicolon delimited list of hostnames
+ self.alt.extend(hostname.split(';'))
+ hostname = self.alt[0]
+ return hostname
+
+ def _initialize_failover_strategy(self, failover_strategy):
# keep text representation for .info
# only temporary solution as this won't work when
# passing a custom object (Issue celery/celery#3320).
@@ -208,18 +231,6 @@ class Connection(object):
self.cycle = self.failover_strategy(self.alt)
next(self.cycle) # skip first entry
- if transport_options is None:
- transport_options = {}
- self.transport_options = transport_options
-
- if _log_connection: # pragma: no cover
- self._logger = True
-
- if uri_prefix:
- self.uri_prefix = uri_prefix
-
- self.declared_entities = set()
-
def switch(self, url):
"""Switch connection parameters to use a new URL.