summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client08.py194
-rw-r--r--kafka/util.py9
2 files changed, 138 insertions, 65 deletions
diff --git a/kafka/client08.py b/kafka/client08.py
index 11910d1..b048d68 100644
--- a/kafka/client08.py
+++ b/kafka/client08.py
@@ -14,6 +14,7 @@ from .codec import snappy_encode, snappy_decode
from .util import read_short_string, read_int_string
from .util import relative_unpack
from .util import write_short_string, write_int_string
+from .util import group_list_by_key
from .util import BufferUnderflowError, ChecksumError
log = logging.getLogger("kafka")
@@ -217,7 +218,7 @@ class KafkaProtocol(object):
@classmethod
- def encode_produce_request(self, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
+ def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
"""
Encode some ProduceRequest structs
@@ -236,8 +237,7 @@ class KafkaProtocol(object):
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
message += struct.pack('>hii', acks, timeout, len(payloads_by_topic))
- for topic, payload in payloads_by_topic:
- payloads = list(payloads)
+ for topic, payloads in payloads_by_topic.items():
message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads))
for payload in payloads:
message_set = KafkaProtocol._encode_message_set(payload.messages)
@@ -280,8 +280,7 @@ class KafkaProtocol(object):
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id
- for topic, payload in payloads_by_topic:
- payloads = list(payloads)
+ for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -312,8 +311,7 @@ class KafkaProtocol(object):
payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id
- for topic, payload in payloads_by_topic:
- payloads = list(payloads)
+ for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -406,8 +404,7 @@ class KafkaProtocol(object):
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(payloads_by_topic))
- for topic, payload in payloads_by_topic:
- payloads = list(payloads)
+ for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -450,8 +447,7 @@ class KafkaProtocol(object):
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group)
message += struct.pack('>i', len(payloads_by_topic))
- for topic, payload in payloads_by_topic:
- payloads = list(payloads)
+ for topic, payloads in payloads_by_topic.items():
message += write_short_string(topic)
message += struct.pack('>i', len(payloads))
for payload in payloads:
@@ -582,13 +578,14 @@ class KafkaClient(object):
def load_metadata_for_topics(self, *topics):
"""
- Discover brokers and metadata for a set of topics
+ Discover brokers and metadata for a set of topics. This method will
+ recurse in the event of a retry.
"""
requestId = self.next_id()
request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics)
- conn = self.conns.values()[0] # Just get the first one in the list
- conn.send(requestId, request)
- response = conn.recv(requestId)
+ response = self.try_send_request(requestId, request)
+ if response is None:
+ raise Exception("All servers failed to process request")
(brokers, topics) = KafkaProtocol.decode_metadata_response(response)
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
@@ -600,7 +597,6 @@ class KafkaClient(object):
log.info("Partition is unassigned, delay for 1s and retry")
time.sleep(1)
self.load_metadata_for_topics(topic)
- return
else:
self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader]
@@ -608,18 +604,44 @@ class KafkaClient(object):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
self.load_metadata_for_topics(topic)
+ if key not in self.topics_to_brokers:
+ raise Exception("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
def send_produce_request(self, payloads=[], fail_on_error=True, callback=None):
+ """
+ Encode and send some ProduceRequests
+
+ ProduceRequests will be grouped by (topic, partition) and then sent to a specific
+ broker. Output is a list of responses in the same order as the list of payloads
+ specified
+
+ Params
+ ======
+ payloads: list of ProduceRequest
+ fail_on_error: boolean, should we raise an Exception if we encounter an API error?
+ callback: function, instead of returning the ProduceResponse, first pass it through this function
+
+ Return
+ ======
+ list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
+ """
+ key_fn = lambda x: (x.topic, x.partition)
+
+ # Note the order of the incoming payloads
+ original_keys = [key_fn(payload) for payload in payloads]
+
# Group the produce requests by topic+partition
- payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition))
+ payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
# Group the produce requests by which broker they go to
payloads_by_broker = defaultdict(list)
- for (topic, partition), payload in payloads_by_topic_and_partition:
- payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload)
+ for (topic, partition), payloads in payloads_by_topic_and_partition.items():
+ payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
+
+ # Accumulate the responses in a dictionary, keyed by key_fn
+ acc = {}
- out = []
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self.get_conn_for_broker(broker)
@@ -635,10 +657,13 @@ class KafkaClient(object):
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
if callback is not None:
- out.append(callback(produce_response))
+ acc[key_fn(produce_response)] = callback(produce_response)
else:
- out.append(produce_response)
- return out
+ acc[key_fn(produce_response)] = produce_response
+
+ print(acc)
+ # Order the accumulated responses by the original key order
+ return (acc[k] for k in original_keys)
def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None):
"""
@@ -647,15 +672,22 @@ class KafkaClient(object):
Payloads are grouped by topic and partition so they can be pipelined to the same
brokers.
"""
+ key_fn = lambda x: (x.topic, x.partition)
+
+ # Note the order of the incoming payloads
+ original_keys = [key_fn(payload) for payload in payloads]
+
# Group the produce requests by topic+partition
- payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition))
+ payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
# Group the produce requests by which broker they go to
payloads_by_broker = defaultdict(list)
- for (topic, partition), payload in payloads_by_topic_and_partition:
- payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload)
+ for (topic, partition), payloads in payloads_by_topic_and_partition.items():
+ payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
+
+ # Accumulate the responses in a dictionary, keyed by key_fn
+ acc = {}
- out = []
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self.get_conn_for_broker(broker)
@@ -667,21 +699,41 @@ class KafkaClient(object):
for fetch_response in KafkaProtocol.decode_fetch_response_iter(response):
# Check for errors
if fail_on_error == True and fetch_response.error != 0:
- raise Exception("FetchRequest %s failed with errorcode=%d",
+ raise Exception("FetchRequest %s failed with errorcode=%d" %
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
# Run the callback
if callback is not None:
- out.append(callback(fetch_response))
+ acc[key_fn(fetch_response)] = callback(fetch_response)
else:
- out.append(fetch_response)
- return out
+ acc[key_fn(fetch_response)] = fetch_response
+
+ # Order the accumulated responses by the original key order
+ return (acc[k] for k in original_keys)
+
+ def try_send_request(self, requestId, request):
+ """
+ Attempt to send a broker-agnostic request to one of the available brokers.
+ Keep trying until you succeed.
+ """
+ for conn in self.conns.values():
+ try:
+ conn.send(requestId, request)
+ response = conn.recv(requestId)
+ return response
+ except Exception:
+ log.warning("Could not commit offset to server %s, trying next server", conn)
+ continue
+ return None
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
- conn = self.conns.values()[0] # Just get the first one in the list
requestId = self.next_id()
request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
- conn.send(requestId, request)
- response = conn.recv(requestId)
+ response = self.try_send_request(requestId, request)
+ if response is None:
+ if fail_on_error is True:
+ raise Exception("All servers failed to process request")
+ else:
+ return None
out = []
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
if fail_on_error == True and offset_commit_response.error != 0:
@@ -693,15 +745,19 @@ class KafkaClient(object):
return out
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
- conn = self.conns.values()[0] # Just get the first one in the list
requestId = self.next_id()
request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
- conn.send(requestId, request)
- response = conn.recv(requestId)
+ response = self.try_send_request(requestId, request)
+ if response is None:
+ if fail_on_error is True:
+ raise Exception("All servers failed to process request")
+ else:
+ return None
out = []
for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response):
if fail_on_error == True and offset_fetch_response.error != 0:
- raise Exception("OffsetFetchRequest failed with errorcode=%s", offset_fetch_response.error)
+ raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % (
+ offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error))
if callback is not None:
out.append(callback(offset_fetch_response))
else:
@@ -709,20 +765,22 @@ class KafkaClient(object):
return out
-
if __name__ == "__main__":
+ logging.basicConfig(level=logging.DEBUG)
+
+ topic = "foo8"
# Bootstrap connection
conn = KafkaClient("localhost", 9092)
# Create some Messages
- messages = (KafkaProtocol.create_gzip_message("GZIPPed"),
+ messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
KafkaProtocol.create_message("not-gzipped"))
- # Create a ProduceRequest
- produce = ProduceRequest(topic="foo5", partition=0, messages=messages)
+ produce1 = ProduceRequest(topic=topic, partition=0, messages=messages)
+ produce2 = ProduceRequest(topic=topic, partition=1, messages=messages)
# Send the ProduceRequest
- produce_resp = conn.send_produce_request(payloads=[produce])
+ produce_resp = conn.send_produce_request(payloads=[produce1, produce2])
# Check for errors
for resp in produce_resp:
@@ -734,29 +792,41 @@ if __name__ == "__main__":
#conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")])
#conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)])
- print conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("foo5", 0)])
- offset = 0
- done = False
- while not done:
- print offset
- for resp in conn.send_fetch_request(payloads=[FetchRequest(topic="foo5", partition=0, offset=offset, max_bytes=4096)]):
+ def init_offsets(offset_response):
+ if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON):
+ raise Exception("OffsetFetch failed: %s" % (offset_response))
+ elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
+ return 0
+ else:
+ return offset_response.offset
+
+ # Load offsets
+ (offset1, offset2) = conn.send_offset_fetch_request(
+ group="group1",
+ payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)],
+ fail_on_error=False,
+ callback=init_offsets
+ )
+ print offset1, offset2
+
+ while True:
+ for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]):
i = 0
for msg in resp.messages:
- print conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("foo5", 0, offset, "")])
- print msg, offset
- offset = msg.offset+1
+ print msg
+ offset1 = msg.offset+1
+ print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")])
i += 1
if i == 0:
raise StopIteration("no more messages")
-class Consumer(object):
- def __init__(self, conn):
- self._conn = conn
-
-
-
-class Producer(object):
- pass
-
-
+ for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]):
+ i = 0
+ for msg in resp.messages:
+ print msg
+ offset2 = msg.offset+1
+ print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")])
+ i += 1
+ if i == 0:
+ raise StopIteration("no more messages")
diff --git a/kafka/util.py b/kafka/util.py
index 0623f35..cb8f7f5 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -44,9 +44,12 @@ def relative_unpack(fmt, data, cur):
out = struct.unpack(fmt, data[cur:cur+size])
return (out, cur+size)
-def group_list_by_key(l, key):
- sorted_l = sorted(l, key=key)
- return list(groupby(sorted_l, key=key))
+def group_list_by_key(it, key):
+ sorted_it = sorted(it, key=key)
+ out = {}
+ for k, group in groupby(sorted_it, key=key):
+ out[k] = list(group)
+ return out
class BufferUnderflowError(Exception):
pass