diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 09:34:48 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 09:35:45 -0700 |
commit | 5a14bd8c947251d1a8f848175cc3cf2b07af3411 (patch) | |
tree | a251ddbc60c84405762365429de9b04727653e6c | |
parent | 221f56d8a05cdc2d37f85018e4af352b4b2a95c5 (diff) | |
download | kafka-python-5a14bd8c947251d1a8f848175cc3cf2b07af3411.tar.gz |
Update imports from kafka.common -> kafka.errors / kafka.structs
41 files changed, 111 insertions, 127 deletions
diff --git a/docs/simple.rst b/docs/simple.rst index 253f543..8192a8b 100644 --- a/docs/simple.rst +++ b/docs/simple.rst @@ -129,10 +129,9 @@ SimpleClient (DEPRECATED) import time from kafka import SimpleClient - from kafka.common import ( - LeaderNotAvailableError, NotLeaderForPartitionError, - ProduceRequestPayload) + from kafka.errors import LeaderNotAvailableError, NotLeaderForPartitionError from kafka.protocol import create_message + from kafka.structs import ProduceRequestPayload kafka = SimpleClient('localhost:9092') payload = ProduceRequestPayload(topic='my-topic', partition=0, diff --git a/docs/usage.rst b/docs/usage.rst index 85fc44f..0ee9894 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -56,7 +56,7 @@ KafkaProducer .. code:: python from kafka import KafkaProducer - from kafka.common import KafkaError + from kafka.errors import KafkaError producer = KafkaProducer(bootstrap_servers=['broker1:1234']) diff --git a/kafka/__init__.py b/kafka/__init__.py index 3f0d8bd..6b2ba97 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -22,7 +22,7 @@ from kafka.conn import BrokerConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message) from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner -from kafka.common import TopicPartition +from kafka.structs import TopicPartition # To be deprecated when KafkaProducer interface is released from kafka.client import SimpleClient diff --git a/kafka/client.py b/kafka/client.py index 99d6fec..2bd2324 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,12 +7,12 @@ import time import six -import kafka.common -from kafka.common import (TopicPartition, BrokerMetadata, UnknownError, - ConnectionError, FailedPayloadsError, +import kafka.errors +from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, NotLeaderForPartitionError, ReplicaNotAvailableError) +from kafka.structs import TopicPartition, BrokerMetadata from kafka.conn import ( collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, @@ -123,7 +123,7 @@ class SimpleClient(object): # If there's a problem with finding the coordinator, raise the # provided error - kafka.common.check_error(resp) + kafka.errors.check_error(resp) # Otherwise return the BrokerMetadata return BrokerMetadata(resp.nodeId, resp.host, resp.port) @@ -389,7 +389,7 @@ class SimpleClient(object): # Or a server api error response try: - kafka.common.check_error(resp) + kafka.errors.check_error(resp) except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): self.reset_topic_metadata(resp.topic) raise @@ -509,7 +509,7 @@ class SimpleClient(object): for error, topic, partitions in resp.topics: # Errors expected for new topics if error: - error_type = kafka.common.kafka_errors.get(error, UnknownError) + error_type = kafka.errors.kafka_errors.get(error, UnknownError) if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError): log.error('Error loading topic metadata for %s: %s (%s)', topic, error_type, error) @@ -530,7 +530,7 @@ class SimpleClient(object): # Check for partition errors if error: - error_type = kafka.common.kafka_errors.get(error, UnknownError) + error_type = kafka.errors.kafka_errors.get(error, UnknownError) # If No Leader, topics_to_brokers topic_partition -> None if error_type is LeaderNotAvailableError: diff --git a/kafka/client_async.py b/kafka/client_async.py index d70e4f2..b77ead5 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -11,10 +11,9 @@ import time import six -import kafka.common as Errors # TODO: make Errors a separate class - from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi +from . import errors as Errors from .future import Future from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest diff --git a/kafka/cluster.py b/kafka/cluster.py index 9ab6e6e..f7940e6 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -9,9 +9,9 @@ import time import six -import kafka.common as Errors -from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition +from . import errors as Errors from .future import Future +from .structs import BrokerMetadata, PartitionMetadata, TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/conn.py b/kafka/conn.py index ffc839e..dc7dd23 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -13,7 +13,7 @@ import warnings import six -import kafka.common as Errors +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse @@ -149,7 +149,7 @@ class BrokerConnection(object): Arguments: error (Exception, optional): pending in-flight-requests will be failed with this exception. - Default: kafka.common.ConnectionError. + Default: kafka.errors.ConnectionError. """ if self._sock: self._sock.close() diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 75c3ee1..d2d9e8d 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -6,12 +6,10 @@ import numbers from threading import Lock import warnings -import kafka.common -from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, - UnknownTopicOrPartitionError, check_error, KafkaError -) - +from kafka.errors import ( + UnknownTopicOrPartitionError, check_error, KafkaError) +from kafka.structs import ( + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload) from kafka.util import ReentrantTimer diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7112c7e..2c9c0b9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -6,12 +6,12 @@ import logging import six -import kafka.common as Errors -from kafka.common import TopicPartition +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.structs import TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9172040..6c85c21 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -7,13 +7,13 @@ import time import six from kafka.client_async import KafkaClient -from kafka.common import TopicPartition from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.protocol.offset import OffsetResetStrategy +from kafka.structs import TopicPartition from kafka.version import __version__ log = logging.getLogger(__name__) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 3d170ae..1c045aa 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -6,8 +6,9 @@ import re import six -from kafka.common import IllegalStateError, OffsetAndMetadata +from kafka.errors import IllegalStateError from kafka.protocol.offset import OffsetResetStrategy +from kafka.structs import OffsetAndMetadata log = logging.getLogger(__name__) diff --git a/kafka/context.py b/kafka/context.py index 376fad1..d6c15fe 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -3,7 +3,8 @@ Context manager to commit/rollback consumer offsets. """ from logging import getLogger -from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError +from kafka.errors import check_error, OffsetOutOfRangeError +from kafka.structs import OffsetCommitRequestPayload class OffsetCommitContext(object): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b0a0981..fcf3901 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -6,7 +6,7 @@ import weakref import six -import kafka.common as Errors +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import (GroupCoordinatorRequest, OffsetCommitRequest_v2 as OffsetCommitRequest) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index b2ef1ea..ae2344f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -12,14 +12,14 @@ from .base import BaseCoordinator from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol -from ..common import OffsetAndMetadata, TopicPartition +from .. import errors as Errors from ..future import Future from ..protocol.commit import ( OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod -import kafka.common as Errors log = logging.getLogger(__name__) diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 4ddcf09..e73b3e5 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -1,7 +1,7 @@ import copy import time -import kafka.common as Errors +import kafka.errors as Errors class Heartbeat(object): diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 9e37397..56a3901 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -1,8 +1,8 @@ from __future__ import absolute_import -from kafka.common import TopicPartition from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +from kafka.structs import TopicPartition class ConsumerProtocolMemberMetadata(Struct): diff --git a/kafka/future.py b/kafka/future.py index c7e0b14..b379272 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -1,7 +1,7 @@ import functools import logging -import kafka.common as Errors +import kafka.errors as Errors log = logging.getLogger(__name__) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2067c7e..07e61d5 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -14,13 +14,12 @@ from threading import Thread, Event import six -from kafka.common import ( - ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions, +from kafka.structs import ( + ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) +from kafka.errors import ( kafka_errors, UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError, AsyncProducerQueueFull, UnknownError, - RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES -) - + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set log = logging.getLogger('kafka.producer') diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 8c83ffc..b2ac747 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -7,10 +7,10 @@ import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_encode, snappy_encode, lz4_encode) +from .. import errors as Errors from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message -import kafka.common as Errors class MessageSetBuffer(object): diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 5a7a9dc..35520d8 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -3,10 +3,9 @@ from __future__ import absolute_import import collections import threading +from .. import errors as Errors from ..future import Future -import kafka.common as Errors - class FutureProduceResult(Future): def __init__(self, topic_partition): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e1a0374..dd8e71f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -8,14 +8,14 @@ import threading import time from ..client_async import KafkaClient -from ..common import TopicPartition +from ..structs import TopicPartition from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet +from .. import errors as Errors from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator from .sender import Sender -import kafka.common as Errors log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 19dc199..b3abaa3 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -8,12 +8,12 @@ import time import six -from ..common import TopicPartition +from .. import errors as Errors +from ..structs import TopicPartition from ..protocol.message import Message, MessageSet from .buffer import MessageSetBuffer, SimpleBufferPool from .future import FutureRecordMetadata, FutureProduceResult -import kafka.common as Errors log = logging.getLogger(__name__) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 9a86a16..3cafb26 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -7,11 +7,11 @@ import threading import six -from ..common import TopicPartition +from .. import errors as Errors +from ..structs import TopicPartition from ..version import __version__ from ..protocol.produce import ProduceRequest -import kafka.common as Errors log = logging.getLogger(__name__) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 1835521..e4745f1 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -7,26 +7,21 @@ import six from six.moves import xrange -import kafka.common import kafka.protocol.commit import kafka.protocol.fetch import kafka.protocol.message import kafka.protocol.metadata import kafka.protocol.offset import kafka.protocol.produce +import kafka.structs from kafka.codec import ( - gzip_encode, gzip_decode, snappy_encode, snappy_decode -) -from kafka.common import ( - ProtocolError, ChecksumError, - UnsupportedCodecError, - ConsumerMetadataResponse -) + gzip_encode, gzip_decode, snappy_encode, snappy_decode) +from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError +from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, - write_short_string, write_int_string, group_by_topic_and_partition -) + write_short_string, write_int_string, group_by_topic_and_partition) log = logging.getLogger(__name__) @@ -166,7 +161,7 @@ class KafkaProtocol(object): Return: list of ProduceResponsePayload """ return [ - kafka.common.ProduceResponsePayload(topic, partition, error, offset) + kafka.structs.ProduceResponsePayload(topic, partition, error, offset) for topic, partitions in response.topics for partition, error, offset in partitions ] @@ -207,9 +202,9 @@ class KafkaProtocol(object): response: FetchResponse """ return [ - kafka.common.FetchResponsePayload( + kafka.structs.FetchResponsePayload( topic, partition, error, highwater_offset, [ - kafka.common.OffsetAndMessage(offset, message) + kafka.structs.OffsetAndMessage(offset, message) for offset, _, message in messages]) for topic, partitions in response.topics for partition, error, highwater_offset, messages in partitions @@ -239,7 +234,7 @@ class KafkaProtocol(object): Returns: list of OffsetResponsePayloads """ return [ - kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets)) + kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets)) for topic, partitions in response.topics for partition, error, offsets in partitions ] @@ -323,7 +318,7 @@ class KafkaProtocol(object): response: OffsetCommitResponse """ return [ - kafka.common.OffsetCommitResponsePayload(topic, partition, error) + kafka.structs.OffsetCommitResponsePayload(topic, partition, error) for topic, partitions in response.topics for partition, error in partitions ] @@ -362,7 +357,7 @@ class KafkaProtocol(object): response: OffsetFetchResponse """ return [ - kafka.common.OffsetFetchResponsePayload( + kafka.structs.OffsetFetchResponsePayload( topic, partition, offset, metadata, error ) for topic, partitions in response.topics @@ -379,7 +374,7 @@ def create_message(payload, key=None): key: bytes, a key used for partition routing (optional) """ - return kafka.common.Message(0, 0, key, payload) + return kafka.structs.Message(0, 0, key, payload) def create_gzip_message(payloads, key=None, compresslevel=None): @@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None): gzipped = gzip_encode(message_set, compresslevel=compresslevel) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP - return kafka.common.Message(0, 0x00 | codec, key, gzipped) + return kafka.structs.Message(0, 0x00 | codec, key, gzipped) def create_snappy_message(payloads, key=None): @@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None): snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY - return kafka.common.Message(0, 0x00 | codec, key, snapped) + return kafka.structs.Message(0, 0x00 | codec, key, snapped) def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): diff --git a/kafka/util.py b/kafka/util.py index 7a11910..18c39a4 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -7,7 +7,7 @@ import weakref import six -from kafka.common import BufferUnderflowError +from kafka.errors import BufferUnderflowError def crc32(data): diff --git a/test/test_client.py b/test/test_client.py index 6980434..42d7dbd 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -6,17 +6,14 @@ import six from . import unittest from kafka import SimpleClient -from kafka.common import ( - ProduceRequestPayload, - BrokerMetadata, - TopicPartition, KafkaUnavailableError, - LeaderNotAvailableError, UnknownTopicOrPartitionError, - KafkaTimeoutError, ConnectionError, FailedPayloadsError -) from kafka.conn import KafkaConnection +from kafka.errors import ( + KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError, + UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError) from kafka.future import Future from kafka.protocol import KafkaProtocol, create_message from kafka.protocol.metadata import MetadataResponse +from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition from test.testutil import Timer diff --git a/test/test_client_async.py b/test/test_client_async.py index 884686d..eaac8e1 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -4,12 +4,12 @@ import socket import pytest from kafka.client_async import KafkaClient -from kafka.common import BrokerMetadata -import kafka.common as Errors from kafka.conn import ConnectionStates +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.metadata import MetadataResponse, MetadataRequest from kafka.protocol.produce import ProduceRequest +from kafka.structs import BrokerMetadata @pytest.mark.parametrize("bootstrap,expected_hosts", [ diff --git a/test/test_client_integration.py b/test/test_client_integration.py index c5d3b58..742572d 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,10 +1,10 @@ import os -from kafka.common import ( - FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, - KafkaTimeoutError, ProduceRequestPayload -) +from kafka.errors import KafkaTimeoutError from kafka.protocol import create_message +from kafka.structs import ( + FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, + ProduceRequestPayload) from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py index f0ef8fb..347588e 100644 --- a/test/test_conn_legacy.py +++ b/test/test_conn_legacy.py @@ -5,9 +5,10 @@ from threading import Thread import mock from . import unittest -from kafka.common import ConnectionError +from kafka.errors import ConnectionError from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS + class ConnTest(unittest.TestCase): def setUp(self): diff --git a/test/test_consumer.py b/test/test_consumer.py index e664292..f3dad16 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -4,11 +4,11 @@ from mock import MagicMock, patch from . import unittest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer -from kafka.common import ( - KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload, - FailedPayloadsError, OffsetAndMessage, - NotLeaderForPartitionError, UnknownTopicOrPartitionError -) +from kafka.errors import ( + FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError, + UnknownTopicOrPartitionError) +from kafka.structs import ( + FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload) class TestKafkaConsumer(unittest.TestCase): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 5fcfbe2..c02eddc 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -7,11 +7,11 @@ import pytest import six from kafka import SimpleClient -from kafka.common import TopicPartition from kafka.conn import ConnectionStates from kafka.consumer.group import KafkaConsumer from kafka.future import Future from kafka.protocol.metadata import MetadataResponse +from kafka.structs import TopicPartition from test.conftest import version from test.testutil import random_string diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 1b60c95..4e081ce 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -7,11 +7,9 @@ from . import unittest from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message ) -from kafka.common import ( - ProduceRequestPayload, ConsumerFetchSizeTooSmall, - OffsetOutOfRangeError, TopicPartition -) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError +from kafka.structs import ProduceRequestPayload, TopicPartition from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( diff --git a/test/test_context.py b/test/test_context.py index da9b22f..3d41ba6 100644 --- a/test/test_context.py +++ b/test/test_context.py @@ -5,8 +5,8 @@ from . import unittest from mock import MagicMock, patch -from kafka.common import OffsetOutOfRangeError from kafka.context import OffsetCommitContext +from kafka.errors import OffsetOutOfRangeError class TestOffsetCommitContext(unittest.TestCase): diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 44db808..d6df983 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -4,7 +4,7 @@ from __future__ import absolute_import import pytest from kafka.client_async import KafkaClient -from kafka.common import TopicPartition, OffsetAndMetadata +from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) from kafka.coordinator.assignors.range import RangePartitionAssignor @@ -13,6 +13,7 @@ from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) from kafka.conn import ConnectionStates +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import ( OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2, @@ -21,8 +22,6 @@ from kafka.protocol.commit import ( from kafka.protocol.metadata import MetadataResponse from kafka.util import WeakMethod -import kafka.common as Errors - @pytest.fixture def conn(mocker): diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 9409241..58e9463 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -3,10 +3,10 @@ import os import time from kafka import SimpleClient, SimpleConsumer, KeyedProducer -from kafka.common import ( - TopicPartition, FailedPayloadsError, ConnectionError, RequestTimedOutError -) +from kafka.errors import ( + FailedPayloadsError, ConnectionError, RequestTimedOutError) from kafka.producer.base import Producer +from kafka.structs import TopicPartition from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, random_string diff --git a/test/test_fetcher.py b/test/test_fetcher.py index a252f6c..cdd324f 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -4,13 +4,12 @@ from __future__ import absolute_import import pytest from kafka.client_async import KafkaClient -from kafka.common import TopicPartition, OffsetAndMetadata from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.fetch import FetchRequest - -import kafka.common as Errors +from kafka.structs import TopicPartition, OffsetAndMetadata @pytest.fixture diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index d631402..176c99e 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -10,11 +10,9 @@ from kafka import ( RoundRobinPartitioner, HashedPartitioner ) from kafka.codec import has_snappy -from kafka.common import ( - FetchRequestPayload, ProduceRequestPayload, - UnknownTopicOrPartitionError, LeaderNotAvailableError -) +from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError from kafka.producer.base import Producer +from kafka.structs import FetchRequestPayload, ProduceRequestPayload from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py index 850cb80..9b87c76 100644 --- a/test/test_producer_legacy.py +++ b/test/test_producer_legacy.py @@ -9,12 +9,12 @@ from mock import MagicMock, patch from . import unittest from kafka import SimpleClient, SimpleProducer, KeyedProducer -from kafka.common import ( - AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, - ProduceResponsePayload, RetryOptions, TopicPartition -) +from kafka.errors import ( + AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError) from kafka.producer.base import Producer, _send_upstream from kafka.protocol import CODEC_NONE +from kafka.structs import ( + ProduceResponsePayload, RetryOptions, TopicPartition) from six.moves import queue, xrange diff --git a/test/test_protocol.py b/test/test_protocol.py index 1d91e7d..d705e3a 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -7,21 +7,21 @@ from mock import patch, sentinel from . import unittest from kafka.codec import has_snappy, gzip_decode, snappy_decode -from kafka.common import ( +from kafka.errors import ( + ChecksumError, KafkaUnavailableError, UnsupportedCodecError, + ConsumerFetchSizeTooSmall, ProtocolError) +from kafka.protocol import ( + ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, + create_message, create_gzip_message, create_snappy_message, + create_message_set) +from kafka.structs import ( OffsetRequestPayload, OffsetResponsePayload, OffsetCommitRequestPayload, OffsetCommitResponsePayload, OffsetFetchRequestPayload, OffsetFetchResponsePayload, ProduceRequestPayload, ProduceResponsePayload, FetchRequestPayload, FetchResponsePayload, - Message, ChecksumError, OffsetAndMessage, BrokerMetadata, - KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall, - ProtocolError, ConsumerMetadataResponse -) -from kafka.protocol import ( - ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, - create_message, create_gzip_message, create_snappy_message, - create_message_set -) + Message, OffsetAndMessage, BrokerMetadata, ConsumerMetadataResponse) + class TestProtocol(unittest.TestCase): def test_create_message(self): diff --git a/test/test_util.py b/test/test_util.py index 7f0432b..5fc3f69 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -4,8 +4,9 @@ import struct import six from . import unittest -import kafka.common +import kafka.errors import kafka.util +import kafka.structs class UtilTest(unittest.TestCase): @@ -48,7 +49,7 @@ class UtilTest(unittest.TestCase): self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15)) def test_read_int_string__insufficient_data(self): - with self.assertRaises(kafka.common.BufferUnderflowError): + with self.assertRaises(kafka.errors.BufferUnderflowError): kafka.util.read_int_string(b'\x00\x00\x00\x021', 0) def test_write_short_string(self): @@ -90,7 +91,7 @@ class UtilTest(unittest.TestCase): self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13)) def test_read_int_string__insufficient_data2(self): - with self.assertRaises(kafka.common.BufferUnderflowError): + with self.assertRaises(kafka.errors.BufferUnderflowError): kafka.util.read_int_string('\x00\x021', 0) def test_relative_unpack2(self): @@ -100,11 +101,11 @@ class UtilTest(unittest.TestCase): ) def test_relative_unpack3(self): - with self.assertRaises(kafka.common.BufferUnderflowError): + with self.assertRaises(kafka.errors.BufferUnderflowError): kafka.util.relative_unpack('>hh', '\x00', 0) def test_group_by_topic_and_partition(self): - t = kafka.common.TopicPartition + t = kafka.structs.TopicPartition l = [ t("a", 1), diff --git a/test/testutil.py b/test/testutil.py index 1d1f6ea..a6f4421 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -12,7 +12,7 @@ from six.moves import xrange from . import unittest from kafka import SimpleClient -from kafka.common import OffsetRequestPayload +from kafka.structs import OffsetRequestPayload __all__ = [ 'random_string', |