summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py31
-rw-r--r--kafka/conn.py76
-rw-r--r--kafka/consumer.py269
-rw-r--r--kafka/producer.py6
-rw-r--r--kafka/protocol.py24
-rw-r--r--test/fixtures.py43
-rw-r--r--test/test_integration.py68
-rw-r--r--test/test_unit.py475
8 files changed, 613 insertions, 379 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 71ededa..33c6d77 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -3,11 +3,12 @@ from collections import defaultdict
from functools import partial
from itertools import count
import logging
-import socket
import time
-from kafka.common import ErrorMapping, TopicAndPartition
-from kafka.common import ConnectionError, FailedPayloadsException
+from kafka.common import (
+ ErrorMapping, TopicAndPartition, ConnectionError,
+ FailedPayloadsException
+)
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -19,12 +20,12 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
+ def __init__(self, host, port, client_id=CLIENT_ID, timeout=10):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
+ self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, bufsize)
+ (host, port): KafkaConnection(host, port, timeout=timeout)
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
@@ -41,7 +42,7 @@ class KafkaClient(object):
"""
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, self.bufsize)
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self.conns[(broker.host, broker.port)]
@@ -165,14 +166,24 @@ class KafkaClient(object):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
+ failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
- response = conn.recv(requestId)
- except ConnectionError, e: # ignore BufferUnderflow for now
- log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError, e:
+ log.warning("Could not receive response to request [%s] "
+ "from server %s: %s", request, conn, e)
+ failed = True
+ except ConnectionError, e:
+ log.warning("Could not send request [%s] to server %s: %s",
+ request, conn, e)
+ failed = True
+
+ if failed:
failed_payloads += payloads
self.topics_to_brokers = {} # reset metadata
continue
diff --git a/kafka/conn.py b/kafka/conn.py
index 1a3e260..c80f428 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -4,7 +4,6 @@ import socket
import struct
from threading import local
-from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -19,14 +18,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4096):
+ def __init__(self, host, port, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
- self._sock.settimeout(10)
+ self.timeout = timeout
+ self._sock.settimeout(self.timeout)
self._dirty = False
def __str__(self):
@@ -36,44 +35,31 @@ class KafkaConnection(local):
# Private API #
###################
- def _consume_response(self):
- """
- Fully consume the response iterator
- """
- return "".join(self._consume_response_iter())
-
- def _consume_response_iter(self):
- """
- This method handles the response header and error messages. It
- then returns an iterator for the chunks of the response
- """
- log.debug("Handling response from Kafka")
-
- # Read the size off of the header
- resp = self._sock.recv(4)
- if resp == "":
- self._raise_connection_error()
- (size,) = struct.unpack('>i', resp)
-
- messagesize = size - 4
- log.debug("About to read %d bytes from Kafka", messagesize)
-
- # Read the remainder of the response
- total = 0
- while total < messagesize:
- resp = self._sock.recv(self.bufsize)
- log.debug("Read %d bytes from Kafka", len(resp))
- if resp == "":
- raise BufferUnderflowError(
- "Not enough data to read this response")
-
- total += len(resp)
- yield resp
-
def _raise_connection_error(self):
self._dirty = True
raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+ def _read_bytes(self, num_bytes):
+ bytes_left = num_bytes
+ resp = ''
+ log.debug("About to read %d bytes from Kafka", num_bytes)
+ if self._dirty:
+ self.reinit()
+ while bytes_left:
+ try:
+ data = self._sock.recv(bytes_left)
+ except socket.error:
+ log.exception('Unable to receive data from Kafka')
+ self._raise_connection_error()
+ if data == '':
+ log.error("Not enough data to read this response")
+ self._raise_connection_error()
+ bytes_left -= len(data)
+ log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
+ resp += data
+
+ return resp
+
##################
# Public API #
##################
@@ -89,7 +75,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error:
+ except socket.error, e:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()
@@ -98,8 +84,14 @@ class KafkaConnection(local):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
- self.data = self._consume_response()
- return self.data
+ # Read the size off of the header
+ resp = self._read_bytes(4)
+
+ (size,) = struct.unpack('>i', resp)
+
+ # Read the remainder of the response
+ resp = self._read_bytes(size)
+ return str(resp)
def copy(self):
"""
@@ -124,5 +116,5 @@ class KafkaConnection(local):
self.close()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
- self._sock.settimeout(10)
+ self._sock.settimeout(self.timeout)
self._dirty = False
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 226700e..eba2912 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -5,8 +5,8 @@ from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -24,6 +24,11 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
class FetchContext(object):
@@ -34,13 +39,15 @@ class FetchContext(object):
self.consumer = consumer
self.block = block
- if block and not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
-
- self.timeout = timeout * 1000
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
@@ -48,9 +55,9 @@ class FetchContext(object):
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
- """Reset values to default"""
- self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class Consumer(object):
@@ -206,8 +213,14 @@ class SimpleConsumer(Consumer):
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
-
fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -218,12 +231,23 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
-
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
+
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
super(SimpleConsumer, self).__init__(
client, group, topic,
@@ -267,12 +291,6 @@ class SimpleConsumer(Consumer):
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
-
- # The API returns back the next available offset
- # For eg: if the current offset is 18, the API will return
- # back 19. So, if we have to seek 5 points before, we will
- # end up going back to 14, instead of 13. Adjust this
- deltas[partition] -= 1
else:
pass
@@ -289,123 +307,111 @@ class SimpleConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
if timeout:
- timeout = timeout * 1.0 / len(self.offsets)
+ max_time = time.time() + timeout
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ while count > 0 and (timeout is None or timeout > 0):
+ message = self.get_message(block, timeout)
+ if message:
+ messages.append(message)
count -= 1
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
return messages
- def __iter__(self):
- """
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
- """
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
+ def get_message(self, block=True, timeout=0.1):
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ return self.queue.get_nowait()
+ except Empty:
+ return None
- if len(iters) == 0:
- return
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
while True:
- if len(iters) == 0:
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+ else:
+ # Timed out waiting for a message
break
- for partition, it in iters.items():
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ requests = []
+ partitions = self.offsets.keys()
+ while partitions:
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition,
+ self.offsets[partition],
+ self.buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = set()
+ for resp in responses:
+ partition = resp.partition
try:
- if self.partition_info:
- yield (partition, it.next())
+ for message in resp.messages:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ # Put the message in our queue
+ if self.partition_info:
+ self.queue.put((partition, message))
+ else:
+ self.queue.put(message)
+ except ConsumerFetchSizeTooSmall, e:
+ if (self.max_buffer_size is not None and
+ self.buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise e
+ if self.max_buffer_size is None:
+ self.buffer_size *= 2
else:
- yield it.next()
+ self.buffer_size = max(self.buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", self.buffer_size)
+ retry_partitions.add(partition)
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
except StopIteration:
+ # Stop iterating through this partition
log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
-
- # skip auto-commit since we didn't yield anything
- continue
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
-
- fetch_size = self.fetch_min_bytes
-
- while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
-
- next_offset = None
- try:
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
- except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
- except ConsumerNoMoreData, e:
- log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
- else:
- offset = next_offset + 1
-
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
@@ -443,8 +449,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -454,12 +461,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
- time.sleep(0.1)
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
@@ -504,7 +510,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -586,8 +592,9 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
@@ -598,7 +605,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout:
+ max_time = time.time() + timeout
+
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -618,6 +628,7 @@ class MultiProcessConsumer(Consumer):
self.count_since_commit += 1
self._auto_commit()
count -= 1
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()
diff --git a/kafka/producer.py b/kafka/producer.py
index a82d99b..5aead43 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -69,7 +69,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
- except Exception as exp:
+ except Exception:
log.exception("Unable to send message")
@@ -146,9 +146,9 @@ class Producer(object):
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
- except Exception as e:
+ except Exception:
log.exception("Unable to send messages")
- raise e
+ raise
return resp
def stop(self, timeout=1):
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 612acf6..54b8eee 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -119,9 +119,17 @@ class KafkaProtocol(object):
read_message = True
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
+ # NOTE: Not sure this is correct error handling:
+ # Is it possible to get a BUE if the message set is somewhere
+ # in the middle of the fetch response? If so, we probably have
+ # an issue that's not fetch size too small.
+ # Aren't we ignoring errors if we fail to unpack data by
+ # raising StopIteration()?
+ # If _decode_message() raises a ChecksumError, couldn't that
+ # also be due to the fetch size being too small?
if read_message is False:
# If we get a partial read of a message, but haven't
- # yielded anyhting there's a problem
+ # yielded anything there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@@ -171,7 +179,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
@@ -231,7 +239,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before
@@ -338,7 +346,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
topics: list of strings
"""
topics = [] if topics is None else topics
@@ -376,12 +384,16 @@ class KafkaProtocol(object):
topic_metadata = {}
for i in range(num_topics):
+ # NOTE: topic_error is discarded. Should probably be returned with
+ # the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = {}
for j in range(num_partitions):
+ # NOTE: partition_error_code is discarded. Should probably be
+ # returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -408,7 +420,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
@@ -459,7 +471,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
diff --git a/test/fixtures.py b/test/fixtures.py
index c771a58..9e283d3 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -208,9 +208,12 @@ class ZookeeperFixture(object):
self.tmp_dir = None
self.child = None
+ def out(self, message):
+ print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message))
+
def open(self):
self.tmp_dir = tempfile.mkdtemp()
- print("*** Running local Zookeeper instance...")
+ self.out("Running local instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" tmp_dir = %s" % self.tmp_dir)
@@ -229,16 +232,16 @@ class ZookeeperFixture(object):
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
- print("*** Starting Zookeeper...")
+ self.out("Starting...")
self.child.start()
self.child.wait_for(r"Snapshotting")
- print("*** Done!")
+ self.out("Done!")
def close(self):
- print("*** Stopping Zookeeper...")
+ self.out("Stopping...")
self.child.stop()
self.child = None
- print("*** Done!")
+ self.out("Done!")
shutil.rmtree(self.tmp_dir)
@@ -272,10 +275,18 @@ class KafkaFixture(object):
self.tmp_dir = None
self.child = None
+ self.running = False
+
+ def out(self, message):
+ print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message))
def open(self):
+ if self.running:
+ self.out("Instance already running")
+ return
+
self.tmp_dir = tempfile.mkdtemp()
- print("*** Running local Kafka instance")
+ self.out("Running local instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" broker_id = %s" % self.broker_id)
@@ -303,25 +314,31 @@ class KafkaFixture(object):
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
- print("*** Creating Zookeeper chroot node...")
+ self.out("Creating Zookeeper chroot node...")
proc = subprocess.Popen(kafka_run_class_args(
"org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create", "/%s" % self.zk_chroot, "kafka-python"
))
if proc.wait() != 0:
- print("*** Failed to create Zookeeper chroot node")
+ self.out("Failed to create Zookeeper chroot node")
raise RuntimeError("Failed to create Zookeeper chroot node")
- print("*** Done!")
+ self.out("Done!")
- print("*** Starting Kafka...")
+ self.out("Starting...")
self.child.start()
self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
- print("*** Done!")
+ self.out("Done!")
+ self.running = True
def close(self):
- print("*** Stopping Kafka...")
+ if not self.running:
+ self.out("Instance already stopped")
+ return
+
+ self.out("Stopping...")
self.child.stop()
self.child = None
- print("*** Done!")
+ self.out("Done!")
shutil.rmtree(self.tmp_dir)
+ self.running = False
diff --git a/test/test_integration.py b/test/test_integration.py
index a10dae2..eaf432d 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -8,6 +8,7 @@ import random
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
+from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from .fixtures import ZookeeperFixture, KafkaFixture
@@ -554,7 +555,7 @@ class TestConsumer(unittest.TestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
+ cls.client = KafkaClient(cls.server2.host, cls.server2.port)
@classmethod
def tearDownClass(cls): # noqa
@@ -583,7 +584,9 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ "test_simple_consumer", auto_commit=False,
+ iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -609,7 +612,9 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
def test_simple_consumer_blocking(self):
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ "test_simple_consumer_blocking",
+ auto_commit=False, iter_timeout=0)
# Blocking API
start = datetime.now()
@@ -657,7 +662,8 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", "test_simple_pending",
+ auto_commit=False, iter_timeout=0)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -755,7 +761,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- # Produce 10 messages that are too large (bigger than default fetch size)
+ # Produce 10 messages that are large (bigger than default fetch size)
messages2 = [create_message(random_string(5000)) for i in range(10)]
produce2 = ProduceRequest("test_large_messages", 0, messages2)
@@ -764,33 +770,55 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 10)
# Consumer should still get all of them
- consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
+ auto_commit=False, iter_timeout=0)
all_messages = messages1 + messages2
for i, message in enumerate(consumer):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+ # Produce 1 message that is too large (bigger than max fetch size)
+ big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
+ big_message = create_message(random_string(big_message_size))
+ produce3 = ProduceRequest("test_large_messages", 0, [big_message])
+ for resp in self.client.send_produce_request([produce3]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 20)
+
+ self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1)
+
+ # Create a consumer with no fetch size limit
+ big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
+ max_buffer_size=None, partitions=[0],
+ auto_commit=False, iter_timeout=0)
+
+ # Seek to the last message
+ big_consumer.seek(-1, 2)
+
+ # Consume giant message successfully
+ message = big_consumer.get_message(block=False, timeout=10)
+ self.assertIsNotNone(message)
+ self.assertEquals(message.message.value, big_message.value)
+
class TestFailover(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
+ def setUp(self):
- zk_chroot = random_string(10)
+ zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- cls.zk = ZookeeperFixture.instance()
- kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
- cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
-
- @classmethod
- def tearDownClass(cls):
- cls.client.close()
- for broker in cls.brokers:
+ self.zk = ZookeeperFixture.instance()
+ kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
+ self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+
+ def tearDown(self):
+ self.client.close()
+ for broker in self.brokers:
broker.close()
- cls.zk.close()
+ self.zk.close()
def test_switch_leader(self):
@@ -869,7 +897,7 @@ class TestFailover(unittest.TestCase):
def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
- consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
diff --git a/test/test_unit.py b/test/test_unit.py
index 3f3af66..e3fd4bb 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -3,13 +3,18 @@ import random
import struct
import unittest
-from kafka.client import KafkaClient
-from kafka.common import ProduceRequest, FetchRequest
+from kafka.common import (
+ ProduceRequest, FetchRequest, Message, ChecksumError,
+ ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
+ OffsetAndMessage, BrokerMetadata, PartitionMetadata
+)
from kafka.codec import (
- has_gzip, has_snappy,
- gzip_encode, gzip_decode,
+ has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
+from kafka.protocol import (
+ create_gzip_message, create_message, create_snappy_message, KafkaProtocol
+)
ITERATIONS = 1000
STRLEN = 100
@@ -20,16 +25,13 @@ def random_string():
class TestPackage(unittest.TestCase):
- @unittest.expectedFailure
+
def test_top_level_namespace(self):
import kafka as kafka1
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
- self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode")
- self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode")
self.assertEquals(kafka1.client.__name__, "kafka.client")
self.assertEquals(kafka1.codec.__name__, "kafka.codec")
- @unittest.expectedFailure
def test_submodule_namespace(self):
import kafka.client as client1
self.assertEquals(client1.__name__, "kafka.client")
@@ -48,173 +50,334 @@ class TestPackage(unittest.TestCase):
from kafka import KafkaClient as KafkaClient2
self.assertEquals(KafkaClient2.__name__, "KafkaClient")
- from kafka import gzip_encode as gzip_encode2
- self.assertEquals(gzip_encode2.__name__, "gzip_encode")
-
- from kafka import snappy_encode as snappy_encode2
- self.assertEquals(snappy_encode2.__name__, "snappy_encode")
-
-
-class TestMisc(unittest.TestCase):
- @unittest.expectedFailure
- def test_length_prefix(self):
- for i in xrange(ITERATIONS):
- s1 = random_string()
- self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1))
+ from kafka.codec import snappy_encode
+ self.assertEquals(snappy_encode.__name__, "snappy_encode")
class TestCodec(unittest.TestCase):
+
+ @unittest.skipUnless(has_gzip(), "Gzip not available")
def test_gzip(self):
- if not has_gzip():
- return
for i in xrange(ITERATIONS):
s1 = random_string()
s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1, s2)
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
- if not has_snappy():
- return
for i in xrange(ITERATIONS):
s1 = random_string()
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
-# XXX(sandello): These really should be protocol tests.
-class TestMessage(unittest.TestCase):
- @unittest.expectedFailure
- def test_create(self):
- msg = KafkaClient.create_message("testing")
- self.assertEquals(msg.payload, "testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 0)
- self.assertEquals(msg.crc, -386704890)
+class TestProtocol(unittest.TestCase):
+
+ def test_create_message(self):
+ payload = "test"
+ key = "key"
+ msg = create_message(payload, key)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, 0)
+ self.assertEqual(msg.key, key)
+ self.assertEqual(msg.value, payload)
- @unittest.expectedFailure
+ @unittest.skipUnless(has_gzip(), "Snappy not available")
def test_create_gzip(self):
- msg = KafkaClient.create_gzip_message("testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 1)
- # Can't check the crc or payload for gzip since it's non-deterministic
- (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload))
- inner = messages[0]
- self.assertEquals(inner.magic, 1)
- self.assertEquals(inner.attributes, 0)
- self.assertEquals(inner.payload, "testing")
- self.assertEquals(inner.crc, -386704890)
-
- @unittest.expectedFailure
+ payloads = ["v1", "v2"]
+ msg = create_gzip_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
+ KafkaProtocol.CODEC_GZIP)
+ self.assertEqual(msg.key, None)
+ # Need to decode to check since gzipped payload is non-deterministic
+ decoded = gzip_decode(msg.value)
+ expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2"
+ "\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff\xff"
+ "\xff\xff\x00\x00\x00\x02v2")
+ self.assertEqual(decoded, expect)
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
- msg = KafkaClient.create_snappy_message("testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 2)
- self.assertEquals(msg.crc, -62350868)
- (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload))
- inner = messages[0]
- self.assertEquals(inner.magic, 1)
- self.assertEquals(inner.attributes, 0)
- self.assertEquals(inner.payload, "testing")
- self.assertEquals(inner.crc, -386704890)
-
- @unittest.expectedFailure
- def test_message_simple(self):
- msg = KafkaClient.create_message("testing")
- enc = KafkaClient.encode_message(msg)
- expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
- self.assertEquals(enc, expect)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], msg)
-
- @unittest.expectedFailure
- def test_message_list(self):
- msgs = [
- KafkaClient.create_message("one"),
- KafkaClient.create_message("two"),
- KafkaClient.create_message("three")
- ]
- enc = KafkaClient.encode_message_set(msgs)
- expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11"
- "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three")
- self.assertEquals(enc, expect)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- @unittest.expectedFailure
- def test_message_gzip(self):
- msg = KafkaClient.create_gzip_message("one", "two", "three")
- enc = KafkaClient.encode_message(msg)
- # Can't check the bytes directly since Gzip is non-deterministic
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- @unittest.expectedFailure
- def test_message_snappy(self):
- msg = KafkaClient.create_snappy_message("one", "two", "three")
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- @unittest.expectedFailure
- def test_message_simple_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(0, 10)
- msgs = [KafkaClient.create_message(random_string()) for j in range(n)]
- enc = KafkaClient.encode_message_set(msgs)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j], msgs[j])
-
- @unittest.expectedFailure
- def test_message_gzip_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(1, 10)
- strings = [random_string() for j in range(n)]
- msg = KafkaClient.create_gzip_message(*strings)
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j].payload, strings[j])
-
- @unittest.expectedFailure
- def test_message_snappy_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(1, 10)
- strings = [random_string() for j in range(n)]
- msg = KafkaClient.create_snappy_message(*strings)
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j].payload, strings[j])
-
-
-class TestRequests(unittest.TestCase):
- @unittest.expectedFailure
- def test_produce_request(self):
- req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
- enc = KafkaClient.encode_produce_request(req)
- expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
- self.assertEquals(enc, expect)
-
- @unittest.expectedFailure
- def test_fetch_request(self):
- req = FetchRequest("my-topic", 0, 0, 1024)
- enc = KafkaClient.encode_fetch_request(req)
- expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00"
- self.assertEquals(enc, expect)
+ payloads = ["v1", "v2"]
+ msg = create_snappy_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
+ KafkaProtocol.CODEC_SNAPPY)
+ self.assertEqual(msg.key, None)
+ expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff"
+ "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff"
+ "\xff\xff\xff\x00\x00\x00\x02v2")
+ self.assertEqual(msg.value, expect)
+
+ def test_encode_message_header(self):
+ expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3'
+ encoded = KafkaProtocol._encode_message_header("client3", 4, 10)
+ self.assertEqual(encoded, expect)
+
+ def test_encode_message(self):
+ message = create_message("test", "key")
+ encoded = KafkaProtocol._encode_message(message)
+ expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
+ self.assertEqual(encoded, expect)
+
+ def test_encode_message_failure(self):
+ self.assertRaises(Exception, KafkaProtocol._encode_message,
+ Message(1, 0, "key", "test"))
+
+ def test_encode_message_set(self):
+ message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
+ encoded = KafkaProtocol._encode_message_set(message_set)
+ expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00"
+ "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00"
+ "\x00\x00\x02k2\x00\x00\x00\x02v2")
+ self.assertEqual(encoded, expect)
+
+ def test_decode_message(self):
+ encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
+ offset = 10
+ (returned_offset, decoded_message) = \
+ list(KafkaProtocol._decode_message(encoded, offset))[0]
+ self.assertEqual(returned_offset, offset)
+ self.assertEqual(decoded_message, create_message("test", "key"))
+
+ def test_decode_message_set(self):
+ encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2'
+ '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff'
+ '\xff\xff\xff\x00\x00\x00\x02v2')
+ iter = KafkaProtocol._decode_message_set_iter(encoded)
+ decoded = list(iter)
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ @unittest.skipUnless(has_gzip(), "Gzip not available")
+ def test_decode_message_gzip(self):
+ gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
+ '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
+ '\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
+ '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
+ '\x00')
+ offset = 11
+ decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset))
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_decode_message_snappy(self):
+ snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
+ '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
+ '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
+ '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
+ offset = 11
+ decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset))
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ def test_decode_message_checksum_error(self):
+ invalid_encoded_message = "This is not a valid encoded message"
+ iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
+ self.assertRaises(ChecksumError, list, iter)
+
+ # NOTE: The error handling in _decode_message_set_iter() is questionable.
+ # If it's modified, the next two tests might need to be fixed.
+ def test_decode_message_set_fetch_size_too_small(self):
+ iter = KafkaProtocol._decode_message_set_iter('a')
+ self.assertRaises(ConsumerFetchSizeTooSmall, list, iter)
+
+ def test_decode_message_set_stop_iteration(self):
+ encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2'
+ '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff'
+ '\xff\xff\xff\x00\x00\x00\x02v2')
+ iter = KafkaProtocol._decode_message_set_iter(encoded + "@#$%(Y!")
+ decoded = list(iter)
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ def test_encode_produce_request(self):
+ requests = [ProduceRequest("topic1", 0, [create_message("a"),
+ create_message("b")]),
+ ProduceRequest("topic2", 1, [create_message("c")])]
+ expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07'
+ 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1'
+ '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff'
+ '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00'
+ '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00'
+ '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01'
+ '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00'
+ '\x01c')
+ encoded = KafkaProtocol.encode_produce_request("client1", 2, requests,
+ 2, 100)
+ self.assertEqual(encoded, expect)
+
+ def test_decode_produce_response(self):
+ t1 = "topic1"
+ t2 = "topic2"
+ encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
+ 2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L,
+ len(t2), t2, 1, 0, 0, 30L)
+ responses = list(KafkaProtocol.decode_produce_response(encoded))
+ self.assertEqual(responses,
+ [ProduceResponse(t1, 0, 0, 10L),
+ ProduceResponse(t1, 1, 1, 20L),
+ ProduceResponse(t2, 0, 0, 30L)])
+
+ def test_encode_fetch_request(self):
+ requests = [FetchRequest("topic1", 0, 10, 1024),
+ FetchRequest("topic2", 1, 20, 100)]
+ expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
+ 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
+ '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
+ 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
+ '\x00\x00\x14\x00\x00\x00d')
+ encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2,
+ 100)
+ self.assertEqual(encoded, expect)
+
+ def test_decode_fetch_response(self):
+ t1 = "topic1"
+ t2 = "topic2"
+ msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"])
+ ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
+ ms2 = KafkaProtocol._encode_message_set([msgs[2]])
+ ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
+
+ encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' %
+ (len(t1), len(ms1), len(ms2), len(t2), len(ms3)),
+ 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1,
+ 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30,
+ len(ms3), ms3)
+
+ responses = list(KafkaProtocol.decode_fetch_response(encoded))
+ def expand_messages(response):
+ return FetchResponse(response.topic, response.partition,
+ response.error, response.highwaterMark,
+ list(response.messages))
+
+ expanded_responses = map(expand_messages, responses)
+ expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
+ OffsetAndMessage(0, msgs[1])]),
+ FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
+ FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
+ OffsetAndMessage(0, msgs[4])])]
+ self.assertEqual(expanded_responses, expect)
+
+ def test_encode_metadata_request_no_topics(self):
+ encoded = KafkaProtocol.encode_metadata_request("cid", 4)
+ self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00'
+ '\x00\x04\x00\x03cid\x00\x00\x00\x00')
+
+ def test_encode_metadata_request_with_topics(self):
+ encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"])
+ self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00'
+ '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02'
+ 't1\x00\x02t2')
+
+ def _create_encoded_metadata_response(self, broker_data, topic_data,
+ topic_errors, partition_errors):
+ encoded = struct.pack('>ii', 3, len(broker_data))
+ for node_id, broker in broker_data.iteritems():
+ encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
+ len(broker.host), broker.host, broker.port)
+
+ encoded += struct.pack('>i', len(topic_data))
+ for topic, partitions in topic_data.iteritems():
+ encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
+ len(topic), topic, len(partitions))
+ for partition, metadata in partitions.iteritems():
+ encoded += struct.pack('>hiii',
+ partition_errors[(topic, partition)],
+ partition, metadata.leader,
+ len(metadata.replicas))
+ if len(metadata.replicas) > 0:
+ encoded += struct.pack('>%di' % len(metadata.replicas),
+ *metadata.replicas)
+
+ encoded += struct.pack('>i', len(metadata.isr))
+ if len(metadata.isr) > 0:
+ encoded += struct.pack('>%di' % len(metadata.isr),
+ *metadata.isr)
+
+ return encoded
+
+ def test_decode_metadata_response(self):
+ node_brokers = {
+ 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
+ 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
+ 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
+ }
+ topic_partitions = {
+ "topic1": {
+ 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
+ 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1))
+ },
+ "topic2": {
+ 0: PartitionMetadata("topic2", 0, 0, (), ())
+ }
+ }
+ topic_errors = {"topic1": 0, "topic2": 1}
+ partition_errors = {
+ ("topic1", 0): 0,
+ ("topic1", 1): 1,
+ ("topic2", 0): 0
+ }
+ encoded = self._create_encoded_metadata_response(node_brokers,
+ topic_partitions,
+ topic_errors,
+ partition_errors)
+ decoded = KafkaProtocol.decode_metadata_response(encoded)
+ self.assertEqual(decoded, (node_brokers, topic_partitions))
+
+ @unittest.skip("Not Implemented")
+ def test_encode_offset_request(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_decode_offset_response(self):
+ pass
+
+
+ @unittest.skip("Not Implemented")
+ def test_encode_offset_commit_request(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_decode_offset_commit_response(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_encode_offset_fetch_request(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_decode_offset_fetch_response(self):
+ pass
if __name__ == '__main__':