diff options
Diffstat (limited to 'amqp/connection.py')
-rw-r--r-- | amqp/connection.py | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index 5af06e1..75f014a 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals import logging import socket +import threading import uuid import warnings @@ -464,12 +465,19 @@ class Connection(AbstractChannel): self._transport = self.connection = self.channels = None def _get_free_channel_id(self): + channel_id = -1 # Pick the first available channel id + # if threading.current_thread() != threading.main_thread(): + # # If we're on a different thread than the main thread + # # make sure to pick a channel not used by any other thread + # channel_id = threading.get_ident() % self.channel_max + try: - return self._avail_channel_ids.pop() + return self._avail_channel_ids.pop(channel_id) except IndexError: + current_channel_id = len(self.channels) if channel_id == -1 else channel_id raise ResourceError( 'No free channel ids, current={0}, channel_max={1}'.format( - len(self.channels), self.channel_max), spec.Channel.Open) + current_channel_id, self.channel_max), spec.Channel.Open) def _claim_channel_id(self, channel_id): try: @@ -484,10 +492,18 @@ class Connection(AbstractChannel): create that object if it doesn't already exist. """ if self.channels is not None: + if channel_id is None: + channel = self.Channel(self, + on_open=callback) + channel.open() + return channel + try: return self.channels[channel_id] except KeyError: - channel = self.Channel(self, channel_id, on_open=callback) + channel = self.Channel(self, + channel_id=channel_id, + on_open=callback) channel.open() return channel raise RecoverableConnectionError('Connection already closed.') |