summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
Commit message (Collapse)AuthorAgeFilesLines
...
* Fix #44 Add missing exception classv0.8.0David Arthur2013-09-241-4/+3
| | | | Also move the exceptions to common instead of util
* Auto-adjusting consumer fetch sizeDavid Arthur2013-09-091-15/+25
| | | | | | | | | | | | | | | Related to #42 Adds new ConsumerFetchSizeTooSmall exception that is thrown when `_decode_message_set_iter` gets a BufferUnderflowError but has not yet yielded a message In this event, SimpleConsumer will increase the fetch size by 1.5 and continue the fetching loop while _not_ increasing the offset (basically just retries the request with a larger fetch size) Once the consumer fetch size has been increased, it will remain increased while SimpleConsumer fetches from that partition
* Fixed #42, make fetch size configurableDavid Arthur2013-09-081-4/+7
| | | | | | | | | | Was hard coded to 1024 bytes which meant that larger messages were unconsumable since they would always get split causing the consumer to stop. It would probably be best to automatically retry truncated messages with a larger request size so you don't have to know your max message size ahead of time
* Fix minor bug in offset managementMahendra M2013-07-011-1/+4
| | | | | In the current patch get_messages(count=1) would return zero messages the first time it is invoked after a consumer was initialized.
* Add more cleanup in consumer.stop()Mahendra M2013-06-281-5/+7
|
* Fix cases of single partitionMahendra M2013-06-281-2/+3
|
* Add TODO commentsMahendra M2013-06-271-0/+2
|
* Re-init the sockets in the new processMahendra M2013-06-271-1/+3
|
* Fix a bug in seek.Mahendra M2013-06-271-0/+6
| | | | This was hidden because of another bug in offset management
* Merge branch 'master' into partitionMahendra M2013-06-251-17/+9
|\ | | | | | | | | Conflicts: kafka/consumer.py
| * Optimize auto-commit threadMahendra M2013-06-121-21/+10
| | | | | | | | | | | | | | | | | | | | | | | | | | The previous commit optimized the commit thread such that the timer started only when there were messages to be consumed. This commit goes a step further and ensures the following: * Only one timer thread is created * The main app does not block on exit (waiting for timer thread to finish) This is ensured by having a single thread blocking on an event and keeps calling a function. We use events instead of time.sleep() so as to prevent the python interpreter from running every 50ms checking if the timer has expired (logic copied from threading.Timer)
| * Spawn the commit thread only if necessaryMahendra M2013-06-111-2/+5
| | | | | | | | | | | | | | If there are no messages being consumed, the timer keeps creating new threads at the specified intervals. This may not be necessary. We can control this behaviour such that the timer thread is started only when a message is consumed
* | Got MultiProcessConsumer workingMahendra M2013-06-251-10/+20
| | | | | | | | | | | | | | | | | | | | | | | | | | | | Other changes * Put a message size restriction on the shared queue - to prevent message overload * Wait for a while after each process is started (in constructor) * Wait for a while in each child if the consumer does not return any messages Just to be nice to the CPU. * Control the start event more granularly - this prevents infinite loops if the control does not return to the generator. For eg: for msg in consumer: assert False * Update message status before yield
* | Added some comments about message stateMahendra M2013-06-251-0/+7
| |
* | Implement blocking get_messages for SimpleConsumerMahendra M2013-06-251-13/+80
| | | | | | | | | | | | | | | | | | | | | | | | | | The implementation is done by using simple options to Kafka Fetch Request Also in the SimpleConsumer iterator, update the offset before the message is yielded. This is so that the consumer state is not lost if certain cases. For eg: the message is yielded and consumed by the caller, but the caller does not come back into the generator again. The message will be consumed but the status is not updated in the consumer
* | Added more documentation and clean up duplicate codeMahendra M2013-06-251-86/+76
| |
* | Minor bug fixMahendra M2013-06-241-2/+3
| |
* | Add support for multi-process consumerMahendra M2013-06-241-48/+286
| |
* | Add support to consume messages from specific partitionsMahendra M2013-06-121-7/+11
|/ | | | | | Currently the kafka SimpleConsumer consumes messages from all partitions. This commit will ensure that data is consumed only from partitions specified during init
* Refactor and update integration testsIvan Pouzyrevsky2013-06-071-0/+5
|
* Finish making remaining files pep8 readyMahendra M2013-06-041-34/+59
|
* Removing the bit about offsetsDavid Arthur2013-05-291-5/+0
|
* Minor bug fixesMahendra M2013-05-291-12/+29
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | * When you initiate a producer with a non-existant queue, the queue is created. However this partition info is not reflected in KafkaClient() immediately. So, we wait for a second and try loading it again. Without this fix, if we do producer.send_messages() after creating a new queue, the library will throw a StopIteration exception. * In SimpleConsumer(), the defaults are not as mentioned in the comments. Fix this (or do we change the documentation?) * There was a problem with the way the consumer iterator worked. for eg: assume that there were 10 messages in the queue/topic and you iterate over it as - for msg in consumer: print (msg) At the end of this, 'offset' that is saved is 10. So, if you run the above loop again, the last message (10) is repeated. This can be fixed by adjusting the offset counter before fetching the message * Avoid some code repeat in consumer.commit() * Fix a bug in send_offset_commit_request() invocation in consumer.py * Fix missing imports
* Merge branch 'issue-22'David Arthur2013-05-281-0/+24
|\ | | | | | | | | Conflicts: kafka/consumer.py
| * Closes #22David Arthur2013-05-281-20/+0
| | | | | | | | Removed get_messages API, added test for get_pending
| * Missed a doc stringMahendra M2013-05-271-0/+2
| |
| * New API for getting a specified set of messagesMahendra M2013-05-271-0/+18
| | | | | | | | | | | | | | | | | | This will be easier to use in some cases where we have to get only a specified set of messages. This API uses the __iter__ API internally, but maintains the state to give back only the required set of messages API is - get_messages(count=1)
| * New API for checking pending message countMahendra M2013-05-271-0/+24
| |
* | Auto commit timer is not periodicMahendra M2013-05-281-7/+18
|/ | | | | | | The auto commit timer is one-shot. After the first commit, it does not fire again. This ticket fixes the issue. Also, in util.ReentrantTimer(), some duplicate code was cleaned up
* Fixes #14David Arthur2013-04-111-1/+1
|
* Update kafka-src to latest trunk, enable 0.8.1 featuresDavid Arthur2013-04-021-1/+0
|
* Refactoring a bit, cleanup for 0.8David Arthur2013-04-021-20/+51
| | | | Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
* Big code re-orgDavid Arthur2013-04-021-0/+159