diff options
-rw-r--r-- | kafka/client.py | 42 | ||||
-rw-r--r-- | kafka/conn.py | 20 | ||||
-rw-r--r-- | kafka/consumer.py | 53 | ||||
-rw-r--r-- | kafka/protocol.py | 38 | ||||
-rw-r--r-- | kafka/queue.py | 19 | ||||
-rw-r--r-- | kafka/util.py | 6 | ||||
-rw-r--r-- | test/test_unit.py | 4 |
7 files changed, 100 insertions, 82 deletions
diff --git a/kafka/client.py b/kafka/client.py index 2fdb271..965cbc5 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,14 +1,10 @@ -import base64 from collections import defaultdict from functools import partial -from itertools import count, cycle +from itertools import count import logging -from operator import attrgetter -import struct import time -import zlib -from kafka.common import * +from kafka.common import ErrorMapping, TopicAndPartition from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -212,8 +208,10 @@ class KafkaClient(object): order of input payloads """ - encoder = partial(KafkaProtocol.encode_produce_request, - acks=acks, timeout=timeout) + encoder = partial( + KafkaProtocol.encode_produce_request, + acks=acks, + timeout=timeout) if acks == 0: decoder = None @@ -226,10 +224,10 @@ class KafkaClient(object): for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) + raise Exception( + "ProduceRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), + resp.error)) # Run the callback if callback is not None: @@ -251,17 +249,18 @@ class KafkaClient(object): max_wait_time=max_wait_time, min_bytes=min_bytes) - resps = self._send_broker_aware_request(payloads, encoder, - KafkaProtocol.decode_fetch_response) + resps = self._send_broker_aware_request( + payloads, encoder, + KafkaProtocol.decode_fetch_response) out = [] for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) + raise Exception( + "FetchRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), + resp.error)) # Run the callback if callback is not None: @@ -272,9 +271,10 @@ class KafkaClient(object): def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_offset_request, - KafkaProtocol.decode_offset_response) + resps = self._send_broker_aware_request( + payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) out = [] for resp in resps: diff --git a/kafka/conn.py b/kafka/conn.py index 29efbf1..e85fd11 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -3,6 +3,8 @@ import socket import struct from threading import local +from kafka.common import BufferUnderflowError + log = logging.getLogger("kafka") @@ -12,7 +14,7 @@ class KafkaConnection(local): A socket connection to a single Kafka broker This class is _not_ thread safe. Each call to `send` must be followed - by a call to `recv` in order to get the correct response. Eventually, + by a call to `recv` in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ @@ -43,7 +45,7 @@ class KafkaConnection(local): def _consume_response_iter(self): """ - This method handles the response header and error messages. It + This method handles the response header and error messages. It then returns an iterator for the chunks of the response """ log.debug("Handling response from Kafka") @@ -57,13 +59,15 @@ class KafkaConnection(local): messagesize = size - 4 log.debug("About to read %d bytes from Kafka", messagesize) - # Read the remainder of the response + # Read the remainder of the response total = 0 while total < messagesize: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": - raise BufferUnderflowError("Not enough data to read this response") + raise BufferUnderflowError( + "Not enough data to read this response") + total += len(resp) yield resp @@ -75,9 +79,13 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + + log.debug( + "About to send %d bytes to Kafka, request %d" % + (len(payload), request_id)) + sent = self._sock.sendall(payload) - if sent != None: + if sent is not None: raise RuntimeError("Kafka went away") def recv(self, request_id): diff --git a/kafka/consumer.py b/kafka/consumer.py index a97e8c0..7d44f28 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -8,7 +8,7 @@ from Queue import Empty from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetFetchRequest, OffsetCommitRequest, + OffsetRequest, OffsetCommitRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -223,11 +223,12 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false - super(SimpleConsumer, self).__init__(client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) def provide_partition_info(self): """ @@ -275,8 +276,8 @@ class SimpleConsumer(Consumer): resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + \ - deltas[resp.partition] + self.offsets[resp.partition] = \ + resp.offsets[0] + deltas[resp.partition] else: raise ValueError("Unexpected value for `whence`, %d" % whence) @@ -364,9 +365,10 @@ class SimpleConsumer(Consumer): req = FetchRequest( self.topic, partition, offset, self.client.bufsize) - (resp,) = self.client.send_fetch_request([req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) + (resp,) = self.client.send_fetch_request( + [req], + max_wait_time=self.fetch_max_wait_time, + min_bytes=fetch_size) assert resp.topic == self.topic assert resp.partition == partition @@ -376,18 +378,22 @@ class SimpleConsumer(Consumer): for message in resp.messages: next_offset = message.offset - # update the offset before the message is yielded. This is - # so that the consumer state is not lost in certain cases. - # For eg: the message is yielded and consumed by the caller, - # but the caller does not come back into the generator again. - # The message will be consumed but the status will not be - # updated in the consumer + # update the offset before the message is yielded. This + # is so that the consumer state is not lost in certain + # cases. + # + # For eg: the message is yielded and consumed by the + # caller, but the caller does not come back into the + # generator again. The message will be consumed but the + # status will not be updated in the consumer self.fetch_started[partition] = True self.offsets[partition] = message.offset yield message except ConsumerFetchSizeTooSmall, e: - log.warn("Fetch size is too small, increasing by 1.5x and retrying") fetch_size *= 1.5 + log.warn( + "Fetch size too small, increasing to %d (1.5x) and retry", + fetch_size) continue except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) @@ -429,11 +435,12 @@ class MultiProcessConsumer(Consumer): num_procs=1, partitions_per_proc=0): # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__(client, group, topic, - partitions=None, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(MultiProcessConsumer, self).__init__( + client, group, topic, + partitions=None, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master diff --git a/kafka/protocol.py b/kafka/protocol.py index 421e19b..612acf6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -25,12 +25,12 @@ class KafkaProtocol(object): This class does not have any state associated with it, it is purely for organization. """ - PRODUCE_KEY = 0 - FETCH_KEY = 1 - OFFSET_KEY = 2 - METADATA_KEY = 3 + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 OFFSET_COMMIT_KEY = 6 - OFFSET_FETCH_KEY = 7 + OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 @@ -120,8 +120,8 @@ class KafkaProtocol(object): yield OffsetAndMessage(offset, message) except BufferUnderflowError: if read_message is False: - # If we get a partial read of a message, but haven't yielded anyhting - # there's a problem + # If we get a partial read of a message, but haven't + # yielded anyhting there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -274,14 +274,14 @@ class KafkaProtocol(object): for i in range(num_partitions): ((partition, error, highwater_mark_offset), cur) = \ - relative_unpack('>ihq', data, cur) + relative_unpack('>ihq', data, cur) (message_set, cur) = read_int_string(data, cur) yield FetchResponse( - topic, partition, error, - highwater_mark_offset, - KafkaProtocol._decode_message_set_iter(message_set)) + topic, partition, error, + highwater_mark_offset, + KafkaProtocol._decode_message_set_iter(message_set)) @classmethod def encode_offset_request(cls, client_id, correlation_id, payloads=None): @@ -321,7 +321,7 @@ class KafkaProtocol(object): for i in range(num_partitions): ((partition, error, num_offsets,), cur) = \ - relative_unpack('>ihi', data, cur) + relative_unpack('>ihi', data, cur) offsets = [] for j in range(num_offsets): @@ -383,17 +383,17 @@ class KafkaProtocol(object): for j in range(num_partitions): ((partition_error_code, partition, leader, numReplicas), cur) = \ - relative_unpack('>hiii', data, cur) + relative_unpack('>hiii', data, cur) - (replicas, cur) = relative_unpack('>%di' % numReplicas, - data, cur) + (replicas, cur) = relative_unpack( + '>%di' % numReplicas, data, cur) ((num_isr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) partition_metadata[partition] = \ - PartitionMetadata(topic_name, partition, leader, - replicas, isr) + PartitionMetadata( + topic_name, partition, leader, replicas, isr) topic_metadata[topic_name] = partition_metadata @@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None): key: bytes, a key used for partition routing (optional) """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload) for payload in payloads]) + [create_message(payload) for payload in payloads]) gzipped = gzip_encode(message_set) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP @@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None): key: bytes, a key used for partition routing (optional) """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload) for payload in payloads]) + [create_message(payload) for payload in payloads]) snapped = snappy_encode(message_set) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY diff --git a/kafka/queue.py b/kafka/queue.py index 41f1c31..a996369 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -25,8 +25,9 @@ class KafkaConsumerProcess(Process): Process.__init__(self) def __str__(self): - return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \ - (self.topic, self.partition, self.consumer_sleep) + return "[KafkaConsumerProcess: topic=%s, \ + partition=%s, sleep=%s]" % \ + (self.topic, self.partition, self.consumer_sleep) def run(self): self.barrier.wait() @@ -70,10 +71,12 @@ class KafkaProducerProcess(Process): Process.__init__(self) def __str__(self): - return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \ - flush_timeout=%s, timeout=%s]" % ( - self.topic, self.producer_flush_buffer, - self.producer_flush_timeout, self.producer_timeout) + return "[KafkaProducerProcess: topic=%s, \ + flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \ + (self.topic, + self.producer_flush_buffer, + self.producer_flush_timeout, + self.producer_timeout) def run(self): self.barrier.wait() @@ -104,8 +107,8 @@ class KafkaProducerProcess(Process): last_produce = time.time() try: - msg = KafkaClient.create_message(self.in_queue.get(True, - self.producer_timeout)) + msg = KafkaClient.create_message( + self.in_queue.get(True, self.producer_timeout)) messages.append(msg) except Empty: diff --git a/kafka/util.py b/kafka/util.py index 96b3745..54052fb 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,9 +1,8 @@ from collections import defaultdict -from itertools import groupby import struct from threading import Thread, Event -from common import * +from kafka.common import BufferUnderflowError def write_int_string(s): @@ -39,7 +38,8 @@ def read_short_string(data, cur): def read_int_string(data, cur): if len(data) < cur + 4: raise BufferUnderflowError( - "Not enough data left to read string len (%d < %d)" % (len(data), cur + 4)) + "Not enough data left to read string len (%d < %d)" % + (len(data), cur + 4)) (strlen,) = struct.unpack('>i', data[cur:cur + 4]) if strlen == -1: diff --git a/test/test_unit.py b/test/test_unit.py index c796c94..3f3af66 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,7 +3,8 @@ import random import struct import unittest -from kafka.client import KafkaClient, ProduceRequest, FetchRequest +from kafka.client import KafkaClient +from kafka.common import ProduceRequest, FetchRequest from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -59,7 +60,6 @@ class TestMisc(unittest.TestCase): def test_length_prefix(self): for i in xrange(ITERATIONS): s1 = random_string() - s2 = length_prefix_message(s1) self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) |