summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatus Valo <matusvalo@gmail.com>2021-08-29 18:57:21 +0200
committerAsif Saif Uddin <auvipy@gmail.com>2021-08-30 14:02:20 +0600
commit7230665e5cd82c3e1b17dc9f5e16dce085994673 (patch)
treed530d0245ee9b478d2271856ea23c88904c3ab4d
parent24b0820d0bddcad074209cc1381d92d17207448c (diff)
downloadkombu-7230665e5cd82c3e1b17dc9f5e16dce085994673.tar.gz
Make BrokerState Transport specific
-rw-r--r--kombu/transport/filesystem.py7
-rw-r--r--kombu/transport/memory.py6
-rw-r--r--kombu/transport/pyro.py7
-rw-r--r--kombu/transport/virtual/base.py5
-rw-r--r--t/unit/transport/virtual/test_base.py7
5 files changed, 26 insertions, 6 deletions
diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py
index 8717ca78..d66c42d6 100644
--- a/kombu/transport/filesystem.py
+++ b/kombu/transport/filesystem.py
@@ -271,10 +271,15 @@ class Transport(virtual.Transport):
"""Filesystem Transport."""
Channel = Channel
-
+ # filesystem backend state is global.
+ global_state = virtual.BrokerState()
default_port = 0
driver_type = 'filesystem'
driver_name = 'filesystem'
+ def __init__(self, client, **kwargs):
+ super().__init__(client, **kwargs)
+ self.state = self.global_state
+
def driver_version(self):
return 'N/A'
diff --git a/kombu/transport/memory.py b/kombu/transport/memory.py
index 0a778294..3073d1cf 100644
--- a/kombu/transport/memory.py
+++ b/kombu/transport/memory.py
@@ -89,12 +89,16 @@ class Transport(virtual.Transport):
Channel = Channel
#: memory backend state is global.
- state = virtual.BrokerState()
+ global_state = virtual.BrokerState()
implements = base.Transport.implements
driver_type = 'memory'
driver_name = 'memory'
+ def __init__(self, client, **kwargs):
+ super().__init__(client, **kwargs)
+ self.state = self.global_state
+
def driver_version(self):
return 'N/A'
diff --git a/kombu/transport/pyro.py b/kombu/transport/pyro.py
index 4bae532c..833d9792 100644
--- a/kombu/transport/pyro.py
+++ b/kombu/transport/pyro.py
@@ -113,12 +113,17 @@ class Transport(virtual.Transport):
Channel = Channel
#: memory backend state is global.
- state = virtual.BrokerState()
+ # TODO: To be checked whether state can be per-Transport
+ global_state = virtual.BrokerState()
default_port = DEFAULT_PORT
driver_type = driver_name = 'pyro'
+ def __init__(self, client, **kwargs):
+ super().__init__(client, **kwargs)
+ self.state = self.global_state
+
def _open(self):
logger.debug("trying Pyro nameserver to find the broker daemon")
conninfo = self.client
diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py
index f054d82f..95e539ac 100644
--- a/kombu/transport/virtual/base.py
+++ b/kombu/transport/virtual/base.py
@@ -871,9 +871,6 @@ class Transport(base.Transport):
Cycle = FairCycle
Management = Management
- #: Global :class:`BrokerState` containing declared exchanges and bindings.
- state = BrokerState()
-
#: :class:`~kombu.utils.scheduling.FairCycle` instance
#: used to fairly drain events from channels (set by constructor).
cycle = None
@@ -901,6 +898,8 @@ class Transport(base.Transport):
def __init__(self, client, **kwargs):
self.client = client
+ # :class:`BrokerState` containing declared exchanges and bindings.
+ self.state = BrokerState()
self.channels = []
self._avail_channels = []
self._callbacks = {}
diff --git a/t/unit/transport/virtual/test_base.py b/t/unit/transport/virtual/test_base.py
index 1a079a36..681841a0 100644
--- a/t/unit/transport/virtual/test_base.py
+++ b/t/unit/transport/virtual/test_base.py
@@ -550,6 +550,13 @@ class test_Transport:
def setup(self):
self.transport = client().transport
+ def test_state_is_transport_specific(self):
+ # Tests that each Transport of Connection instance
+ # has own state attribute
+ conn1 = client()
+ conn2 = client()
+ assert conn1.transport.state != conn2.transport.state
+
def test_custom_polling_interval(self):
x = client(transport_options={'polling_interval': 32.3})
assert x.transport.polling_interval == 32.3