summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kombu/compat.py24
-rw-r--r--kombu/connection.py25
-rw-r--r--kombu/pools.py4
-rw-r--r--kombu/transport/__init__.py2
-rw-r--r--kombu/utils/__init__.py17
5 files changed, 38 insertions, 34 deletions
diff --git a/kombu/compat.py b/kombu/compat.py
index cb3eef53..ab6398a7 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -36,15 +36,13 @@ class Publisher(messaging.Producer):
durable = True
auto_delete = False
_closed = False
- _provided_channel = None
+ _provided_channel = False
def __init__(self, connection, exchange=None, routing_key=None,
- exchange_type=None, durable=None, auto_delete=None, channel=None,
- **kwargs):
+ exchange_type=None, durable=None, auto_delete=None,
+ channel=None, **kwargs):
if channel:
- self._provided_channel = self.backend = channel
- else:
- self.backend = connection.channel()
+ connection, self._provided_channel = channel, True
self.exchange = exchange or self.exchange
self.exchange_type = exchange_type or self.exchange_type
@@ -61,20 +59,14 @@ class Publisher(messaging.Producer):
routing_key=self.routing_key,
auto_delete=self.auto_delete,
durable=self.durable)
-
- super(Publisher, self).__init__(self.backend, self.exchange,
- **kwargs)
+ super(Publisher, self).__init__(connection, self.exchange, **kwargs)
def send(self, *args, **kwargs):
return self.publish(*args, **kwargs)
- def revive(self, channel):
- self.backend = channel
- super(Publisher, self).revive(channel)
-
def close(self):
if not self._provided_channel:
- self.backend.close()
+ self.channel.close()
self._closed = True
def __enter__(self):
@@ -83,6 +75,10 @@ class Publisher(messaging.Producer):
def __exit__(self, *exc_info):
self.close()
+ @property
+ def backend(self):
+ return self.channel
+
class Consumer(messaging.Consumer):
queue = ""
diff --git a/kombu/connection.py b/kombu/connection.py
index 3a7dba15..88f9d21b 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -385,7 +385,7 @@ class BrokerConnection(object):
info[key] = value
return info
- def __hash__(self):
+ def __eqhash__(self):
return hash("|".join(map(str, self.info().itervalues())))
def as_uri(self, include_password=False):
@@ -634,22 +634,22 @@ class Resource(object):
if self.limit:
while 1:
try:
- resource = self._resource.get(block=block, timeout=timeout)
+ R = self._resource.get(block=block, timeout=timeout)
except Empty:
self._add_when_empty()
else:
- resource = self.prepare(resource)
- self._dirty.add(resource)
+ R = self.prepare(R)
+ self._dirty.add(R)
break
else:
- resource = self.prepare(self.new())
+ R = self.prepare(self.new())
@wraps(self.release)
def _release():
- self.release(resource)
- resource.release = _release
+ self.release(R)
+ R.release = _release
- return resource
+ return R
def prepare(self, resource):
return resource
@@ -665,9 +665,7 @@ class Resource(object):
of defective resources."""
if self.limit:
self._dirty.discard(resource)
- self.close_resource(resource)
- else:
- self.close_resource(resource)
+ self.close_resource(resource)
def release(self, resource):
"""Release resource so it can be used by another thread.
@@ -715,18 +713,21 @@ class Resource(object):
mutex.release()
if os.environ.get("KOMBU_DEBUG_POOL"):
-
_orig_acquire = acquire
_orig_release = release
_next_resource_id = 0
def acquire(self, *args, **kwargs): # noqa
+ import traceback
id = self._next_resource_id = self._next_resource_id + 1
print("+%s ACQUIRE %s" % (id, self.__class__.__name__, ))
r = self._orig_acquire(*args, **kwargs)
r._resource_id = id
print("-%s ACQUIRE %s" % (id, self.__class__.__name__, ))
+ if not hasattr(r, "acquired_by"):
+ r.acquired_by = []
+ r.acquired_by.append(traceback.format_stack())
return r
def release(self, resource): # noqa
diff --git a/kombu/pools.py b/kombu/pools.py
index 6061802b..98dd6cfa 100644
--- a/kombu/pools.py
+++ b/kombu/pools.py
@@ -16,7 +16,7 @@ from itertools import chain
from .connection import Resource
from .messaging import Producer
-from .utils import HashingDict
+from .utils import EqualityDict
__all__ = ["ProducerPool", "PoolGroup", "register_group",
"connections", "producers", "get_limit", "set_limit", "reset"]
@@ -64,7 +64,7 @@ class ProducerPool(Resource):
super(ProducerPool, self).release(resource)
-class PoolGroup(HashingDict):
+class PoolGroup(EqualityDict):
def __init__(self, limit=None):
self.limit = limit
diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py
index 4441b8d5..edf5f4e1 100644
--- a/kombu/transport/__init__.py
+++ b/kombu/transport/__init__.py
@@ -39,7 +39,7 @@ TRANSPORT_ALIASES = {
"amqp": "kombu.transport.amqplib.Transport",
"amqplib": "kombu.transport.amqplib.Transport",
"librabbitmq": "kombu.transport.librabbitmq.Transport",
- "pika": "kombu.transport.pika.AsyncoreTransport",
+ "pika": "kombu.transport.pika2.Transport",
"syncpika": "kombu.transport.pika.SyncTransport",
"memory": "kombu.transport.memory.Transport",
"redis": "kombu.transport.redis.Transport",
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 170fdeef..d01e153c 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -23,25 +23,32 @@ try:
except:
ctypes = None # noqa
-__all__ = ["HashingDict", "say", "uuid", "kwdict", "maybe_list",
+__all__ = ["EqualityDict", "say", "uuid", "kwdict", "maybe_list",
"fxrange", "fxrangemax", "retry_over_time",
"emergency_dump_state", "cached_property",
"reprkwargs", "reprcall", "nested"]
-class HashingDict(dict):
+def eqhash(o):
+ try:
+ return o.__eqhash__()
+ except AttributeError:
+ return hash(o)
+
+
+class EqualityDict(dict):
def __getitem__(self, key):
- h = hash(key)
+ h = eqhash(key)
if h not in self:
return self.__missing__(key)
return dict.__getitem__(self, h)
def __setitem__(self, key, value):
- return dict.__setitem__(self, hash(key), value)
+ return dict.__setitem__(self, eqhash(key), value)
def __delitem__(self, key):
- return dict.__delitem__(self, hash(key))
+ return dict.__delitem__(self, eqhash(key))
class HashingDict(dict):