diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer.py | 3 | ||||
-rw-r--r-- | kafka/util.py | 8 |
2 files changed, 9 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 3f8d8c2..98f18a0 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from itertools import izip_longest, repeat import logging import time +import numbers from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue @@ -81,6 +82,8 @@ class Consumer(object): if not partitions: partitions = self.client.topic_partitions[topic] + else: + assert all(isinstance(x, numbers.Integral) for x in partitions) # Variables for handling offset commits self.commit_lock = Lock() diff --git a/kafka/util.py b/kafka/util.py index 54052fb..0577a88 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,5 +1,6 @@ -from collections import defaultdict +import sys import struct +import collections from threading import Thread, Event from kafka.common import BufferUnderflowError @@ -15,6 +16,9 @@ def write_int_string(s): def write_short_string(s): if s is None: return struct.pack('>h', -1) + elif len(s) > 32767 and sys.version < (2,7): + # Python 2.6 issues a deprecation warning instead of a struct error + raise struct.error(len(s)) else: return struct.pack('>h%ds' % len(s), len(s), s) @@ -63,7 +67,7 @@ def relative_unpack(fmt, data, cur): def group_by_topic_and_partition(tuples): - out = defaultdict(dict) + out = collections.defaultdict(dict) for t in tuples: out[t.topic][t.partition] = t return out |