summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-02-22 23:09:25 -0500
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit2a3d231aa61642c57537bc2128dd4f2bd30f35dd (patch)
tree6bfdfa13b228481df9c79bcb926c2036b476b891
parente87c561723be25fcfa2564030367196231aa366e (diff)
downloadkafka-python-2a3d231aa61642c57537bc2128dd4f2bd30f35dd.tar.gz
Protocol and low-level client done, adding tests
m---------kafka-src0
-rw-r--r--kafka/client08.py180
-rw-r--r--kafka/util.py16
-rw-r--r--test/integration.py407
-rw-r--r--test/resources/log4j.properties2
-rw-r--r--test/resources/server.properties63
6 files changed, 396 insertions, 272 deletions
diff --git a/kafka-src b/kafka-src
-Subproject df3248b758340990010b6a83ebfced60b1339c4
+Subproject e7edb5e1e933f5535378d546bcf4d8b178d2e69
diff --git a/kafka/client08.py b/kafka/client08.py
index b048d68..49d786f 100644
--- a/kafka/client08.py
+++ b/kafka/client08.py
@@ -14,7 +14,7 @@ from .codec import snappy_encode, snappy_decode
from .util import read_short_string, read_int_string
from .util import relative_unpack
from .util import write_short_string, write_int_string
-from .util import group_list_by_key
+from .util import group_by_topic_and_partition
from .util import BufferUnderflowError, ChecksumError
log = logging.getLogger("kafka")
@@ -33,7 +33,7 @@ OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
# Response payloads
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"])
-OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"])
+OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
@@ -74,6 +74,9 @@ class KafkaProtocol(object):
OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03
+ CODEC_NONE = 0x00
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
###################
# Private API #
@@ -171,13 +174,13 @@ class KafkaProtocol(object):
(key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur)
- if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0:
+ if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE:
yield (offset, Message(magic, att, key, value))
- elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1:
+ elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP:
gz = gzip_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, message)
- elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2:
+ elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, message)
@@ -214,8 +217,25 @@ class KafkaProtocol(object):
message_set = KafkaProtocol._encode_message_set(
[KafkaProtocol.create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set)
- return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped)
+ return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped)
+ @classmethod
+ def create_snappy_message(cls, payloads, key=None):
+ """
+ Construct a Snappy Message containing multiple Messages
+
+ The given payloads will be encoded, compressed, and sent as a single atomic
+ message to Kafka.
+
+ Params
+ ======
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+ """
+ message_set = KafkaProtocol._encode_message_set(
+ [KafkaProtocol.create_message(payload) for payload in payloads])
+ snapped = snappy_encode(message_set)
+ return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped)
@classmethod
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
@@ -234,14 +254,14 @@ class KafkaProtocol(object):
-1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
"""
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
- message += struct.pack('>hii', acks, timeout, len(payloads_by_topic))
- for topic, payloads in payloads_by_topic.items():
- message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads))
- for payload in payloads:
+ message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
+ for topic, topic_payloads in grouped_payloads.items():
+ message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads))
+ for partition, payload in topic_payloads.items():
message_set = KafkaProtocol._encode_message_set(payload.messages)
- message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set)
+ message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -276,15 +296,15 @@ class KafkaProtocol(object):
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before returning the response
"""
-
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
- message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -308,14 +328,14 @@ class KafkaProtocol(object):
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=[]):
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
- message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>iqi', partition, payload.time, payload.max_offsets)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -332,8 +352,12 @@ class KafkaProtocol(object):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
- ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur)
- yield OffsetResponse(topic, partition, error, offset)
+ ((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur)
+ offsets = []
+ for j in range(num_offsets):
+ ((offset,), cur) = relative_unpack('>q', data, cur)
+ offsets.append(offset)
+ yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=[]):
@@ -400,15 +424,15 @@ class KafkaProtocol(object):
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads= group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group)
- message += struct.pack('>i', len(payloads_by_topic))
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>i', len(grouped_payloads))
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>iq', payload.partition, payload.offset)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>iq', partition, payload.offset)
message += write_short_string(payload.metadata)
return struct.pack('>i%ds' % len(message), len(message), message)
@@ -421,6 +445,7 @@ class KafkaProtocol(object):
======
data: bytes to decode
"""
+ data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -443,15 +468,15 @@ class KafkaProtocol(object):
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group)
- message += struct.pack('>i', len(payloads_by_topic))
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>i', len(grouped_payloads))
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>i', payload.partition)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>i', partition)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -493,6 +518,9 @@ class KafkaConnection(object):
self._sock.connect((host, port))
self._sock.settimeout(10)
+ def __str__(self):
+ return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
+
###################
# Private API #
###################
@@ -536,6 +564,8 @@ class KafkaConnection(object):
# Public API #
##################
+ # TODO multiplex socket communication to allow for multi-threaded clients
+
def send(self, requestId, payload):
"Send a request to Kafka"
sent = self._sock.sendall(payload)
@@ -566,6 +596,10 @@ class KafkaClient(object):
self.topics_to_brokers = {} # topic_id -> broker_id
self.load_metadata_for_topics()
+ def close(self):
+ for conn in self.conns.values():
+ conn.close()
+
def get_conn_for_broker(self, broker):
"Get or create a connection to a broker"
if (broker.host, broker.port) not in self.conns:
@@ -626,20 +660,14 @@ class KafkaClient(object):
======
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
"""
- key_fn = lambda x: (x.topic, x.partition)
-
- # Note the order of the incoming payloads
- original_keys = [key_fn(payload) for payload in payloads]
-
- # Group the produce requests by topic+partition
- payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
-
# Group the produce requests by which broker they go to
+ original_keys = []
payloads_by_broker = defaultdict(list)
- for (topic, partition), payloads in payloads_by_topic_and_partition.items():
- payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
+ for payload in payloads:
+ payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads
+ original_keys.append((payload.topic, payload.partition))
- # Accumulate the responses in a dictionary, keyed by key_fn
+ # Accumulate the responses in a dictionary
acc = {}
# For each broker, send the list of request payloads
@@ -657,11 +685,10 @@ class KafkaClient(object):
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
if callback is not None:
- acc[key_fn(produce_response)] = callback(produce_response)
+ acc[(produce_response.topic, produce_response.partition)] = callback(produce_response)
else:
- acc[key_fn(produce_response)] = produce_response
+ acc[(produce_response.topic, produce_response.partition)] = produce_response
- print(acc)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
@@ -672,20 +699,14 @@ class KafkaClient(object):
Payloads are grouped by topic and partition so they can be pipelined to the same
brokers.
"""
- key_fn = lambda x: (x.topic, x.partition)
-
- # Note the order of the incoming payloads
- original_keys = [key_fn(payload) for payload in payloads]
-
- # Group the produce requests by topic+partition
- payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
-
# Group the produce requests by which broker they go to
+ original_keys = []
payloads_by_broker = defaultdict(list)
- for (topic, partition), payloads in payloads_by_topic_and_partition.items():
- payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
+ for payload in payloads:
+ payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload)
+ original_keys.append((payload.topic, payload.partition))
- # Accumulate the responses in a dictionary, keyed by key_fn
+ # Accumulate the responses in a dictionary, keyed by topic+partition
acc = {}
# For each broker, send the list of request payloads
@@ -703,9 +724,9 @@ class KafkaClient(object):
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
# Run the callback
if callback is not None:
- acc[key_fn(fetch_response)] = callback(fetch_response)
+ acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response)
else:
- acc[key_fn(fetch_response)] = fetch_response
+ acc[(fetch_response.topic, fetch_response.partition)] = fetch_response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
@@ -720,11 +741,30 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
- except Exception:
- log.warning("Could not commit offset to server %s, trying next server", conn)
+ except Exception, e:
+ log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
continue
return None
+ def send_offset_request(self, payloads=[], fail_on_error=True, callback=None):
+ requestId = self.next_id()
+ request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads)
+ response = self.try_send_request(requestId, request)
+ if response is None:
+ if fail_on_error is True:
+ raise Exception("All servers failed to process request")
+ else:
+ return None
+ out = []
+ for offset_response in KafkaProtocol.decode_offset_response(response):
+ if fail_on_error == True and offset_response.error != 0:
+ raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error)
+ if callback is not None:
+ out.append(callback(offset_response))
+ else:
+ out.append(offset_response)
+ return out
+
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
requestId = self.next_id()
request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
@@ -737,6 +777,7 @@ class KafkaClient(object):
out = []
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
if fail_on_error == True and offset_commit_response.error != 0:
+ print(offset_commit_response)
raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error)
if callback is not None:
out.append(callback(offset_commit_response))
@@ -770,7 +811,7 @@ if __name__ == "__main__":
topic = "foo8"
# Bootstrap connection
- conn = KafkaClient("localhost", 9092)
+ conn = KafkaClient("localhost", 49720)
# Create some Messages
messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
@@ -799,7 +840,6 @@ if __name__ == "__main__":
return 0
else:
return offset_response.offset
-
# Load offsets
(offset1, offset2) = conn.send_offset_fetch_request(
group="group1",
diff --git a/kafka/util.py b/kafka/util.py
index cb8f7f5..509c5b8 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,15 +1,16 @@
+from collections import defaultdict
from itertools import groupby
import struct
def write_int_string(s):
if s is None:
- return struct.pack('>i', -1)
+ return struct.pack('>i', 0) # TODO change this to -1 when KAFKA-771 is accepted
else:
return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
if s is None:
- return struct.pack('>h', -1)
+ return struct.pack('>h', 0) # TODO change this to -1 when KAFKA-771 is accepted
else:
return struct.pack('>h%ds' % len(s), len(s), s)
@@ -44,12 +45,11 @@ def relative_unpack(fmt, data, cur):
out = struct.unpack(fmt, data[cur:cur+size])
return (out, cur+size)
-def group_list_by_key(it, key):
- sorted_it = sorted(it, key=key)
- out = {}
- for k, group in groupby(sorted_it, key=key):
- out[k] = list(group)
- return out
+def group_by_topic_and_partition(tuples):
+ out = defaultdict(dict)
+ for t in tuples:
+ out[t.topic][t.partition] = t
+ return out
class BufferUnderflowError(Exception):
pass
diff --git a/test/integration.py b/test/integration.py
index 3971d3f..598b17a 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -11,8 +11,7 @@ from threading import Thread, Event
import time
import unittest
-from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
-from kafka.queue import KafkaQueue
+from kafka.client08 import *
def get_open_port():
sock = socket.socket()
@@ -27,12 +26,15 @@ def build_kafka_classpath():
jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar"))
jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar"))
jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar"))
- jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar"))
jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar"))
- return ":".join(["."] + [os.path.abspath(jar) for jar in jars])
+ jars += glob.glob(os.path.join(baseDir, "core/target/scala-2.8.0/kafka_2.8.0-*.jar"))
+ jars += glob.glob(os.path.join(baseDir, "/Users/mumrah/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar"))
+ cp = ":".join(["."] + [os.path.abspath(jar) for jar in jars])
+ cp += ":" + os.path.abspath(os.path.join(baseDir, "conf/log4j.properties"))
+ return cp
class KafkaFixture(Thread):
- def __init__(self, port):
+ def __init__(self, host, port):
Thread.__init__(self)
self.port = port
self.capture = ""
@@ -57,7 +59,7 @@ class KafkaFixture(Thread):
# Start Kafka
args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile))
- proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
+ proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()})
killed = False
while True:
@@ -65,6 +67,7 @@ class KafkaFixture(Thread):
if proc.stdout in rlist:
read = proc.stdout.readline()
stdout.write(read)
+ stdout.flush()
self.capture += read
if self.shouldDie.is_set():
@@ -88,174 +91,254 @@ class KafkaFixture(Thread):
return True
time.sleep(0.100)
+ def close(self):
+ self.shouldDie.set()
-class IntegrationTest(unittest.TestCase):
+class ExternalKafkaFixture(object):
+ def __init__(self, host, port):
+ print("Using already running Kafka at %s:%d" % (host, port))
+
+ def close(self):
+ pass
+
+
+class TestKafkaClient(unittest.TestCase):
@classmethod
def setUpClass(cls):
- port = get_open_port()
- cls.server = KafkaFixture(port)
- cls.server.start()
- cls.server.wait_for("Kafka server started")
- cls.kafka = KafkaClient("localhost", port)
+ if os.environ.has_key('KAFKA_URI'):
+ parse = urlparse(os.environ['KAFKA_URI'])
+ (host, port) = (parse.hostname, parse.port)
+ cls.server = ExternalKafkaFixture(host, port)
+ cls.client = KafkaClient(host, port)
+ else:
+ port = get_open_port()
+ cls.server = KafkaFixture("localhost", port)
+ cls.server.start()
+ cls.server.wait_for("Kafka server started")
+ cls.client = KafkaClient("localhost", port)
@classmethod
def tearDownClass(cls):
- cls.kafka.close()
- cls.server.shouldDie.set()
-
- def test_send_simple(self):
- self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3")
- self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple"))
-
- def test_produce(self):
- # Produce a message, check that the log got created
- req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
-
- # Same thing, different partition
- req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
-
- def _test_produce_consume(self, topic, create_func):
- # Send two messages and consume them
- message1 = create_func("testing 1")
- message2 = create_func("testing 2")
- req = ProduceRequest(topic, 0, [message1, message2])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic))
- self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic))
- req = FetchRequest(topic, 0, 0, 1024)
- (messages, req) = self.kafka.get_message_set(req)
- self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0].payload, "testing 1")
- self.assertEquals(messages[1].payload, "testing 2")
-
- # Do the same, but for a different partition
- message3 = create_func("testing 3")
- message4 = create_func("testing 4")
- req = ProduceRequest(topic, 1, [message3, message4])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic))
- self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic))
- req = FetchRequest(topic, 1, 0, 1024)
- (messages, req) = self.kafka.get_message_set(req)
- self.assertEquals(len(messages), 2)
- self.assertEquals(messages[0].payload, "testing 3")
- self.assertEquals(messages[1].payload, "testing 4")
+ cls.client.close()
+ cls.server.close()
+
+ #####################
+ # Produce Tests #
+ #####################
+
+ def test_produce_many_simple(self):
+ produce = ProduceRequest("test_produce_many_simple", 0, messages=[
+ KafkaProtocol.create_message("Test message %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 100)
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 100)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 200)
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 200)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 300)
+
+ def test_produce_10k_simple(self):
+ produce = ProduceRequest("test_produce_10k_simple", 0, messages=[
+ KafkaProtocol.create_message("Test message %d" % i) for i in range(10000)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 10000)
+
+ def test_produce_many_gzip(self):
+ message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
+ message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
+
+ produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 200)
+
+ def test_produce_many_snappy(self):
+ message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
+ message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
+
+ produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 200)
+
+ def test_produce_mixed(self):
+ message1 = KafkaProtocol.create_message("Just a plain message")
+ message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)])
+ message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)])
+
+ produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 201)
+
+
+ def test_produce_100k_gzipped(self):
+ produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
+ KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)])
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)])
+ self.assertEquals(offset.offsets[0], 100000)
+
+ #####################
+ # Consume Tests #
+ #####################
+
+ def test_consume_none(self):
+ fetch = FetchRequest("test_consume_none", 0, 0, 1024)
+
+ fetch_resp = self.client.send_fetch_request([fetch]).next()
+ self.assertEquals(fetch_resp.error, 0)
+ self.assertEquals(fetch_resp.topic, "test_consume_none")
+ self.assertEquals(fetch_resp.partition, 0)
+
+ messages = list(fetch_resp.messages)
+ self.assertEquals(len(messages), 0)
def test_produce_consume(self):
- self._test_produce_consume("test-produce-consume", KafkaClient.create_message)
-
- def test_produce_consume_snappy(self):
- self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message)
-
- def test_produce_consume_gzip(self):
- self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message)
-
- def test_check_offset(self):
- # Produce/consume a message, check that the next offset looks correct
- message1 = KafkaClient.create_message("testing 1")
- req = ProduceRequest("test-check-offset", 0, [message1])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
- req = FetchRequest("test-check-offset", 0, 0, 1024)
- (messages, nextReq) = self.kafka.get_message_set(req)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], message1)
- self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)))
-
- # Produce another message, consume with the last offset
- message2 = KafkaClient.create_message("test 2")
- req = ProduceRequest("test-check-offset", 0, [message2])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'"))
-
- # Verify
- (messages, nextReq) = self.kafka.get_message_set(nextReq)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], message2)
- self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
-
- def test_iterator(self):
- # Produce 100 messages
- messages = []
- for i in range(100):
- messages.append(KafkaClient.create_message("testing %d" % i))
- req = ProduceRequest("test-iterator", 0, messages)
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'"))
-
- # Initialize an iterator of fetch size 64 bytes - big enough for one message
- # but not enough for all 100 messages
- cnt = 0
- for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)):
- self.assertEquals(messages[i], msg)
- cnt += 1
- self.assertEquals(cnt, 100)
-
- # Same thing, but don't auto paginate
- cnt = 0
- for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)):
- self.assertEquals(messages[i], msg)
- cnt += 1
- self.assertTrue(cnt < 100)
-
- def test_offset_request(self):
- # Produce a message to create the topic/partition
- message1 = KafkaClient.create_message("testing 1")
- req = ProduceRequest("test-offset-request", 0, [message1])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'"))
-
- t1 = int(time.time()*1000) # now
- t2 = t1 + 60000 # one minute from now
- req = OffsetRequest("test-offset-request", 0, t1, 1024)
- self.kafka.get_offsets(req)
-
- req = OffsetRequest("test-offset-request", 0, t2, 1024)
- self.kafka.get_offsets(req)
-
- def test_10k_messages(self):
- msg_tmpl = "this is a test message with a few bytes in it. this is message number %d"
- # TODO 10k actually fails, why?
- msg = KafkaClient.create_gzip_message(*[msg_tmpl % i for i in range(1000)])
- req = ProduceRequest("test-10k", 0, [msg])
- self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'test-10k'-0"))
- self.assertTrue(self.server.wait_for("Flushing log 'test-10k-0'"))
- #self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1"))
- #self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'"))
-
- def test_queue(self):
- # Send 1000 messages
- q = KafkaQueue(self.kafka, "test-queue", [0,1])
- t1 = time.time()
- for i in range(1000):
- q.put("test %d" % i)
- t2 = time.time()
+ produce = ProduceRequest("test_produce_consume", 0, messages=[
+ KafkaProtocol.create_message("Just a test message"),
+ KafkaProtocol.create_message("Message with a key", "foo"),
+ ])
- # Wait for the producer to fully flush
- time.sleep(2)
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
- # Copy all the messages into a list
- t1 = time.time()
- consumed = []
- for i in range(1000):
- consumed.append(q.get())
- t2 = time.time()
+ fetch = FetchRequest("test_produce_consume", 0, 0, 1024)
- # Verify everything is there
- for i in range(1000):
- self.assertTrue("test %d" % i in consumed)
+ fetch_resp = self.client.send_fetch_request([fetch]).next()
+ self.assertEquals(fetch_resp.error, 0)
+
+ messages = list(fetch_resp.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].offset, 0)
+ self.assertEquals(messages[0].message.value, "Just a test message")
+ self.assertEquals(messages[0].message.key, None)
+ self.assertEquals(messages[1].offset, 1)
+ self.assertEquals(messages[1].message.value, "Message with a key")
+ self.assertEquals(messages[1].message.key, "foo")
+
+ def test_produce_consume_many(self):
+ produce = ProduceRequest("test_produce_consume_many", 0, messages=[
+ KafkaProtocol.create_message("Test message %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ # 1024 is not enough for 100 messages...
+ fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024)
+
+ (fetch_resp1,) = self.client.send_fetch_request([fetch1])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 100)
+ messages = list(fetch_resp1.messages)
+ self.assertTrue(len(messages) < 100)
+
+ # 10240 should be enough
+ fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240)
+ (fetch_resp2,) = self.client.send_fetch_request([fetch2])
+
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 100)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 100)
+ for i, message in enumerate(messages):
+ self.assertEquals(message.offset, i)
+ self.assertEquals(message.message.value, "Test message %d" % i)
+ self.assertEquals(message.message.key, None)
+
+ def test_produce_consume_two_partitions(self):
+ produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[
+ KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10)
+ ])
+ produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[
+ KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10)
+ ])
+
+ for resp in self.client.send_produce_request([produce1, produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+ return
+
+ fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024)
+ fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2])
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 10)
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 10)
+ for i, message in enumerate(messages):
+ self.assertEquals(message.offset, i)
+ self.assertEquals(message.message.value, "Partition 0 %d" % i)
+ self.assertEquals(message.message.key, None)
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 10)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 10)
+ for i, message in enumerate(messages):
+ self.assertEquals(message.offset, i)
+ self.assertEquals(message.message.value, "Partition 1 %d" % i)
+ self.assertEquals(message.message.key, None)
+
+ ####################
+ # Offset Tests #
+ ####################
+
+ def test_commit_fetch_offsets(self):
+ req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
+ (resp,) = self.client.send_offset_commit_request("group", [req])
+ self.assertEquals(resp.error, 0)
+
+ req = OffsetFetchRequest("test_commit_fetch_offsets", 0)
+ (resp,) = self.client.send_offset_fetch_request("group", [req])
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 42)
+ self.assertEquals(resp.metadata, "metadata")
+
+
+
- # Shutdown the queue
- q.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
diff --git a/test/resources/log4j.properties b/test/resources/log4j.properties
index c4ecd2c..47a817a 100644
--- a/test/resources/log4j.properties
+++ b/test/resources/log4j.properties
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
diff --git a/test/resources/server.properties b/test/resources/server.properties
index 2eefe3b..0d01fca 100644
--- a/test/resources/server.properties
+++ b/test/resources/server.properties
@@ -17,31 +17,32 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
-brokerid=0
-
-# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
-# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
-# may not be what you want.
-#hostname=
-
+broker.id=0
############################# Socket Server Settings #############################
# The port the socket server listens on
port=%(kafka.port)d
-# The number of processor threads the socket server uses for receiving and answering requests.
-# Defaults to the number of cores on the machine
-num.threads=2
+# Hostname the broker will bind to and advertise to producers and consumers.
+# If not set, the server will bind to all interfaces and advertise the value returned from
+# from java.net.InetAddress.getCanonicalHostName().
+#host.name=localhost
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer=1048576
+socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer=1048576
+socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
+socket.request.max.bytes=104857600
############################# Log Basics #############################
@@ -53,9 +54,6 @@ log.dir=%(kafka.tmp.dir)s
# for consumption, but also mean more files.
num.partitions=%(kafka.partitions)d
-# Overrides for for the default given by num.partitions on a per-topic basis
-#topic.partition.count.map=topic1:3, topic2:4
-
############################# Log Flush Policy #############################
# The following configurations control the flush of data to disk. This is the most
@@ -68,16 +66,13 @@ num.partitions=%(kafka.partitions)d
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=1
+log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=10000
+log.flush.interval.ms=1000
-# Per-topic overrides for log.default.flush.interval.ms
-#topic.flush.intervals.ms=topic1:1000, topic2:3000
-
-# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=10000
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
############################# Log Retention Policy #############################
@@ -90,11 +85,11 @@ log.default.flush.scheduler.interval.ms=10000
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.size.
-#log.retention.size=1073741824
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
@@ -102,15 +97,21 @@ log.cleanup.interval.mins=1
############################# Zookeeper #############################
-# Enable connecting to zookeeper
-enable.zookeeper=false
-
# Zk connection string (see zk docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
-zk.connect=localhost:2181
+zk.connect=localhost:2181/kafka-python
# Timeout in ms for connecting to zookeeper
-zk.connectiontimeout.ms=1000000
+zk.connection.timeout.ms=1000000
+
+# metrics reporter properties
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
+kafka.csv.metrics.dir=/tmp/kafka_metrics
+# Disable csv reporting by default.
+kafka.csv.metrics.reporter.enabled=false
+
+log.cleanup.policy=delete