summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-10-24 16:50:46 -0700
committerEnrico Canzonieri <enrico@yelp.com>2015-11-10 17:58:04 -0800
commitc2adeeab057b825c8cccae67aac822be02293211 (patch)
treeee745273c341f5bb482ebe98fda89496bee97b7c /kafka/client.py
parent04920bb89f9d73e4028dbd487719975c65954592 (diff)
downloadkafka-python-c2adeeab057b825c8cccae67aac822be02293211.tar.gz
Add tests. Bug fix. Rename socket_conn dict.
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py14
1 files changed, 7 insertions, 7 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 68277ed..6603a47 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -179,9 +179,9 @@ class KafkaClient(object):
# and collect the responses and errors
broker_failures = []
- # For each KafkaConnection we store the real socket so that we can use
+ # For each KafkaConnection keep the real socket so that we can use
# a select to perform unblocking I/O
- socket_connection = {}
+ connections_by_socket = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -216,13 +216,13 @@ class KafkaClient(object):
responses[topic_partition] = None
continue
else:
- socket_connection[conn.get_connected_socket()] = (conn, broker)
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker)
conn = None
- while socket_connection:
- sockets = socket_connection.keys()
+ while connections_by_socket:
+ sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker = socket_connection.pop(rlist[0])
+ conn, broker = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -231,7 +231,7 @@ class KafkaClient(object):
'response to request %s from server %s: %s',
requestId, broker, e)
- for payload in payloads:
+ for payload in payloads_by_broker[broker]:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)