| Commit message (Collapse) | Author | Age | Files | Lines |
... | |
|
|
|
| |
Also move the exceptions to common instead of util
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Related to #42
Adds new ConsumerFetchSizeTooSmall exception that is thrown when
`_decode_message_set_iter` gets a BufferUnderflowError but has not yet
yielded a message
In this event, SimpleConsumer will increase the fetch size by 1.5 and
continue the fetching loop while _not_ increasing the offset (basically
just retries the request with a larger fetch size)
Once the consumer fetch size has been increased, it will remain
increased while SimpleConsumer fetches from that partition
|
|
|
|
|
|
|
|
|
|
| |
Was hard coded to 1024 bytes which meant that larger messages were
unconsumable since they would always get split causing the consumer to
stop.
It would probably be best to automatically retry truncated messages with
a larger request size so you don't have to know your max message size
ahead of time
|
|
|
|
|
| |
In the current patch get_messages(count=1) would return zero messages
the first time it is invoked after a consumer was initialized.
|
| |
|
| |
|
| |
|
| |
|
|
|
|
| |
This was hidden because of another bug in offset management
|
|\
| |
| |
| |
| | |
Conflicts:
kafka/consumer.py
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| | |
The previous commit optimized the commit thread such that the timer
started only when there were messages to be consumed. This commit
goes a step further and ensures the following:
* Only one timer thread is created
* The main app does not block on exit (waiting for timer thread to finish)
This is ensured by having a single thread blocking on an event and
keeps calling a function. We use events instead of time.sleep() so
as to prevent the python interpreter from running every 50ms checking
if the timer has expired (logic copied from threading.Timer)
|
| |
| |
| |
| |
| |
| |
| | |
If there are no messages being consumed, the timer keeps
creating new threads at the specified intervals. This may
not be necessary. We can control this behaviour such that
the timer thread is started only when a message is consumed
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| | |
Other changes
* Put a message size restriction on the shared queue
- to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
if the control does not return to the generator. For eg:
for msg in consumer:
assert False
* Update message status before yield
|
| | |
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| | |
The implementation is done by using simple options to
Kafka Fetch Request
Also in the SimpleConsumer iterator, update the offset before the
message is yielded. This is so that the consumer state is not lost
if certain cases.
For eg: the message is yielded and consumed by the caller,
but the caller does not come back into the generator again.
The message will be consumed but the status is not updated in
the consumer
|
| | |
|
| | |
|
| | |
|
|/
|
|
|
|
| |
Currently the kafka SimpleConsumer consumes messages from all
partitions. This commit will ensure that data is consumed only
from partitions specified during init
|
| |
|
| |
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
* When you initiate a producer with a non-existant queue, the queue is
created. However this partition info is not reflected in KafkaClient()
immediately. So, we wait for a second and try loading it again.
Without this fix, if we do producer.send_messages() after creating a new
queue, the library will throw a StopIteration exception.
* In SimpleConsumer(), the defaults are not as mentioned in the comments.
Fix this (or do we change the documentation?)
* There was a problem with the way the consumer iterator worked.
for eg: assume that there were 10 messages in the queue/topic
and you iterate over it as -
for msg in consumer:
print (msg)
At the end of this, 'offset' that is saved is 10.
So, if you run the above loop again, the last message (10) is repeated.
This can be fixed by adjusting the offset counter before fetching
the message
* Avoid some code repeat in consumer.commit()
* Fix a bug in send_offset_commit_request() invocation in consumer.py
* Fix missing imports
|
|\
| |
| |
| |
| | |
Conflicts:
kafka/consumer.py
|
| |
| |
| |
| | |
Removed get_messages API, added test for get_pending
|
| | |
|
| |
| |
| |
| |
| |
| |
| |
| |
| | |
This will be easier to use in some cases where we have to get
only a specified set of messages. This API uses the __iter__
API internally, but maintains the state to give back only the
required set of messages
API is - get_messages(count=1)
|
| | |
|
|/
|
|
|
|
|
| |
The auto commit timer is one-shot. After the first commit, it does
not fire again. This ticket fixes the issue.
Also, in util.ReentrantTimer(), some duplicate code was cleaned up
|
| |
|
| |
|
|
|
|
| |
Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
|
|
|