summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
committermrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
commit84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (patch)
treee3d03da4eeecf8eab2dc63cf113a4daf82addf72 /kafka/client.py
parent0bdff4e833f73518a7219fca04dfbc3ed201b06e (diff)
parent4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff)
downloadkafka-python-84de472a4d5b583ff3ed6cc6d92250a7c9291ceb.tar.gz
Merge branch 'master' into multihosts
Conflicts: kafka/client.py kafka/conn.py setup.py test/test_integration.py test/test_unit.py
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py211
1 files changed, 120 insertions, 91 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 81eec7d..33c4419 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,14 +1,16 @@
import copy
+import logging
+
from collections import defaultdict
from functools import partial
from itertools import count
-import logging
-import socket
-import time
-from kafka.common import ErrorMapping, TopicAndPartition
-from kafka.common import ConnectionError, FailedPayloadsException
-from kafka.conn import collect_hosts, KafkaConnection
+from kafka.common import (ErrorMapping, TopicAndPartition,
+ ConnectionError, FailedPayloadsError,
+ BrokerResponseError, PartitionUnavailableError,
+ KafkaUnavailableError, KafkaRequestError)
+
+from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -19,19 +21,21 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
+ # NOTE: The timeout given to the client should always be greater than the
+ # one passed to SimpleConsumer.get_message(), otherwise you can get a
+ # socket timeout.
+ def __init__(self, host, port, client_id=CLIENT_ID,
+ timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
-
- self.hosts = collect_hosts(hosts)
-
- # create connections only when we need them
- self.conns = {}
+ self.timeout = timeout
+ self.conns = { # (host, port) -> KafkaConnection
+ (host, port): KafkaConnection(host, port, timeout=timeout)
+ }
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
- self._load_metadata_for_topics()
+ self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
@@ -47,62 +51,25 @@ class KafkaClient(object):
return self.conns[host_key]
def _get_conn_for_broker(self, broker):
- "Get or create a connection to a broker"
+ """
+ Get or create a connection to a broker
+ """
+ if (broker.host, broker.port) not in self.conns:
+ self.conns[(broker.host, broker.port)] = \
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
- self._load_metadata_for_topics(topic)
+ self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
- raise Exception("Partition does not exist: %s" % str(key))
+ raise KafkaRequestError("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
- def _load_metadata_for_topics(self, *topics):
- """
- Discover brokers and metadata for a set of topics. This method will
- recurse in the event of a retry.
- """
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
-
- response = self._send_broker_unaware_request(request_id, request)
- if response is None:
- raise Exception("All servers failed to process request")
-
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
-
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
-
- self.brokers = brokers
- self.topics_to_brokers = {}
-
- for topic, partitions in topics.items():
- # Clear the list once before we add it. This removes stale entries
- # and avoids duplicates
- self.topic_partitions.pop(topic, None)
-
- if not partitions:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- break
-
- for partition, meta in partitions.items():
- if meta.leader == -1:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- else:
- topic_part = TopicAndPartition(topic, partition)
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
- self.topic_partitions[topic].append(partition)
-
def _next_id(self):
"""
Generate a new correlation id
@@ -125,7 +92,7 @@ class KafkaClient(object):
"trying next server: %s" % (request, conn, e))
continue
- return None
+ raise KafkaUnavailableError("All servers failed to process request")
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -156,6 +123,8 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
+ if leader == -1:
+ raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -172,30 +141,73 @@ class KafkaClient(object):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
+ failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
- response = conn.recv(requestId)
- except ConnectionError, e: # ignore BufferUnderflow for now
- log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError, e:
+ log.warning("Could not receive response to request [%s] "
+ "from server %s: %s", request, conn, e)
+ failed = True
+ except ConnectionError, e:
+ log.warning("Could not send request [%s] to server %s: %s",
+ request, conn, e)
+ failed = True
+
+ if failed:
failed_payloads += payloads
- self.topics_to_brokers = {} # reset metadata
+ self.reset_all_metadata()
continue
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
if failed_payloads:
- raise FailedPayloadsException(failed_payloads)
+ raise FailedPayloadsError(failed_payloads)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
+ def __repr__(self):
+ return '<KafkaClient client_id=%s>' % (self.client_id)
+
+ def _raise_on_response_error(self, resp):
+ if resp.error == ErrorMapping.NO_ERROR:
+ return
+
+ if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
+ ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ self.reset_topic_metadata(resp.topic)
+
+ raise BrokerResponseError(
+ "Request for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error))
+
#################
# Public API #
#################
+ def reset_topic_metadata(self, *topics):
+ for topic in topics:
+ try:
+ partitions = self.topic_partitions[topic]
+ except KeyError:
+ continue
+
+ for partition in partitions:
+ self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
+
+ del self.topic_partitions[topic]
+
+ def reset_all_metadata(self):
+ self.topics_to_brokers.clear()
+ self.topic_partitions.clear()
+
+ def has_metadata_for_topic(self, topic):
+ return topic in self.topic_partitions
def close(self):
for conn in self.conns.values():
@@ -215,6 +227,36 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.reinit()
+ def load_metadata_for_topics(self, *topics):
+ """
+ Discover brokers and metadata for a set of topics. This function is called
+ lazily whenever metadata is unavailable.
+ """
+ request_id = self._next_id()
+ request = KafkaProtocol.encode_metadata_request(self.client_id,
+ request_id, topics)
+
+ response = self._send_broker_unaware_request(request_id, request)
+
+ (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+
+ log.debug("Broker metadata: %s", brokers)
+ log.debug("Topic metadata: %s", topics)
+
+ self.brokers = brokers
+
+ for topic, partitions in topics.items():
+ self.reset_topic_metadata(topic)
+
+ if not partitions:
+ continue
+
+ self.topic_partitions[topic] = []
+ for partition, meta in partitions.items():
+ topic_part = TopicAndPartition(topic, partition)
+ self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topic_partitions[topic].append(partition)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -252,14 +294,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "ProduceRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -285,14 +322,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "FetchRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -308,9 +340,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
@@ -326,9 +357,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with "
- "errorcode=%s", resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
@@ -346,9 +376,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else: