diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 54 |
1 files changed, 48 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py index fb99910..3f1fa39 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,7 +1,7 @@ import base64 from collections import namedtuple, defaultdict from functools import partial -from itertools import groupby, count +from itertools import count, cycle import logging from operator import attrgetter import socket @@ -596,6 +596,7 @@ class KafkaClient(object): } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id + self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() def close(self): @@ -635,6 +636,7 @@ class KafkaClient(object): self.load_metadata_for_topics(topic) else: self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) def get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) @@ -682,7 +684,7 @@ class KafkaClient(object): response = conn.recv(requestId) for produce_response in KafkaProtocol.decode_produce_response(response): # Check for errors - if fail_on_error == True and produce_response.error != 0: + if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR: raise Exception("ProduceRequest for %s failed with errorcode=%d", (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback @@ -721,7 +723,7 @@ class KafkaClient(object): response = conn.recv(requestId) for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): # Check for errors - if fail_on_error == True and fetch_response.error != 0: + if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR: raise Exception("FetchRequest %s failed with errorcode=%d" % (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback @@ -759,7 +761,7 @@ class KafkaClient(object): return None out = [] for offset_response in KafkaProtocol.decode_offset_response(response): - if fail_on_error == True and offset_response.error != 0: + if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) if callback is not None: out.append(callback(offset_response)) @@ -779,7 +781,7 @@ class KafkaClient(object): out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): log.debug(offset_commit_response) - if fail_on_error == True and offset_commit_response.error != 0: + if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) if callback is not None: out.append(callback(offset_commit_response)) @@ -798,7 +800,7 @@ class KafkaClient(object): return None out = [] for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): - if fail_on_error == True and offset_fetch_response.error != 0: + if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error)) if callback is not None: @@ -806,3 +808,43 @@ class KafkaClient(object): else: out.append(offset_fetch_response) return out + +class SimpleProducer(object): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + """ + def __init__(self, client, topic): + self.client = client + self.topic = topic + self.client.load_metadata_for_topics(topic) + self.next_partition = cycle(self.client.topic_partitions[topic]) + + def send_message(self, msg): + req = ProduceRequest(self.topic, self.next_partition.next(), + messages=[KafkaProtocol.create_message(msg)]) + resp = self.client.send_produce_request([req]).next() + +class SimpleConsumer(object): + def __init__(self, client, group, topic): + self.client = client + self.topic = topic + self.group = group + self.client.load_metadata_for_topics(topic) + self.offsets = {} + + def get_or_init_offset(resp): + if resp.error == ErrorMapping.NO_ERROR: + return resp.offset + elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + return 0 + else: + raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( + resp.topic, resp.partition, resp.error)) + + for partition in self.client.topic_partitions[topic]: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False) + self.offsets[partition] = offset + + print self.offsets + |