summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-09-28 11:28:56 -0400
committerDavid Arthur <mumrah@gmail.com>2012-09-28 11:28:56 -0400
commit51d8bbb231b29c006323d0705f8819f592aeb6e7 (patch)
treed84c834c8b6d5b8157e6616baab88be3f56a9206
parent2a332d2983ae306a692b3796e0a309a5e7504097 (diff)
downloadkafka-python-51d8bbb231b29c006323d0705f8819f592aeb6e7.tar.gz
Starting integration tests
-rw-r--r--README.md26
-rw-r--r--test/integration.py100
-rw-r--r--test/resources/log4j.properties30
-rw-r--r--test/resources/server.properties116
-rw-r--r--test/unit.py (renamed from test.py)0
5 files changed, 269 insertions, 3 deletions
diff --git a/README.md b/README.md
index 7b6f47c..73894d1 100644
--- a/README.md
+++ b/README.md
@@ -12,14 +12,34 @@ Copyright 2012, David Arthur under Apache License, v2.0. See `LICENSE`
This project is very much alpha. The API is in flux and not all the features are fully implemented.
-# Usage
+# Tests
+
+## Run the unit tests
+
+```shell
+python -m test.unit
+```
+
+## Run the integration tests
-## Run the tests
+First, checkout the Kafka source
```shell
-python -m unittest -v test
+git submodule init
+git submodule update
+cd kafka-src
+./sbt update
+./sbt package
```
+Then from the root directory, run the integration tests
+
+```shell
+python -m test.integration
+```
+
+# Usage
+
## Send a message to a topic
You need to specify the topic and partition
diff --git a/test/integration.py b/test/integration.py
new file mode 100644
index 0000000..0ce8b66
--- /dev/null
+++ b/test/integration.py
@@ -0,0 +1,100 @@
+import os
+import select
+import shlex
+import shutil
+import socket
+import subprocess
+import sys
+import tempfile
+from threading import Thread, Event
+import time
+import unittest
+
+from kafka import KafkaClient, ProduceRequest, FetchRequest
+
+def get_open_port():
+ sock = socket.socket()
+ sock.bind(('',0))
+ port = sock.getsockname()[1]
+ sock.close()
+ return port
+
+class KafkaFixture(Thread):
+ def __init__(self, port):
+ Thread.__init__(self)
+ self.port = port
+ self.capture = ""
+ self.shouldDie = Event()
+ self.tmpDir = tempfile.mkdtemp()
+
+ def run(self):
+ # Create the log directory
+ logDir = os.path.join(self.tmpDir, 'logs')
+ os.mkdir(logDir)
+
+ # Create the config file
+ configFile = os.path.join(self.tmpDir, 'server.properties')
+ f = open('test/resources/server.properties', 'r')
+ props = f.read()
+ f = open(configFile, 'w')
+ f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2})
+ f.close()
+
+ # Start Kafka
+ args = shlex.split("./kafka-src/bin/kafka-server-start.sh %s" % configFile)
+ proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
+
+ killed = False
+ while True:
+ (rlist, wlist, xlist) = select.select([proc.stdout], [], [], 1)
+ if proc.stdout in rlist:
+ read = proc.stdout.readline()
+ sys.stdout.write(read)
+ self.capture += read
+
+ if self.shouldDie.is_set():
+ proc.terminate()
+ killed = True
+
+ if proc.poll() is not None:
+ shutil.rmtree(self.tmpDir)
+ if killed:
+ break
+ else:
+ raise RuntimeError("Kafka died. Aborting.")
+
+ def wait_for(self, target, timeout=10):
+ t1 = time.time()
+ while True:
+ t2 = time.time()
+ if t2-t1 >= timeout:
+ return False
+ if target in self.capture:
+ return True
+ time.sleep(1)
+
+
+class IntegrationTest(unittest.TestCase):
+ def setUp(self):
+ port = get_open_port()
+ self.server = KafkaFixture(port)
+ self.server.start()
+ self.server.wait_for("Kafka server started")
+ self.kafka = KafkaClient("localhost", port)
+
+ def test_produce(self):
+ req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
+
+ req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
+
+ def tearDown(self):
+ self.kafka.close()
+ self.server.shouldDie.set()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/resources/log4j.properties b/test/resources/log4j.properties
new file mode 100644
index 0000000..47a817a
--- /dev/null
+++ b/test/resources/log4j.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=TRACE, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+
+
+# Turn on all our debugging info
+#log4j.logger.kafka=INFO
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+
diff --git a/test/resources/server.properties b/test/resources/server.properties
new file mode 100644
index 0000000..cd2ad9a
--- /dev/null
+++ b/test/resources/server.properties
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
+# may not be what you want.
+#hostname=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=%(kafka.port)d
+
+# The number of processor threads the socket server uses for receiving and answering requests.
+# Defaults to the number of cores on the machine
+num.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=%(kafka.tmp.dir)s
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=%(kafka.partitions)d
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+# 3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.file.size=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=false
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
diff --git a/test.py b/test/unit.py
index 5c48f27..5c48f27 100644
--- a/test.py
+++ b/test/unit.py