summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-05-28 15:58:26 -0700
committerJeff Widman <jeff@jeffwidman.com>2018-06-05 14:32:01 -0700
commitbc4cc434cddf403a35d0393d68ecfdbfad17c8e5 (patch)
treeb74c5190a0fd74afffb5318d4bd34b59ae33e25c
parent81cda595b3ecf17737b4e4d86efa230db2e9bd31 (diff)
downloadkafka-python-bc4cc434cddf403a35d0393d68ecfdbfad17c8e5.tar.gz
Don't use `kafka.common` internally1.3.5
This finishes the split from `kafka.common` to `kafka.errors`/`kafka.structs`.
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/consumer/multiprocess.py4
-rw-r--r--kafka/consumer/simple.py6
-rw-r--r--kafka/coordinator/assignors/roundrobin.py2
-rw-r--r--kafka/coordinator/consumer.py2
-rw-r--r--kafka/producer/base.py4
-rw-r--r--kafka/producer/kafka.py8
-rw-r--r--kafka/producer/record_accumulator.py4
-rw-r--r--kafka/protocol/legacy.py5
-rw-r--r--kafka/structs.py4
-rw-r--r--test/test_client_async.py3
-rw-r--r--test/test_conn.py2
-rw-r--r--test/test_coordinator.py6
-rw-r--r--test/test_fetcher.py4
-rw-r--r--test/test_util.py2
-rw-r--r--test/testutil.py10
16 files changed, 32 insertions, 36 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index f108eff..ff364d3 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -25,8 +25,8 @@ from kafka.conn import BrokerConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
-from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.serializer import Serializer, Deserializer
+from kafka.structs import TopicPartition, OffsetAndMetadata
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 1da4a33..758bb92 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -8,7 +8,7 @@ import warnings
from kafka.vendor.six.moves import queue # pylint: disable=import-error
-from kafka.common import KafkaError
+from kafka.errors import KafkaError
from kafka.consumer.base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
@@ -92,7 +92,7 @@ def _mp_consume(client, group, topic, message_queue, size, events, **consumer_op
except KafkaError as e:
# Retry with exponential backoff
- log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
+ log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval)
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index c0c1b1e..b60a586 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -24,13 +24,13 @@ from kafka.consumer.base import (
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
-from kafka.common import (
- FetchRequestPayload, KafkaError, OffsetRequestPayload,
- ConsumerFetchSizeTooSmall,
+from kafka.errors import (
+ KafkaError, ConsumerFetchSizeTooSmall,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from kafka.protocol.message import PartialMessage
+from kafka.structs import FetchRequestPayload, OffsetRequestPayload
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
index a831033..2d24a5c 100644
--- a/kafka/coordinator/assignors/roundrobin.py
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -7,8 +7,8 @@ import logging
from kafka.vendor import six
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
-from kafka.common import TopicPartition
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index cb1de0d..f90d182 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -11,7 +11,7 @@ from kafka.coordinator.base import BaseCoordinator, Generation
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
-from kafka import errors as Errors
+import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index c9dd6c3..956cef6 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -14,13 +14,13 @@ from threading import Thread, Event
from kafka.vendor import six
-from kafka.structs import (
- ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
from kafka.errors import (
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
+from kafka.structs import (
+ ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
log = logging.getLogger('kafka.producer')
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f285ab4..7d52bdf 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -10,18 +10,18 @@ import weakref
from kafka.vendor import six
-from kafka import errors as Errors
+import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
from kafka.codec import has_gzip, has_snappy, has_lz4
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
+from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
+from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
+from kafka.producer.sender import Sender
from kafka.record.default_records import DefaultRecordBatchBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
-from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
-from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
-from kafka.producer.sender import Sender
log = logging.getLogger(__name__)
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 61f1e0e..1cd5413 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -6,12 +6,12 @@ import logging
import threading
import time
-from kafka import errors as Errors
+import kafka.errors as Errors
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
-from kafka.structs import TopicPartition
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
+from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index b8f84e7..7dd2580 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -15,7 +15,6 @@ import kafka.structs
from kafka.codec import gzip_encode, snappy_encode
from kafka.errors import ProtocolError, UnsupportedCodecError
-from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, relative_unpack,
write_int_string, group_by_topic_and_partition)
@@ -322,7 +321,7 @@ class KafkaProtocol(object):
@classmethod
def decode_consumer_metadata_response(cls, data):
"""
- Decode bytes to a ConsumerMetadataResponse
+ Decode bytes to a kafka.structs.ConsumerMetadataResponse
Arguments:
data: bytes to decode
@@ -331,7 +330,7 @@ class KafkaProtocol(object):
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
- return ConsumerMetadataResponse(error, nodeId, host, port)
+ return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port)
@classmethod
def encode_offset_commit_request(cls, group, payloads):
diff --git a/kafka/structs.py b/kafka/structs.py
index 62f36dd..e15e92e 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -93,7 +93,3 @@ KafkaMessage = namedtuple("KafkaMessage",
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
-
-
-# Support legacy imports from kafka.common
-from kafka.errors import *
diff --git a/test/test_client_async.py b/test/test_client_async.py
index eccb564..09781ac 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -13,14 +13,13 @@ import time
import pytest
from kafka.client_async import KafkaClient, IdleConnectionManager
+from kafka.cluster import ClusterMetadata
from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse, MetadataRequest
from kafka.protocol.produce import ProduceRequest
from kafka.structs import BrokerMetadata
-from kafka.cluster import ClusterMetadata
-from kafka.future import Future
@pytest.fixture
diff --git a/test/test_conn.py b/test/test_conn.py
index fbdeeb9..27d77be 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -13,7 +13,7 @@ from kafka.protocol.api import RequestHeader
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest
-import kafka.common as Errors
+import kafka.errors as Errors
@pytest.fixture
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 7a2627e..4afdcd9 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -5,7 +5,6 @@ import time
import pytest
from kafka.client_async import KafkaClient
-from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
@@ -21,6 +20,7 @@ from kafka.protocol.commit import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
+from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.util import WeakMethod
@@ -34,7 +34,7 @@ def coordinator(client):
def test_init(client, coordinator):
- # metadata update on init
+ # metadata update on init
assert client.cluster._need_update is True
assert WeakMethod(coordinator._handle_metadata_update) in client.cluster._listeners
@@ -542,7 +542,7 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):
response = OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])])
_f.success(response)
patched_coord._handle_offset_fetch_response.assert_called_with(
- future, response)
+ future, response)
@pytest.mark.parametrize('response,error,dead', [
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index fc031f7..c821018 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -12,16 +12,16 @@ from kafka.consumer.fetcher import (
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
)
from kafka.consumer.subscription_state import SubscriptionState
+from kafka.future import Future
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.offset import OffsetResponse
-from kafka.structs import TopicPartition
-from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
+from kafka.structs import TopicPartition
@pytest.fixture
diff --git a/test/test_util.py b/test/test_util.py
index 58e5ab8..fb592e8 100644
--- a/test/test_util.py
+++ b/test/test_util.py
@@ -5,8 +5,8 @@ import six
from . import unittest
import kafka.errors
-import kafka.util
import kafka.structs
+import kafka.util
class UtilTest(unittest.TestCase):
diff --git a/test/testutil.py b/test/testutil.py
index 365e47f..a1383a0 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -11,10 +11,12 @@ import pytest
from . import unittest
from kafka import SimpleClient, create_message
-from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError
-from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \
- NotLeaderForPartitionError, UnknownTopicOrPartitionError, \
- FailedPayloadsError
+from kafka.errors import (
+ LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
+ NotLeaderForPartitionError, UnknownTopicOrPartitionError,
+ FailedPayloadsError
+)
+from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
def kafka_versions(*versions):