diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2013-12-18 18:36:32 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:50 -0800 |
commit | 0c7cf2569e384fcdde67b86689d64bafbaed953f (patch) | |
tree | df875fdc92e07a1b345f97c4b97ed8bbbe0fe96d /kafka/protocol.py | |
parent | 5dd8d81c9e47ee21c22945b90221c67baa7852b9 (diff) | |
download | kafka-python-0c7cf2569e384fcdde67b86689d64bafbaed953f.tar.gz |
SimpleConsumer flow changes:
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on https://github.com/mumrah/kafka-python/pull/74 -
don't increase min_bytes if the consumer fetch buffer size is too small.
Notes:
Change MultiProcessConsumer and _mp_consume() accordingly.
Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.
Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:
>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
return self.get(False)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
raise Empty
Queue.Empty
Diffstat (limited to 'kafka/protocol.py')
0 files changed, 0 insertions, 0 deletions