summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py54
1 files changed, 48 insertions, 6 deletions
diff --git a/kafka/client.py b/kafka/client.py
index fb99910..3f1fa39 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,7 +1,7 @@
import base64
from collections import namedtuple, defaultdict
from functools import partial
-from itertools import groupby, count
+from itertools import count, cycle
import logging
from operator import attrgetter
import socket
@@ -596,6 +596,7 @@ class KafkaClient(object):
}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
+ self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics()
def close(self):
@@ -635,6 +636,7 @@ class KafkaClient(object):
self.load_metadata_for_topics(topic)
else:
self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader]
+ self.topic_partitions[topic].append(partition)
def get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
@@ -682,7 +684,7 @@ class KafkaClient(object):
response = conn.recv(requestId)
for produce_response in KafkaProtocol.decode_produce_response(response):
# Check for errors
- if fail_on_error == True and produce_response.error != 0:
+ if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
raise Exception("ProduceRequest for %s failed with errorcode=%d",
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
@@ -721,7 +723,7 @@ class KafkaClient(object):
response = conn.recv(requestId)
for fetch_response in KafkaProtocol.decode_fetch_response_iter(response):
# Check for errors
- if fail_on_error == True and fetch_response.error != 0:
+ if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR:
raise Exception("FetchRequest %s failed with errorcode=%d" %
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
# Run the callback
@@ -759,7 +761,7 @@ class KafkaClient(object):
return None
out = []
for offset_response in KafkaProtocol.decode_offset_response(response):
- if fail_on_error == True and offset_response.error != 0:
+ if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error)
if callback is not None:
out.append(callback(offset_response))
@@ -779,7 +781,7 @@ class KafkaClient(object):
out = []
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
log.debug(offset_commit_response)
- if fail_on_error == True and offset_commit_response.error != 0:
+ if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error)
if callback is not None:
out.append(callback(offset_commit_response))
@@ -798,7 +800,7 @@ class KafkaClient(object):
return None
out = []
for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response):
- if fail_on_error == True and offset_fetch_response.error != 0:
+ if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR:
raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error))
if callback is not None:
@@ -806,3 +808,43 @@ class KafkaClient(object):
else:
out.append(offset_fetch_response)
return out
+
+class SimpleProducer(object):
+ """
+ A simple, round-robbin producer. Each message goes to exactly one partition
+ """
+ def __init__(self, client, topic):
+ self.client = client
+ self.topic = topic
+ self.client.load_metadata_for_topics(topic)
+ self.next_partition = cycle(self.client.topic_partitions[topic])
+
+ def send_message(self, msg):
+ req = ProduceRequest(self.topic, self.next_partition.next(),
+ messages=[KafkaProtocol.create_message(msg)])
+ resp = self.client.send_produce_request([req]).next()
+
+class SimpleConsumer(object):
+ def __init__(self, client, group, topic):
+ self.client = client
+ self.topic = topic
+ self.group = group
+ self.client.load_metadata_for_topics(topic)
+ self.offsets = {}
+
+ def get_or_init_offset(resp):
+ if resp.error == ErrorMapping.NO_ERROR:
+ return resp.offset
+ elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ return 0
+ else:
+ raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
+ resp.topic, resp.partition, resp.error))
+
+ for partition in self.client.topic_partitions[topic]:
+ req = OffsetFetchRequest(topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False)
+ self.offsets[partition] = offset
+
+ print self.offsets
+