summaryrefslogtreecommitdiff
path: root/kafka/codec.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-08 23:41:46 -0700
committerDana Powers <dana.powers@rd.io>2015-03-09 00:03:16 -0700
commitfa4338821976a66134b08eabab79bedefc2a0c67 (patch)
treeeb8743c274c57ac12f908fe9e5f64618c41f8695 /kafka/codec.py
parent4d59678dd38393fcdc2ef627ca717d5e87d0e744 (diff)
downloadkafka-python-fa4338821976a66134b08eabab79bedefc2a0c67.tar.gz
Take the linter to kafka/codec.py
Diffstat (limited to 'kafka/codec.py')
-rw-r--r--kafka/codec.py21
1 files changed, 10 insertions, 11 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 56689ce..19f405b 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,8 +1,7 @@
-from io import BytesIO
import gzip
+from io import BytesIO
import struct
-import six
from six.moves import xrange
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
@@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
- _has_snappy = True
+ _HAS_SNAPPY = True
except ImportError:
- _has_snappy = False
+ _HAS_SNAPPY = False
def has_gzip():
@@ -20,7 +19,7 @@ def has_gzip():
def has_snappy():
- return _has_snappy
+ return _HAS_SNAPPY
def gzip_encode(payload):
@@ -57,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
"""Encodes the given data with snappy if xerial_compatible is set then the
stream is encoded in a fashion compatible with the xerial snappy library
- The block size (xerial_blocksize) controls how frequent the blocking occurs
- 32k is the default in the xerial library.
+ The block size (xerial_blocksize) controls how frequent the blocking
+ occurs 32k is the default in the xerial library.
The format winds up being
+-------------+------------+--------------+------------+--------------+
@@ -73,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
length will always be <= blocksize.
"""
- if not _has_snappy:
+ if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if xerial_compatible:
@@ -84,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
out = BytesIO()
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
- in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
+ in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
for chunk in _chunker():
@@ -123,13 +122,13 @@ def _detect_xerial_stream(payload):
"""
if len(payload) > 16:
- header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
+ header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False
def snappy_decode(payload):
- if not _has_snappy:
+ if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if _detect_xerial_stream(payload):