summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
committermrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
commit84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (patch)
treee3d03da4eeecf8eab2dc63cf113a4daf82addf72
parent0bdff4e833f73518a7219fca04dfbc3ed201b06e (diff)
parent4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff)
downloadkafka-python-84de472a4d5b583ff3ed6cc6d92250a7c9291ceb.tar.gz
Merge branch 'master' into multihosts
Conflicts: kafka/client.py kafka/conn.py setup.py test/test_integration.py test/test_unit.py
-rw-r--r--README.md31
m---------kafka-src0
-rw-r--r--kafka/client.py211
-rw-r--r--kafka/common.py35
-rw-r--r--kafka/conn.py93
-rw-r--r--kafka/consumer.py337
-rw-r--r--kafka/producer.py104
-rw-r--r--kafka/protocol.py30
-rw-r--r--kafka/queue.py2
-rw-r--r--setup.py4
-rw-r--r--test/fixtures.py43
-rw-r--r--test/test_integration.py318
-rw-r--r--test/test_unit.py476
13 files changed, 1017 insertions, 667 deletions
diff --git a/README.md b/README.md
index edf3931..a315db6 100644
--- a/README.md
+++ b/README.md
@@ -17,9 +17,8 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
# Status
-I'm following the version numbers of Kafka, plus one number to indicate the
-version of this project. The current version is 0.8.0-1. This version is under
-development, APIs are subject to change.
+The current version of this package is **0.9.0** and is compatible with
+Kafka brokers running version **0.8.1**.
# Usage
@@ -33,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost:9092")
# To send messages synchronously
-producer = SimpleProducer(kafka, "my-topic")
-producer.send_messages("some message")
-producer.send_messages("this method", "is variadic")
+producer = SimpleProducer(kafka)
+producer.send_messages("my-topic", "some message")
+producer.send_messages("my-topic", "this method", "is variadic")
# To send messages asynchronously
-producer = SimpleProducer(kafka, "my-topic", async=True)
-producer.send_messages("async message")
+producer = SimpleProducer(kafka, async=True)
+producer.send_messages("my-topic", "async message")
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
-producer = SimpleProducer(kafka, "my-topic", async=False,
+producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- acks_timeout=2000)
+ ack_timeout=2000)
-response = producer.send_messages("async message")
+response = producer.send_messages("my-topic", "async message")
if response:
print(response[0].error)
@@ -63,7 +62,7 @@ if response:
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
-producer = SimpleProducer(kafka, "my-topic", batch_send=True,
+producer = SimpleProducer(kafka, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
@@ -84,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost:9092")
# HashedPartitioner is default
-producer = KeyedProducer(kafka, "my-topic")
-producer.send("key1", "some message")
-producer.send("key2", "this methode")
+producer = KeyedProducer(kafka)
+producer.send("my-topic", "key1", "some message")
+producer.send("my-topic", "key2", "this methode")
-producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
+producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
```
## Multiprocess consumer
diff --git a/kafka-src b/kafka-src
-Subproject 5bd33c1517bb2e7734166dc3e787ac90a4ef8f8
+Subproject 15bb3961d9171c1c54c4c840a554ce2c7616816
diff --git a/kafka/client.py b/kafka/client.py
index 81eec7d..33c4419 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,14 +1,16 @@
import copy
+import logging
+
from collections import defaultdict
from functools import partial
from itertools import count
-import logging
-import socket
-import time
-from kafka.common import ErrorMapping, TopicAndPartition
-from kafka.common import ConnectionError, FailedPayloadsException
-from kafka.conn import collect_hosts, KafkaConnection
+from kafka.common import (ErrorMapping, TopicAndPartition,
+ ConnectionError, FailedPayloadsError,
+ BrokerResponseError, PartitionUnavailableError,
+ KafkaUnavailableError, KafkaRequestError)
+
+from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -19,19 +21,21 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
+ # NOTE: The timeout given to the client should always be greater than the
+ # one passed to SimpleConsumer.get_message(), otherwise you can get a
+ # socket timeout.
+ def __init__(self, host, port, client_id=CLIENT_ID,
+ timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
-
- self.hosts = collect_hosts(hosts)
-
- # create connections only when we need them
- self.conns = {}
+ self.timeout = timeout
+ self.conns = { # (host, port) -> KafkaConnection
+ (host, port): KafkaConnection(host, port, timeout=timeout)
+ }
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
- self._load_metadata_for_topics()
+ self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
@@ -47,62 +51,25 @@ class KafkaClient(object):
return self.conns[host_key]
def _get_conn_for_broker(self, broker):
- "Get or create a connection to a broker"
+ """
+ Get or create a connection to a broker
+ """
+ if (broker.host, broker.port) not in self.conns:
+ self.conns[(broker.host, broker.port)] = \
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
- self._load_metadata_for_topics(topic)
+ self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
- raise Exception("Partition does not exist: %s" % str(key))
+ raise KafkaRequestError("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
- def _load_metadata_for_topics(self, *topics):
- """
- Discover brokers and metadata for a set of topics. This method will
- recurse in the event of a retry.
- """
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
-
- response = self._send_broker_unaware_request(request_id, request)
- if response is None:
- raise Exception("All servers failed to process request")
-
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
-
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
-
- self.brokers = brokers
- self.topics_to_brokers = {}
-
- for topic, partitions in topics.items():
- # Clear the list once before we add it. This removes stale entries
- # and avoids duplicates
- self.topic_partitions.pop(topic, None)
-
- if not partitions:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- break
-
- for partition, meta in partitions.items():
- if meta.leader == -1:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- else:
- topic_part = TopicAndPartition(topic, partition)
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
- self.topic_partitions[topic].append(partition)
-
def _next_id(self):
"""
Generate a new correlation id
@@ -125,7 +92,7 @@ class KafkaClient(object):
"trying next server: %s" % (request, conn, e))
continue
- return None
+ raise KafkaUnavailableError("All servers failed to process request")
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -156,6 +123,8 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
+ if leader == -1:
+ raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -172,30 +141,73 @@ class KafkaClient(object):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
+ failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
- response = conn.recv(requestId)
- except ConnectionError, e: # ignore BufferUnderflow for now
- log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError, e:
+ log.warning("Could not receive response to request [%s] "
+ "from server %s: %s", request, conn, e)
+ failed = True
+ except ConnectionError, e:
+ log.warning("Could not send request [%s] to server %s: %s",
+ request, conn, e)
+ failed = True
+
+ if failed:
failed_payloads += payloads
- self.topics_to_brokers = {} # reset metadata
+ self.reset_all_metadata()
continue
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
if failed_payloads:
- raise FailedPayloadsException(failed_payloads)
+ raise FailedPayloadsError(failed_payloads)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
+ def __repr__(self):
+ return '<KafkaClient client_id=%s>' % (self.client_id)
+
+ def _raise_on_response_error(self, resp):
+ if resp.error == ErrorMapping.NO_ERROR:
+ return
+
+ if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
+ ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ self.reset_topic_metadata(resp.topic)
+
+ raise BrokerResponseError(
+ "Request for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error))
+
#################
# Public API #
#################
+ def reset_topic_metadata(self, *topics):
+ for topic in topics:
+ try:
+ partitions = self.topic_partitions[topic]
+ except KeyError:
+ continue
+
+ for partition in partitions:
+ self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
+
+ del self.topic_partitions[topic]
+
+ def reset_all_metadata(self):
+ self.topics_to_brokers.clear()
+ self.topic_partitions.clear()
+
+ def has_metadata_for_topic(self, topic):
+ return topic in self.topic_partitions
def close(self):
for conn in self.conns.values():
@@ -215,6 +227,36 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.reinit()
+ def load_metadata_for_topics(self, *topics):
+ """
+ Discover brokers and metadata for a set of topics. This function is called
+ lazily whenever metadata is unavailable.
+ """
+ request_id = self._next_id()
+ request = KafkaProtocol.encode_metadata_request(self.client_id,
+ request_id, topics)
+
+ response = self._send_broker_unaware_request(request_id, request)
+
+ (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+
+ log.debug("Broker metadata: %s", brokers)
+ log.debug("Topic metadata: %s", topics)
+
+ self.brokers = brokers
+
+ for topic, partitions in topics.items():
+ self.reset_topic_metadata(topic)
+
+ if not partitions:
+ continue
+
+ self.topic_partitions[topic] = []
+ for partition, meta in partitions.items():
+ topic_part = TopicAndPartition(topic, partition)
+ self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topic_partitions[topic].append(partition)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -252,14 +294,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "ProduceRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -285,14 +322,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "FetchRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -308,9 +340,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
@@ -326,9 +357,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with "
- "errorcode=%s", resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
@@ -346,9 +376,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
diff --git a/kafka/common.py b/kafka/common.py
index 6f0dd32..c0a1a6a 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -69,23 +69,46 @@ class ErrorMapping(object):
# Exceptions #
#################
-class FailedPayloadsException(Exception):
+
+class KafkaError(RuntimeError):
+ pass
+
+
+class KafkaRequestError(KafkaError):
+ pass
+
+
+class KafkaUnavailableError(KafkaError):
+ pass
+
+
+class BrokerResponseError(KafkaError):
pass
-class ConnectionError(Exception):
+
+class PartitionUnavailableError(KafkaError):
+ pass
+
+
+class FailedPayloadsError(KafkaError):
pass
-class BufferUnderflowError(Exception):
+
+class ConnectionError(KafkaError):
+ pass
+
+
+class BufferUnderflowError(KafkaError):
pass
-class ChecksumError(Exception):
+class ChecksumError(KafkaError):
pass
-class ConsumerFetchSizeTooSmall(Exception):
+class ConsumerFetchSizeTooSmall(KafkaError):
pass
-class ConsumerNoMoreData(Exception):
+class ConsumerNoMoreData(KafkaError):
pass
diff --git a/kafka/conn.py b/kafka/conn.py
index 614b1bb..de2d385 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,11 +5,11 @@ import struct
from random import shuffle
from threading import local
-from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
+DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
def collect_hosts(hosts, randomize=True):
"""
@@ -39,64 +39,53 @@ class KafkaConnection(local):
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
+
+ host: the host name or IP address of a kafka broker
+ port: the port number the kafka broker is listening on
+ timeout: default 120. The socket timeout for sending and receiving data
+ in seconds. None means no timeout, so a request can block forever.
"""
- def __init__(self, host, port, bufsize=4096, timeout=10):
+ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self.bufsize = bufsize
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((host, port))
self.timeout = timeout
-
- self._sock = socket.create_connection((host, port), timeout=timeout)
+ self._sock.settimeout(self.timeout)
self._dirty = False
- def __str__(self):
+ def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
###################
# Private API #
###################
- def _consume_response(self):
- """
- Fully consumer the response iterator
- """
- data = ""
- for chunk in self._consume_response_iter():
- data += chunk
- return data
-
- def _consume_response_iter(self):
- """
- This method handles the response header and error messages. It
- then returns an iterator for the chunks of the response
- """
- log.debug("Handling response from Kafka")
-
- # Read the size off of the header
- resp = self._sock.recv(4)
- if resp == "":
- self._raise_connection_error()
- (size,) = struct.unpack('>i', resp)
-
- messagesize = size - 4
- log.debug("About to read %d bytes from Kafka", messagesize)
-
- # Read the remainder of the response
- total = 0
- while total < messagesize:
- resp = self._sock.recv(self.bufsize)
- log.debug("Read %d bytes from Kafka", len(resp))
- if resp == "":
- raise BufferUnderflowError(
- "Not enough data to read this response")
-
- total += len(resp)
- yield resp
-
def _raise_connection_error(self):
self._dirty = True
- raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+ raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
+
+ def _read_bytes(self, num_bytes):
+ bytes_left = num_bytes
+ resp = ''
+ log.debug("About to read %d bytes from Kafka", num_bytes)
+ if self._dirty:
+ self.reinit()
+ while bytes_left:
+ try:
+ data = self._sock.recv(bytes_left)
+ except socket.error:
+ log.exception('Unable to receive data from Kafka')
+ self._raise_connection_error()
+ if data == '':
+ log.error("Not enough data to read this response")
+ self._raise_connection_error()
+ bytes_left -= len(data)
+ log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
+ resp += data
+
+ return resp
##################
# Public API #
@@ -113,7 +102,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error:
+ except socket.error, e:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()
@@ -122,8 +111,14 @@ class KafkaConnection(local):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
- self.data = self._consume_response()
- return self.data
+ # Read the size off of the header
+ resp = self._read_bytes(4)
+
+ (size,) = struct.unpack('>i', resp)
+
+ # Read the remainder of the response
+ resp = self._read_bytes(size)
+ return str(resp)
def copy(self):
"""
@@ -146,5 +141,7 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
self.close()
- self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((self.host, self.port))
+ self._sock.settimeout(self.timeout)
self._dirty = False
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f2898ad..28b53ec 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,10 +1,11 @@
-from collections import defaultdict
+from __future__ import absolute_import
+
from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -22,6 +23,11 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
class FetchContext(object):
@@ -32,13 +38,15 @@ class FetchContext(object):
self.consumer = consumer
self.block = block
- if block and not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
-
- self.timeout = timeout * 1000
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
@@ -46,9 +54,9 @@ class FetchContext(object):
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
- """Reset values to default"""
- self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class Consumer(object):
@@ -67,7 +75,7 @@ class Consumer(object):
self.client = client
self.topic = topic
self.group = group
- self.client._load_metadata_for_topics(topic)
+ self.client.load_metadata_for_topics(topic)
self.offsets = {}
if not partitions:
@@ -204,8 +212,14 @@ class SimpleConsumer(Consumer):
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
-
fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -216,13 +230,10 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
-
- self.partition_info = False # Do not return partition info in msgs
- self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = fetch_size_bytes
- self.fetch_started = defaultdict(bool) # defaults to false
-
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -230,6 +241,23 @@ class SimpleConsumer(Consumer):
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = fetch_size_bytes
+ self.fetch_offsets = self.offsets.copy()
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
+
+ def __repr__(self):
+ return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
+ (self.group, self.topic, str(self.offsets.keys()))
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -265,12 +293,6 @@ class SimpleConsumer(Consumer):
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
-
- # The API returns back the next available offset
- # For eg: if the current offset is 18, the API will return
- # back 19. So, if we have to seek 5 points before, we will
- # end up going back to 14, instead of 13. Adjust this
- deltas[partition] -= 1
else:
pass
@@ -281,128 +303,148 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ self.queue = Queue()
+
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
- timeout = timeout * 1.0 / len(self.offsets)
-
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
count -= 1
-
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout is not None:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
+
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
return messages
- def __iter__(self):
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
"""
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
+ If no messages can be fetched, returns None.
+ If get_partition_info is None, it defaults to self.partition_info
+ If get_partition_info is True, returns (partition, message)
+ If get_partition_info is False, returns message
"""
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
-
- if len(iters) == 0:
- return
-
- while True:
- if len(iters) == 0:
- break
-
- for partition, it in iters.items():
- try:
- if self.partition_info:
- yield (partition, it.next())
- else:
- yield it.next()
- except StopIteration:
- log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ partition, message = self.queue.get_nowait()
- # skip auto-commit since we didn't yield anything
- continue
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
+ return partition, message
+ else:
+ return message
+ except Empty:
+ return None
- fetch_size = self.fetch_min_bytes
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
-
- next_offset = None
- try:
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
- except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
- except ConsumerNoMoreData, e:
- log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
else:
- offset = next_offset + 1
+ # Timed out waiting for a message
+ break
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ requests = []
+ partitions = self.fetch_offsets.keys()
+ while partitions:
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition,
+ self.fetch_offsets[partition],
+ self.buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = set()
+ for resp in responses:
+ partition = resp.partition
+ try:
+ for message in resp.messages:
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
+ except ConsumerFetchSizeTooSmall, e:
+ if (self.max_buffer_size is not None and
+ self.buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise e
+ if self.max_buffer_size is None:
+ self.buffer_size *= 2
+ else:
+ self.buffer_size = max(self.buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", self.buffer_size)
+ retry_partitions.add(partition)
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
@@ -440,8 +482,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -451,12 +494,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
- time.sleep(0.1)
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
@@ -501,7 +543,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -535,6 +577,10 @@ class MultiProcessConsumer(Consumer):
proc.start()
self.procs.append(proc)
+ def __repr__(self):
+ return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
+ (self.group, self.topic, len(self.procs))
+
def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()
@@ -568,12 +614,11 @@ class MultiProcessConsumer(Consumer):
break
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.start.clear()
- yield message
-
self.count_since_commit += 1
self._auto_commit()
+ yield message
self.start.clear()
@@ -583,8 +628,9 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
@@ -595,7 +641,11 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -609,15 +659,18 @@ class MultiProcessConsumer(Consumer):
break
messages.append(message)
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
- self.count_since_commit += 1
- self._auto_commit()
+ new_offsets[partition] = message.offset + 1
count -= 1
+ if timeout is not None:
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()
self.pause.set()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+
return messages
diff --git a/kafka/producer.py b/kafka/producer.py
index 7ef7896..12a2934 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,15 +1,16 @@
+from __future__ import absolute_import
+
+import logging
+import time
+
+from Queue import Empty
from collections import defaultdict
-from datetime import datetime, timedelta
from itertools import cycle
from multiprocessing import Queue, Process
-from Queue import Empty
-import logging
-import sys
-from kafka.common import ProduceRequest
-from kafka.common import FailedPayloadsException
-from kafka.protocol import create_message
+from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner
+from kafka.protocol import create_message
log = logging.getLogger("kafka")
@@ -19,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1
-def _send_upstream(topic, queue, client, batch_time, batch_size,
+def _send_upstream(queue, client, batch_time, batch_size,
req_acks, ack_timeout):
"""
Listen on the queue for a specified number of messages or till
@@ -36,38 +37,41 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
while not stop:
timeout = batch_time
count = batch_size
- send_at = datetime.now() + timedelta(seconds=timeout)
+ send_at = time.time() + timeout
msgset = defaultdict(list)
# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
try:
- partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg = queue.get(timeout=timeout)
+
except Empty:
break
# Check if the controller has requested us to stop
- if partition == STOP_ASYNC_PRODUCER:
+ if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
- timeout = (send_at - datetime.now()).total_seconds()
- msgset[partition].append(msg)
+ timeout = send_at - time.time()
+ msgset[topic_partition].append(msg)
# Send collected requests upstream
reqs = []
- for partition, messages in msgset.items():
- req = ProduceRequest(topic, partition, messages)
+ for topic_partition, messages in msgset.items():
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
reqs.append(req)
try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
- except Exception as exp:
+ except Exception:
log.exception("Unable to send message")
@@ -77,7 +81,6 @@ class Producer(object):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -118,8 +121,7 @@ class Producer(object):
if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
- args=(self.topic,
- self.queue,
+ args=(self.queue,
self.client.copy(),
batch_send_every_t,
batch_send_every_n,
@@ -130,23 +132,24 @@ class Producer(object):
self.proc.daemon = True
self.proc.start()
- def send_messages(self, partition, *msg):
+ def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
"""
if self.async:
for m in msg:
- self.queue.put((partition, create_message(m)))
+ self.queue.put((TopicAndPartition(topic, partition),
+ create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
- req = ProduceRequest(self.topic, partition, messages)
+ req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
- except Exception as e:
+ except Exception:
log.exception("Unable to send messages")
- raise e
+ raise
return resp
def stop(self, timeout=1):
@@ -168,7 +171,6 @@ class SimpleProducer(Producer):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -179,24 +181,31 @@ class SimpleProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, async=False,
+ def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client._load_metadata_for_topics(topic)
- self.next_partition = cycle(client.topic_partitions[topic])
-
+ self.partition_cycles = {}
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send_messages(self, *msg):
- partition = self.next_partition.next()
- return super(SimpleProducer, self).send_messages(partition, *msg)
+ def _next_partition(self, topic):
+ if topic not in self.partition_cycles:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ return self.partition_cycles[topic].next()
+
+ def send_messages(self, topic, *msg):
+ partition = self._next_partition(topic)
+ return super(SimpleProducer, self).send_messages(topic, partition, *msg)
+
+ def __repr__(self):
+ return '<SimpleProducer batch=%s>' % self.async
class KeyedProducer(Producer):
@@ -205,7 +214,6 @@ class KeyedProducer(Producer):
Args:
client - The kafka client instance
- topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
@@ -216,26 +224,34 @@ class KeyedProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, partitioner=None, async=False,
+ def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client._load_metadata_for_topics(topic)
-
if not partitioner:
partitioner = HashedPartitioner
-
- self.partitioner = partitioner(client.topic_partitions[topic])
+ self.partitioner_class = partitioner
+ self.partitioners = {}
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send(self, key, msg):
- partitions = self.client.topic_partitions[self.topic]
- partition = self.partitioner.partition(key, partitions)
- return self.send_messages(partition, msg)
+ def _next_partition(self, topic, key):
+ if topic not in self.partitioners:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partitioners[topic] = \
+ self.partitioner_class(self.client.topic_partitions[topic])
+ partitioner = self.partitioners[topic]
+ return partitioner.partition(key, self.client.topic_partitions[topic])
+
+ def send(self, topic, key, msg):
+ partition = self._next_partition(topic, key)
+ return self.send_messages(topic, partition, msg)
+
+ def __repr__(self):
+ return '<KeyedProducer batch=%s>' % self.async
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 612acf6..25be023 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -29,8 +29,8 @@ class KafkaProtocol(object):
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
- OFFSET_COMMIT_KEY = 6
- OFFSET_FETCH_KEY = 7
+ OFFSET_COMMIT_KEY = 8
+ OFFSET_FETCH_KEY = 9
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
@@ -119,9 +119,17 @@ class KafkaProtocol(object):
read_message = True
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
+ # NOTE: Not sure this is correct error handling:
+ # Is it possible to get a BUE if the message set is somewhere
+ # in the middle of the fetch response? If so, we probably have
+ # an issue that's not fetch size too small.
+ # Aren't we ignoring errors if we fail to unpack data by
+ # raising StopIteration()?
+ # If _decode_message() raises a ChecksumError, couldn't that
+ # also be due to the fetch size being too small?
if read_message is False:
# If we get a partial read of a message, but haven't
- # yielded anyhting there's a problem
+ # yielded anything there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@@ -171,7 +179,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
@@ -231,7 +239,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before
@@ -338,7 +346,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
topics: list of strings
"""
topics = [] if topics is None else topics
@@ -376,12 +384,16 @@ class KafkaProtocol(object):
topic_metadata = {}
for i in range(num_topics):
+ # NOTE: topic_error is discarded. Should probably be returned with
+ # the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = {}
for j in range(num_partitions):
+ # NOTE: partition_error_code is discarded. Should probably be
+ # returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -408,7 +420,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
@@ -439,7 +451,6 @@ class KafkaProtocol(object):
data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
- (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_topics):
@@ -459,7 +470,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
@@ -490,7 +501,6 @@ class KafkaProtocol(object):
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
- (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in range(num_topics):
diff --git a/kafka/queue.py b/kafka/queue.py
index a996369..ada495f 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import
+
from copy import copy
import logging
from multiprocessing import Process, Queue, Event
diff --git a/setup.py b/setup.py
index fedf139..4176135 100644
--- a/setup.py
+++ b/setup.py
@@ -20,9 +20,9 @@ class Tox(Command):
setup(
name="kafka-python",
- version="0.8.1-1",
+ version="0.9.0",
- install_requires=["distribute", "tox", "mock"],
+ install_requires=["distribute"],
tests_require=["tox"],
cmdclass={"test": Tox},
diff --git a/test/fixtures.py b/test/fixtures.py
index c771a58..9e283d3 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -208,9 +208,12 @@ class ZookeeperFixture(object):
self.tmp_dir = None
self.child = None
+ def out(self, message):
+ print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message))
+
def open(self):
self.tmp_dir = tempfile.mkdtemp()
- print("*** Running local Zookeeper instance...")
+ self.out("Running local instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" tmp_dir = %s" % self.tmp_dir)
@@ -229,16 +232,16 @@ class ZookeeperFixture(object):
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
- print("*** Starting Zookeeper...")
+ self.out("Starting...")
self.child.start()
self.child.wait_for(r"Snapshotting")
- print("*** Done!")
+ self.out("Done!")
def close(self):
- print("*** Stopping Zookeeper...")
+ self.out("Stopping...")
self.child.stop()
self.child = None
- print("*** Done!")
+ self.out("Done!")
shutil.rmtree(self.tmp_dir)
@@ -272,10 +275,18 @@ class KafkaFixture(object):
self.tmp_dir = None
self.child = None
+ self.running = False
+
+ def out(self, message):
+ print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message))
def open(self):
+ if self.running:
+ self.out("Instance already running")
+ return
+
self.tmp_dir = tempfile.mkdtemp()
- print("*** Running local Kafka instance")
+ self.out("Running local instance...")
print(" host = %s" % self.host)
print(" port = %s" % self.port)
print(" broker_id = %s" % self.broker_id)
@@ -303,25 +314,31 @@ class KafkaFixture(object):
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
- print("*** Creating Zookeeper chroot node...")
+ self.out("Creating Zookeeper chroot node...")
proc = subprocess.Popen(kafka_run_class_args(
"org.apache.zookeeper.ZooKeeperMain",
"-server", "%s:%d" % (self.zk_host, self.zk_port),
"create", "/%s" % self.zk_chroot, "kafka-python"
))
if proc.wait() != 0:
- print("*** Failed to create Zookeeper chroot node")
+ self.out("Failed to create Zookeeper chroot node")
raise RuntimeError("Failed to create Zookeeper chroot node")
- print("*** Done!")
+ self.out("Done!")
- print("*** Starting Kafka...")
+ self.out("Starting...")
self.child.start()
self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
- print("*** Done!")
+ self.out("Done!")
+ self.running = True
def close(self):
- print("*** Stopping Kafka...")
+ if not self.running:
+ self.out("Instance already stopped")
+ return
+
+ self.out("Stopping...")
self.child.stop()
self.child = None
- print("*** Done!")
+ self.out("Done!")
shutil.rmtree(self.tmp_dir)
+ self.running = False
diff --git a/test/test_integration.py b/test/test_integration.py
index 1f37ebf..000f44a 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -8,10 +8,36 @@ import random
from kafka import * # noqa
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
+from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from .fixtures import ZookeeperFixture, KafkaFixture
-class TestKafkaClient(unittest.TestCase):
+def random_string(l):
+ s = "".join(random.choice(string.letters) for i in xrange(l))
+ return s
+
+
+def ensure_topic_creation(client, topic_name):
+ times = 0
+ while True:
+ times += 1
+ client.load_metadata_for_topics(topic_name)
+ if client.has_metadata_for_topic(topic_name):
+ break
+ print "Waiting for %s topic to be created" % topic_name
+ time.sleep(1)
+
+ if times > 30:
+ raise Exception("Unable to create topic %s" % topic_name)
+
+
+class KafkaTestCase(unittest.TestCase):
+ def setUp(self):
+ self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
+ ensure_topic_creation(self.client, self.topic)
+
+
+class TestKafkaClient(KafkaTestCase):
@classmethod
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
@@ -29,7 +55,8 @@ class TestKafkaClient(unittest.TestCase):
#####################
def test_produce_many_simple(self):
- produce = ProduceRequest("test_produce_many_simple", 0, messages=[
+
+ produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message %d" % i) for i in range(100)
])
@@ -37,25 +64,25 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 100)
for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 100)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 200)
for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 200)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 300)
def test_produce_10k_simple(self):
- produce = ProduceRequest("test_produce_10k_simple", 0, messages=[
+ produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message %d" % i) for i in range(10000)
])
@@ -63,7 +90,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 10000)
def test_produce_many_gzip(self):
@@ -72,13 +99,13 @@ class TestKafkaClient(unittest.TestCase):
message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
- produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2])
+ produce = ProduceRequest(self.topic, 0, messages=[message1, message2])
for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 200)
def test_produce_many_snappy(self):
@@ -87,13 +114,13 @@ class TestKafkaClient(unittest.TestCase):
message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)])
message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)])
- produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2])
+ produce = ProduceRequest(self.topic, 0, messages=[message1, message2])
for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 200)
def test_produce_mixed(self):
@@ -103,17 +130,17 @@ class TestKafkaClient(unittest.TestCase):
message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)])
message3 = create_snappy_message(["Snappy %d" % i for i in range(100)])
- produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3])
+ produce = ProduceRequest(self.topic, 0, messages=[message1, message2, message3])
for resp in self.client.send_produce_request([produce]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 201)
def test_produce_100k_gzipped(self):
- req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
+ req1 = ProduceRequest(self.topic, 0, messages=[
create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
])
@@ -121,10 +148,10 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 50000)
- req2 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[
+ req2 = ProduceRequest(self.topic, 0, messages=[
create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)])
])
@@ -132,7 +159,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 50000)
- (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)])
+ (offset, ) = self.client.send_offset_request([OffsetRequest(self.topic, 0, -1, 1)])
self.assertEquals(offset.offsets[0], 100000)
#####################
@@ -140,18 +167,18 @@ class TestKafkaClient(unittest.TestCase):
#####################
def test_consume_none(self):
- fetch = FetchRequest("test_consume_none", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])[0]
self.assertEquals(fetch_resp.error, 0)
- self.assertEquals(fetch_resp.topic, "test_consume_none")
+ self.assertEquals(fetch_resp.topic, self.topic)
self.assertEquals(fetch_resp.partition, 0)
messages = list(fetch_resp.messages)
self.assertEquals(len(messages), 0)
def test_produce_consume(self):
- produce = ProduceRequest("test_produce_consume", 0, messages=[
+ produce = ProduceRequest(self.topic, 0, messages=[
create_message("Just a test message"),
create_message("Message with a key", "foo"),
])
@@ -160,7 +187,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- fetch = FetchRequest("test_produce_consume", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])[0]
self.assertEquals(fetch_resp.error, 0)
@@ -175,7 +202,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(messages[1].message.key, "foo")
def test_produce_consume_many(self):
- produce = ProduceRequest("test_produce_consume_many", 0, messages=[
+ produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message %d" % i) for i in range(100)
])
@@ -184,7 +211,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# 1024 is not enough for 100 messages...
- fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
(fetch_resp1,) = self.client.send_fetch_request([fetch1])
@@ -194,7 +221,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertTrue(len(messages) < 100)
# 10240 should be enough
- fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240)
+ fetch2 = FetchRequest(self.topic, 0, 0, 10240)
(fetch_resp2,) = self.client.send_fetch_request([fetch2])
self.assertEquals(fetch_resp2.error, 0)
@@ -207,10 +234,10 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(message.message.key, None)
def test_produce_consume_two_partitions(self):
- produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[
+ produce1 = ProduceRequest(self.topic, 0, messages=[
create_message("Partition 0 %d" % i) for i in range(10)
])
- produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[
+ produce2 = ProduceRequest(self.topic, 1, messages=[
create_message("Partition 1 %d" % i) for i in range(10)
])
@@ -218,8 +245,8 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024)
- fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2])
self.assertEquals(fetch_resp1.error, 0)
self.assertEquals(fetch_resp1.highwaterMark, 10)
@@ -244,11 +271,11 @@ class TestKafkaClient(unittest.TestCase):
@unittest.skip('commmit offset not supported in this version')
def test_commit_fetch_offsets(self):
- req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata")
+ req = OffsetCommitRequest(self.topic, 0, 42, "metadata")
(resp,) = self.client.send_offset_commit_request("group", [req])
self.assertEquals(resp.error, 0)
- req = OffsetFetchRequest("test_commit_fetch_offsets", 0)
+ req = OffsetFetchRequest(self.topic, 0)
(resp,) = self.client.send_offset_fetch_request("group", [req])
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 42)
@@ -257,8 +284,8 @@ class TestKafkaClient(unittest.TestCase):
# Producer Tests
def test_simple_producer(self):
- producer = SimpleProducer(self.client, "test_simple_producer")
- resp = producer.send_messages("one", "two")
+ producer = SimpleProducer(self.client)
+ resp = producer.send_messages(self.topic, "one", "two")
# Will go to partition 0
self.assertEquals(len(resp), 1)
@@ -266,13 +293,13 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(resp[0].offset, 0) # offset of first msg
# Will go to partition 1
- resp = producer.send_messages("three")
+ resp = producer.send_messages(self.topic, "three")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 0) # offset of first msg
- fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024)
- fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
self.assertEquals(fetch_resp1.error, 0)
@@ -288,7 +315,7 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(messages[0].message.value, "three")
# Will go to partition 0
- resp = producer.send_messages("four", "five")
+ resp = producer.send_messages(self.topic, "four", "five")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 2) # offset of first msg
@@ -296,15 +323,15 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_round_robin_partitioner(self):
- producer = KeyedProducer(self.client, "test_round_robin_partitioner",
+ producer = KeyedProducer(self.client,
partitioner=RoundRobinPartitioner)
- producer.send("key1", "one")
- producer.send("key2", "two")
- producer.send("key3", "three")
- producer.send("key4", "four")
+ producer.send(self.topic, "key1", "one")
+ producer.send(self.topic, "key2", "two")
+ producer.send(self.topic, "key3", "three")
+ producer.send(self.topic, "key4", "four")
- fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024)
- fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -330,15 +357,15 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_hashed_partitioner(self):
- producer = KeyedProducer(self.client, "test_hash_partitioner",
+ producer = KeyedProducer(self.client,
partitioner=HashedPartitioner)
- producer.send(1, "one")
- producer.send(2, "two")
- producer.send(3, "three")
- producer.send(4, "four")
+ producer.send(self.topic, 1, "one")
+ producer.send(self.topic, 2, "two")
+ producer.send(self.topic, 3, "three")
+ producer.send(self.topic, 4, "four")
- fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024)
- fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -364,12 +391,12 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_acks_none(self):
- producer = SimpleProducer(self.client, "test_acks_none",
+ producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_NOT_REQUIRED)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
- fetch = FetchRequest("test_acks_none", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
@@ -383,12 +410,12 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_acks_local_write(self):
- producer = SimpleProducer(self.client, "test_acks_local_write",
+ producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
- fetch = FetchRequest("test_acks_local_write", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
@@ -403,12 +430,12 @@ class TestKafkaClient(unittest.TestCase):
def test_acks_cluster_commit(self):
producer = SimpleProducer(
- self.client, "test_acks_cluster_commit",
+ self.client,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
- fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
@@ -422,16 +449,14 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_async_simple_producer(self):
- producer = SimpleProducer(self.client, "test_async_simple_producer",
- async=True)
-
- resp = producer.send_messages("one")
+ producer = SimpleProducer(self.client, async=True)
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
# Give it some time
time.sleep(2)
- fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
@@ -445,16 +470,15 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_async_keyed_producer(self):
- producer = KeyedProducer(self.client, "test_async_keyed_producer",
- async=True)
+ producer = KeyedProducer(self.client, async=True)
- resp = producer.send("key1", "one")
+ resp = producer.send(self.topic, "key1", "one")
self.assertEquals(len(resp), 0)
# Give it some time
time.sleep(2)
- fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024)
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp = self.client.send_fetch_request([fetch])
self.assertEquals(fetch_resp[0].error, 0)
@@ -468,14 +492,14 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
def test_batched_simple_producer(self):
- producer = SimpleProducer(self.client, "test_batched_simple_producer",
+ producer = SimpleProducer(self.client,
batch_send=True,
batch_send_every_n=10,
batch_send_every_t=20)
# Send 5 messages and do a fetch
msgs = ["message-%d" % i for i in range(0, 5)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
@@ -483,8 +507,8 @@ class TestKafkaClient(unittest.TestCase):
# Give it some time
time.sleep(2)
- fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024)
- fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -498,13 +522,13 @@ class TestKafkaClient(unittest.TestCase):
# Send 5 more messages, wait for 2 seconds and do a fetch
msgs = ["message-%d" % i for i in range(5, 10)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
# Give it some time
time.sleep(2)
- fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024)
- fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 0, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 0, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -518,12 +542,12 @@ class TestKafkaClient(unittest.TestCase):
# Send 7 messages and wait for 20 seconds
msgs = ["message-%d" % i for i in range(10, 15)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
msgs = ["message-%d" % i for i in range(15, 17)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
- fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024)
- fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 5, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 5, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -535,8 +559,8 @@ class TestKafkaClient(unittest.TestCase):
# Give it some time
time.sleep(22)
- fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024)
- fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024)
+ fetch1 = FetchRequest(self.topic, 0, 5, 1024)
+ fetch2 = FetchRequest(self.topic, 1, 5, 1024)
fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
fetch2])
@@ -548,7 +572,7 @@ class TestKafkaClient(unittest.TestCase):
producer.stop()
-class TestConsumer(unittest.TestCase):
+class TestConsumer(KafkaTestCase):
@classmethod
def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
@@ -565,7 +589,7 @@ class TestConsumer(unittest.TestCase):
def test_simple_consumer(self):
# Produce 100 messages to partition 0
- produce1 = ProduceRequest("test_simple_consumer", 0, messages=[
+ produce1 = ProduceRequest(self.topic, 0, messages=[
create_message("Test message 0 %d" % i) for i in range(100)
])
@@ -574,7 +598,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Produce 100 messages to partition 1
- produce2 = ProduceRequest("test_simple_consumer", 1, messages=[
+ produce2 = ProduceRequest(self.topic, 1, messages=[
create_message("Test message 1 %d" % i) for i in range(100)
])
@@ -583,7 +607,9 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ self.topic, auto_commit=False,
+ iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -609,7 +635,9 @@ class TestConsumer(unittest.TestCase):
consumer.stop()
def test_simple_consumer_blocking(self):
- consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1",
+ self.topic,
+ auto_commit=False, iter_timeout=0)
# Blocking API
start = datetime.now()
@@ -619,7 +647,7 @@ class TestConsumer(unittest.TestCase):
self.assertEqual(len(messages), 0)
# Send 10 messages
- produce = ProduceRequest("test_simple_consumer_blocking", 0, messages=[
+ produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
@@ -643,21 +671,22 @@ class TestConsumer(unittest.TestCase):
def test_simple_consumer_pending(self):
# Produce 10 messages to partition 0 and 1
- produce1 = ProduceRequest("test_simple_pending", 0, messages=[
+ produce1 = ProduceRequest(self.topic, 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- produce2 = ProduceRequest("test_simple_pending", 1, messages=[
+ produce2 = ProduceRequest(self.topic, 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", self.topic,
+ auto_commit=False, iter_timeout=0)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -665,7 +694,7 @@ class TestConsumer(unittest.TestCase):
def test_multi_process_consumer(self):
# Produce 100 messages to partition 0
- produce1 = ProduceRequest("test_mpconsumer", 0, messages=[
+ produce1 = ProduceRequest(self.topic, 0, messages=[
create_message("Test message 0 %d" % i) for i in range(100)
])
@@ -674,7 +703,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Produce 100 messages to partition 1
- produce2 = ProduceRequest("test_mpconsumer", 1, messages=[
+ produce2 = ProduceRequest(self.topic, 1, messages=[
create_message("Test message 1 %d" % i) for i in range(100)
])
@@ -683,7 +712,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.offset, 0)
# Start a consumer
- consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False)
+ consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -696,11 +725,11 @@ class TestConsumer(unittest.TestCase):
start = datetime.now()
messages = consumer.get_messages(block=True, timeout=5)
diff = (datetime.now() - start).total_seconds()
- self.assertGreaterEqual(diff, 5)
+ self.assertGreaterEqual(diff, 4.999)
self.assertEqual(len(messages), 0)
# Send 10 messages
- produce = ProduceRequest("test_mpconsumer", 0, messages=[
+ produce = ProduceRequest(self.topic, 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
@@ -723,7 +752,7 @@ class TestConsumer(unittest.TestCase):
def test_multi_proc_pending(self):
# Produce 10 messages to partition 0 and 1
- produce1 = ProduceRequest("test_mppending", 0, messages=[
+ produce1 = ProduceRequest(self.topic, 0, messages=[
create_message("Test message 0 %d" % i) for i in range(10)
])
@@ -731,7 +760,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- produce2 = ProduceRequest("test_mppending", 1, messages=[
+ produce2 = ProduceRequest(self.topic, 1, messages=[
create_message("Test message 1 %d" % i) for i in range(10)
])
@@ -739,7 +768,7 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False)
+ consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
self.assertEquals(consumer.pending(), 20)
self.assertEquals(consumer.pending(partitions=[0]), 10)
self.assertEquals(consumer.pending(partitions=[1]), 10)
@@ -749,74 +778,96 @@ class TestConsumer(unittest.TestCase):
def test_large_messages(self):
# Produce 10 "normal" size messages
messages1 = [create_message(random_string(1024)) for i in range(10)]
- produce1 = ProduceRequest("test_large_messages", 0, messages1)
+ produce1 = ProduceRequest(self.topic, 0, messages1)
for resp in self.client.send_produce_request([produce1]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 0)
- # Produce 10 messages that are too large (bigger than default fetch size)
+ # Produce 10 messages that are large (bigger than default fetch size)
messages2 = [create_message(random_string(5000)) for i in range(10)]
- produce2 = ProduceRequest("test_large_messages", 0, messages2)
+ produce2 = ProduceRequest(self.topic, 0, messages2)
for resp in self.client.send_produce_request([produce2]):
self.assertEquals(resp.error, 0)
self.assertEquals(resp.offset, 10)
# Consumer should still get all of them
- consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False)
+ consumer = SimpleConsumer(self.client, "group1", self.topic,
+ auto_commit=False, iter_timeout=0)
all_messages = messages1 + messages2
for i, message in enumerate(consumer):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
-class TestFailover(unittest.TestCase):
+ # Produce 1 message that is too large (bigger than max fetch size)
+ big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
+ big_message = create_message(random_string(big_message_size))
+ produce3 = ProduceRequest(self.topic, 0, [big_message])
+ for resp in self.client.send_produce_request([produce3]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 20)
- @classmethod
- def setUpClass(cls):
+ self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1)
+
+ # Create a consumer with no fetch size limit
+ big_consumer = SimpleConsumer(self.client, "group1", self.topic,
+ max_buffer_size=None, partitions=[0],
+ auto_commit=False, iter_timeout=0)
+ # Seek to the last message
+ big_consumer.seek(-1, 2)
+
+ # Consume giant message successfully
+ message = big_consumer.get_message(block=False, timeout=10)
+ self.assertIsNotNone(message)
+ self.assertEquals(message.message.value, big_message.value)
+
+
+class TestFailover(KafkaTestCase):
+
+ def setUp(self):
zk_chroot = random_string(10)
replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- cls.zk = ZookeeperFixture.instance()
- kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
- cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
+ self.zk = ZookeeperFixture.instance()
+ kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
+ self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
cls.client = KafkaClient(hosts)
+ super(TestFailover, self).setUp()
- @classmethod
- def tearDownClass(cls):
- cls.client.close()
- for broker in cls.brokers:
+ def tearDown(self):
+ self.client.close()
+ for broker in self.brokers:
broker.close()
- cls.zk.close()
+ self.zk.close()
def test_switch_leader(self):
-
- key, topic, partition = random_string(5), 'test_switch_leader', 0
- producer = SimpleProducer(self.client, topic)
+ key, topic, partition = random_string(5), self.topic, 0
+ producer = SimpleProducer(self.client)
for i in range(1, 4):
# XXX unfortunately, the conns dict needs to be warmed for this to work
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
- with self.assertRaises(FailedPayloadsException):
- producer.send_messages('part 1')
- producer.send_messages('part 2')
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages(self.topic, 'part 1')
+ producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
@@ -828,24 +879,23 @@ class TestFailover(unittest.TestCase):
producer.stop()
def test_switch_leader_async(self):
-
- key, topic, partition = random_string(5), 'test_switch_leader_async', 0
- producer = SimpleProducer(self.client, topic, async=True)
+ key, topic, partition = random_string(5), self.topic, 0
+ producer = SimpleProducer(self.client, async=True)
for i in range(1, 4):
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
- producer.send_messages('part 1')
- producer.send_messages('part 2')
+ producer.send_messages(self.topic, 'part 1')
+ producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
@@ -856,9 +906,9 @@ class TestFailover(unittest.TestCase):
producer.stop()
- def _send_random_messages(self, producer, n):
+ def _send_random_messages(self, producer, topic, n):
for j in range(n):
- resp = producer.send_messages(random_string(10))
+ resp = producer.send_messages(topic, random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time
@@ -871,10 +921,9 @@ class TestFailover(unittest.TestCase):
return broker
def _count_messages(self, group, topic):
-
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
- consumer = SimpleConsumer(client, group, topic, auto_commit=False)
+ consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
all_messages.append(message)
@@ -882,11 +931,6 @@ class TestFailover(unittest.TestCase):
client.close()
return len(all_messages)
-
-def random_string(l):
- s = "".join(random.choice(string.letters) for i in xrange(l))
- return s
-
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
unittest.main()
diff --git a/test/test_unit.py b/test/test_unit.py
index 4ea9442..4c78c1b 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -5,15 +5,17 @@ import unittest
from mock import patch
-from kafka.client import KafkaClient
+from kafka.common import (
+ ProduceRequest, FetchRequest, Message, ChecksumError,
+ ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
+ OffsetAndMessage, BrokerMetadata, PartitionMetadata
+)
from kafka.codec import (
- has_gzip, has_snappy,
- gzip_encode, gzip_decode,
+ has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
-from kafka.common import (
- ProduceRequest, FetchRequest,
- BrokerMetadata, PartitionMetadata, TopicAndPartition
+from kafka.protocol import (
+ create_gzip_message, create_message, create_snappy_message, KafkaProtocol
)
ITERATIONS = 1000
@@ -25,16 +27,13 @@ def random_string():
class TestPackage(unittest.TestCase):
- @unittest.expectedFailure
+
def test_top_level_namespace(self):
import kafka as kafka1
self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
- self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode")
- self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode")
self.assertEquals(kafka1.client.__name__, "kafka.client")
self.assertEquals(kafka1.codec.__name__, "kafka.codec")
- @unittest.expectedFailure
def test_submodule_namespace(self):
import kafka.client as client1
self.assertEquals(client1.__name__, "kafka.client")
@@ -53,173 +52,334 @@ class TestPackage(unittest.TestCase):
from kafka import KafkaClient as KafkaClient2
self.assertEquals(KafkaClient2.__name__, "KafkaClient")
- from kafka import gzip_encode as gzip_encode2
- self.assertEquals(gzip_encode2.__name__, "gzip_encode")
-
- from kafka import snappy_encode as snappy_encode2
- self.assertEquals(snappy_encode2.__name__, "snappy_encode")
-
-
-class TestMisc(unittest.TestCase):
- @unittest.expectedFailure
- def test_length_prefix(self):
- for i in xrange(ITERATIONS):
- s1 = random_string()
- self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1))
+ from kafka.codec import snappy_encode
+ self.assertEquals(snappy_encode.__name__, "snappy_encode")
class TestCodec(unittest.TestCase):
+
+ @unittest.skipUnless(has_gzip(), "Gzip not available")
def test_gzip(self):
- if not has_gzip():
- return
for i in xrange(ITERATIONS):
s1 = random_string()
s2 = gzip_decode(gzip_encode(s1))
self.assertEquals(s1, s2)
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
- if not has_snappy():
- return
for i in xrange(ITERATIONS):
s1 = random_string()
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
-# XXX(sandello): These really should be protocol tests.
-class TestMessage(unittest.TestCase):
- @unittest.expectedFailure
- def test_create(self):
- msg = KafkaClient.create_message("testing")
- self.assertEquals(msg.payload, "testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 0)
- self.assertEquals(msg.crc, -386704890)
+class TestProtocol(unittest.TestCase):
- @unittest.expectedFailure
+ def test_create_message(self):
+ payload = "test"
+ key = "key"
+ msg = create_message(payload, key)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, 0)
+ self.assertEqual(msg.key, key)
+ self.assertEqual(msg.value, payload)
+
+ @unittest.skipUnless(has_gzip(), "Snappy not available")
def test_create_gzip(self):
- msg = KafkaClient.create_gzip_message("testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 1)
- # Can't check the crc or payload for gzip since it's non-deterministic
- (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload))
- inner = messages[0]
- self.assertEquals(inner.magic, 1)
- self.assertEquals(inner.attributes, 0)
- self.assertEquals(inner.payload, "testing")
- self.assertEquals(inner.crc, -386704890)
-
- @unittest.expectedFailure
+ payloads = ["v1", "v2"]
+ msg = create_gzip_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
+ KafkaProtocol.CODEC_GZIP)
+ self.assertEqual(msg.key, None)
+ # Need to decode to check since gzipped payload is non-deterministic
+ decoded = gzip_decode(msg.value)
+ expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2"
+ "\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff\xff"
+ "\xff\xff\x00\x00\x00\x02v2")
+ self.assertEqual(decoded, expect)
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
- msg = KafkaClient.create_snappy_message("testing")
- self.assertEquals(msg.magic, 1)
- self.assertEquals(msg.attributes, 2)
- self.assertEquals(msg.crc, -62350868)
- (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload))
- inner = messages[0]
- self.assertEquals(inner.magic, 1)
- self.assertEquals(inner.attributes, 0)
- self.assertEquals(inner.payload, "testing")
- self.assertEquals(inner.crc, -386704890)
-
- @unittest.expectedFailure
- def test_message_simple(self):
- msg = KafkaClient.create_message("testing")
- enc = KafkaClient.encode_message(msg)
- expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
- self.assertEquals(enc, expect)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 1)
- self.assertEquals(messages[0], msg)
-
- @unittest.expectedFailure
- def test_message_list(self):
- msgs = [
- KafkaClient.create_message("one"),
- KafkaClient.create_message("two"),
- KafkaClient.create_message("three")
- ]
- enc = KafkaClient.encode_message_set(msgs)
- expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11"
- "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three")
- self.assertEquals(enc, expect)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- @unittest.expectedFailure
- def test_message_gzip(self):
- msg = KafkaClient.create_gzip_message("one", "two", "three")
- enc = KafkaClient.encode_message(msg)
- # Can't check the bytes directly since Gzip is non-deterministic
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- @unittest.expectedFailure
- def test_message_snappy(self):
- msg = KafkaClient.create_snappy_message("one", "two", "three")
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), 3)
- self.assertEquals(messages[0].payload, "one")
- self.assertEquals(messages[1].payload, "two")
- self.assertEquals(messages[2].payload, "three")
-
- @unittest.expectedFailure
- def test_message_simple_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(0, 10)
- msgs = [KafkaClient.create_message(random_string()) for j in range(n)]
- enc = KafkaClient.encode_message_set(msgs)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j], msgs[j])
-
- @unittest.expectedFailure
- def test_message_gzip_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(1, 10)
- strings = [random_string() for j in range(n)]
- msg = KafkaClient.create_gzip_message(*strings)
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j].payload, strings[j])
-
- @unittest.expectedFailure
- def test_message_snappy_random(self):
- for i in xrange(ITERATIONS):
- n = random.randint(1, 10)
- strings = [random_string() for j in range(n)]
- msg = KafkaClient.create_snappy_message(*strings)
- enc = KafkaClient.encode_message(msg)
- (messages, read) = KafkaClient.read_message_set(enc)
- self.assertEquals(len(messages), n)
- for j in range(n):
- self.assertEquals(messages[j].payload, strings[j])
-
-
-class TestRequests(unittest.TestCase):
- @unittest.expectedFailure
- def test_produce_request(self):
- req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
- enc = KafkaClient.encode_produce_request(req)
- expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
- self.assertEquals(enc, expect)
-
- @unittest.expectedFailure
- def test_fetch_request(self):
- req = FetchRequest("my-topic", 0, 0, 1024)
- enc = KafkaClient.encode_fetch_request(req)
- expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00"
- self.assertEquals(enc, expect)
+ payloads = ["v1", "v2"]
+ msg = create_snappy_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
+ KafkaProtocol.CODEC_SNAPPY)
+ self.assertEqual(msg.key, None)
+ expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff"
+ "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff"
+ "\xff\xff\xff\x00\x00\x00\x02v2")
+ self.assertEqual(msg.value, expect)
+
+ def test_encode_message_header(self):
+ expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3'
+ encoded = KafkaProtocol._encode_message_header("client3", 4, 10)
+ self.assertEqual(encoded, expect)
+
+ def test_encode_message(self):
+ message = create_message("test", "key")
+ encoded = KafkaProtocol._encode_message(message)
+ expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
+ self.assertEqual(encoded, expect)
+
+ def test_encode_message_failure(self):
+ self.assertRaises(Exception, KafkaProtocol._encode_message,
+ Message(1, 0, "key", "test"))
+
+ def test_encode_message_set(self):
+ message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
+ encoded = KafkaProtocol._encode_message_set(message_set)
+ expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00"
+ "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00"
+ "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00"
+ "\x00\x00\x02k2\x00\x00\x00\x02v2")
+ self.assertEqual(encoded, expect)
+
+ def test_decode_message(self):
+ encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
+ offset = 10
+ (returned_offset, decoded_message) = \
+ list(KafkaProtocol._decode_message(encoded, offset))[0]
+ self.assertEqual(returned_offset, offset)
+ self.assertEqual(decoded_message, create_message("test", "key"))
+
+ def test_decode_message_set(self):
+ encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2'
+ '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff'
+ '\xff\xff\xff\x00\x00\x00\x02v2')
+ iter = KafkaProtocol._decode_message_set_iter(encoded)
+ decoded = list(iter)
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ @unittest.skipUnless(has_gzip(), "Gzip not available")
+ def test_decode_message_gzip(self):
+ gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
+ '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
+ '\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
+ '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
+ '\x00')
+ offset = 11
+ decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset))
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_decode_message_snappy(self):
+ snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
+ '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
+ '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
+ '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
+ offset = 11
+ decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset))
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ def test_decode_message_checksum_error(self):
+ invalid_encoded_message = "This is not a valid encoded message"
+ iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
+ self.assertRaises(ChecksumError, list, iter)
+
+ # NOTE: The error handling in _decode_message_set_iter() is questionable.
+ # If it's modified, the next two tests might need to be fixed.
+ def test_decode_message_set_fetch_size_too_small(self):
+ iter = KafkaProtocol._decode_message_set_iter('a')
+ self.assertRaises(ConsumerFetchSizeTooSmall, list, iter)
+
+ def test_decode_message_set_stop_iteration(self):
+ encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2'
+ '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff'
+ '\xff\xff\xff\x00\x00\x00\x02v2')
+ iter = KafkaProtocol._decode_message_set_iter(encoded + "@#$%(Y!")
+ decoded = list(iter)
+ self.assertEqual(len(decoded), 2)
+ (returned_offset1, decoded_message1) = decoded[0]
+ self.assertEqual(returned_offset1, 0)
+ self.assertEqual(decoded_message1, create_message("v1"))
+ (returned_offset2, decoded_message2) = decoded[1]
+ self.assertEqual(returned_offset2, 0)
+ self.assertEqual(decoded_message2, create_message("v2"))
+
+ def test_encode_produce_request(self):
+ requests = [ProduceRequest("topic1", 0, [create_message("a"),
+ create_message("b")]),
+ ProduceRequest("topic2", 1, [create_message("c")])]
+ expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07'
+ 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1'
+ '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff'
+ '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00'
+ '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00'
+ '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01'
+ '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00'
+ '\x01c')
+ encoded = KafkaProtocol.encode_produce_request("client1", 2, requests,
+ 2, 100)
+ self.assertEqual(encoded, expect)
+
+ def test_decode_produce_response(self):
+ t1 = "topic1"
+ t2 = "topic2"
+ encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
+ 2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L,
+ len(t2), t2, 1, 0, 0, 30L)
+ responses = list(KafkaProtocol.decode_produce_response(encoded))
+ self.assertEqual(responses,
+ [ProduceResponse(t1, 0, 0, 10L),
+ ProduceResponse(t1, 1, 1, 20L),
+ ProduceResponse(t2, 0, 0, 30L)])
+
+ def test_encode_fetch_request(self):
+ requests = [FetchRequest("topic1", 0, 10, 1024),
+ FetchRequest("topic2", 1, 20, 100)]
+ expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
+ 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
+ '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
+ 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
+ '\x00\x00\x14\x00\x00\x00d')
+ encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2,
+ 100)
+ self.assertEqual(encoded, expect)
+
+ def test_decode_fetch_response(self):
+ t1 = "topic1"
+ t2 = "topic2"
+ msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"])
+ ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
+ ms2 = KafkaProtocol._encode_message_set([msgs[2]])
+ ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
+
+ encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' %
+ (len(t1), len(ms1), len(ms2), len(t2), len(ms3)),
+ 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1,
+ 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30,
+ len(ms3), ms3)
+
+ responses = list(KafkaProtocol.decode_fetch_response(encoded))
+ def expand_messages(response):
+ return FetchResponse(response.topic, response.partition,
+ response.error, response.highwaterMark,
+ list(response.messages))
+
+ expanded_responses = map(expand_messages, responses)
+ expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
+ OffsetAndMessage(0, msgs[1])]),
+ FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
+ FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
+ OffsetAndMessage(0, msgs[4])])]
+ self.assertEqual(expanded_responses, expect)
+
+ def test_encode_metadata_request_no_topics(self):
+ encoded = KafkaProtocol.encode_metadata_request("cid", 4)
+ self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00'
+ '\x00\x04\x00\x03cid\x00\x00\x00\x00')
+
+ def test_encode_metadata_request_with_topics(self):
+ encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"])
+ self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00'
+ '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02'
+ 't1\x00\x02t2')
+
+ def _create_encoded_metadata_response(self, broker_data, topic_data,
+ topic_errors, partition_errors):
+ encoded = struct.pack('>ii', 3, len(broker_data))
+ for node_id, broker in broker_data.iteritems():
+ encoded += struct.pack('>ih%dsi' % len(broker.host), node_id,
+ len(broker.host), broker.host, broker.port)
+
+ encoded += struct.pack('>i', len(topic_data))
+ for topic, partitions in topic_data.iteritems():
+ encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic],
+ len(topic), topic, len(partitions))
+ for partition, metadata in partitions.iteritems():
+ encoded += struct.pack('>hiii',
+ partition_errors[(topic, partition)],
+ partition, metadata.leader,
+ len(metadata.replicas))
+ if len(metadata.replicas) > 0:
+ encoded += struct.pack('>%di' % len(metadata.replicas),
+ *metadata.replicas)
+
+ encoded += struct.pack('>i', len(metadata.isr))
+ if len(metadata.isr) > 0:
+ encoded += struct.pack('>%di' % len(metadata.isr),
+ *metadata.isr)
+
+ return encoded
+
+ def test_decode_metadata_response(self):
+ node_brokers = {
+ 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000),
+ 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001),
+ 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000)
+ }
+ topic_partitions = {
+ "topic1": {
+ 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)),
+ 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1))
+ },
+ "topic2": {
+ 0: PartitionMetadata("topic2", 0, 0, (), ())
+ }
+ }
+ topic_errors = {"topic1": 0, "topic2": 1}
+ partition_errors = {
+ ("topic1", 0): 0,
+ ("topic1", 1): 1,
+ ("topic2", 0): 0
+ }
+ encoded = self._create_encoded_metadata_response(node_brokers,
+ topic_partitions,
+ topic_errors,
+ partition_errors)
+ decoded = KafkaProtocol.decode_metadata_response(encoded)
+ self.assertEqual(decoded, (node_brokers, topic_partitions))
+
+ @unittest.skip("Not Implemented")
+ def test_encode_offset_request(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_decode_offset_response(self):
+ pass
+
+
+ @unittest.skip("Not Implemented")
+ def test_encode_offset_commit_request(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_decode_offset_commit_response(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_encode_offset_fetch_request(self):
+ pass
+
+ @unittest.skip("Not Implemented")
+ def test_decode_offset_fetch_response(self):
+ pass
class TestKafkaClient(unittest.TestCase):