| Commit message (Collapse) | Author | Age | Files | Lines |
... | |
| |_|_|_|/
|/| | | | |
|
| |_|_|/
|/| | | |
|
|\ \ \ \
| |_|_|/
|/| | | |
Support for multiple hosts on KafkaClient boostrap (improves on #70)
|
| | | | |
|
| | | | |
|
| |\ \ \
| | | |/
| | |/|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Conflicts:
kafka/client.py
kafka/conn.py
setup.py
test/test_integration.py
test/test_unit.py
|
| | | | |
|
| | | | |
|
| |/ /
|/| |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Fixes mumrah/kafka-python#126
TL;DR
=====
This makes it possible to read and write snappy compressed streams that
are compatible with the java and scala kafka clients (the xerial
blocking format))
Xerial Details
==============
Kafka supports transparent compression of data (both in transit and at
rest) of messages, one of the allowable compression algorithms is
Google's snappy, an algorithm which has excellent performance at the
cost of efficiency.
The specific implementation of snappy used in kafka is the xerial-snappy
implementation, this is a readily available java library for snappy.
As part of this implementation, there is a specialised blocking format
that is somewhat none standard in the snappy world.
Xerial Format
-------------
The blocking mode of the xerial snappy library is fairly simple, using a
magic header to identify itself and then a size + block scheme, unless
otherwise noted all items in xerials blocking format are assumed to be
big-endian.
A block size (```xerial_blocksize``` in implementation) controls how
frequent the blocking occurs 32k is the default in the xerial library,
this blocking controls the size of the uncompressed chunks that will be
fed to snappy to be compressed.
The format winds up being
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
| ----------- | ---------- | ------------ | ---------- | ------------ |
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
It is important to not that the blocksize is the amount of uncompressed
data presented to snappy at each block, whereas the blocklen is the
number of bytes that will be present in the stream, that is the
length will always be <= blocksize.
Xerial blocking header
----------------------
Marker | Magic String | Null / Pad | Version | Compat
------ | ------------ | ---------- | -------- | --------
byte | c-string | byte | int32 | int32
------ | ------------ | ---------- | -------- | --------
-126 | 'SNAPPY' | \0 | variable | variable
The pad appears to be to ensure that SNAPPY is a valid cstring, and to
align the header on a word boundary.
The version is the version of this format as written by xerial, in the
wild this is currently 1 as such we only support v1.
Compat is there to claim the minimum supported version that can read a
xerial block stream, presently in the wild this is 1.
Implementation specific details
===============================
The implementation presented here follows the Xerial implementation as
of its v1 blocking format, no attempts are made to check for future
versions. Since none-xerial aware clients might have persisted snappy
compressed messages to kafka brokers we allow clients to turn on xerial
compatibility for message sending, and perform header sniffing to detect
xerial vs plain snappy payloads.
|
|\ \ \
| | | |
| | | | |
Make producers take a topic argument at send rather than init time -- fixes Issue #110, but breaks backwards compatibility with previous Producer interface.
|
| | | | |
|
| | |/
| |/|
| | |
| | | |
This allows a single producer to be used to send to multiple topics.
See https://github.com/mumrah/kafka-python/issues/110
|
|\ \ \
| |/ /
|/| | |
|
| | | |
|
| | | |
|
| | | |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall
or if _fetch() is called multiple times for some reason. We don't want
to re-fetch messages that are already in our queue, so store the offsets
of the last enqueued messages from each partition.
|
| | |
| | |
| | |
| | |
| | | |
* Increment the offset before returning a message rather than when
putting it in the internal queue. This prevents committing the wrong offsets.
* In MultiProcessConsumer, store the offset of the next message
|
|\ \ \
| | | |
| | | | |
Increase default connection timeout
|
| | | | |
|
| |/ /
| | |
| | |
| | | |
This fixes the default behavior, which used to cause a socket timeout
when waiting for 10 seconds for a message to be produced.
|
|\ \ \
| | | |
| | | | |
Changes for aligning code with offset fetch and commit APIs (Kafka 0.8.1)
|
| | | | |
|
| | | | |
|
|\ \ \ \
| |_|/ /
|/| | |
| | | |
| | | |
| | | |
| | | |
| | | | |
mahendra-repr
Conflicts:
kafka/client.py
kafka/consumer.py
|
| | | | |
|
|\ \ \ \
| | | | |
| | | | | |
Branch fix: No infinite loops during metadata requests, invalidate metadata more, exception hierarchy
|
| | | | | |
|
| | | | | |
|
|/ / / / |
|
|\ \ \ \
| |_|/ /
|/| | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
|
| | | | |
|
| | | |
| | | |
| | | | |
Both errors are handled the same way when raised and caught, so this makes sense.
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | | |
* If the connection is dirty, reinit
* If we get a BufferUnderflowError, the server could have gone away, so mark it dirty
|
| | | | |
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
size is too small
Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
|
| | | |
| | | |
| | | |
| | | |
| | | | |
This differentiates between errors that occur when sending the request
and receiving the response, and adds BufferUnderflowError handling.
|
| | | |
| | | |
| | | |
| | | | |
Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
|
| | | |
| | | |
| | | |
| | | |
| | | | |
We always store the offset of the next available message, so we shouldn't decrement the offset deltas
when seeking by an extra 1
|
| | | |
| | | |
| | | |
| | | | |
Will remove once any error handling issues are resolved.
|
| | | | |
|
| | | | |
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
iterator to exit when reached.
Also put constant timeout values in pre-defined constants
|
| | | | |
|
| | | |
| | | |
| | | |
| | | | |
to block forever if it's reached.
|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on https://github.com/mumrah/kafka-python/pull/74 -
don't increase min_bytes if the consumer fetch buffer size is too small.
Notes:
Change MultiProcessConsumer and _mp_consume() accordingly.
Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.
Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:
>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
return self.get(False)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
raise Empty
Queue.Empty
|
| | | | |
|
| | | | |
|