diff options
-rw-r--r-- | kafka/consumer/subscription_state.py | 31 | ||||
-rw-r--r-- | test/test_substription_state.py | 25 |
2 files changed, 53 insertions, 3 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 19046ae..3d4dfef 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -43,6 +43,10 @@ class SubscriptionState(object): " (2) subscribe to topics matching a regex pattern," " (3) assign itself specific topic-partitions.") + # Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 + _MAX_NAME_LENGTH = 249 + _TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') + def __init__(self, offset_reset_strategy='earliest'): """Initialize a SubscriptionState instance @@ -120,6 +124,24 @@ class SubscriptionState(object): raise TypeError('listener must be a ConsumerRebalanceListener') self.listener = listener + def _ensure_valid_topic_name(self, topic): + """ Ensures that the topic name is valid according to the kafka source. """ + + # See Kafka Source: + # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java + if topic is None: + raise TypeError('All topics must not be None') + if not isinstance(topic, six.string_types): + raise TypeError('All topics must be strings') + if len(topic) == 0: + raise ValueError('All topics must be non-empty strings') + if topic == '.' or topic == '..': + raise ValueError('Topic name cannot be "." or ".."') + if len(topic) > self._MAX_NAME_LENGTH: + raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic)) + if not self._TOPIC_LEGAL_CHARS.match(topic): + raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) + def change_subscription(self, topics): """Change the topic subscription. @@ -128,7 +150,10 @@ class SubscriptionState(object): Raises: IllegalStateErrror: if assign_from_user has been used already - TypeError: if a non-str topic is given + TypeError: if a topic is None or a non-str + ValueError: if a topic is an empty string or + - a topic name is '.' or '..' or + - a topic name does not consist of ASCII-characters/'-'/'_'/'.' """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -141,8 +166,8 @@ class SubscriptionState(object): topics) return - if any(not isinstance(t, six.string_types) for t in topics): - raise TypeError('All topics must be strings') + for t in topics: + self._ensure_valid_topic_name(t) log.info('Updating subscribed topics to: %s', topics) self.subscription = set(topics) diff --git a/test/test_substription_state.py b/test/test_substription_state.py new file mode 100644 index 0000000..9718f6a --- /dev/null +++ b/test/test_substription_state.py @@ -0,0 +1,25 @@ +# pylint: skip-file +from __future__ import absolute_import + +import pytest + +from kafka.consumer.subscription_state import SubscriptionState + +@pytest.mark.parametrize(('topic_name', 'expectation'), [ + (0, pytest.raises(TypeError)), + (None, pytest.raises(TypeError)), + ('', pytest.raises(ValueError)), + ('.', pytest.raises(ValueError)), + ('..', pytest.raises(ValueError)), + ('a' * 250, pytest.raises(ValueError)), + ('abc/123', pytest.raises(ValueError)), + ('/abc/123', pytest.raises(ValueError)), + ('/abc123', pytest.raises(ValueError)), + ('name with space', pytest.raises(ValueError)), + ('name*with*stars', pytest.raises(ValueError)), + ('name+with+plus', pytest.raises(ValueError)), +]) +def test_topic_name_validation(topic_name, expectation): + state = SubscriptionState() + with expectation: + state._ensure_valid_topic_name(topic_name) |