diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 267 |
1 files changed, 136 insertions, 131 deletions
diff --git a/kafka/client.py b/kafka/client.py index f7e39a9..862a30e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -28,30 +28,35 @@ class KafkaClient(object): self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self.load_metadata_for_topics() + self._load_metadata_for_topics() + + ################## + # Private API # + ################## - def close(self): - for conn in self.conns.values(): - conn.close() - def get_conn_for_broker(self, broker): + def _get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) return self.conns[(broker.host, broker.port)] - def next_id(self): - "Generate a new correlation id" - return KafkaClient.ID_GEN.next() + def _get_leader_for_partition(self, topic, partition): + key = TopicAndPartition(topic, partition) + if key not in self.topics_to_brokers: + self._load_metadata_for_topics(topic) + if key not in self.topics_to_brokers: + raise Exception("Partition does not exist: %s" % str(key)) + return self.topics_to_brokers[key] - def load_metadata_for_topics(self, *topics): + def _load_metadata_for_topics(self, *topics): """ Discover brokers and metadata for a set of topics. This method will recurse in the event of a retry. """ - requestId = self.next_id() + requestId = self._next_id() request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) - response = self.try_send_request(requestId, request) + response = self._send_broker_unaware_request(requestId, request) if response is None: raise Exception("All servers failed to process request") (brokers, topics) = KafkaProtocol.decode_metadata_response(response) @@ -64,69 +69,114 @@ class KafkaClient(object): if meta.leader == -1: log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) - self.load_metadata_for_topics(topic) + self._load_metadata_for_topics(topic) else: self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] self.topic_partitions[topic].append(partition) - def get_leader_for_partition(self, topic, partition): - key = TopicAndPartition(topic, partition) - if key not in self.topics_to_brokers: - self.load_metadata_for_topics(topic) - if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) - return self.topics_to_brokers[key] + def _next_id(self): + "Generate a new correlation id" + return KafkaClient.ID_GEN.next() - def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + def _send_broker_unaware_request(self, requestId, request): """ - Encode and send some ProduceRequests + Attempt to send a broker-agnostic request to one of the available brokers. + Keep trying until you succeed. + """ + for conn in self.conns.values(): + try: + conn.send(requestId, request) + response = conn.recv(requestId) + return response + except Exception, e: + log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) + continue + return None - ProduceRequests will be grouped by (topic, partition) and then sent to a specific - broker. Output is a list of responses in the same order as the list of payloads - specified + def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): + """ + Group a list of request payloads by topic+partition and send them to the + leader broker for that partition using the supplied encode/decode functions Params ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we encounter an API error? - callback: function, instead of returning the ProduceResponse, first pass it through this function + payloads: list of object-like entities with a topic and partition attribute + encode_fn: a method to encode the list of payloads to a request body, must accept + client_id, correlation_id, and payloads as keyword arguments + decode_fn: a method to decode a response body into response objects. The response + objects must be object-like and have topic and partition attributes Return ====== - list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + List of response objects in the same order as the supplied payloads """ - # Group the produce requests by which broker they go to + # Group the requests by topic+partition original_keys = [] payloads_by_broker = defaultdict(list) for payload in payloads: - payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads + payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload) original_keys.append((payload.topic, payload.partition)) - + # Accumulate the responses in a dictionary acc = {} # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): - conn = self.get_conn_for_broker(broker) - requestId = self.next_id() - request = KafkaProtocol.encode_produce_request(KafkaClient.CLIENT_ID, requestId, payloads) - # Send the request + conn = self._get_conn_for_broker(broker) + requestId = self._next_id() + request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response conn.send(requestId, request) response = conn.recv(requestId) - for produce_response in KafkaProtocol.decode_produce_response(response): - # Check for errors - if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) - # Run the callback - if callback is not None: - acc[(produce_response.topic, produce_response.partition)] = callback(produce_response) - else: - acc[(produce_response.topic, produce_response.partition)] = produce_response + for response in decoder_fn(response): + acc[(response.topic, response.partition)] = response # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) + ################# + # Public API # + ################# + + def close(self): + for conn in self.conns.values(): + conn.close() + + def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + """ + Encode and send some ProduceRequests + + ProduceRequests will be grouped by (topic, partition) and then sent to a specific + broker. Output is a list of responses in the same order as the list of payloads + specified + + Params + ====== + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we encounter an API error? + callback: function, instead of returning the ProduceResponse, first pass it through this function + + Return + ====== + list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + """ + resps = self._send_broker_aware_request(payloads, + KafkaProtocol.encode_produce_request, + KafkaProtocol.decode_produce_response) + out = [] + for resp in resps: + # Check for errors + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("ProduceRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + # Run the callback + if callback is not None: + out.append(callback(resp)) + else: + out.append(resp) + return out + def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): """ Encode and send a FetchRequest @@ -134,108 +184,63 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - # Group the produce requests by which broker they go to - original_keys = [] - payloads_by_broker = defaultdict(list) - for payload in payloads: - payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload) - original_keys.append((payload.topic, payload.partition)) - - # Accumulate the responses in a dictionary, keyed by topic+partition - acc = {} - - # For each broker, send the list of request payloads - for broker, payloads in payloads_by_broker.items(): - conn = self.get_conn_for_broker(broker) - requestId = self.next_id() - request = KafkaProtocol.encode_fetch_request(KafkaClient.CLIENT_ID, requestId, payloads) - # Send the request - conn.send(requestId, request) - response = conn.recv(requestId) - for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): - # Check for errors - if fail_on_error == True and fetch_response.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest %s failed with errorcode=%d" % - (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) - # Run the callback - if callback is not None: - acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response) - else: - acc[(fetch_response.topic, fetch_response.partition)] = fetch_response + resps = self._send_broker_aware_request(payloads, + KafkaProtocol.encode_fetch_request, + KafkaProtocol.decode_fetch_response) + out = [] + for resp in resps: + # Check for errors + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("FetchRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + # Run the callback + if callback is not None: + out.append(callback(resp)) + else: + out.append(resp) + return out - # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) - - def try_send_request(self, requestId, request): - """ - Attempt to send a broker-agnostic request to one of the available brokers. - Keep trying until you succeed. - """ - for conn in self.conns.values(): - try: - conn.send(requestId, request) - response = conn.recv(requestId) - return response - except Exception, e: - log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) - continue - return None def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): - requestId = self.next_id() - request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads) - response = self.try_send_request(requestId, request) - if response is None: - if fail_on_error is True: - raise Exception("All servers failed to process request") - else: - return None + resps = self._send_broker_aware_request(payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) out = [] - for offset_response in KafkaProtocol.decode_offset_response(response): - if fail_on_error == True and offset_response.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) + for resp in resps: + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetRequest failed with errorcode=%s", resp.error) if callback is not None: - out.append(callback(offset_response)) + out.append(callback(resp)) else: - out.append(offset_response) + out.append(resp) return out def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - requestId = self.next_id() - request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - response = self.try_send_request(requestId, request) - if response is None: - if fail_on_error is True: - raise Exception("All servers failed to process request") - else: - return None + raise NotImplementedError("Broker-managed offsets not supported in 0.8") + resps = self._send_broker_aware_request(payloads, + partial(KafkaProtocol.encode_offset_commit_request, group=group), + KafkaProtocol.decode_offset_commit_response) out = [] - for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): - log.debug(offset_commit_response) - if fail_on_error == True and offset_commit_response.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) + for resp in resps: + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) if callback is not None: - out.append(callback(offset_commit_response)) + out.append(callback(resp)) else: - out.append(offset_commit_response) + out.append(resp) return out def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - requestId = self.next_id() - request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - response = self.try_send_request(requestId, request) - if response is None: - if fail_on_error is True: - raise Exception("All servers failed to process request") - else: - return None + raise NotImplementedError("Broker-managed offsets not supported in 0.8") + resps = self._send_broker_aware_request(payloads, + partial(KafkaProtocol.encode_offset_commit_fetch, group=group), + KafkaProtocol.decode_offset_fetch_response) out = [] - for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): - if fail_on_error == True and offset_fetch_response.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( - offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error)) + for resp in resps: + if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) if callback is not None: - out.append(callback(offset_fetch_response)) + out.append(callback(resp)) else: - out.append(offset_fetch_response) + out.append(resp) return out |