diff options
Diffstat (limited to 'kafka/producer/sender.py')
-rw-r--r-- | kafka/producer/sender.py | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index f10c34c..f0f77ee 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -26,6 +26,7 @@ class Sender(threading.Thread): 'acks': 1, 'retries': 0, 'request_timeout_ms': 30000, + 'guarantee_message_order': False, 'client_id': 'kafka-python-' + __version__, 'api_version': (0, 8, 0), } @@ -110,6 +111,12 @@ class Sender(threading.Thread): batches_by_node = self._accumulator.drain( self._metadata, ready_nodes, self.config['max_request_size']) + if self.config['guarantee_message_order']: + # Mute all the partitions drained + for batch_list in six.itervalues(batches_by_node): + for batch in batch_list: + self._accumulator.muted.add(batch.topic_partition) + expired_batches = self._accumulator.abort_expired_batches( self.config['request_timeout_ms'], self._metadata) @@ -222,6 +229,10 @@ class Sender(threading.Thread): if getattr(error, 'invalid_metadata', False): self._metadata.request_update() + # Unmute the completed partition. + if self.config['guarantee_message_order']: + self._accumulator.muted.remove(batch.topic_partition) + def _can_retry(self, batch, error): """ We can retry a send if the error is transient and the number of |