diff options
author | David Arthur <mumrah@gmail.com> | 2013-05-28 10:36:55 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-05-28 10:36:55 -0400 |
commit | 222ef82983fae6c9d9277bc3806bc7e8bbe39dbf (patch) | |
tree | 29a210e877acd662948cd50e24b1dd18570d5347 | |
parent | bf8fc04a9f6d5f0e2fe7634e08f9840de050ac64 (diff) | |
parent | f4a326f490e347a28aa57b8b9d445c87972dc220 (diff) | |
download | kafka-python-222ef82983fae6c9d9277bc3806bc7e8bbe39dbf.tar.gz |
Merge branch 'issue-22'
Conflicts:
kafka/consumer.py
-rw-r--r-- | kafka/consumer.py | 24 | ||||
-rw-r--r-- | test/integration.py | 21 |
2 files changed, 45 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 5c39cb7..93da316 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -103,6 +103,30 @@ class SimpleConsumer(object): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + def pending(self, partitions=[]): + """ + Gets the pending message count + + partitions: list of partitions to check for, default is to check all + """ + if len(partitions) == 0: + partitions = self.offsets.keys() + + total = 0 + reqs = [] + + for partition in partitions: + reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + + resps = self.client.send_offset_request(reqs) + for resp in resps: + partition = resp.partition + pending = resp.offsets[0] + offset = self.offsets[partition] + total += pending - offset - (1 if offset > 0 else 0) + + return total + def _timed_commit(self): """ Commit offsets as part of timer diff --git a/test/integration.py b/test/integration.py index 609cfc6..68e0e25 100644 --- a/test/integration.py +++ b/test/integration.py @@ -456,6 +456,27 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 13) + def test_pending(self): + # Produce 10 messages to partition 0 and 1 + + produce1 = ProduceRequest("test_pending", 0, messages=[ + create_message("Test message 0 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + produce2 = ProduceRequest("test_pending", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(10) + ]) + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + consumer = SimpleConsumer(self.client, "group1", "test_pending") + self.assertEquals(consumer.pending(), 20) + self.assertEquals(consumer.pending(partitions=[0]), 10) + self.assertEquals(consumer.pending(partitions=[1]), 10) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) |