summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/subscription_state.py31
-rw-r--r--test/test_substription_state.py25
2 files changed, 53 insertions, 3 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 19046ae..3d4dfef 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -43,6 +43,10 @@ class SubscriptionState(object):
" (2) subscribe to topics matching a regex pattern,"
" (3) assign itself specific topic-partitions.")
+ # Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
+ _MAX_NAME_LENGTH = 249
+ _TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$')
+
def __init__(self, offset_reset_strategy='earliest'):
"""Initialize a SubscriptionState instance
@@ -120,6 +124,24 @@ class SubscriptionState(object):
raise TypeError('listener must be a ConsumerRebalanceListener')
self.listener = listener
+ def _ensure_valid_topic_name(self, topic):
+ """ Ensures that the topic name is valid according to the kafka source. """
+
+ # See Kafka Source:
+ # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+ if topic is None:
+ raise TypeError('All topics must not be None')
+ if not isinstance(topic, six.string_types):
+ raise TypeError('All topics must be strings')
+ if len(topic) == 0:
+ raise ValueError('All topics must be non-empty strings')
+ if topic == '.' or topic == '..':
+ raise ValueError('Topic name cannot be "." or ".."')
+ if len(topic) > self._MAX_NAME_LENGTH:
+ raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic))
+ if not self._TOPIC_LEGAL_CHARS.match(topic):
+ raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))
+
def change_subscription(self, topics):
"""Change the topic subscription.
@@ -128,7 +150,10 @@ class SubscriptionState(object):
Raises:
IllegalStateErrror: if assign_from_user has been used already
- TypeError: if a non-str topic is given
+ TypeError: if a topic is None or a non-str
+ ValueError: if a topic is an empty string or
+ - a topic name is '.' or '..' or
+ - a topic name does not consist of ASCII-characters/'-'/'_'/'.'
"""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -141,8 +166,8 @@ class SubscriptionState(object):
topics)
return
- if any(not isinstance(t, six.string_types) for t in topics):
- raise TypeError('All topics must be strings')
+ for t in topics:
+ self._ensure_valid_topic_name(t)
log.info('Updating subscribed topics to: %s', topics)
self.subscription = set(topics)
diff --git a/test/test_substription_state.py b/test/test_substription_state.py
new file mode 100644
index 0000000..9718f6a
--- /dev/null
+++ b/test/test_substription_state.py
@@ -0,0 +1,25 @@
+# pylint: skip-file
+from __future__ import absolute_import
+
+import pytest
+
+from kafka.consumer.subscription_state import SubscriptionState
+
+@pytest.mark.parametrize(('topic_name', 'expectation'), [
+ (0, pytest.raises(TypeError)),
+ (None, pytest.raises(TypeError)),
+ ('', pytest.raises(ValueError)),
+ ('.', pytest.raises(ValueError)),
+ ('..', pytest.raises(ValueError)),
+ ('a' * 250, pytest.raises(ValueError)),
+ ('abc/123', pytest.raises(ValueError)),
+ ('/abc/123', pytest.raises(ValueError)),
+ ('/abc123', pytest.raises(ValueError)),
+ ('name with space', pytest.raises(ValueError)),
+ ('name*with*stars', pytest.raises(ValueError)),
+ ('name+with+plus', pytest.raises(ValueError)),
+])
+def test_topic_name_validation(topic_name, expectation):
+ state = SubscriptionState()
+ with expectation:
+ state._ensure_valid_topic_name(topic_name)