From be6b5ededa5654ca43cea67927667456e32523a3 Mon Sep 17 00:00:00 2001 From: Paul Brown Date: Tue, 14 Dec 2021 10:03:11 +0000 Subject: reduce memory usage of Connection (#377) * reduce memory usage of Connection * allow ValueError on _used_channel_ids.remove --- amqp/channel.py | 6 +++++- amqp/connection.py | 22 ++++++++++++---------- t/unit/test_connection.py | 6 +++++- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/amqp/channel.py b/amqp/channel.py index b271820..77cfaab 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -150,7 +150,11 @@ class Channel(AbstractChannel): connection, self.connection = self.connection, None if connection: connection.channels.pop(channel_id, None) - connection._avail_channel_ids.append(channel_id) + try: + connection._used_channel_ids.remove(channel_id) + except ValueError: + # channel id already removed + pass self.callbacks.clear() self.cancel_callbacks.clear() self.events.clear() diff --git a/amqp/connection.py b/amqp/connection.py index 9917ec7..3e9097f 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -267,7 +267,7 @@ class Connection(AbstractChannel): self.on_unblocked = on_unblocked self.on_open = ensure_promise(on_open) - self._avail_channel_ids = array('H', range(self.channel_max, 0, -1)) + self._used_channel_ids = array('H') # Properties set in the Start method self.version_major = 0 @@ -482,18 +482,20 @@ class Connection(AbstractChannel): self._transport = self.connection = self.channels = None def _get_free_channel_id(self): - try: - return self._avail_channel_ids.pop() - except IndexError: - raise ResourceError( - 'No free channel ids, current={}, channel_max={}'.format( - len(self.channels), self.channel_max), spec.Channel.Open) + for channel_id in range(1, self.channel_max): + if channel_id not in self._used_channel_ids: + return channel_id + + raise ResourceError( + 'No free channel ids, current={}, channel_max={}'.format( + len(self.channels), self.channel_max), spec.Channel.Open) def _claim_channel_id(self, channel_id): - try: - return self._avail_channel_ids.remove(channel_id) - except ValueError: + if channel_id in self._used_channel_ids: raise ConnectionError(f'Channel {channel_id!r} already open') + else: + self._used_channel_ids.append(channel_id) + return channel_id def channel(self, channel_id=None, callback=None): """Create new channel. diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index a2997e6..03f0258 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -1,6 +1,7 @@ import re import socket import warnings +from array import array from unittest.mock import Mock, call, patch import pytest @@ -347,8 +348,11 @@ class test_Connection: self.conn.collect() self.conn.collect() + def test_get_free_channel_id(self): + assert self.conn._get_free_channel_id() == 1 + def test_get_free_channel_id__raises_IndexError(self): - self.conn._avail_channel_ids = [] + self.conn._used_channel_ids = array('H', range(1, self.conn.channel_max)) with pytest.raises(ResourceError): self.conn._get_free_channel_id() -- cgit v1.2.1