summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-15 18:56:45 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-15 18:56:45 -0800
commit876791430513ac819d37e9877661387958d50fe4 (patch)
tree7caec15019d8f0ffb077b871bf9c63670607ca55
parent742755d8d813262d6ccf09907fb3130bc47fbdb5 (diff)
downloadkafka-python-876791430513ac819d37e9877661387958d50fe4.tar.gz
Remove unused internal sender lock
-rw-r--r--kafka/producer/kafka.py5
-rw-r--r--kafka/producer/sender.py39
2 files changed, 20 insertions, 24 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e8601c8..0a40325 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -254,9 +254,8 @@ class KafkaProducer(object):
self._accumulator = RecordAccumulator(**self.config)
self._metadata = client.cluster
- self._metadata_lock = threading.Condition()
- self._sender = Sender(client, self._metadata, self._metadata_lock,
- self._accumulator, **self.config)
+ self._sender = Sender(client, self._metadata, self._accumulator,
+ **self.config)
self._sender.daemon = True
self._sender.start()
self._closed = False
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 1f637b4..0e6d6cd 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -32,7 +32,7 @@ class Sender(threading.Thread):
'client_id': 'kafka-python-' + __version__,
}
- def __init__(self, client, metadata, lock, accumulator, **configs):
+ def __init__(self, client, metadata, accumulator, **configs):
super(Sender, self).__init__()
self.config = copy.copy(self._DEFAULT_CONFIG)
for key in self.config:
@@ -43,7 +43,6 @@ class Sender(threading.Thread):
self._client = client
self._accumulator = accumulator
self._metadata = client.cluster
- self._lock = lock
self._running = True
self._force_close = False
self._topics_to_add = []
@@ -98,8 +97,7 @@ class Sender(threading.Thread):
# metadata update
if unknown_leaders_exist:
log.debug('Unknown leaders exist, requesting metadata update')
- with self._lock:
- self._metadata.request_update()
+ self._metadata.request_update()
# remove any nodes we aren't ready to send to
not_ready_timeout = 999999999
@@ -131,23 +129,22 @@ class Sender(threading.Thread):
log.debug("Created %d produce requests: %s", len(requests), requests) # trace
poll_timeout_ms = 0
- with self._lock:
- for node_id, request in six.iteritems(requests):
- batches = batches_by_node[node_id]
- log.debug('Sending Produce Request: %r', request)
- (self._client.send(node_id, request)
- .add_callback(
- self._handle_produce_response, batches)
- .add_errback(
- self._failed_produce, batches, node_id))
-
- # if some partitions are already ready to be sent, the select time
- # would be 0; otherwise if some partition already has some data
- # accumulated but not ready yet, the select time will be the time
- # difference between now and its linger expiry time; otherwise the
- # select time will be the time difference between now and the
- # metadata expiry time
- self._client.poll(poll_timeout_ms, sleep=True)
+ for node_id, request in six.iteritems(requests):
+ batches = batches_by_node[node_id]
+ log.debug('Sending Produce Request: %r', request)
+ (self._client.send(node_id, request)
+ .add_callback(
+ self._handle_produce_response, batches)
+ .add_errback(
+ self._failed_produce, batches, node_id))
+
+ # if some partitions are already ready to be sent, the select time
+ # would be 0; otherwise if some partition already has some data
+ # accumulated but not ready yet, the select time will be the time
+ # difference between now and its linger expiry time; otherwise the
+ # select time will be the time difference between now and the
+ # metadata expiry time
+ self._client.poll(poll_timeout_ms, sleep=True)
def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""