summaryrefslogtreecommitdiff
path: root/kafka/util.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-02-22 23:09:25 -0500
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit2a3d231aa61642c57537bc2128dd4f2bd30f35dd (patch)
tree6bfdfa13b228481df9c79bcb926c2036b476b891 /kafka/util.py
parente87c561723be25fcfa2564030367196231aa366e (diff)
downloadkafka-python-2a3d231aa61642c57537bc2128dd4f2bd30f35dd.tar.gz
Protocol and low-level client done, adding tests
Diffstat (limited to 'kafka/util.py')
-rw-r--r--kafka/util.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/util.py b/kafka/util.py
index cb8f7f5..509c5b8 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,15 +1,16 @@
+from collections import defaultdict
from itertools import groupby
import struct
def write_int_string(s):
if s is None:
- return struct.pack('>i', -1)
+ return struct.pack('>i', 0) # TODO change this to -1 when KAFKA-771 is accepted
else:
return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
if s is None:
- return struct.pack('>h', -1)
+ return struct.pack('>h', 0) # TODO change this to -1 when KAFKA-771 is accepted
else:
return struct.pack('>h%ds' % len(s), len(s), s)
@@ -44,12 +45,11 @@ def relative_unpack(fmt, data, cur):
out = struct.unpack(fmt, data[cur:cur+size])
return (out, cur+size)
-def group_list_by_key(it, key):
- sorted_it = sorted(it, key=key)
- out = {}
- for k, group in groupby(sorted_it, key=key):
- out[k] = list(group)
- return out
+def group_by_topic_and_partition(tuples):
+ out = defaultdict(dict)
+ for t in tuples:
+ out[t.topic][t.partition] = t
+ return out
class BufferUnderflowError(Exception):
pass