summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmer Katz <omer.drow@gmail.com>2019-03-19 09:53:06 +0200
committerOmer Katz <omer.drow@gmail.com>2019-03-19 09:53:06 +0200
commitd760002e256f0a488c436d0d57e5896adb162d7b (patch)
treee5ccfa63a37e0df730089ee19f761ed486edf84c
parentf08e2106eed9878e73d7d553473226d9c74c7d3d (diff)
downloadpy-amqp-threadsafty.tar.gz
Added a test that proves that amqp is not threadsafe.threadsafty
-rw-r--r--amqp/connection.py22
-rw-r--r--t/integration/test_rmq.py20
2 files changed, 39 insertions, 3 deletions
diff --git a/amqp/connection.py b/amqp/connection.py
index 5af06e1..75f014a 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals
import logging
import socket
+import threading
import uuid
import warnings
@@ -464,12 +465,19 @@ class Connection(AbstractChannel):
self._transport = self.connection = self.channels = None
def _get_free_channel_id(self):
+ channel_id = -1 # Pick the first available channel id
+ # if threading.current_thread() != threading.main_thread():
+ # # If we're on a different thread than the main thread
+ # # make sure to pick a channel not used by any other thread
+ # channel_id = threading.get_ident() % self.channel_max
+
try:
- return self._avail_channel_ids.pop()
+ return self._avail_channel_ids.pop(channel_id)
except IndexError:
+ current_channel_id = len(self.channels) if channel_id == -1 else channel_id
raise ResourceError(
'No free channel ids, current={0}, channel_max={1}'.format(
- len(self.channels), self.channel_max), spec.Channel.Open)
+ current_channel_id, self.channel_max), spec.Channel.Open)
def _claim_channel_id(self, channel_id):
try:
@@ -484,10 +492,18 @@ class Connection(AbstractChannel):
create that object if it doesn't already exist.
"""
if self.channels is not None:
+ if channel_id is None:
+ channel = self.Channel(self,
+ on_open=callback)
+ channel.open()
+ return channel
+
try:
return self.channels[channel_id]
except KeyError:
- channel = self.Channel(self, channel_id, on_open=callback)
+ channel = self.Channel(self,
+ channel_id=channel_id,
+ on_open=callback)
channel.open()
return channel
raise RecoverableConnectionError('Connection already closed.')
diff --git a/t/integration/test_rmq.py b/t/integration/test_rmq.py
index e5f0496..71ecaf3 100644
--- a/t/integration/test_rmq.py
+++ b/t/integration/test_rmq.py
@@ -1,6 +1,8 @@
from __future__ import absolute_import, unicode_literals
import os
+import threading
+import logging
import pytest
@@ -146,3 +148,21 @@ class test_rabbitmq_operations():
queue='py-amqp-unittest',
)
assert msg is None
+
+
+def test_threadsafty(caplog, connection):
+ caplog.set_level(logging.DEBUG)
+ connection.connect()
+
+ def worker():
+ connection.channel()
+
+ threads = [threading.Thread(target=worker) for _ in range(2)]
+
+ for t in threads:
+ t.start()
+
+ for t in threads:
+ t.join(timeout=30)
+
+ connection.close()