| Commit message (Collapse) | Author | Age | Files | Lines |
... | |
|\ \ \
| |/ /
|/| | |
|
| | | |
|
| | | |
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall
or if _fetch() is called multiple times for some reason. We don't want
to re-fetch messages that are already in our queue, so store the offsets
of the last enqueued messages from each partition.
|
| | |
| | |
| | |
| | |
| | | |
* Increment the offset before returning a message rather than when
putting it in the internal queue. This prevents committing the wrong offsets.
* In MultiProcessConsumer, store the offset of the next message
|
|\ \ \
| | | |
| | | | |
Increase default connection timeout
|
| | | | |
|
| |/ /
| | |
| | |
| | | |
This fixes the default behavior, which used to cause a socket timeout
when waiting for 10 seconds for a message to be produced.
|
|\ \ \
| | | |
| | | | |
Changes for aligning code with offset fetch and commit APIs (Kafka 0.8.1)
|
| | | | |
|
| | | | |
|
|\ \ \ \
| |_|/ /
|/| | |
| | | |
| | | |
| | | |
| | | |
| | | | |
mahendra-repr
Conflicts:
kafka/client.py
kafka/consumer.py
|
| | | | |
|
|\ \ \ \
| | | | |
| | | | | |
Branch fix: No infinite loops during metadata requests, invalidate metadata more, exception hierarchy
|
| | | | | |
|
| | | | | |
|
|/ / / / |
|
|\ \ \ \
| |_|/ /
|/| | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
|
| | | | |
|
| | | |
| | | |
| | | | |
Both errors are handled the same way when raised and caught, so this makes sense.
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | | |
* If the connection is dirty, reinit
* If we get a BufferUnderflowError, the server could have gone away, so mark it dirty
|
| | | | |
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
size is too small
Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
|
| | | |
| | | |
| | | |
| | | |
| | | | |
This differentiates between errors that occur when sending the request
and receiving the response, and adds BufferUnderflowError handling.
|
| | | |
| | | |
| | | |
| | | | |
Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
|
| | | |
| | | |
| | | |
| | | |
| | | | |
We always store the offset of the next available message, so we shouldn't decrement the offset deltas
when seeking by an extra 1
|
| | | |
| | | |
| | | |
| | | | |
Will remove once any error handling issues are resolved.
|
| | | | |
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
iterator to exit when reached.
Also put constant timeout values in pre-defined constants
|
| | | | |
|
| | | |
| | | |
| | | |
| | | | |
to block forever if it's reached.
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on https://github.com/mumrah/kafka-python/pull/74 -
don't increase min_bytes if the consumer fetch buffer size is too small.
Notes:
Change MultiProcessConsumer and _mp_consume() accordingly.
Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.
Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:
>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
return self.get(False)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
raise Empty
Queue.Empty
|
| | | | |
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
* Remove bufsize from client and conn, since they're not actually enforced
Notes:
This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.
Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
According to the protocol documentation, the 4 byte integer at the beginning
of a response represents the size of the payload only, not including those bytes.
See http://goo.gl/rg5uom
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Previously, if you try to consume a message with a timeout greater than 10 seconds,
but you don't receive data in those 10 seconds, a socket.timeout exception is raised.
This allows a higher socket timeout to be set, or even None for no timeout.
|
|\ \ \ \
| |/ / /
|/| | | |
Enable absolute imports for modules using Queue.
|
| | |/
| |/|
| | |
| | |
| | |
| | |
| | | |
When running on Linux with code on a case-insensitive file system,
imports of the `Queue` module fail because python resolves the
wrong file (It is trying to use a relative import of `queue.py` in
the kafka directory). This change forces absolute imports via PEP328.
|
|\ \ \
| | | |
| | | | |
Replace _send_upstream datetime logic with simpler time().
|
| | | | |
|
|/ / / |
|
|/ / |
|
|/ |
|
|\
| |
| |
| |
| | |
Conflicts:
kafka/producer.py
|
| |
| |
| |
| |
| |
| |
| |
| | |
failed_messages
- add integration tests for sync producer
- add integration tests for async producer w. leadership election
- use log.exception
|