| Commit message (Collapse) | Author | Age | Files | Lines |
... | |
| | | |
|
| | | |
|
| | | |
|
| | |
| | |
| | |
| | | |
working on intermittent failures in test_encode_fetch_request and test_encode_produc_request
|
| | | |
|
| | | |
|
|/ / |
|
|\ \
| | |
| | | |
TopicAndPartition fix when partition has no leader = -1
|
| | | |
|
| |\ \
| | |/
| | |
| | |
| | | |
Conflicts:
test/test_unit.py
|
| | | |
|
| | |
| | |
| | |
| | | |
clarity
|
| | | |
|
| |\ \ |
|
| | | | |
|
| | | | |
|
| | | | |
|
| |_|/
|/| | |
|
|\ \ \
| | | |
| | | | |
Support for multiple hosts on KafkaClient boostrap (improves on #70)
|
| | | | |
|
| | | | |
|
| |\ \ \
| | | |/
| | |/|
| | | |
| | | |
| | | |
| | | |
| | | |
| | | | |
Conflicts:
kafka/client.py
kafka/conn.py
setup.py
test/test_integration.py
test/test_unit.py
|
| | | | |
|
| |/ /
|/| |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | | |
Fixes mumrah/kafka-python#126
TL;DR
=====
This makes it possible to read and write snappy compressed streams that
are compatible with the java and scala kafka clients (the xerial
blocking format))
Xerial Details
==============
Kafka supports transparent compression of data (both in transit and at
rest) of messages, one of the allowable compression algorithms is
Google's snappy, an algorithm which has excellent performance at the
cost of efficiency.
The specific implementation of snappy used in kafka is the xerial-snappy
implementation, this is a readily available java library for snappy.
As part of this implementation, there is a specialised blocking format
that is somewhat none standard in the snappy world.
Xerial Format
-------------
The blocking mode of the xerial snappy library is fairly simple, using a
magic header to identify itself and then a size + block scheme, unless
otherwise noted all items in xerials blocking format are assumed to be
big-endian.
A block size (```xerial_blocksize``` in implementation) controls how
frequent the blocking occurs 32k is the default in the xerial library,
this blocking controls the size of the uncompressed chunks that will be
fed to snappy to be compressed.
The format winds up being
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
| ----------- | ---------- | ------------ | ---------- | ------------ |
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
It is important to not that the blocksize is the amount of uncompressed
data presented to snappy at each block, whereas the blocklen is the
number of bytes that will be present in the stream, that is the
length will always be <= blocksize.
Xerial blocking header
----------------------
Marker | Magic String | Null / Pad | Version | Compat
------ | ------------ | ---------- | -------- | --------
byte | c-string | byte | int32 | int32
------ | ------------ | ---------- | -------- | --------
-126 | 'SNAPPY' | \0 | variable | variable
The pad appears to be to ensure that SNAPPY is a valid cstring, and to
align the header on a word boundary.
The version is the version of this format as written by xerial, in the
wild this is currently 1 as such we only support v1.
Compat is there to claim the minimum supported version that can read a
xerial block stream, presently in the wild this is 1.
Implementation specific details
===============================
The implementation presented here follows the Xerial implementation as
of its v1 blocking format, no attempts are made to check for future
versions. Since none-xerial aware clients might have persisted snappy
compressed messages to kafka brokers we allow clients to turn on xerial
compatibility for message sending, and perform header sniffing to detect
xerial vs plain snappy payloads.
|
| |/
|/|
| |
| | |
This allows a single producer to be used to send to multiple topics.
See https://github.com/mumrah/kafka-python/issues/110
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
| |
| |
| |
| | |
This is better since the tests stop/start brokers, and if something goes wrong
they can affect eachother.
|
| | |
|
| | |
|
| |
| |
| |
| |
| |
| |
| | |
integration tests
If some of the tests stop brokers then error out, the teardown method will try to close the
same brokers and fail. This change allows it to continue.
|
| | |
|
|/
|
|
|
|
| |
This is pretty much a rewrite. The tests that involve offset requests/responses
are not implemented since that API is not supported in kafka 0.8 yet.
Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
|
|
|
|
|
| |
So we can run tests against an already running Kafka instance
KAFKA_URI=tcp://localhost:9092 python -m test.test_integration
|
|
|
|
|
|
|
|
| |
failed_messages
- add integration tests for sync producer
- add integration tests for async producer w. leadership election
- use log.exception
|
| |
|
|
|
|
| |
This reverts commit e39e05f8a50b7528a22fed99dc67d561cbd79c41.
|
| |
|
|
|
|
|
|
|
|
|
| |
Sync tests and fixtures with kafka 0.8.0-beta1 tag
Conflicts:
README.md
kafka-src
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Related to #42
Adds new ConsumerFetchSizeTooSmall exception that is thrown when
`_decode_message_set_iter` gets a BufferUnderflowError but has not yet
yielded a message
In this event, SimpleConsumer will increase the fetch size by 1.5 and
continue the fetching loop while _not_ increasing the offset (basically
just retries the request with a larger fetch size)
Once the consumer fetch size has been increased, it will remain
increased while SimpleConsumer fetches from that partition
|
|
|
|
|
|
|
|
|
|
| |
Was hard coded to 1024 bytes which meant that larger messages were
unconsumable since they would always get split causing the consumer to
stop.
It would probably be best to automatically retry truncated messages with
a larger request size so you don't have to know your max message size
ahead of time
|
|\
| |
| |
| |
| |
| |
| | |
Conflicts:
kafka/__init__.py
kafka/consumer.py
test/test_integration.py
|
| | |
|
|/ |
|
|
|
|
| |
This alleviates IPv4 -vs- IPv6 issues in ZK and Kafka.
|
| |
|