summaryrefslogtreecommitdiff
path: root/kafka/producer/sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r--kafka/producer/sender.py11
1 files changed, 11 insertions, 0 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index f10c34c..f0f77ee 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -26,6 +26,7 @@ class Sender(threading.Thread):
'acks': 1,
'retries': 0,
'request_timeout_ms': 30000,
+ 'guarantee_message_order': False,
'client_id': 'kafka-python-' + __version__,
'api_version': (0, 8, 0),
}
@@ -110,6 +111,12 @@ class Sender(threading.Thread):
batches_by_node = self._accumulator.drain(
self._metadata, ready_nodes, self.config['max_request_size'])
+ if self.config['guarantee_message_order']:
+ # Mute all the partitions drained
+ for batch_list in six.itervalues(batches_by_node):
+ for batch in batch_list:
+ self._accumulator.muted.add(batch.topic_partition)
+
expired_batches = self._accumulator.abort_expired_batches(
self.config['request_timeout_ms'], self._metadata)
@@ -222,6 +229,10 @@ class Sender(threading.Thread):
if getattr(error, 'invalid_metadata', False):
self._metadata.request_update()
+ # Unmute the completed partition.
+ if self.config['guarantee_message_order']:
+ self._accumulator.muted.remove(batch.topic_partition)
+
def _can_retry(self, batch, error):
"""
We can retry a send if the error is transient and the number of