diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-15 18:56:45 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-15 18:56:45 -0800 |
commit | 876791430513ac819d37e9877661387958d50fe4 (patch) | |
tree | 7caec15019d8f0ffb077b871bf9c63670607ca55 | |
parent | 742755d8d813262d6ccf09907fb3130bc47fbdb5 (diff) | |
download | kafka-python-876791430513ac819d37e9877661387958d50fe4.tar.gz |
Remove unused internal sender lock
-rw-r--r-- | kafka/producer/kafka.py | 5 | ||||
-rw-r--r-- | kafka/producer/sender.py | 39 |
2 files changed, 20 insertions, 24 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e8601c8..0a40325 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -254,9 +254,8 @@ class KafkaProducer(object): self._accumulator = RecordAccumulator(**self.config) self._metadata = client.cluster - self._metadata_lock = threading.Condition() - self._sender = Sender(client, self._metadata, self._metadata_lock, - self._accumulator, **self.config) + self._sender = Sender(client, self._metadata, self._accumulator, + **self.config) self._sender.daemon = True self._sender.start() self._closed = False diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 1f637b4..0e6d6cd 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -32,7 +32,7 @@ class Sender(threading.Thread): 'client_id': 'kafka-python-' + __version__, } - def __init__(self, client, metadata, lock, accumulator, **configs): + def __init__(self, client, metadata, accumulator, **configs): super(Sender, self).__init__() self.config = copy.copy(self._DEFAULT_CONFIG) for key in self.config: @@ -43,7 +43,6 @@ class Sender(threading.Thread): self._client = client self._accumulator = accumulator self._metadata = client.cluster - self._lock = lock self._running = True self._force_close = False self._topics_to_add = [] @@ -98,8 +97,7 @@ class Sender(threading.Thread): # metadata update if unknown_leaders_exist: log.debug('Unknown leaders exist, requesting metadata update') - with self._lock: - self._metadata.request_update() + self._metadata.request_update() # remove any nodes we aren't ready to send to not_ready_timeout = 999999999 @@ -131,23 +129,22 @@ class Sender(threading.Thread): log.debug("Created %d produce requests: %s", len(requests), requests) # trace poll_timeout_ms = 0 - with self._lock: - for node_id, request in six.iteritems(requests): - batches = batches_by_node[node_id] - log.debug('Sending Produce Request: %r', request) - (self._client.send(node_id, request) - .add_callback( - self._handle_produce_response, batches) - .add_errback( - self._failed_produce, batches, node_id)) - - # if some partitions are already ready to be sent, the select time - # would be 0; otherwise if some partition already has some data - # accumulated but not ready yet, the select time will be the time - # difference between now and its linger expiry time; otherwise the - # select time will be the time difference between now and the - # metadata expiry time - self._client.poll(poll_timeout_ms, sleep=True) + for node_id, request in six.iteritems(requests): + batches = batches_by_node[node_id] + log.debug('Sending Produce Request: %r', request) + (self._client.send(node_id, request) + .add_callback( + self._handle_produce_response, batches) + .add_errback( + self._failed_produce, batches, node_id)) + + # if some partitions are already ready to be sent, the select time + # would be 0; otherwise if some partition already has some data + # accumulated but not ready yet, the select time will be the time + # difference between now and its linger expiry time; otherwise the + # select time will be the time difference between now and the + # metadata expiry time + self._client.poll(poll_timeout_ms, sleep=True) def initiate_close(self): """Start closing the sender (won't complete until all data is sent).""" |