summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-08 12:37:03 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-08 12:37:03 -0700
commit4d6d9ec485f9b44bc7fa989c843d8b1dfdca9bb2 (patch)
treed0004411d6208c0af906333807fe62e4a9990a80
parent758a21be88d7cbb51445d5df6bd003bb3d1b34bb (diff)
downloadkafka-python-4d6d9ec485f9b44bc7fa989c843d8b1dfdca9bb2.tar.gz
Return gssapi errors via future -- outer layer will raise to user
-rw-r--r--kafka/conn.py63
1 files changed, 33 insertions, 30 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 067dc0f..cd79e7f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -518,36 +518,39 @@ class BrokerConnection(object):
# Exchange tokens until authentication either succeeds or fails
received_token = None
- while not ctx_Context.complete:
- # calculate an output token from kafka token (or None if first iteration)
- # exceptions raised here are uncaught and will be sent to the user
- output_token = ctx_Context.step(received_token)
-
- # pass output token to kafka
- try:
- self._sock.setblocking(True)
- msg = output_token
- size = Int32.encode(len(msg))
- self._sock.sendall(size + msg)
- # The server will send a token back. Processing of this token either
- # establishes a security context, or it needs further token exchange.
- # The gssapi will be able to identify the needed next step.
- # The connection is closed on failure.
- response = self._sock.recv(2000)
- self._sock.setblocking(False)
-
- except (AssertionError, ConnectionError) as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
- self.close(error=error)
- return future.failure(error)
-
- # pass the received token back to gssapi, strip the first 4 bytes
- # dpkp note: what are the first four bytes here?
- # it seems likely that this is the encoded message size
- # which we should receive and parse first, then use to parse
- # the remainder of the token
- received_token = response[4:]
+ try:
+ while not ctx_Context.complete:
+ # calculate an output token from kafka token (or None if first iteration)
+ output_token = ctx_Context.step(received_token)
+
+ # pass output token to kafka
+ try:
+ self._sock.setblocking(True)
+ msg = output_token
+ size = Int32.encode(len(msg))
+ self._sock.sendall(size + msg)
+ # The server will send a token back. Processing of this token either
+ # establishes a security context, or it needs further token exchange.
+ # The gssapi will be able to identify the needed next step.
+ # The connection is closed on failure.
+ response = self._sock.recv(2000)
+ self._sock.setblocking(False)
+
+ except ConnectionError as e:
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.ConnectionError("%s: %s" % (self, e))
+ self.close(error=error)
+ return future.failure(error)
+
+ # pass the received token back to gssapi, strip the first 4 bytes
+ # dpkp note: what are the first four bytes here?
+ # it seems likely that this is the encoded message size
+ # which we should receive and parse first, then use to parse
+ # the remainder of the token
+ received_token = response[4:]
+
+ except Exception as e:
+ return future.failure(e)
log.info('%s: Authenticated as %s via GSSAPI', self, gssname)
return future.success(True)