diff options
author | David Arthur <mumrah@gmail.com> | 2013-03-08 19:56:51 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-04-02 20:19:30 -0400 |
commit | 1b721325fe6b170cdfe001749dbd7b750fe59512 (patch) | |
tree | 41a2560b287875944df737ca367aa823058914f3 /test/integration.py | |
parent | 8fab55ad198eea41513a5daac3e92a709cb39247 (diff) | |
download | kafka-python-1b721325fe6b170cdfe001749dbd7b750fe59512.tar.gz |
Started on a simple producer and consumer
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/test/integration.py b/test/integration.py index 9fa8538..91917e6 100644 --- a/test/integration.py +++ b/test/integration.py @@ -310,7 +310,6 @@ class TestKafkaClient(unittest.TestCase): for resp in self.client.send_produce_request([produce1, produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - return fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) @@ -347,6 +346,32 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.offset, 42) self.assertEquals(resp.metadata, "") # Metadata isn't stored for now + # Producer Tests + + def test_simple_producer(self): + producer = SimpleProducer(self.client, "test_simple_producer") + producer.send_message("one") + producer.send_message("two") + + fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 1) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 1) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "two") + + # Consumer Tests + + def test_consumer(self): + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + if __name__ == "__main__": logging.basicConfig(level=logging.INFO) unittest.main() |