diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-03-29 15:43:31 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-03-29 15:43:31 -0700 |
commit | d02276eb2b9cee77672b95a7a78e9fe9103d58b5 (patch) | |
tree | cf8f8612734a6634d96b149f20f8793e129eb831 | |
parent | 1a5cb0376cfc35d77bbc49072a6d55ca2ee3f7ce (diff) | |
parent | fa4338821976a66134b08eabab79bedefc2a0c67 (diff) | |
download | kafka-python-d02276eb2b9cee77672b95a7a78e9fe9103d58b5.tar.gz |
Merge pull request #337 from dpkp/gzip_context
Use context managers in gzip_encode / gzip_decode
-rw-r--r-- | kafka/codec.py | 55 |
1 files changed, 32 insertions, 23 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 2279200..19f405b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,8 +1,7 @@ -from io import BytesIO import gzip +from io import BytesIO import struct -import six from six.moves import xrange _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) @@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy - _has_snappy = True + _HAS_SNAPPY = True except ImportError: - _has_snappy = False + _HAS_SNAPPY = False def has_gzip(): @@ -20,26 +19,36 @@ def has_gzip(): def has_snappy(): - return _has_snappy + return _HAS_SNAPPY def gzip_encode(payload): - buffer = BytesIO() - handle = gzip.GzipFile(fileobj=buffer, mode="w") - handle.write(payload) - handle.close() - buffer.seek(0) - result = buffer.read() - buffer.close() + with BytesIO() as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode="w") + try: + gzipper.write(payload) + finally: + gzipper.close() + + result = buf.getvalue() + return result def gzip_decode(payload): - buffer = BytesIO(payload) - handle = gzip.GzipFile(fileobj=buffer, mode='r') - result = handle.read() - handle.close() - buffer.close() + with BytesIO(payload) as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode='r') + try: + result = gzipper.read() + finally: + gzipper.close() + return result @@ -47,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): """Encodes the given data with snappy if xerial_compatible is set then the stream is encoded in a fashion compatible with the xerial snappy library - The block size (xerial_blocksize) controls how frequent the blocking occurs - 32k is the default in the xerial library. + The block size (xerial_blocksize) controls how frequent the blocking + occurs 32k is the default in the xerial library. The format winds up being +-------------+------------+--------------+------------+--------------+ @@ -63,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): length will always be <= blocksize. """ - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if xerial_compatible: @@ -74,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): out = BytesIO() header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) out.write(header) for chunk in _chunker(): @@ -113,13 +122,13 @@ def _detect_xerial_stream(payload): """ if len(payload) > 16: - header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) return header == _XERIAL_V1_HEADER return False def snappy_decode(payload): - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if _detect_xerial_stream(payload): |