diff options
author | Omer Katz <omer.drow@gmail.com> | 2019-03-19 09:53:06 +0200 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2019-03-19 09:53:06 +0200 |
commit | d760002e256f0a488c436d0d57e5896adb162d7b (patch) | |
tree | e5ccfa63a37e0df730089ee19f761ed486edf84c | |
parent | f08e2106eed9878e73d7d553473226d9c74c7d3d (diff) | |
download | py-amqp-threadsafty.tar.gz |
Added a test that proves that amqp is not threadsafe.threadsafty
-rw-r--r-- | amqp/connection.py | 22 | ||||
-rw-r--r-- | t/integration/test_rmq.py | 20 |
2 files changed, 39 insertions, 3 deletions
diff --git a/amqp/connection.py b/amqp/connection.py index 5af06e1..75f014a 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals import logging import socket +import threading import uuid import warnings @@ -464,12 +465,19 @@ class Connection(AbstractChannel): self._transport = self.connection = self.channels = None def _get_free_channel_id(self): + channel_id = -1 # Pick the first available channel id + # if threading.current_thread() != threading.main_thread(): + # # If we're on a different thread than the main thread + # # make sure to pick a channel not used by any other thread + # channel_id = threading.get_ident() % self.channel_max + try: - return self._avail_channel_ids.pop() + return self._avail_channel_ids.pop(channel_id) except IndexError: + current_channel_id = len(self.channels) if channel_id == -1 else channel_id raise ResourceError( 'No free channel ids, current={0}, channel_max={1}'.format( - len(self.channels), self.channel_max), spec.Channel.Open) + current_channel_id, self.channel_max), spec.Channel.Open) def _claim_channel_id(self, channel_id): try: @@ -484,10 +492,18 @@ class Connection(AbstractChannel): create that object if it doesn't already exist. """ if self.channels is not None: + if channel_id is None: + channel = self.Channel(self, + on_open=callback) + channel.open() + return channel + try: return self.channels[channel_id] except KeyError: - channel = self.Channel(self, channel_id, on_open=callback) + channel = self.Channel(self, + channel_id=channel_id, + on_open=callback) channel.open() return channel raise RecoverableConnectionError('Connection already closed.') diff --git a/t/integration/test_rmq.py b/t/integration/test_rmq.py index e5f0496..71ecaf3 100644 --- a/t/integration/test_rmq.py +++ b/t/integration/test_rmq.py @@ -1,6 +1,8 @@ from __future__ import absolute_import, unicode_literals import os +import threading +import logging import pytest @@ -146,3 +148,21 @@ class test_rabbitmq_operations(): queue='py-amqp-unittest', ) assert msg is None + + +def test_threadsafty(caplog, connection): + caplog.set_level(logging.DEBUG) + connection.connect() + + def worker(): + connection.channel() + + threads = [threading.Thread(target=worker) for _ in range(2)] + + for t in threads: + t.start() + + for t in threads: + t.join(timeout=30) + + connection.close() |