diff options
-rw-r--r-- | kafka/NOTES.md | 10 | ||||
-rw-r--r-- | kafka/client.py | 54 | ||||
-rw-r--r-- | test/integration.py | 27 |
3 files changed, 84 insertions, 7 deletions
diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 8be6282..7cec39e 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -5,3 +5,13 @@ There are a few levels of abstraction: * Protocol support: encode/decode the requests/responses * Socket support: send/recieve messages * API support: higher level APIs such as: get_topic_metadata + + +# Methods of producing + +* Round robbin (each message to the next partition) +* All-to-one (each message to one partition) +* All-to-all? (each message to every partition) +* Partitioned (run each message through a partitioning function) +** HashPartitioned +** FunctionPartition diff --git a/kafka/client.py b/kafka/client.py index fb99910..3f1fa39 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,7 +1,7 @@ import base64 from collections import namedtuple, defaultdict from functools import partial -from itertools import groupby, count +from itertools import count, cycle import logging from operator import attrgetter import socket @@ -596,6 +596,7 @@ class KafkaClient(object): } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id + self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() def close(self): @@ -635,6 +636,7 @@ class KafkaClient(object): self.load_metadata_for_topics(topic) else: self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) def get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) @@ -682,7 +684,7 @@ class KafkaClient(object): response = conn.recv(requestId) for produce_response in KafkaProtocol.decode_produce_response(response): # Check for errors - if fail_on_error == True and produce_response.error != 0: + if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR: raise Exception("ProduceRequest for %s failed with errorcode=%d", (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback @@ -721,7 +723,7 @@ class KafkaClient(object): response = conn.recv(requestId) for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): # Check for errors - if fail_on_error == True and fetch_response.error != 0: + if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR: raise Exception("FetchRequest %s failed with errorcode=%d" % (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback @@ -759,7 +761,7 @@ class KafkaClient(object): return None out = [] for offset_response in KafkaProtocol.decode_offset_response(response): - if fail_on_error == True and offset_response.error != 0: + if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) if callback is not None: out.append(callback(offset_response)) @@ -779,7 +781,7 @@ class KafkaClient(object): out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): log.debug(offset_commit_response) - if fail_on_error == True and offset_commit_response.error != 0: + if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) if callback is not None: out.append(callback(offset_commit_response)) @@ -798,7 +800,7 @@ class KafkaClient(object): return None out = [] for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): - if fail_on_error == True and offset_fetch_response.error != 0: + if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR: raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error)) if callback is not None: @@ -806,3 +808,43 @@ class KafkaClient(object): else: out.append(offset_fetch_response) return out + +class SimpleProducer(object): + """ + A simple, round-robbin producer. Each message goes to exactly one partition + """ + def __init__(self, client, topic): + self.client = client + self.topic = topic + self.client.load_metadata_for_topics(topic) + self.next_partition = cycle(self.client.topic_partitions[topic]) + + def send_message(self, msg): + req = ProduceRequest(self.topic, self.next_partition.next(), + messages=[KafkaProtocol.create_message(msg)]) + resp = self.client.send_produce_request([req]).next() + +class SimpleConsumer(object): + def __init__(self, client, group, topic): + self.client = client + self.topic = topic + self.group = group + self.client.load_metadata_for_topics(topic) + self.offsets = {} + + def get_or_init_offset(resp): + if resp.error == ErrorMapping.NO_ERROR: + return resp.offset + elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + return 0 + else: + raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( + resp.topic, resp.partition, resp.error)) + + for partition in self.client.topic_partitions[topic]: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False) + self.offsets[partition] = offset + + print self.offsets + diff --git a/test/integration.py b/test/integration.py index 9fa8538..91917e6 100644 --- a/test/integration.py +++ b/test/integration.py @@ -310,7 +310,6 @@ class TestKafkaClient(unittest.TestCase): for resp in self.client.send_produce_request([produce1, produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - return fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) @@ -347,6 +346,32 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.offset, 42) self.assertEquals(resp.metadata, "") # Metadata isn't stored for now + # Producer Tests + + def test_simple_producer(self): + producer = SimpleProducer(self.client, "test_simple_producer") + producer.send_message("one") + producer.send_message("two") + + fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 1) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 1) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "two") + + # Consumer Tests + + def test_consumer(self): + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + if __name__ == "__main__": logging.basicConfig(level=logging.INFO) unittest.main() |