diff options
author | Omer Katz <omer.drow@gmail.com> | 2019-01-09 13:21:20 +0200 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2019-01-09 13:21:20 +0200 |
commit | 662b11624042665cc5d7ef29c5946ea79d45f917 (patch) | |
tree | b7d448827c80627b9ba7aa03cbe3583e732bc9bd | |
parent | a28e7a27fbeeb296fd1a16d5dbebd26dfdcdfa6d (diff) | |
download | kombu-refactor-connection-init.tar.gz |
Refactor connection initialization code.refactor-connection-init
-rw-r--r-- | kombu/connection.py | 61 |
1 files changed, 36 insertions, 25 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index d74cb80d..fd69500d 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -160,7 +160,7 @@ class Connection(object): transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs): - alt = [] if alternates is None else alternates + self.alt = alternates or [] # have to spell the args out, just to get nice docstrings :( params = self._initial_params = { 'hostname': hostname, 'userid': userid, @@ -170,22 +170,37 @@ class Connection(object): 'login_method': login_method, 'heartbeat': heartbeat } - if hostname and not isinstance(hostname, string_t): - alt.extend(hostname) - hostname = alt[0] - if hostname and '://' in hostname: - if ';' in hostname: - alt.extend(hostname.split(';')) - hostname = alt[0] + if hostname: + hostname = self._parse_alternatives(hostname) + self._parse_transport_and_options_from_uri(hostname, transport, params) + + self._init_params(**params) + + # fallback hosts + self._initialize_failover_strategy(failover_strategy) + + self.transport_options = transport_options or {} + + if _log_connection: # pragma: no cover + self._logger = True + + if uri_prefix: + self.uri_prefix = uri_prefix + + self.declared_entities = set() + + def _parse_transport_and_options_from_uri(self, hostname, transport, params): + if '://' in hostname: if '+' in hostname[:hostname.index('://')]: # e.g. sqla+mysql://root:masterkey@localhost/ params['transport'], params['hostname'] = \ hostname.split('+', 1) self.uri_prefix = params['transport'] else: + # Figure out transport from arguments or URI transport = transport or urlparse(hostname).scheme if not get_transport_cls(transport).can_parse_url: - # we must parse the URL + # We must parse the URL url_params = parse_url(hostname) params.update( dictfilter(url_params), @@ -194,10 +209,18 @@ class Connection(object): params['transport'] = transport - self._init_params(**params) - - # fallback hosts - self.alt = alt + def _parse_alternatives(self, hostname): + if not isinstance(hostname, string_t): + # hostname is actually an iterable of hostnames + self.alt.extend(hostname) + hostname = self.alt[0] + elif ';' in hostname: + # hostname is a semicolon delimited list of hostnames + self.alt.extend(hostname.split(';')) + hostname = self.alt[0] + return hostname + + def _initialize_failover_strategy(self, failover_strategy): # keep text representation for .info # only temporary solution as this won't work when # passing a custom object (Issue celery/celery#3320). @@ -208,18 +231,6 @@ class Connection(object): self.cycle = self.failover_strategy(self.alt) next(self.cycle) # skip first entry - if transport_options is None: - transport_options = {} - self.transport_options = transport_options - - if _log_connection: # pragma: no cover - self._logger = True - - if uri_prefix: - self.uri_prefix = uri_prefix - - self.declared_entities = set() - def switch(self, url): """Switch connection parameters to use a new URL. |