summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
authorPatrick Lucas <plucas@yelp.com>2014-05-07 10:02:57 -0700
committerPatrick Lucas <plucas@yelp.com>2014-05-07 10:08:14 -0700
commit805b52a34da9ce0dead80a64d7315412f2034673 (patch)
tree8d9a6d6b378e40ce3177d6fe7ee4030abf362718 /kafka/producer.py
parent39796ec49162a0895be98a8aeb0e42e8319e5e30 (diff)
downloadkafka-python-805b52a34da9ce0dead80a64d7315412f2034673.tar.gz
Improve error handling and tests w.r.t. codecs
Add function kafka.protocol.create_message_set() that takes a list of payloads and a codec and returns a message set with the desired encoding. Introduce kafka.common.UnsupportedCodecError, raised if an unknown codec is specified. Include a test for the new function.
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py29
1 files changed, 9 insertions, 20 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 9ecb341..8e40be5 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -9,12 +9,11 @@ from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from kafka.common import ProduceRequest, TopicAndPartition
-from kafka.partitioner import HashedPartitioner
-from kafka.protocol import (
- CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
- create_message, create_gzip_message, create_snappy_message,
+from kafka.common import (
+ ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
+from kafka.partitioner import HashedPartitioner
+from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
log = logging.getLogger("kafka")
@@ -66,13 +65,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
reqs = []
for topic_partition, msg in msgset.items():
- if codec == CODEC_GZIP:
- messages = [create_gzip_message(msg)]
- elif codec == CODEC_SNAPPY:
- messages = [create_snappy_message(msg)]
- else:
- messages = [create_message(m) for m in msg]
-
+ messages = create_message_set(msg, codec)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
@@ -132,7 +125,9 @@ class Producer(object):
if codec is None:
codec = CODEC_NONE
- assert codec in ALL_CODECS
+ elif codec not in ALL_CODECS:
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+
self.codec = codec
if self.async:
@@ -159,13 +154,7 @@ class Producer(object):
self.queue.put((TopicAndPartition(topic, partition), m))
resp = []
else:
- if self.codec == CODEC_GZIP:
- messages = [create_gzip_message(msg)]
- elif self.codec == CODEC_SNAPPY:
- messages = [create_snappy_message(msg)]
- else:
- messages = [create_message(m) for m in msg]
-
+ messages = create_message_set(msg, self.codec)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,