summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-04-01 14:56:59 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c (patch)
treed4011fc89717f1eb9884787ae333be5b525bacd4 /kafka/client.py
parentb6d98c07b418b16061ae92392947d5dd6958a708 (diff)
downloadkafka-python-0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c.tar.gz
Refactoring a bit, cleanup for 0.8
Marking some stuff as not compatible for 0.8 (will be added in 0.8.1)
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py267
1 files changed, 136 insertions, 131 deletions
diff --git a/kafka/client.py b/kafka/client.py
index f7e39a9..862a30e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -28,30 +28,35 @@ class KafkaClient(object):
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
- self.load_metadata_for_topics()
+ self._load_metadata_for_topics()
+
+ ##################
+ # Private API #
+ ##################
- def close(self):
- for conn in self.conns.values():
- conn.close()
- def get_conn_for_broker(self, broker):
+ def _get_conn_for_broker(self, broker):
"Get or create a connection to a broker"
if (broker.host, broker.port) not in self.conns:
self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize)
return self.conns[(broker.host, broker.port)]
- def next_id(self):
- "Generate a new correlation id"
- return KafkaClient.ID_GEN.next()
+ def _get_leader_for_partition(self, topic, partition):
+ key = TopicAndPartition(topic, partition)
+ if key not in self.topics_to_brokers:
+ self._load_metadata_for_topics(topic)
+ if key not in self.topics_to_brokers:
+ raise Exception("Partition does not exist: %s" % str(key))
+ return self.topics_to_brokers[key]
- def load_metadata_for_topics(self, *topics):
+ def _load_metadata_for_topics(self, *topics):
"""
Discover brokers and metadata for a set of topics. This method will
recurse in the event of a retry.
"""
- requestId = self.next_id()
+ requestId = self._next_id()
request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics)
- response = self.try_send_request(requestId, request)
+ response = self._send_broker_unaware_request(requestId, request)
if response is None:
raise Exception("All servers failed to process request")
(brokers, topics) = KafkaProtocol.decode_metadata_response(response)
@@ -64,69 +69,114 @@ class KafkaClient(object):
if meta.leader == -1:
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
- self.load_metadata_for_topics(topic)
+ self._load_metadata_for_topics(topic)
else:
self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader]
self.topic_partitions[topic].append(partition)
- def get_leader_for_partition(self, topic, partition):
- key = TopicAndPartition(topic, partition)
- if key not in self.topics_to_brokers:
- self.load_metadata_for_topics(topic)
- if key not in self.topics_to_brokers:
- raise Exception("Partition does not exist: %s" % str(key))
- return self.topics_to_brokers[key]
+ def _next_id(self):
+ "Generate a new correlation id"
+ return KafkaClient.ID_GEN.next()
- def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
+ def _send_broker_unaware_request(self, requestId, request):
"""
- Encode and send some ProduceRequests
+ Attempt to send a broker-agnostic request to one of the available brokers.
+ Keep trying until you succeed.
+ """
+ for conn in self.conns.values():
+ try:
+ conn.send(requestId, request)
+ response = conn.recv(requestId)
+ return response
+ except Exception, e:
+ log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
+ continue
+ return None
- ProduceRequests will be grouped by (topic, partition) and then sent to a specific
- broker. Output is a list of responses in the same order as the list of payloads
- specified
+ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
+ """
+ Group a list of request payloads by topic+partition and send them to the
+ leader broker for that partition using the supplied encode/decode functions
Params
======
- payloads: list of ProduceRequest
- fail_on_error: boolean, should we raise an Exception if we encounter an API error?
- callback: function, instead of returning the ProduceResponse, first pass it through this function
+ payloads: list of object-like entities with a topic and partition attribute
+ encode_fn: a method to encode the list of payloads to a request body, must accept
+ client_id, correlation_id, and payloads as keyword arguments
+ decode_fn: a method to decode a response body into response objects. The response
+ objects must be object-like and have topic and partition attributes
Return
======
- list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
+ List of response objects in the same order as the supplied payloads
"""
- # Group the produce requests by which broker they go to
+ # Group the requests by topic+partition
original_keys = []
payloads_by_broker = defaultdict(list)
for payload in payloads:
- payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads
+ payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload)
original_keys.append((payload.topic, payload.partition))
-
+
# Accumulate the responses in a dictionary
acc = {}
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self.get_conn_for_broker(broker)
- requestId = self.next_id()
- request = KafkaProtocol.encode_produce_request(KafkaClient.CLIENT_ID, requestId, payloads)
- # Send the request
+ conn = self._get_conn_for_broker(broker)
+ requestId = self._next_id()
+ request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads)
+
+ # Send the request, recv the response
conn.send(requestId, request)
response = conn.recv(requestId)
- for produce_response in KafkaProtocol.decode_produce_response(response):
- # Check for errors
- if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
- raise Exception("ProduceRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
- # Run the callback
- if callback is not None:
- acc[(produce_response.topic, produce_response.partition)] = callback(produce_response)
- else:
- acc[(produce_response.topic, produce_response.partition)] = produce_response
+ for response in decoder_fn(response):
+ acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
+ #################
+ # Public API #
+ #################
+
+ def close(self):
+ for conn in self.conns.values():
+ conn.close()
+
+ def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
+ """
+ Encode and send some ProduceRequests
+
+ ProduceRequests will be grouped by (topic, partition) and then sent to a specific
+ broker. Output is a list of responses in the same order as the list of payloads
+ specified
+
+ Params
+ ======
+ payloads: list of ProduceRequest
+ fail_on_error: boolean, should we raise an Exception if we encounter an API error?
+ callback: function, instead of returning the ProduceResponse, first pass it through this function
+
+ Return
+ ======
+ list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
+ """
+ resps = self._send_broker_aware_request(payloads,
+ KafkaProtocol.encode_produce_request,
+ KafkaProtocol.decode_produce_response)
+ out = []
+ for resp in resps:
+ # Check for errors
+ if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
+ raise Exception("ProduceRequest for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error))
+ # Run the callback
+ if callback is not None:
+ out.append(callback(resp))
+ else:
+ out.append(resp)
+ return out
+
def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
"""
Encode and send a FetchRequest
@@ -134,108 +184,63 @@ class KafkaClient(object):
Payloads are grouped by topic and partition so they can be pipelined to the same
brokers.
"""
- # Group the produce requests by which broker they go to
- original_keys = []
- payloads_by_broker = defaultdict(list)
- for payload in payloads:
- payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload)
- original_keys.append((payload.topic, payload.partition))
-
- # Accumulate the responses in a dictionary, keyed by topic+partition
- acc = {}
-
- # For each broker, send the list of request payloads
- for broker, payloads in payloads_by_broker.items():
- conn = self.get_conn_for_broker(broker)
- requestId = self.next_id()
- request = KafkaProtocol.encode_fetch_request(KafkaClient.CLIENT_ID, requestId, payloads)
- # Send the request
- conn.send(requestId, request)
- response = conn.recv(requestId)
- for fetch_response in KafkaProtocol.decode_fetch_response_iter(response):
- # Check for errors
- if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR:
- raise Exception("FetchRequest %s failed with errorcode=%d" %
- (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
- # Run the callback
- if callback is not None:
- acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response)
- else:
- acc[(fetch_response.topic, fetch_response.partition)] = fetch_response
+ resps = self._send_broker_aware_request(payloads,
+ KafkaProtocol.encode_fetch_request,
+ KafkaProtocol.decode_fetch_response)
+ out = []
+ for resp in resps:
+ # Check for errors
+ if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
+ raise Exception("FetchRequest for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error))
+ # Run the callback
+ if callback is not None:
+ out.append(callback(resp))
+ else:
+ out.append(resp)
+ return out
- # Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys)
-
- def try_send_request(self, requestId, request):
- """
- Attempt to send a broker-agnostic request to one of the available brokers.
- Keep trying until you succeed.
- """
- for conn in self.conns.values():
- try:
- conn.send(requestId, request)
- response = conn.recv(requestId)
- return response
- except Exception, e:
- log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
- continue
- return None
def send_offset_request(self, payloads=[], fail_on_error=True, callback=None):
- requestId = self.next_id()
- request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads)
- response = self.try_send_request(requestId, request)
- if response is None:
- if fail_on_error is True:
- raise Exception("All servers failed to process request")
- else:
- return None
+ resps = self._send_broker_aware_request(payloads,
+ KafkaProtocol.encode_offset_request,
+ KafkaProtocol.decode_offset_response)
out = []
- for offset_response in KafkaProtocol.decode_offset_response(response):
- if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error)
+ for resp in resps:
+ if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
+ raise Exception("OffsetRequest failed with errorcode=%s", resp.error)
if callback is not None:
- out.append(callback(offset_response))
+ out.append(callback(resp))
else:
- out.append(offset_response)
+ out.append(resp)
return out
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
- requestId = self.next_id()
- request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
- response = self.try_send_request(requestId, request)
- if response is None:
- if fail_on_error is True:
- raise Exception("All servers failed to process request")
- else:
- return None
+ raise NotImplementedError("Broker-managed offsets not supported in 0.8")
+ resps = self._send_broker_aware_request(payloads,
+ partial(KafkaProtocol.encode_offset_commit_request, group=group),
+ KafkaProtocol.decode_offset_commit_response)
out = []
- for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
- log.debug(offset_commit_response)
- if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error)
+ for resp in resps:
+ if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
+ raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error)
if callback is not None:
- out.append(callback(offset_commit_response))
+ out.append(callback(resp))
else:
- out.append(offset_commit_response)
+ out.append(resp)
return out
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
- requestId = self.next_id()
- request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
- response = self.try_send_request(requestId, request)
- if response is None:
- if fail_on_error is True:
- raise Exception("All servers failed to process request")
- else:
- return None
+ raise NotImplementedError("Broker-managed offsets not supported in 0.8")
+ resps = self._send_broker_aware_request(payloads,
+ partial(KafkaProtocol.encode_offset_commit_fetch, group=group),
+ KafkaProtocol.decode_offset_fetch_response)
out = []
- for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response):
- if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
- offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error))
+ for resp in resps:
+ if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR:
+ raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error)
if callback is not None:
- out.append(callback(offset_fetch_response))
+ out.append(callback(resp))
else:
- out.append(offset_fetch_response)
+ out.append(resp)
return out