summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py45
-rw-r--r--kafka/conn.py8
-rw-r--r--kafka/consumer/multiprocess.py15
-rw-r--r--kafka/consumer/simple.py27
-rw-r--r--kafka/producer/base.py2
-rw-r--r--kafka/producer/keyed.py53
-rw-r--r--kafka/producer/simple.py62
-rw-r--r--kafka/protocol.py28
8 files changed, 88 insertions, 152 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 63b33b3..2ef22b3 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -4,8 +4,8 @@ import copy
import functools
import logging
import time
-import kafka.common
+import kafka.common
from kafka.common import (TopicAndPartition, BrokerMetadata,
ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
@@ -22,7 +22,7 @@ log = logging.getLogger(__name__)
class KafkaClient(object):
- CLIENT_ID = b"kafka-python"
+ CLIENT_ID = b'kafka-python'
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
@@ -50,7 +50,7 @@ class KafkaClient(object):
##################
def _get_conn(self, host, port):
- "Get or create a connection to a broker using host and port"
+ """Get or create a connection to a broker using host and port"""
host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(
@@ -111,6 +111,7 @@ class KafkaClient(object):
"""
for (host, port) in self.hosts:
requestId = self._next_id()
+ log.debug('Request %s: %s', requestId, payloads)
try:
conn = self._get_conn(host, port)
request = encoder_fn(client_id=self.client_id,
@@ -119,13 +120,15 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
- return decoder_fn(response)
+ decoded = decoder_fn(response)
+ log.debug('Response %s: %s', requestId, decoded)
+ return decoded
except Exception:
- log.exception("Could not send request [%r] to server %s:%i, "
- "trying next server" % (requestId, host, port))
+ log.exception('Error sending request [%s] to server %s:%s, '
+ 'trying next server', requestId, host, port)
- raise KafkaUnavailableError("All servers failed to process request")
+ raise KafkaUnavailableError('All servers failed to process request')
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -150,9 +153,6 @@ class KafkaClient(object):
List of response objects in the same order as the supplied payloads
"""
-
- log.debug("Sending Payloads: %s" % payloads)
-
# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
@@ -170,6 +170,7 @@ class KafkaClient(object):
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
+ log.debug('Request %s to %s: %s', requestId, broker, payloads)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
@@ -180,7 +181,7 @@ class KafkaClient(object):
except ConnectionError as e:
broker_failures.append(broker)
- log.warning("Could not send request [%s] to server %s: %s",
+ log.warning('Could not send request [%s] to server %s: %s',
binascii.b2a_hex(request), broker, e)
for payload in payloads:
@@ -201,15 +202,14 @@ class KafkaClient(object):
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
- log.warning("Could not receive response to request [%s] "
- "from server %s: %s",
+ log.warning('Could not receive response to request [%s] '
+ 'from server %s: %s',
binascii.b2a_hex(request), conn, e)
for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
else:
-
for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)
@@ -223,7 +223,6 @@ class KafkaClient(object):
# Return responses in the same order as provided
responses_by_payload = [responses_by_broker[broker].pop(0)
for broker in brokers_for_payloads]
- log.debug('Responses: %s' % responses_by_payload)
return responses_by_payload
def __repr__(self):
@@ -254,8 +253,11 @@ class KafkaClient(object):
def copy(self):
"""
- Create an inactive copy of the client object
- A reinit() has to be done on the copy before it can be used again
+ Create an inactive copy of the client object, suitable for passing
+ to a separate thread.
+
+ Note that the copied connections are not initialized, so reinit() must
+ be called on the returned copy.
"""
c = copy.deepcopy(self)
for key in c.conns:
@@ -297,7 +299,7 @@ class KafkaClient(object):
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
- raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
+ raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
try:
self.load_metadata_for_topics(topic)
except LeaderNotAvailableError:
@@ -345,8 +347,8 @@ class KafkaClient(object):
resp = self.send_metadata_request(topics)
- log.debug("Received new broker metadata: %s", resp.brokers)
- log.debug("Received new topic metadata: %s", resp.topics)
+ log.debug('Received new broker metadata: %s', resp.brokers)
+ log.debug('Received new topic metadata: %s', resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
@@ -365,7 +367,7 @@ class KafkaClient(object):
raise
# Otherwise, just log a warning
- log.error("Error loading topic metadata for %s: %s", topic, type(e))
+ log.error('Error loading topic metadata for %s: %s', topic, type(e))
continue
self.topic_partitions[topic] = {}
@@ -406,7 +408,6 @@ class KafkaClient(object):
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
-
encoder = KafkaProtocol.encode_metadata_request
decoder = KafkaProtocol.decode_metadata_response
diff --git a/kafka/conn.py b/kafka/conn.py
index 7a49d8c..432e10b 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -161,9 +161,11 @@ class KafkaConnection(local):
def copy(self):
"""
- Create an inactive copy of the connection object
- A reinit() has to be done on the copy before it can be used again
- return a new KafkaConnection object
+ Create an inactive copy of the connection object, suitable for
+ passing to a background thread.
+
+ The returned copy is not connected; you must call reinit() before
+ using.
"""
c = copy.deepcopy(self)
# Python 3 doesn't copy custom attributes of the threadlocal subclass
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 8cec92d..d03eb95 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -1,22 +1,21 @@
from __future__ import absolute_import
-import logging
-import time
-
from collections import namedtuple
+import logging
from multiprocessing import Process, Manager as MPManager
-
try:
- from Queue import Empty, Full
-except ImportError: # python 2
- from queue import Empty, Full
+ from Queue import Empty, Full # python 3
+except ImportError:
+ from queue import Empty, Full # python 2
+import time
from .base import (
+ Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
FULL_QUEUE_WAIT_TIME_SECONDS
)
-from .simple import Consumer, SimpleConsumer
+from .simple import SimpleConsumer
log = logging.getLogger(__name__)
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 384fa8e..e4233ff 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -2,25 +2,18 @@ from __future__ import absolute_import
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
-except ImportError: # python 2
- from itertools import izip_longest as izip_longest, repeat
+except ImportError:
+ from itertools import izip_longest as izip_longest, repeat # python 2
import logging
+try:
+ from Queue import Empty, Queue # python 3
+except ImportError:
+ from queue import Empty, Queue # python 2
+import sys
import time
import six
-import sys
-
-try:
- from Queue import Empty, Queue
-except ImportError: # python 2
- from queue import Empty, Queue
-from kafka.common import (
- FetchRequest, OffsetRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, FailedPayloadsError, check_error
-)
from .base import (
Consumer,
FETCH_DEFAULT_BLOCK_TIMEOUT,
@@ -33,6 +26,12 @@ from .base import (
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
+from ..common import (
+ FetchRequest, OffsetRequest,
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError,
+ OffsetOutOfRangeError, FailedPayloadsError, check_error
+)
log = logging.getLogger(__name__)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 18af342..e0c086b 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -206,6 +206,8 @@ class Producer(object):
Arguments:
client (KafkaClient): instance to use for broker communications.
+ If async=True, the background thread will use client.copy(),
+ which is expected to return a thread-safe object.
codec (kafka.protocol.ALL_CODECS): compression codec to use.
req_acks (int, optional): A value indicating the acknowledgements that
the server must receive before responding to the request,
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 5fe9b12..a5a26c9 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -3,14 +3,10 @@ from __future__ import absolute_import
import logging
import warnings
-from kafka.partitioner import HashedPartitioner
-from kafka.util import kafka_bytestring
+from .base import Producer
+from ..partitioner import HashedPartitioner
+from ..util import kafka_bytestring
-from .base import (
- Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
- ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
-)
log = logging.getLogger(__name__)
@@ -19,46 +15,17 @@ class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
- Arguments:
- client: The kafka client instance
+ See Producer class for Arguments
- Keyword Arguments:
+ Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner
- async: If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
+ to send the message to. Must be derived from Partitioner.
+ Defaults to HashedPartitioner.
"""
- def __init__(self, client, partitioner=None, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- async_retry_limit=ASYNC_RETRY_LIMIT,
- async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
- async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
- if not partitioner:
- partitioner = HashedPartitioner
- self.partitioner_class = partitioner
+ def __init__(self, *args, **kwargs):
+ self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
self.partitioners = {}
-
- super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
- codec, async, batch_send,
- batch_send_every_n,
- batch_send_every_t,
- async_retry_limit,
- async_retry_backoff_ms,
- async_retry_on_timeouts,
- async_queue_maxsize,
- async_queue_put_timeout)
+ super(KeyedProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic, key):
if topic not in self.partitioners:
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 280a02e..13e60d9 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -1,68 +1,34 @@
from __future__ import absolute_import
+from itertools import cycle
import logging
import random
import six
-from itertools import cycle
-
from six.moves import xrange
-from .base import (
- Producer, BATCH_SEND_DEFAULT_INTERVAL,
- BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
- ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
-)
+from .base import Producer
+
log = logging.getLogger(__name__)
class SimpleProducer(Producer):
- """
- A simple, round-robin producer. Each message goes to exactly one partition
-
- Arguments:
- client: The Kafka client instance to use
-
- Keyword Arguments:
- async: If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- req_acks: A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout: Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send: If True, messages are send in batches
- batch_send_every_n: If set, messages are send in batches of this size
- batch_send_every_t: If set, messages are send after this timeout
- random_start: If true, randomize the initial partition which the
+ """A simple, round-robin producer.
+
+ See Producer class for Base Arguments
+
+ Additional Arguments:
+ random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise
if false, the first message block will always publish
- to partition 0 before cycling through each partition
+ to partition 0 before cycling through each partition,
+ defaults to True.
"""
- def __init__(self, client, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=True,
- async_retry_limit=ASYNC_RETRY_LIMIT,
- async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
- async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
+ def __init__(self, *args, **kwargs):
self.partition_cycles = {}
- self.random_start = random_start
- super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
- codec, async, batch_send,
- batch_send_every_n,
- batch_send_every_t,
- async_retry_limit,
- async_retry_backoff_ms,
- async_retry_on_timeouts,
- async_queue_maxsize,
- async_queue_put_timeout)
+ self.random_start = kwargs.pop('random_start', True)
+ super(SimpleProducer, self).__init__(*args, **kwargs)
def _next_partition(self, topic):
if topic not in self.partition_cycles:
diff --git a/kafka/protocol.py b/kafka/protocol.py
index f12e6a3..d5adf89 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -232,12 +232,12 @@ class KafkaProtocol(object):
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
- for i in range(num_topics):
+ for _ in range(num_topics):
((strlen,), cur) = relative_unpack('>h', data, cur)
topic = data[cur:cur + strlen]
cur += strlen
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- for i in range(num_partitions):
+ for _ in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq',
data, cur)
@@ -289,11 +289,11 @@ class KafkaProtocol(object):
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
- for i in range(num_topics):
+ for _ in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- for i in range(num_partitions):
+ for j in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = \
relative_unpack('>ihq', data, cur)
@@ -337,16 +337,16 @@ class KafkaProtocol(object):
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
- for i in range(num_topics):
+ for _ in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- for i in range(num_partitions):
+ for _ in range(num_partitions):
((partition, error, num_offsets,), cur) = \
relative_unpack('>ihi', data, cur)
offsets = []
- for j in range(num_offsets):
+ for k in range(num_offsets):
((offset,), cur) = relative_unpack('>q', data, cur)
offsets.append(offset)
@@ -392,7 +392,7 @@ class KafkaProtocol(object):
# Broker info
brokers = []
- for i in range(numbrokers):
+ for _ in range(numbrokers):
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
@@ -402,13 +402,13 @@ class KafkaProtocol(object):
((num_topics,), cur) = relative_unpack('>i', data, cur)
topic_metadata = []
- for i in range(num_topics):
+ for _ in range(num_topics):
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = []
- for j in range(num_partitions):
+ for _ in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -471,11 +471,11 @@ class KafkaProtocol(object):
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
- for i in xrange(num_topics):
+ for _ in xrange(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- for i in xrange(num_partitions):
+ for _ in xrange(num_partitions):
((partition, error), cur) = relative_unpack('>ih', data, cur)
yield OffsetCommitResponse(topic, partition, error)
@@ -522,11 +522,11 @@ class KafkaProtocol(object):
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
- for i in range(num_topics):
+ for _ in range(num_topics):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
- for i in range(num_partitions):
+ for _ in range(num_partitions):
((partition, offset), cur) = relative_unpack('>iq', data, cur)
(metadata, cur) = read_short_string(data, cur)
((error,), cur) = relative_unpack('>h', data, cur)