summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-10-29 00:09:45 -0700
committerJeff Widman <jeff@jeffwidman.com>2018-10-29 02:34:36 -0700
commit9eb63e180e090f121ffa4bb3369bea5ac51ac548 (patch)
tree1049f4e5a96def4e3505a5aafb4fa96e8ebdedcb
parent3e332e83258f7cfffd4df13ba9f17647dc302c43 (diff)
downloadkafka-python-migrate-from-unittest-to-pytest.tar.gz
-rw-r--r--test/test_consumer_integration.py19
-rw-r--r--test/testutil.py26
2 files changed, 13 insertions, 32 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 9a7790e..9f76f7f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -25,20 +25,21 @@ from kafka.structs import (
from test.conftest import version
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
-from test.testutil import (
- KafkaIntegrationTestCase, kafka_versions, Timer,
- send_messages
-)
+from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
- """Test KafkaConsumer
- """
+def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
+ """Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
- send_messages(simple_client, topic, 0, range(0, 100))
- send_messages(simple_client, topic, 1, range(100, 200))
+ # TODO replace this with a `send_messages()` pytest fixture
+ # as we will likely need this elsewhere
+ for i in range(0, 100):
+ kafka_producer.send(topic, partition=0, value=str(i).encode())
+ for i in range(100, 200):
+ kafka_producer.send(topic, partition=1, value=str(i).encode())
+ kafka_producer.flush()
cnt = 0
messages = {0: set(), 1: set()}
diff --git a/test/testutil.py b/test/testutil.py
index feb6f6d..6f6cafb 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -3,20 +3,19 @@ from __future__ import absolute_import
import functools
import operator
import os
-import socket
import time
import uuid
import pytest
from . import unittest
-from kafka import SimpleClient, create_message
+from kafka import SimpleClient
from kafka.errors import (
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
FailedPayloadsError
)
-from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
+from kafka.structs import OffsetRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
@@ -67,26 +66,6 @@ def kafka_versions(*versions):
return real_kafka_versions
-_MESSAGES = {}
-def msg(message):
- """Format, encode and deduplicate a message
- """
- global _MESSAGES #pylint: disable=global-statement
- if message not in _MESSAGES:
- _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))
-
- return _MESSAGES[message].encode('utf-8')
-
-def send_messages(client, topic, partition, messages):
- """Send messages to a topic's partition
- """
- messages = [create_message(msg(str(m))) for m in messages]
- produce = ProduceRequestPayload(topic, partition, messages=messages)
- resp, = client.send_produce_request([produce])
- assert resp.error == 0
-
- return [x.value for x in messages]
-
def current_offset(client, topic, partition, kafka_broker=None):
"""Get the current offset of a topic's partition
"""
@@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None):
else:
return offsets.offsets[0]
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None