diff options
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 19 |
1 files changed, 8 insertions, 11 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 95e797a..3ff1a09 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -23,6 +23,7 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition + log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() @@ -375,13 +376,13 @@ class KafkaProducer(object): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - wakeup_timeout_ms=self.config['max_block_ms'], - **self.config) + client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: - self.config['api_version'] = self._client.config['api_version'] + self.config['api_version'] = client.config['api_version'] if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' @@ -397,9 +398,9 @@ class KafkaProducer(object): message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) - self._metadata = self._client.cluster + self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(self._client, self._metadata, + self._sender = Sender(client, self._metadata, self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) @@ -413,20 +414,16 @@ class KafkaProducer(object): def bootstrap_connected(self): """Return True if the bootstrap is connected.""" - if self._client._bootstrap_fails > 0: - return False - return True + return self._sender.bootstrap_connected() def _cleanup_factory(self): """Build a cleanup clojure that doesn't increase our ref count""" _self = weakref.proxy(self) - def wrapper(): try: _self.close(timeout=0) except (ReferenceError, AttributeError): pass - return wrapper def _unregister_cleanup(self): |