summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 15:37:17 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 15:44:15 -0800
commita3ec9bd8e8c730c9f6715b536c0c590230fc2e28 (patch)
treeeaebf6dc87ffb83d7256497355c5559f5eec5d72 /kafka/producer
parentad030ccd4df57305bb624b03eddaa2641f956160 (diff)
downloadkafka-python-a3ec9bd8e8c730c9f6715b536c0c590230fc2e28.tar.gz
Update references to kafka.common Request/Response (now Payload)
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/base.py17
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 39b1f84..3f2bba6 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -15,7 +15,7 @@ from threading import Thread, Event
import six
from kafka.common import (
- ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions,
+ ProduceRequestPayload, ProduceResponsePayload, TopicAndPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
@@ -133,9 +133,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key, codec_compresslevel)
- req = ProduceRequest(topic_partition.topic,
- topic_partition.partition,
- tuple(messages))
+ req = ProduceRequestPayload(
+ topic_partition.topic,
+ topic_partition.partition,
+ tuple(messages))
request_tries[req] = 0
if not request_tries:
@@ -169,13 +170,13 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
error_cls = response.__class__
orig_req = response.payload
- elif isinstance(response, ProduceResponse) and response.error:
+ elif isinstance(response, ProduceResponsePayload) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
orig_req = requests[i]
if error_cls:
_handle_error(error_cls, orig_req)
- log.error('%s sending ProduceRequest (#%d of %d) '
+ log.error('%s sending ProduceRequestPayload (#%d of %d) '
'to %s:%d with msgs %s',
error_cls.__name__, (i + 1), len(requests),
orig_req.topic, orig_req.partition,
@@ -210,7 +211,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Log messages we are going to retry
for orig_req in request_tries.keys():
- log.info('Retrying ProduceRequest to %s:%d with msgs %s',
+ log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s',
orig_req.topic, orig_req.partition,
orig_req.messages if log_messages_on_error
else hash(orig_req.messages))
@@ -404,7 +405,7 @@ class Producer(object):
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
- req = ProduceRequest(topic, partition, messages)
+ req = ProduceRequestPayload(topic, partition, messages)
try:
resp = self.client.send_produce_request(
[req], acks=self.req_acks, timeout=self.ack_timeout,