summaryrefslogtreecommitdiff
path: root/amqp/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'amqp/connection.py')
-rw-r--r--amqp/connection.py22
1 files changed, 19 insertions, 3 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index 5af06e1..75f014a 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals
import logging
import socket
+import threading
import uuid
import warnings
@@ -464,12 +465,19 @@ class Connection(AbstractChannel):
self._transport = self.connection = self.channels = None
def _get_free_channel_id(self):
+ channel_id = -1 # Pick the first available channel id
+ # if threading.current_thread() != threading.main_thread():
+ # # If we're on a different thread than the main thread
+ # # make sure to pick a channel not used by any other thread
+ # channel_id = threading.get_ident() % self.channel_max
+
try:
- return self._avail_channel_ids.pop()
+ return self._avail_channel_ids.pop(channel_id)
except IndexError:
+ current_channel_id = len(self.channels) if channel_id == -1 else channel_id
raise ResourceError(
'No free channel ids, current={0}, channel_max={1}'.format(
- len(self.channels), self.channel_max), spec.Channel.Open)
+ current_channel_id, self.channel_max), spec.Channel.Open)
def _claim_channel_id(self, channel_id):
try:
@@ -484,10 +492,18 @@ class Connection(AbstractChannel):
create that object if it doesn't already exist.
"""
if self.channels is not None:
+ if channel_id is None:
+ channel = self.Channel(self,
+ on_open=callback)
+ channel.open()
+ return channel
+
try:
return self.channels[channel_id]
except KeyError:
- channel = self.Channel(self, channel_id, on_open=callback)
+ channel = self.Channel(self,
+ channel_id=channel_id,
+ on_open=callback)
channel.open()
return channel
raise RecoverableConnectionError('Connection already closed.')