summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-05-28 10:36:55 -0400
committerDavid Arthur <mumrah@gmail.com>2013-05-28 10:36:55 -0400
commit222ef82983fae6c9d9277bc3806bc7e8bbe39dbf (patch)
tree29a210e877acd662948cd50e24b1dd18570d5347
parentbf8fc04a9f6d5f0e2fe7634e08f9840de050ac64 (diff)
parentf4a326f490e347a28aa57b8b9d445c87972dc220 (diff)
downloadkafka-python-222ef82983fae6c9d9277bc3806bc7e8bbe39dbf.tar.gz
Merge branch 'issue-22'
Conflicts: kafka/consumer.py
-rw-r--r--kafka/consumer.py24
-rw-r--r--test/integration.py21
2 files changed, 45 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 5c39cb7..93da316 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -103,6 +103,30 @@ class SimpleConsumer(object):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ def pending(self, partitions=[]):
+ """
+ Gets the pending message count
+
+ partitions: list of partitions to check for, default is to check all
+ """
+ if len(partitions) == 0:
+ partitions = self.offsets.keys()
+
+ total = 0
+ reqs = []
+
+ for partition in partitions:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+
+ resps = self.client.send_offset_request(reqs)
+ for resp in resps:
+ partition = resp.partition
+ pending = resp.offsets[0]
+ offset = self.offsets[partition]
+ total += pending - offset - (1 if offset > 0 else 0)
+
+ return total
+
def _timed_commit(self):
"""
Commit offsets as part of timer
diff --git a/test/integration.py b/test/integration.py
index 609cfc6..68e0e25 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -456,6 +456,27 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(len(all_messages), 13)
+ def test_pending(self):
+ # Produce 10 messages to partition 0 and 1
+
+ produce1 = ProduceRequest("test_pending", 0, messages=[
+ create_message("Test message 0 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ produce2 = ProduceRequest("test_pending", 1, messages=[
+ create_message("Test message 1 %d" % i) for i in range(10)
+ ])
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ consumer = SimpleConsumer(self.client, "group1", "test_pending")
+ self.assertEquals(consumer.pending(), 20)
+ self.assertEquals(consumer.pending(partitions=[0]), 10)
+ self.assertEquals(consumer.pending(partitions=[1]), 10)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)