diff options
-rw-r--r-- | kafka/client.py | 45 | ||||
-rw-r--r-- | kafka/conn.py | 8 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 15 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 27 | ||||
-rw-r--r-- | kafka/producer/base.py | 2 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 53 | ||||
-rw-r--r-- | kafka/producer/simple.py | 62 | ||||
-rw-r--r-- | kafka/protocol.py | 28 |
8 files changed, 88 insertions, 152 deletions
diff --git a/kafka/client.py b/kafka/client.py index 63b33b3..2ef22b3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -4,8 +4,8 @@ import copy import functools import logging import time -import kafka.common +import kafka.common from kafka.common import (TopicAndPartition, BrokerMetadata, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, @@ -22,7 +22,7 @@ log = logging.getLogger(__name__) class KafkaClient(object): - CLIENT_ID = b"kafka-python" + CLIENT_ID = b'kafka-python' # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a @@ -50,7 +50,7 @@ class KafkaClient(object): ################## def _get_conn(self, host, port): - "Get or create a connection to a broker using host and port" + """Get or create a connection to a broker using host and port""" host_key = (host, port) if host_key not in self.conns: self.conns[host_key] = KafkaConnection( @@ -111,6 +111,7 @@ class KafkaClient(object): """ for (host, port) in self.hosts: requestId = self._next_id() + log.debug('Request %s: %s', requestId, payloads) try: conn = self._get_conn(host, port) request = encoder_fn(client_id=self.client_id, @@ -119,13 +120,15 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) - return decoder_fn(response) + decoded = decoder_fn(response) + log.debug('Response %s: %s', requestId, decoded) + return decoded except Exception: - log.exception("Could not send request [%r] to server %s:%i, " - "trying next server" % (requestId, host, port)) + log.exception('Error sending request [%s] to server %s:%s, ' + 'trying next server', requestId, host, port) - raise KafkaUnavailableError("All servers failed to process request") + raise KafkaUnavailableError('All servers failed to process request') def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -150,9 +153,6 @@ class KafkaClient(object): List of response objects in the same order as the supplied payloads """ - - log.debug("Sending Payloads: %s" % payloads) - # Group the requests by topic+partition brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) @@ -170,6 +170,7 @@ class KafkaClient(object): broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() + log.debug('Request %s to %s: %s', requestId, broker, payloads) request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) @@ -180,7 +181,7 @@ class KafkaClient(object): except ConnectionError as e: broker_failures.append(broker) - log.warning("Could not send request [%s] to server %s: %s", + log.warning('Could not send request [%s] to server %s: %s', binascii.b2a_hex(request), broker, e) for payload in payloads: @@ -201,15 +202,14 @@ class KafkaClient(object): response = conn.recv(requestId) except ConnectionError as e: broker_failures.append(broker) - log.warning("Could not receive response to request [%s] " - "from server %s: %s", + log.warning('Could not receive response to request [%s] ' + 'from server %s: %s', binascii.b2a_hex(request), conn, e) for payload in payloads: responses_by_broker[broker].append(FailedPayloadsError(payload)) else: - for payload_response in decoder_fn(response): responses_by_broker[broker].append(payload_response) @@ -223,7 +223,6 @@ class KafkaClient(object): # Return responses in the same order as provided responses_by_payload = [responses_by_broker[broker].pop(0) for broker in brokers_for_payloads] - log.debug('Responses: %s' % responses_by_payload) return responses_by_payload def __repr__(self): @@ -254,8 +253,11 @@ class KafkaClient(object): def copy(self): """ - Create an inactive copy of the client object - A reinit() has to be done on the copy before it can be used again + Create an inactive copy of the client object, suitable for passing + to a separate thread. + + Note that the copied connections are not initialized, so reinit() must + be called on the returned copy. """ c = copy.deepcopy(self) for key in c.conns: @@ -297,7 +299,7 @@ class KafkaClient(object): while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: - raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) + raise KafkaTimeoutError('Unable to create topic {0}'.format(topic)) try: self.load_metadata_for_topics(topic) except LeaderNotAvailableError: @@ -345,8 +347,8 @@ class KafkaClient(object): resp = self.send_metadata_request(topics) - log.debug("Received new broker metadata: %s", resp.brokers) - log.debug("Received new topic metadata: %s", resp.topics) + log.debug('Received new broker metadata: %s', resp.brokers) + log.debug('Received new topic metadata: %s', resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) @@ -365,7 +367,7 @@ class KafkaClient(object): raise # Otherwise, just log a warning - log.error("Error loading topic metadata for %s: %s", topic, type(e)) + log.error('Error loading topic metadata for %s: %s', topic, type(e)) continue self.topic_partitions[topic] = {} @@ -406,7 +408,6 @@ class KafkaClient(object): def send_metadata_request(self, payloads=[], fail_on_error=True, callback=None): - encoder = KafkaProtocol.encode_metadata_request decoder = KafkaProtocol.decode_metadata_response diff --git a/kafka/conn.py b/kafka/conn.py index 7a49d8c..432e10b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -161,9 +161,11 @@ class KafkaConnection(local): def copy(self): """ - Create an inactive copy of the connection object - A reinit() has to be done on the copy before it can be used again - return a new KafkaConnection object + Create an inactive copy of the connection object, suitable for + passing to a background thread. + + The returned copy is not connected; you must call reinit() before + using. """ c = copy.deepcopy(self) # Python 3 doesn't copy custom attributes of the threadlocal subclass diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 8cec92d..d03eb95 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -1,22 +1,21 @@ from __future__ import absolute_import -import logging -import time - from collections import namedtuple +import logging from multiprocessing import Process, Manager as MPManager - try: - from Queue import Empty, Full -except ImportError: # python 2 - from queue import Empty, Full + from Queue import Empty, Full # python 3 +except ImportError: + from queue import Empty, Full # python 2 +import time from .base import ( + Consumer, AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, NO_MESSAGES_WAIT_TIME_SECONDS, FULL_QUEUE_WAIT_TIME_SECONDS ) -from .simple import Consumer, SimpleConsumer +from .simple import SimpleConsumer log = logging.getLogger(__name__) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 384fa8e..e4233ff 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -2,25 +2,18 @@ from __future__ import absolute_import try: from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611 -except ImportError: # python 2 - from itertools import izip_longest as izip_longest, repeat +except ImportError: + from itertools import izip_longest as izip_longest, repeat # python 2 import logging +try: + from Queue import Empty, Queue # python 3 +except ImportError: + from queue import Empty, Queue # python 2 +import sys import time import six -import sys - -try: - from Queue import Empty, Queue -except ImportError: # python 2 - from queue import Empty, Queue -from kafka.common import ( - FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, - OffsetOutOfRangeError, FailedPayloadsError, check_error -) from .base import ( Consumer, FETCH_DEFAULT_BLOCK_TIMEOUT, @@ -33,6 +26,12 @@ from .base import ( ITER_TIMEOUT_SECONDS, NO_MESSAGES_WAIT_TIME_SECONDS ) +from ..common import ( + FetchRequest, OffsetRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, FailedPayloadsError, check_error +) log = logging.getLogger(__name__) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 18af342..e0c086b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -206,6 +206,8 @@ class Producer(object): Arguments: client (KafkaClient): instance to use for broker communications. + If async=True, the background thread will use client.copy(), + which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. req_acks (int, optional): A value indicating the acknowledgements that the server must receive before responding to the request, diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 5fe9b12..a5a26c9 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -3,14 +3,10 @@ from __future__ import absolute_import import logging import warnings -from kafka.partitioner import HashedPartitioner -from kafka.util import kafka_bytestring +from .base import Producer +from ..partitioner import HashedPartitioner +from ..util import kafka_bytestring -from .base import ( - Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, - ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS -) log = logging.getLogger(__name__) @@ -19,46 +15,17 @@ class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key - Arguments: - client: The kafka client instance + See Producer class for Arguments - Keyword Arguments: + Additional Arguments: partitioner: A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async: If True, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout + to send the message to. Must be derived from Partitioner. + Defaults to HashedPartitioner. """ - def __init__(self, client, partitioner=None, async=False, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, - codec=None, - batch_send=False, - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_limit=ASYNC_RETRY_LIMIT, - async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, - async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): - if not partitioner: - partitioner = HashedPartitioner - self.partitioner_class = partitioner + def __init__(self, *args, **kwargs): + self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner) self.partitioners = {} - - super(KeyedProducer, self).__init__(client, req_acks, ack_timeout, - codec, async, batch_send, - batch_send_every_n, - batch_send_every_t, - async_retry_limit, - async_retry_backoff_ms, - async_retry_on_timeouts, - async_queue_maxsize, - async_queue_put_timeout) + super(KeyedProducer, self).__init__(*args, **kwargs) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 280a02e..13e60d9 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -1,68 +1,34 @@ from __future__ import absolute_import +from itertools import cycle import logging import random import six -from itertools import cycle - from six.moves import xrange -from .base import ( - Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, - ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS -) +from .base import Producer + log = logging.getLogger(__name__) class SimpleProducer(Producer): - """ - A simple, round-robin producer. Each message goes to exactly one partition - - Arguments: - client: The Kafka client instance to use - - Keyword Arguments: - async: If True, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - req_acks: A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout - random_start: If true, randomize the initial partition which the + """A simple, round-robin producer. + + See Producer class for Base Arguments + + Additional Arguments: + random_start (bool, optional): randomize the initial partition which the first message block will be published to, otherwise if false, the first message block will always publish - to partition 0 before cycling through each partition + to partition 0 before cycling through each partition, + defaults to True. """ - def __init__(self, client, async=False, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, - codec=None, - batch_send=False, - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True, - async_retry_limit=ASYNC_RETRY_LIMIT, - async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, - async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): + def __init__(self, *args, **kwargs): self.partition_cycles = {} - self.random_start = random_start - super(SimpleProducer, self).__init__(client, req_acks, ack_timeout, - codec, async, batch_send, - batch_send_every_n, - batch_send_every_t, - async_retry_limit, - async_retry_backoff_ms, - async_retry_on_timeouts, - async_queue_maxsize, - async_queue_put_timeout) + self.random_start = kwargs.pop('random_start', True) + super(SimpleProducer, self).__init__(*args, **kwargs) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/kafka/protocol.py b/kafka/protocol.py index f12e6a3..d5adf89 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -232,12 +232,12 @@ class KafkaProtocol(object): """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) - for i in range(num_topics): + for _ in range(num_topics): ((strlen,), cur) = relative_unpack('>h', data, cur) topic = data[cur:cur + strlen] cur += strlen ((num_partitions,), cur) = relative_unpack('>i', data, cur) - for i in range(num_partitions): + for _ in range(num_partitions): ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) @@ -289,11 +289,11 @@ class KafkaProtocol(object): """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) - for i in range(num_topics): + for _ in range(num_topics): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - for i in range(num_partitions): + for j in range(num_partitions): ((partition, error, highwater_mark_offset), cur) = \ relative_unpack('>ihq', data, cur) @@ -337,16 +337,16 @@ class KafkaProtocol(object): """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) - for i in range(num_topics): + for _ in range(num_topics): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - for i in range(num_partitions): + for _ in range(num_partitions): ((partition, error, num_offsets,), cur) = \ relative_unpack('>ihi', data, cur) offsets = [] - for j in range(num_offsets): + for k in range(num_offsets): ((offset,), cur) = relative_unpack('>q', data, cur) offsets.append(offset) @@ -392,7 +392,7 @@ class KafkaProtocol(object): # Broker info brokers = [] - for i in range(numbrokers): + for _ in range(numbrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) @@ -402,13 +402,13 @@ class KafkaProtocol(object): ((num_topics,), cur) = relative_unpack('>i', data, cur) topic_metadata = [] - for i in range(num_topics): + for _ in range(num_topics): ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) partition_metadata = [] - for j in range(num_partitions): + for _ in range(num_partitions): ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -471,11 +471,11 @@ class KafkaProtocol(object): ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) - for i in xrange(num_topics): + for _ in xrange(num_topics): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - for i in xrange(num_partitions): + for _ in xrange(num_partitions): ((partition, error), cur) = relative_unpack('>ih', data, cur) yield OffsetCommitResponse(topic, partition, error) @@ -522,11 +522,11 @@ class KafkaProtocol(object): ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) - for i in range(num_topics): + for _ in range(num_topics): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) - for i in range(num_partitions): + for _ in range(num_partitions): ((partition, offset), cur) = relative_unpack('>iq', data, cur) (metadata, cur) = read_short_string(data, cur) ((error,), cur) = relative_unpack('>h', data, cur) |