summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 09:34:48 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 09:35:45 -0700
commit5a14bd8c947251d1a8f848175cc3cf2b07af3411 (patch)
treea251ddbc60c84405762365429de9b04727653e6c
parent221f56d8a05cdc2d37f85018e4af352b4b2a95c5 (diff)
downloadkafka-python-5a14bd8c947251d1a8f848175cc3cf2b07af3411.tar.gz
Update imports from kafka.common -> kafka.errors / kafka.structs
-rw-r--r--docs/simple.rst5
-rw-r--r--docs/usage.rst2
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/client.py14
-rw-r--r--kafka/client_async.py3
-rw-r--r--kafka/cluster.py4
-rw-r--r--kafka/conn.py4
-rw-r--r--kafka/consumer/base.py10
-rw-r--r--kafka/consumer/fetcher.py4
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/consumer/subscription_state.py3
-rw-r--r--kafka/context.py3
-rw-r--r--kafka/coordinator/base.py2
-rw-r--r--kafka/coordinator/consumer.py4
-rw-r--r--kafka/coordinator/heartbeat.py2
-rw-r--r--kafka/coordinator/protocol.py2
-rw-r--r--kafka/future.py2
-rw-r--r--kafka/producer/base.py9
-rw-r--r--kafka/producer/buffer.py2
-rw-r--r--kafka/producer/future.py3
-rw-r--r--kafka/producer/kafka.py4
-rw-r--r--kafka/producer/record_accumulator.py4
-rw-r--r--kafka/producer/sender.py4
-rw-r--r--kafka/protocol/legacy.py33
-rw-r--r--kafka/util.py2
-rw-r--r--test/test_client.py11
-rw-r--r--test/test_client_async.py4
-rw-r--r--test/test_client_integration.py8
-rw-r--r--test/test_conn_legacy.py3
-rw-r--r--test/test_consumer.py10
-rw-r--r--test/test_consumer_group.py2
-rw-r--r--test/test_consumer_integration.py6
-rw-r--r--test/test_context.py2
-rw-r--r--test/test_coordinator.py5
-rw-r--r--test/test_failover_integration.py6
-rw-r--r--test/test_fetcher.py5
-rw-r--r--test/test_producer_integration.py6
-rw-r--r--test/test_producer_legacy.py8
-rw-r--r--test/test_protocol.py20
-rw-r--r--test/test_util.py11
-rw-r--r--test/testutil.py2
41 files changed, 111 insertions, 127 deletions
diff --git a/docs/simple.rst b/docs/simple.rst
index 253f543..8192a8b 100644
--- a/docs/simple.rst
+++ b/docs/simple.rst
@@ -129,10 +129,9 @@ SimpleClient (DEPRECATED)
import time
from kafka import SimpleClient
- from kafka.common import (
- LeaderNotAvailableError, NotLeaderForPartitionError,
- ProduceRequestPayload)
+ from kafka.errors import LeaderNotAvailableError, NotLeaderForPartitionError
from kafka.protocol import create_message
+ from kafka.structs import ProduceRequestPayload
kafka = SimpleClient('localhost:9092')
payload = ProduceRequestPayload(topic='my-topic', partition=0,
diff --git a/docs/usage.rst b/docs/usage.rst
index 85fc44f..0ee9894 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -56,7 +56,7 @@ KafkaProducer
.. code:: python
from kafka import KafkaProducer
- from kafka.common import KafkaError
+ from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 3f0d8bd..6b2ba97 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -22,7 +22,7 @@ from kafka.conn import BrokerConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
-from kafka.common import TopicPartition
+from kafka.structs import TopicPartition
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
diff --git a/kafka/client.py b/kafka/client.py
index 99d6fec..2bd2324 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -7,12 +7,12 @@ import time
import six
-import kafka.common
-from kafka.common import (TopicPartition, BrokerMetadata, UnknownError,
- ConnectionError, FailedPayloadsError,
+import kafka.errors
+from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
+from kafka.structs import TopicPartition, BrokerMetadata
from kafka.conn import (
collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
@@ -123,7 +123,7 @@ class SimpleClient(object):
# If there's a problem with finding the coordinator, raise the
# provided error
- kafka.common.check_error(resp)
+ kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
@@ -389,7 +389,7 @@ class SimpleClient(object):
# Or a server api error response
try:
- kafka.common.check_error(resp)
+ kafka.errors.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise
@@ -509,7 +509,7 @@ class SimpleClient(object):
for error, topic, partitions in resp.topics:
# Errors expected for new topics
if error:
- error_type = kafka.common.kafka_errors.get(error, UnknownError)
+ error_type = kafka.errors.kafka_errors.get(error, UnknownError)
if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
log.error('Error loading topic metadata for %s: %s (%s)',
topic, error_type, error)
@@ -530,7 +530,7 @@ class SimpleClient(object):
# Check for partition errors
if error:
- error_type = kafka.common.kafka_errors.get(error, UnknownError)
+ error_type = kafka.errors.kafka_errors.get(error, UnknownError)
# If No Leader, topics_to_brokers topic_partition -> None
if error_type is LeaderNotAvailableError:
diff --git a/kafka/client_async.py b/kafka/client_async.py
index d70e4f2..b77ead5 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -11,10 +11,9 @@ import time
import six
-import kafka.common as Errors # TODO: make Errors a separate class
-
from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
+from . import errors as Errors
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9ab6e6e..f7940e6 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -9,9 +9,9 @@ import time
import six
-import kafka.common as Errors
-from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition
+from . import errors as Errors
from .future import Future
+from .structs import BrokerMetadata, PartitionMetadata, TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/conn.py b/kafka/conn.py
index ffc839e..dc7dd23 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -13,7 +13,7 @@ import warnings
import six
-import kafka.common as Errors
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
@@ -149,7 +149,7 @@ class BrokerConnection(object):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
- Default: kafka.common.ConnectionError.
+ Default: kafka.errors.ConnectionError.
"""
if self._sock:
self._sock.close()
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 75c3ee1..d2d9e8d 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -6,12 +6,10 @@ import numbers
from threading import Lock
import warnings
-import kafka.common
-from kafka.common import (
- OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
- UnknownTopicOrPartitionError, check_error, KafkaError
-)
-
+from kafka.errors import (
+ UnknownTopicOrPartitionError, check_error, KafkaError)
+from kafka.structs import (
+ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
from kafka.util import ReentrantTimer
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 7112c7e..2c9c0b9 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -6,12 +6,12 @@ import logging
import six
-import kafka.common as Errors
-from kafka.common import TopicPartition
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 9172040..6c85c21 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -7,13 +7,13 @@ import time
import six
from kafka.client_async import KafkaClient
-from kafka.common import TopicPartition
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.protocol.offset import OffsetResetStrategy
+from kafka.structs import TopicPartition
from kafka.version import __version__
log = logging.getLogger(__name__)
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 3d170ae..1c045aa 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -6,8 +6,9 @@ import re
import six
-from kafka.common import IllegalStateError, OffsetAndMetadata
+from kafka.errors import IllegalStateError
from kafka.protocol.offset import OffsetResetStrategy
+from kafka.structs import OffsetAndMetadata
log = logging.getLogger(__name__)
diff --git a/kafka/context.py b/kafka/context.py
index 376fad1..d6c15fe 100644
--- a/kafka/context.py
+++ b/kafka/context.py
@@ -3,7 +3,8 @@ Context manager to commit/rollback consumer offsets.
"""
from logging import getLogger
-from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError
+from kafka.errors import check_error, OffsetOutOfRangeError
+from kafka.structs import OffsetCommitRequestPayload
class OffsetCommitContext(object):
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index b0a0981..fcf3901 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -6,7 +6,7 @@ import weakref
import six
-import kafka.common as Errors
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (GroupCoordinatorRequest,
OffsetCommitRequest_v2 as OffsetCommitRequest)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index b2ef1ea..ae2344f 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -12,14 +12,14 @@ from .base import BaseCoordinator
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
-from ..common import OffsetAndMetadata, TopicPartition
+from .. import errors as Errors
from ..future import Future
from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
-import kafka.common as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index 4ddcf09..e73b3e5 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -1,7 +1,7 @@
import copy
import time
-import kafka.common as Errors
+import kafka.errors as Errors
class Heartbeat(object):
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
index 9e37397..56a3901 100644
--- a/kafka/coordinator/protocol.py
+++ b/kafka/coordinator/protocol.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import
-from kafka.common import TopicPartition
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
+from kafka.structs import TopicPartition
class ConsumerProtocolMemberMetadata(Struct):
diff --git a/kafka/future.py b/kafka/future.py
index c7e0b14..b379272 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -1,7 +1,7 @@
import functools
import logging
-import kafka.common as Errors
+import kafka.errors as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 2067c7e..07e61d5 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -14,13 +14,12 @@ from threading import Thread, Event
import six
-from kafka.common import (
- ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions,
+from kafka.structs import (
+ ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
+from kafka.errors import (
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
- RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
-)
-
+ RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
log = logging.getLogger('kafka.producer')
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 8c83ffc..b2ac747 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -7,10 +7,10 @@ import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_encode, snappy_encode, lz4_encode)
+from .. import errors as Errors
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
-import kafka.common as Errors
class MessageSetBuffer(object):
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 5a7a9dc..35520d8 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -3,10 +3,9 @@ from __future__ import absolute_import
import collections
import threading
+from .. import errors as Errors
from ..future import Future
-import kafka.common as Errors
-
class FutureProduceResult(Future):
def __init__(self, topic_partition):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e1a0374..dd8e71f 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -8,14 +8,14 @@ import threading
import time
from ..client_async import KafkaClient
-from ..common import TopicPartition
+from ..structs import TopicPartition
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
+from .. import errors as Errors
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
from .sender import Sender
-import kafka.common as Errors
log = logging.getLogger(__name__)
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 19dc199..b3abaa3 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -8,12 +8,12 @@ import time
import six
-from ..common import TopicPartition
+from .. import errors as Errors
+from ..structs import TopicPartition
from ..protocol.message import Message, MessageSet
from .buffer import MessageSetBuffer, SimpleBufferPool
from .future import FutureRecordMetadata, FutureProduceResult
-import kafka.common as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 9a86a16..3cafb26 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -7,11 +7,11 @@ import threading
import six
-from ..common import TopicPartition
+from .. import errors as Errors
+from ..structs import TopicPartition
from ..version import __version__
from ..protocol.produce import ProduceRequest
-import kafka.common as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 1835521..e4745f1 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -7,26 +7,21 @@ import six
from six.moves import xrange
-import kafka.common
import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
+import kafka.structs
from kafka.codec import (
- gzip_encode, gzip_decode, snappy_encode, snappy_decode
-)
-from kafka.common import (
- ProtocolError, ChecksumError,
- UnsupportedCodecError,
- ConsumerMetadataResponse
-)
+ gzip_encode, gzip_decode, snappy_encode, snappy_decode)
+from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError
+from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
- write_short_string, write_int_string, group_by_topic_and_partition
-)
+ write_short_string, write_int_string, group_by_topic_and_partition)
log = logging.getLogger(__name__)
@@ -166,7 +161,7 @@ class KafkaProtocol(object):
Return: list of ProduceResponsePayload
"""
return [
- kafka.common.ProduceResponsePayload(topic, partition, error, offset)
+ kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
for topic, partitions in response.topics
for partition, error, offset in partitions
]
@@ -207,9 +202,9 @@ class KafkaProtocol(object):
response: FetchResponse
"""
return [
- kafka.common.FetchResponsePayload(
+ kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
- kafka.common.OffsetAndMessage(offset, message)
+ kafka.structs.OffsetAndMessage(offset, message)
for offset, _, message in messages])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
@@ -239,7 +234,7 @@ class KafkaProtocol(object):
Returns: list of OffsetResponsePayloads
"""
return [
- kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
+ kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
for topic, partitions in response.topics
for partition, error, offsets in partitions
]
@@ -323,7 +318,7 @@ class KafkaProtocol(object):
response: OffsetCommitResponse
"""
return [
- kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
for topic, partitions in response.topics
for partition, error in partitions
]
@@ -362,7 +357,7 @@ class KafkaProtocol(object):
response: OffsetFetchResponse
"""
return [
- kafka.common.OffsetFetchResponsePayload(
+ kafka.structs.OffsetFetchResponsePayload(
topic, partition, offset, metadata, error
)
for topic, partitions in response.topics
@@ -379,7 +374,7 @@ def create_message(payload, key=None):
key: bytes, a key used for partition routing (optional)
"""
- return kafka.common.Message(0, 0, key, payload)
+ return kafka.structs.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
@@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None):
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
- return kafka.common.Message(0, 0x00 | codec, key, gzipped)
+ return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
@@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None):
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
- return kafka.common.Message(0, 0x00 | codec, key, snapped)
+ return kafka.structs.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
diff --git a/kafka/util.py b/kafka/util.py
index 7a11910..18c39a4 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -7,7 +7,7 @@ import weakref
import six
-from kafka.common import BufferUnderflowError
+from kafka.errors import BufferUnderflowError
def crc32(data):
diff --git a/test/test_client.py b/test/test_client.py
index 6980434..42d7dbd 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -6,17 +6,14 @@ import six
from . import unittest
from kafka import SimpleClient
-from kafka.common import (
- ProduceRequestPayload,
- BrokerMetadata,
- TopicPartition, KafkaUnavailableError,
- LeaderNotAvailableError, UnknownTopicOrPartitionError,
- KafkaTimeoutError, ConnectionError, FailedPayloadsError
-)
from kafka.conn import KafkaConnection
+from kafka.errors import (
+ KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
+ UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
from kafka.future import Future
from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
+from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition
from test.testutil import Timer
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 884686d..eaac8e1 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -4,12 +4,12 @@ import socket
import pytest
from kafka.client_async import KafkaClient
-from kafka.common import BrokerMetadata
-import kafka.common as Errors
from kafka.conn import ConnectionStates
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse, MetadataRequest
from kafka.protocol.produce import ProduceRequest
+from kafka.structs import BrokerMetadata
@pytest.mark.parametrize("bootstrap,expected_hosts", [
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index c5d3b58..742572d 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -1,10 +1,10 @@
import os
-from kafka.common import (
- FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
- KafkaTimeoutError, ProduceRequestPayload
-)
+from kafka.errors import KafkaTimeoutError
from kafka.protocol import create_message
+from kafka.structs import (
+ FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
+ ProduceRequestPayload)
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py
index f0ef8fb..347588e 100644
--- a/test/test_conn_legacy.py
+++ b/test/test_conn_legacy.py
@@ -5,9 +5,10 @@ from threading import Thread
import mock
from . import unittest
-from kafka.common import ConnectionError
+from kafka.errors import ConnectionError
from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
+
class ConnTest(unittest.TestCase):
def setUp(self):
diff --git a/test/test_consumer.py b/test/test_consumer.py
index e664292..f3dad16 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -4,11 +4,11 @@ from mock import MagicMock, patch
from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
-from kafka.common import (
- KafkaConfigurationError, FetchResponsePayload, OffsetFetchResponsePayload,
- FailedPayloadsError, OffsetAndMessage,
- NotLeaderForPartitionError, UnknownTopicOrPartitionError
-)
+from kafka.errors import (
+ FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError,
+ UnknownTopicOrPartitionError)
+from kafka.structs import (
+ FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)
class TestKafkaConsumer(unittest.TestCase):
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 5fcfbe2..c02eddc 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -7,11 +7,11 @@ import pytest
import six
from kafka import SimpleClient
-from kafka.common import TopicPartition
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
+from kafka.structs import TopicPartition
from test.conftest import version
from test.testutil import random_string
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 1b60c95..4e081ce 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -7,11 +7,9 @@ from . import unittest
from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
)
-from kafka.common import (
- ProduceRequestPayload, ConsumerFetchSizeTooSmall,
- OffsetOutOfRangeError, TopicPartition
-)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
+from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
+from kafka.structs import ProduceRequestPayload, TopicPartition
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
diff --git a/test/test_context.py b/test/test_context.py
index da9b22f..3d41ba6 100644
--- a/test/test_context.py
+++ b/test/test_context.py
@@ -5,8 +5,8 @@ from . import unittest
from mock import MagicMock, patch
-from kafka.common import OffsetOutOfRangeError
from kafka.context import OffsetCommitContext
+from kafka.errors import OffsetOutOfRangeError
class TestOffsetCommitContext(unittest.TestCase):
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 44db808..d6df983 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -4,7 +4,7 @@ from __future__ import absolute_import
import pytest
from kafka.client_async import KafkaClient
-from kafka.common import TopicPartition, OffsetAndMetadata
+from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
@@ -13,6 +13,7 @@ from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
from kafka.conn import ConnectionStates
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (
OffsetCommitRequest_v0, OffsetCommitRequest_v1, OffsetCommitRequest_v2,
@@ -21,8 +22,6 @@ from kafka.protocol.commit import (
from kafka.protocol.metadata import MetadataResponse
from kafka.util import WeakMethod
-import kafka.common as Errors
-
@pytest.fixture
def conn(mocker):
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 9409241..58e9463 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -3,10 +3,10 @@ import os
import time
from kafka import SimpleClient, SimpleConsumer, KeyedProducer
-from kafka.common import (
- TopicPartition, FailedPayloadsError, ConnectionError, RequestTimedOutError
-)
+from kafka.errors import (
+ FailedPayloadsError, ConnectionError, RequestTimedOutError)
from kafka.producer.base import Producer
+from kafka.structs import TopicPartition
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, random_string
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index a252f6c..cdd324f 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -4,13 +4,12 @@ from __future__ import absolute_import
import pytest
from kafka.client_async import KafkaClient
-from kafka.common import TopicPartition, OffsetAndMetadata
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.fetch import FetchRequest
-
-import kafka.common as Errors
+from kafka.structs import TopicPartition, OffsetAndMetadata
@pytest.fixture
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index d631402..176c99e 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -10,11 +10,9 @@ from kafka import (
RoundRobinPartitioner, HashedPartitioner
)
from kafka.codec import has_snappy
-from kafka.common import (
- FetchRequestPayload, ProduceRequestPayload,
- UnknownTopicOrPartitionError, LeaderNotAvailableError
-)
+from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError
from kafka.producer.base import Producer
+from kafka.structs import FetchRequestPayload, ProduceRequestPayload
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
index 850cb80..9b87c76 100644
--- a/test/test_producer_legacy.py
+++ b/test/test_producer_legacy.py
@@ -9,12 +9,12 @@ from mock import MagicMock, patch
from . import unittest
from kafka import SimpleClient, SimpleProducer, KeyedProducer
-from kafka.common import (
- AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
- ProduceResponsePayload, RetryOptions, TopicPartition
-)
+from kafka.errors import (
+ AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError)
from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
+from kafka.structs import (
+ ProduceResponsePayload, RetryOptions, TopicPartition)
from six.moves import queue, xrange
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 1d91e7d..d705e3a 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -7,21 +7,21 @@ from mock import patch, sentinel
from . import unittest
from kafka.codec import has_snappy, gzip_decode, snappy_decode
-from kafka.common import (
+from kafka.errors import (
+ ChecksumError, KafkaUnavailableError, UnsupportedCodecError,
+ ConsumerFetchSizeTooSmall, ProtocolError)
+from kafka.protocol import (
+ ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
+ create_message, create_gzip_message, create_snappy_message,
+ create_message_set)
+from kafka.structs import (
OffsetRequestPayload, OffsetResponsePayload,
OffsetCommitRequestPayload, OffsetCommitResponsePayload,
OffsetFetchRequestPayload, OffsetFetchResponsePayload,
ProduceRequestPayload, ProduceResponsePayload,
FetchRequestPayload, FetchResponsePayload,
- Message, ChecksumError, OffsetAndMessage, BrokerMetadata,
- KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
- ProtocolError, ConsumerMetadataResponse
-)
-from kafka.protocol import (
- ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
- create_message, create_gzip_message, create_snappy_message,
- create_message_set
-)
+ Message, OffsetAndMessage, BrokerMetadata, ConsumerMetadataResponse)
+
class TestProtocol(unittest.TestCase):
def test_create_message(self):
diff --git a/test/test_util.py b/test/test_util.py
index 7f0432b..5fc3f69 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -4,8 +4,9 @@ import struct
import six
from . import unittest
-import kafka.common
+import kafka.errors
import kafka.util
+import kafka.structs
class UtilTest(unittest.TestCase):
@@ -48,7 +49,7 @@ class UtilTest(unittest.TestCase):
self.assertEqual(kafka.util.read_int_string(b'\x00\x00\x00\x0bsome string', 0), (b'some string', 15))
def test_read_int_string__insufficient_data(self):
- with self.assertRaises(kafka.common.BufferUnderflowError):
+ with self.assertRaises(kafka.errors.BufferUnderflowError):
kafka.util.read_int_string(b'\x00\x00\x00\x021', 0)
def test_write_short_string(self):
@@ -90,7 +91,7 @@ class UtilTest(unittest.TestCase):
self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
def test_read_int_string__insufficient_data2(self):
- with self.assertRaises(kafka.common.BufferUnderflowError):
+ with self.assertRaises(kafka.errors.BufferUnderflowError):
kafka.util.read_int_string('\x00\x021', 0)
def test_relative_unpack2(self):
@@ -100,11 +101,11 @@ class UtilTest(unittest.TestCase):
)
def test_relative_unpack3(self):
- with self.assertRaises(kafka.common.BufferUnderflowError):
+ with self.assertRaises(kafka.errors.BufferUnderflowError):
kafka.util.relative_unpack('>hh', '\x00', 0)
def test_group_by_topic_and_partition(self):
- t = kafka.common.TopicPartition
+ t = kafka.structs.TopicPartition
l = [
t("a", 1),
diff --git a/test/testutil.py b/test/testutil.py
index 1d1f6ea..a6f4421 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -12,7 +12,7 @@ from six.moves import xrange
from . import unittest
from kafka import SimpleClient
-from kafka.common import OffsetRequestPayload
+from kafka.structs import OffsetRequestPayload
__all__ = [
'random_string',