| Commit message (Collapse) | Author | Age | Files | Lines |
... | |
|/
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
As per the multiprocessing module's documentation, the objects
passed to the Process() class must be pickle-able in Windows.
So, the Async producer did not work in windows.
To fix this we have to ensure that code which uses multiprocessing
has to follow certain rules
* The target=func should not be a member function
* We cannot pass objects like socket() to multiprocessing
This ticket fixes these issues. For KafkaClient and KafkaConnection
objects, we make copies of the object and reinit() them inside the
child processes.
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
|
|
|
|
|
| |
Conflicts:
kafka/util.py
|
| |
|
| |
|
|
|
|
| |
Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)
|
| |
|
|
|
|
| |
Also move the exceptions to common instead of util
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Related to #42
Adds new ConsumerFetchSizeTooSmall exception that is thrown when
`_decode_message_set_iter` gets a BufferUnderflowError but has not yet
yielded a message
In this event, SimpleConsumer will increase the fetch size by 1.5 and
continue the fetching loop while _not_ increasing the offset (basically
just retries the request with a larger fetch size)
Once the consumer fetch size has been increased, it will remain
increased while SimpleConsumer fetches from that partition
|
|
|
|
|
|
|
|
|
|
| |
Was hard coded to 1024 bytes which meant that larger messages were
unconsumable since they would always get split causing the consumer to
stop.
It would probably be best to automatically retry truncated messages with
a larger request size so you don't have to know your max message size
ahead of time
|
|\
| |
| |
| |
| |
| |
| | |
Conflicts:
kafka/__init__.py
kafka/consumer.py
test/test_integration.py
|
| |
| |
| |
| |
| | |
In the current patch get_messages(count=1) would return zero messages
the first time it is invoked after a consumer was initialized.
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
| |
| |
| | |
This was hidden because of another bug in offset management
|
| |\
| | |
| | |
| | |
| | | |
Conflicts:
kafka/consumer.py
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Other changes
* Put a message size restriction on the shared queue
- to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
if the control does not return to the generator. For eg:
for msg in consumer:
assert False
* Update message status before yield
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
The implementation is done by using simple options to
Kafka Fetch Request
Also in the SimpleConsumer iterator, update the offset before the
message is yielded. This is so that the consumer state is not lost
if certain cases.
For eg: the message is yielded and consumed by the caller,
but the caller does not come back into the generator again.
The message will be consumed but the status is not updated in
the consumer
|
| | | |
|
| | | |
|
| | | |
|
| | | |
|
| |\ \ |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Currently the kafka SimpleConsumer consumes messages from all
partitions. This commit will ensure that data is consumed only
from partitions specified during init
|
|\ \ \ \
| | | | |
| | | | |
| | | | |
| | | | | |
Support for async producer
Merged locally, tests pass, +1
|
| | | | | |
|
| | | | | |
|
| | | | | |
|
| | | | | |
|
| | | | |
| | | | |
| | | | |
| | | | |
| | | | | |
Also improve on the logic for stopping the async Processor instance.
Ensure that unsend messages are sent before it is stopped.
|
| |\ \ \ \
| | | |_|/
| | |/| | |
|
| | | | | |
|
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | |
| | | | | |
Also, ensure that the case of 'no-acks' works fine
In conn.send(), do not wait for the response. Wait for it only on
conn.recv(). This behaviour is fine now since the connection is not
shared among consumer threads etc.
|
| | | | | |
|
| | | | |
| | | | |
| | | | |
| | | | |
| | | | | |
Add support for two options in the producer - req_acks and ack_timeout
The acks, if any, are passed to the caller directly
|
| |\ \ \ \
| | | |_|/
| | |/| |
| | | | |
| | | | | |
Conflicts:
kafka/producer.py
|
| | |_|/
| |/| |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
The Java/Scala Kafka client supports a mechanism for sending
messages asynchronously by using a queue and a thread.
Messages are put on the queue and the worker thread keeps sending
it to the broker.
This ticket implements this feature in python
We use multiprocessing instead of threads to send the messages
|
| |_|/
|/| |
| | |
| | |
| | |
| | | |
Also start a change log for important stuff like this
Closes #34
|