| Commit message (Collapse) | Author | Age | Files | Lines |
... | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
* Remove bufsize from client and conn, since they're not actually enforced
Notes:
This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.
Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
According to the protocol documentation, the 4 byte integer at the beginning
of a response represents the size of the payload only, not including those bytes.
See http://goo.gl/rg5uom
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Previously, if you try to consume a message with a timeout greater than 10 seconds,
but you don't receive data in those 10 seconds, a socket.timeout exception is raised.
This allows a higher socket timeout to be set, or even None for no timeout.
|
|\ \ \ \
| |/ / /
|/| | | |
Enable absolute imports for modules using Queue.
|
| | |/
| |/|
| | |
| | |
| | |
| | |
| | | |
When running on Linux with code on a case-insensitive file system,
imports of the `Queue` module fail because python resolves the
wrong file (It is trying to use a relative import of `queue.py` in
the kafka directory). This change forces absolute imports via PEP328.
|
|\ \ \
| | | |
| | | | |
Replace _send_upstream datetime logic with simpler time().
|
| | | | |
|
|/ / / |
|
|/ / |
|
|/ |
|
|\
| |
| |
| |
| | |
Conflicts:
kafka/producer.py
|
| |
| |
| |
| |
| |
| |
| |
| | |
failed_messages
- add integration tests for sync producer
- add integration tests for async producer w. leadership election
- use log.exception
|
|/
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
As per the multiprocessing module's documentation, the objects
passed to the Process() class must be pickle-able in Windows.
So, the Async producer did not work in windows.
To fix this we have to ensure that code which uses multiprocessing
has to follow certain rules
* The target=func should not be a member function
* We cannot pass objects like socket() to multiprocessing
This ticket fixes these issues. For KafkaClient and KafkaConnection
objects, we make copies of the object and reinit() them inside the
child processes.
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
|
|
|
|
|
| |
Conflicts:
kafka/util.py
|
| |
|
| |
|
|
|
|
| |
Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)
|
| |
|
|
|
|
| |
Also move the exceptions to common instead of util
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Related to #42
Adds new ConsumerFetchSizeTooSmall exception that is thrown when
`_decode_message_set_iter` gets a BufferUnderflowError but has not yet
yielded a message
In this event, SimpleConsumer will increase the fetch size by 1.5 and
continue the fetching loop while _not_ increasing the offset (basically
just retries the request with a larger fetch size)
Once the consumer fetch size has been increased, it will remain
increased while SimpleConsumer fetches from that partition
|
|
|
|
|
|
|
|
|
|
| |
Was hard coded to 1024 bytes which meant that larger messages were
unconsumable since they would always get split causing the consumer to
stop.
It would probably be best to automatically retry truncated messages with
a larger request size so you don't have to know your max message size
ahead of time
|
|\
| |
| |
| |
| |
| |
| | |
Conflicts:
kafka/__init__.py
kafka/consumer.py
test/test_integration.py
|
| |
| |
| |
| |
| | |
In the current patch get_messages(count=1) would return zero messages
the first time it is invoked after a consumer was initialized.
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
| |
| |
| | |
This was hidden because of another bug in offset management
|
| |\
| | |
| | |
| | |
| | | |
Conflicts:
kafka/consumer.py
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Other changes
* Put a message size restriction on the shared queue
- to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
if the control does not return to the generator. For eg:
for msg in consumer:
assert False
* Update message status before yield
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
The implementation is done by using simple options to
Kafka Fetch Request
Also in the SimpleConsumer iterator, update the offset before the
message is yielded. This is so that the consumer state is not lost
if certain cases.
For eg: the message is yielded and consumed by the caller,
but the caller does not come back into the generator again.
The message will be consumed but the status is not updated in
the consumer
|
| | | |
|
| | | |
|
| | | |
|
| | | |
|
| |\ \ |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Currently the kafka SimpleConsumer consumes messages from all
partitions. This commit will ensure that data is consumed only
from partitions specified during init
|
|\ \ \ \
| | | | |
| | | | |
| | | | |
| | | | | |
Support for async producer
Merged locally, tests pass, +1
|
| | | | | |
|