summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-13 12:42:47 -0700
committerGitHub <noreply@github.com>2017-03-13 12:42:47 -0700
commitce57dac0c6c620371a1c484b9619e2deb83be82e (patch)
treeca8ec78932b552adca256c71d226214274bf3146
parent899f11730db5f209c03cfad20111ec131ee4c70b (diff)
downloadkafka-python-ce57dac0c6c620371a1c484b9619e2deb83be82e.tar.gz
Return copy of consumer subscription set (#1029)
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--test/test_consumer.py8
2 files changed, 9 insertions, 1 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f2b1699..32f4556 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -819,7 +819,7 @@ class KafkaConsumer(six.Iterator):
Returns:
set: {topic, ...}
"""
- return self._subscription.subscription
+ return self._subscription.subscription.copy()
def unsubscribe(self):
"""Unsubscribe from all topics and clear all assigned partitions."""
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 073a3af..e5dd946 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -24,6 +24,14 @@ class TestKafkaConsumer(unittest.TestCase):
with self.assertRaises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
+ def test_subscription_copy(self):
+ consumer = KafkaConsumer('foo', api_version=(0, 10))
+ sub = consumer.subscription()
+ assert sub is not consumer.subscription()
+ assert sub == set(['foo'])
+ sub.add('fizz')
+ assert consumer.subscription() == set(['foo'])
+
class TestMultiProcessConsumer(unittest.TestCase):
@unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')