summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py7
1 files changed, 2 insertions, 5 deletions
diff --git a/kafka/client.py b/kafka/client.py
index cb60d98..ca737c4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -17,7 +17,6 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError,
from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
-from kafka.util import kafka_bytestring
log = logging.getLogger(__name__)
@@ -212,7 +211,7 @@ class KafkaClient(object):
failed_payloads(broker_payloads)
continue
- conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
+ conn = self._get_conn(broker.host, broker.port)
request = encoder_fn(payloads=broker_payloads)
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
@@ -305,7 +304,7 @@ class KafkaClient(object):
# Send the request, recv the response
try:
- conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
+ conn = self._get_conn(broker.host, broker.port)
conn.send(requestId, request)
except ConnectionError as e:
@@ -410,14 +409,12 @@ class KafkaClient(object):
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
- topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
- topic = kafka_bytestring(topic)
if topic not in self.topic_partitions:
return []