summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py29
1 files changed, 9 insertions, 20 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 9ecb341..8e40be5 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -9,12 +9,11 @@ from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from kafka.common import ProduceRequest, TopicAndPartition
-from kafka.partitioner import HashedPartitioner
-from kafka.protocol import (
- CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
- create_message, create_gzip_message, create_snappy_message,
+from kafka.common import (
+ ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
+from kafka.partitioner import HashedPartitioner
+from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
log = logging.getLogger("kafka")
@@ -66,13 +65,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
reqs = []
for topic_partition, msg in msgset.items():
- if codec == CODEC_GZIP:
- messages = [create_gzip_message(msg)]
- elif codec == CODEC_SNAPPY:
- messages = [create_snappy_message(msg)]
- else:
- messages = [create_message(m) for m in msg]
-
+ messages = create_message_set(msg, codec)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
@@ -132,7 +125,9 @@ class Producer(object):
if codec is None:
codec = CODEC_NONE
- assert codec in ALL_CODECS
+ elif codec not in ALL_CODECS:
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+
self.codec = codec
if self.async:
@@ -159,13 +154,7 @@ class Producer(object):
self.queue.put((TopicAndPartition(topic, partition), m))
resp = []
else:
- if self.codec == CODEC_GZIP:
- messages = [create_gzip_message(msg)]
- elif self.codec == CODEC_SNAPPY:
- messages = [create_snappy_message(msg)]
- else:
- messages = [create_message(m) for m in msg]
-
+ messages = create_message_set(msg, self.codec)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,