diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 63 |
1 files changed, 33 insertions, 30 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 067dc0f..cd79e7f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -518,36 +518,39 @@ class BrokerConnection(object): # Exchange tokens until authentication either succeeds or fails received_token = None - while not ctx_Context.complete: - # calculate an output token from kafka token (or None if first iteration) - # exceptions raised here are uncaught and will be sent to the user - output_token = ctx_Context.step(received_token) - - # pass output token to kafka - try: - self._sock.setblocking(True) - msg = output_token - size = Int32.encode(len(msg)) - self._sock.sendall(size + msg) - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - response = self._sock.recv(2000) - self._sock.setblocking(False) - - except (AssertionError, ConnectionError) as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return future.failure(error) - - # pass the received token back to gssapi, strip the first 4 bytes - # dpkp note: what are the first four bytes here? - # it seems likely that this is the encoded message size - # which we should receive and parse first, then use to parse - # the remainder of the token - received_token = response[4:] + try: + while not ctx_Context.complete: + # calculate an output token from kafka token (or None if first iteration) + output_token = ctx_Context.step(received_token) + + # pass output token to kafka + try: + self._sock.setblocking(True) + msg = output_token + size = Int32.encode(len(msg)) + self._sock.sendall(size + msg) + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + response = self._sock.recv(2000) + self._sock.setblocking(False) + + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) + + # pass the received token back to gssapi, strip the first 4 bytes + # dpkp note: what are the first four bytes here? + # it seems likely that this is the encoded message size + # which we should receive and parse first, then use to parse + # the remainder of the token + received_token = response[4:] + + except Exception as e: + return future.failure(e) log.info('%s: Authenticated as %s via GSSAPI', self, gssname) return future.success(True) |