summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authorOmar Ghishan <omar.ghishan@rd.io>2013-12-18 18:36:32 -0800
committerOmar Ghishan <omar.ghishan@rd.io>2014-01-06 15:14:50 -0800
commit0c7cf2569e384fcdde67b86689d64bafbaed953f (patch)
treedf875fdc92e07a1b345f97c4b97ed8bbbe0fe96d /kafka/protocol.py
parent5dd8d81c9e47ee21c22945b90221c67baa7852b9 (diff)
downloadkafka-python-0c7cf2569e384fcdde67b86689d64bafbaed953f.tar.gz
SimpleConsumer flow changes:
* Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on https://github.com/mumrah/kafka-python/pull/74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty
Diffstat (limited to 'kafka/protocol.py')
0 files changed, 0 insertions, 0 deletions