From 610f01e96d3bfd9f632371de5bd6cf911a8e71ef Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 20:01:57 -0700 Subject: Use context managers in gzip_encode / gzip_decode --- kafka/codec.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'kafka/codec.py') diff --git a/kafka/codec.py b/kafka/codec.py index 2279200..7883158 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -24,22 +24,17 @@ def has_snappy(): def gzip_encode(payload): - buffer = BytesIO() - handle = gzip.GzipFile(fileobj=buffer, mode="w") - handle.write(payload) - handle.close() - buffer.seek(0) - result = buffer.read() - buffer.close() + with BytesIO() as buf: + with gzip.GzipFile(fileobj=buf, mode="w") as gzipper: + gzipper.write(payload) + result = buf.getvalue() return result def gzip_decode(payload): - buffer = BytesIO(payload) - handle = gzip.GzipFile(fileobj=buffer, mode='r') - result = handle.read() - handle.close() - buffer.close() + with BytesIO(payload) as buf: + with gzip.GzipFile(fileobj=buf, mode='r') as gzipper: + result = gzipper.read() return result -- cgit v1.2.1 From 4d59678dd38393fcdc2ef627ca717d5e87d0e744 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 23:41:10 -0700 Subject: Gzip context manager not supported in py2.6, so use try/finally instead --- kafka/codec.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) (limited to 'kafka/codec.py') diff --git a/kafka/codec.py b/kafka/codec.py index 7883158..56689ce 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -25,16 +25,31 @@ def has_snappy(): def gzip_encode(payload): with BytesIO() as buf: - with gzip.GzipFile(fileobj=buf, mode="w") as gzipper: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode="w") + try: gzipper.write(payload) + finally: + gzipper.close() + result = buf.getvalue() + return result def gzip_decode(payload): with BytesIO(payload) as buf: - with gzip.GzipFile(fileobj=buf, mode='r') as gzipper: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode='r') + try: result = gzipper.read() + finally: + gzipper.close() + return result -- cgit v1.2.1 From fa4338821976a66134b08eabab79bedefc2a0c67 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 23:41:46 -0700 Subject: Take the linter to kafka/codec.py --- kafka/codec.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'kafka/codec.py') diff --git a/kafka/codec.py b/kafka/codec.py index 56689ce..19f405b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,8 +1,7 @@ -from io import BytesIO import gzip +from io import BytesIO import struct -import six from six.moves import xrange _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) @@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy - _has_snappy = True + _HAS_SNAPPY = True except ImportError: - _has_snappy = False + _HAS_SNAPPY = False def has_gzip(): @@ -20,7 +19,7 @@ def has_gzip(): def has_snappy(): - return _has_snappy + return _HAS_SNAPPY def gzip_encode(payload): @@ -57,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): """Encodes the given data with snappy if xerial_compatible is set then the stream is encoded in a fashion compatible with the xerial snappy library - The block size (xerial_blocksize) controls how frequent the blocking occurs - 32k is the default in the xerial library. + The block size (xerial_blocksize) controls how frequent the blocking + occurs 32k is the default in the xerial library. The format winds up being +-------------+------------+--------------+------------+--------------+ @@ -73,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): length will always be <= blocksize. """ - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if xerial_compatible: @@ -84,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): out = BytesIO() header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) out.write(header) for chunk in _chunker(): @@ -123,13 +122,13 @@ def _detect_xerial_stream(payload): """ if len(payload) > 16: - header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) return header == _XERIAL_V1_HEADER return False def snappy_decode(payload): - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if _detect_xerial_stream(payload): -- cgit v1.2.1