| Commit message (Collapse) | Author | Age | Files | Lines |
|
|
|
| |
in memory logging. Address code review concerns
|
| |
|
| |
|
|
|
|
|
|
|
|
| |
Bump version number to 0.9.1
Update readme to show supported Kafka/Python versions
Validate arguments in consumer.py, add initial consumer unit test
Make service kill() child processes when startup fails
Add tests for util.py, fix Python 2.6 specific bug.
|
| |
|
|
|
|
| |
integration tests, make skipped integration also skip setupClass, implement rudimentary offset support in consumer.py
|
| |
|
| |
|
| |
|
|\ |
|
| | |
|
| | |
|
| | |
|
| |
| |
| |
| |
| |
| |
| | |
Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall
or if _fetch() is called multiple times for some reason. We don't want
to re-fetch messages that are already in our queue, so store the offsets
of the last enqueued messages from each partition.
|
| |
| |
| |
| |
| | |
* Increment the offset before returning a message rather than when
putting it in the internal queue. This prevents committing the wrong offsets.
* In MultiProcessConsumer, store the offset of the next message
|
|\ \
| |/
|/|
| |
| |
| |
| |
| | |
mahendra-repr
Conflicts:
kafka/client.py
kafka/consumer.py
|
| | |
|
| | |
|
|\ \
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | | |
size is too small
Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
|
| | |
| | |
| | |
| | |
| | | |
We always store the offset of the next available message, so we shouldn't decrement the offset deltas
when seeking by an extra 1
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | | |
iterator to exit when reached.
Also put constant timeout values in pre-defined constants
|
| | | |
|
| | |
| | |
| | |
| | | |
to block forever if it's reached.
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on https://github.com/mumrah/kafka-python/pull/74 -
don't increase min_bytes if the consumer fetch buffer size is too small.
Notes:
Change MultiProcessConsumer and _mp_consume() accordingly.
Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.
Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:
>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
return self.get(False)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
raise Empty
Queue.Empty
|
| | | |
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
* Remove bufsize from client and conn, since they're not actually enforced
Notes:
This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.
Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
|
|\ \ \
| |/ /
|/| | |
Enable absolute imports for modules using Queue.
|
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
When running on Linux with code on a case-insensitive file system,
imports of the `Queue` module fail because python resolves the
wrong file (It is trying to use a relative import of `queue.py` in
the kafka directory). This change forces absolute imports via PEP328.
|
|/ / |
|
|/ |
|
| |
|
| |
|
|
|
|
| |
Set FetchRequest MaxBytes value to bufsize instead of fetchsize (=MinBytes)
|
|
|
|
| |
Also move the exceptions to common instead of util
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Related to #42
Adds new ConsumerFetchSizeTooSmall exception that is thrown when
`_decode_message_set_iter` gets a BufferUnderflowError but has not yet
yielded a message
In this event, SimpleConsumer will increase the fetch size by 1.5 and
continue the fetching loop while _not_ increasing the offset (basically
just retries the request with a larger fetch size)
Once the consumer fetch size has been increased, it will remain
increased while SimpleConsumer fetches from that partition
|
|
|
|
|
|
|
|
|
|
| |
Was hard coded to 1024 bytes which meant that larger messages were
unconsumable since they would always get split causing the consumer to
stop.
It would probably be best to automatically retry truncated messages with
a larger request size so you don't have to know your max message size
ahead of time
|
|
|
|
|
| |
In the current patch get_messages(count=1) would return zero messages
the first time it is invoked after a consumer was initialized.
|
| |
|
| |
|
| |
|
| |
|
|
|
|
| |
This was hidden because of another bug in offset management
|
|\
| |
| |
| |
| | |
Conflicts:
kafka/consumer.py
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| | |
The previous commit optimized the commit thread such that the timer
started only when there were messages to be consumed. This commit
goes a step further and ensures the following:
* Only one timer thread is created
* The main app does not block on exit (waiting for timer thread to finish)
This is ensured by having a single thread blocking on an event and
keeps calling a function. We use events instead of time.sleep() so
as to prevent the python interpreter from running every 50ms checking
if the timer has expired (logic copied from threading.Timer)
|
| |
| |
| |
| |
| |
| |
| | |
If there are no messages being consumed, the timer keeps
creating new threads at the specified intervals. This may
not be necessary. We can control this behaviour such that
the timer thread is started only when a message is consumed
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| | |
Other changes
* Put a message size restriction on the shared queue
- to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
if the control does not return to the generator. For eg:
for msg in consumer:
assert False
* Update message status before yield
|