summaryrefslogtreecommitdiff
path: root/kafka/client08.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-02-22 23:09:25 -0500
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit2a3d231aa61642c57537bc2128dd4f2bd30f35dd (patch)
tree6bfdfa13b228481df9c79bcb926c2036b476b891 /kafka/client08.py
parente87c561723be25fcfa2564030367196231aa366e (diff)
downloadkafka-python-2a3d231aa61642c57537bc2128dd4f2bd30f35dd.tar.gz
Protocol and low-level client done, adding tests
Diffstat (limited to 'kafka/client08.py')
-rw-r--r--kafka/client08.py180
1 files changed, 110 insertions, 70 deletions
diff --git a/kafka/client08.py b/kafka/client08.py
index b048d68..49d786f 100644
--- a/kafka/client08.py
+++ b/kafka/client08.py
@@ -14,7 +14,7 @@ from .codec import snappy_encode, snappy_decode
from .util import read_short_string, read_int_string
from .util import relative_unpack
from .util import write_short_string, write_int_string
-from .util import group_list_by_key
+from .util import group_by_topic_and_partition
from .util import BufferUnderflowError, ChecksumError
log = logging.getLogger("kafka")
@@ -33,7 +33,7 @@ OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
# Response payloads
ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"])
FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"])
-OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"])
+OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"])
BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"])
@@ -74,6 +74,9 @@ class KafkaProtocol(object):
OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03
+ CODEC_NONE = 0x00
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
###################
# Private API #
@@ -171,13 +174,13 @@ class KafkaProtocol(object):
(key, cur) = read_int_string(data, cur)
(value, cur) = read_int_string(data, cur)
- if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0:
+ if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE:
yield (offset, Message(magic, att, key, value))
- elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1:
+ elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP:
gz = gzip_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(gz):
yield (offset, message)
- elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2:
+ elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY:
snp = snappy_decode(value)
for (offset, message) in KafkaProtocol._decode_message_set_iter(snp):
yield (offset, message)
@@ -214,8 +217,25 @@ class KafkaProtocol(object):
message_set = KafkaProtocol._encode_message_set(
[KafkaProtocol.create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set)
- return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped)
+ return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped)
+ @classmethod
+ def create_snappy_message(cls, payloads, key=None):
+ """
+ Construct a Snappy Message containing multiple Messages
+
+ The given payloads will be encoded, compressed, and sent as a single atomic
+ message to Kafka.
+
+ Params
+ ======
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+ """
+ message_set = KafkaProtocol._encode_message_set(
+ [KafkaProtocol.create_message(payload) for payload in payloads])
+ snapped = snappy_encode(message_set)
+ return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped)
@classmethod
def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000):
@@ -234,14 +254,14 @@ class KafkaProtocol(object):
-1: waits for all replicas to be in sync
timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout
"""
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY)
- message += struct.pack('>hii', acks, timeout, len(payloads_by_topic))
- for topic, payloads in payloads_by_topic.items():
- message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads))
- for payload in payloads:
+ message += struct.pack('>hii', acks, timeout, len(grouped_payloads))
+ for topic, topic_payloads in grouped_payloads.items():
+ message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads))
+ for partition, payload in topic_payloads.items():
message_set = KafkaProtocol._encode_message_set(payload.messages)
- message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set)
+ message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -276,15 +296,15 @@ class KafkaProtocol(object):
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before returning the response
"""
-
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY)
- message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -308,14 +328,14 @@ class KafkaProtocol(object):
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=[]):
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY)
- message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>iqi', partition, payload.time, payload.max_offsets)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -332,8 +352,12 @@ class KafkaProtocol(object):
(topic, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for i in range(num_partitions):
- ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur)
- yield OffsetResponse(topic, partition, error, offset)
+ ((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur)
+ offsets = []
+ for j in range(num_offsets):
+ ((offset,), cur) = relative_unpack('>q', data, cur)
+ offsets.append(offset)
+ yield OffsetResponse(topic, partition, error, tuple(offsets))
@classmethod
def encode_metadata_request(cls, client_id, correlation_id, topics=[]):
@@ -400,15 +424,15 @@ class KafkaProtocol(object):
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads= group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY)
message += write_short_string(group)
- message += struct.pack('>i', len(payloads_by_topic))
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>i', len(grouped_payloads))
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>iq', payload.partition, payload.offset)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>iq', partition, payload.offset)
message += write_short_string(payload.metadata)
return struct.pack('>i%ds' % len(message), len(message), message)
@@ -421,6 +445,7 @@ class KafkaProtocol(object):
======
data: bytes to decode
"""
+ data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -443,15 +468,15 @@ class KafkaProtocol(object):
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
- payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic"))
+ grouped_payloads = group_by_topic_and_partition(payloads)
message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY)
message += write_short_string(group)
- message += struct.pack('>i', len(payloads_by_topic))
- for topic, payloads in payloads_by_topic.items():
+ message += struct.pack('>i', len(grouped_payloads))
+ for topic, topic_payloads in grouped_payloads.items():
message += write_short_string(topic)
- message += struct.pack('>i', len(payloads))
- for payload in payloads:
- message += struct.pack('>i', payload.partition)
+ message += struct.pack('>i', len(topic_payloads))
+ for partition, payload in topic_payloads.items():
+ message += struct.pack('>i', partition)
return struct.pack('>i%ds' % len(message), len(message), message)
@classmethod
@@ -493,6 +518,9 @@ class KafkaConnection(object):
self._sock.connect((host, port))
self._sock.settimeout(10)
+ def __str__(self):
+ return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
+
###################
# Private API #
###################
@@ -536,6 +564,8 @@ class KafkaConnection(object):
# Public API #
##################
+ # TODO multiplex socket communication to allow for multi-threaded clients
+
def send(self, requestId, payload):
"Send a request to Kafka"
sent = self._sock.sendall(payload)
@@ -566,6 +596,10 @@ class KafkaClient(object):
self.topics_to_brokers = {} # topic_id -> broker_id
self.load_metadata_for_topics()
+ def close(self):
+ for conn in self.conns.values():
+ conn.close()
+
def get_conn_for_broker(self, broker):
"Get or create a connection to a broker"
if (broker.host, broker.port) not in self.conns:
@@ -626,20 +660,14 @@ class KafkaClient(object):
======
list of ProduceResponse or callback(ProduceResponse), in the order of input payloads
"""
- key_fn = lambda x: (x.topic, x.partition)
-
- # Note the order of the incoming payloads
- original_keys = [key_fn(payload) for payload in payloads]
-
- # Group the produce requests by topic+partition
- payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
-
# Group the produce requests by which broker they go to
+ original_keys = []
payloads_by_broker = defaultdict(list)
- for (topic, partition), payloads in payloads_by_topic_and_partition.items():
- payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
+ for payload in payloads:
+ payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads
+ original_keys.append((payload.topic, payload.partition))
- # Accumulate the responses in a dictionary, keyed by key_fn
+ # Accumulate the responses in a dictionary
acc = {}
# For each broker, send the list of request payloads
@@ -657,11 +685,10 @@ class KafkaClient(object):
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
if callback is not None:
- acc[key_fn(produce_response)] = callback(produce_response)
+ acc[(produce_response.topic, produce_response.partition)] = callback(produce_response)
else:
- acc[key_fn(produce_response)] = produce_response
+ acc[(produce_response.topic, produce_response.partition)] = produce_response
- print(acc)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
@@ -672,20 +699,14 @@ class KafkaClient(object):
Payloads are grouped by topic and partition so they can be pipelined to the same
brokers.
"""
- key_fn = lambda x: (x.topic, x.partition)
-
- # Note the order of the incoming payloads
- original_keys = [key_fn(payload) for payload in payloads]
-
- # Group the produce requests by topic+partition
- payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn)
-
# Group the produce requests by which broker they go to
+ original_keys = []
payloads_by_broker = defaultdict(list)
- for (topic, partition), payloads in payloads_by_topic_and_partition.items():
- payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads
+ for payload in payloads:
+ payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload)
+ original_keys.append((payload.topic, payload.partition))
- # Accumulate the responses in a dictionary, keyed by key_fn
+ # Accumulate the responses in a dictionary, keyed by topic+partition
acc = {}
# For each broker, send the list of request payloads
@@ -703,9 +724,9 @@ class KafkaClient(object):
(TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error))
# Run the callback
if callback is not None:
- acc[key_fn(fetch_response)] = callback(fetch_response)
+ acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response)
else:
- acc[key_fn(fetch_response)] = fetch_response
+ acc[(fetch_response.topic, fetch_response.partition)] = fetch_response
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys)
@@ -720,11 +741,30 @@ class KafkaClient(object):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
- except Exception:
- log.warning("Could not commit offset to server %s, trying next server", conn)
+ except Exception, e:
+ log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e))
continue
return None
+ def send_offset_request(self, payloads=[], fail_on_error=True, callback=None):
+ requestId = self.next_id()
+ request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads)
+ response = self.try_send_request(requestId, request)
+ if response is None:
+ if fail_on_error is True:
+ raise Exception("All servers failed to process request")
+ else:
+ return None
+ out = []
+ for offset_response in KafkaProtocol.decode_offset_response(response):
+ if fail_on_error == True and offset_response.error != 0:
+ raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error)
+ if callback is not None:
+ out.append(callback(offset_response))
+ else:
+ out.append(offset_response)
+ return out
+
def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None):
requestId = self.next_id()
request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads)
@@ -737,6 +777,7 @@ class KafkaClient(object):
out = []
for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response):
if fail_on_error == True and offset_commit_response.error != 0:
+ print(offset_commit_response)
raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error)
if callback is not None:
out.append(callback(offset_commit_response))
@@ -770,7 +811,7 @@ if __name__ == "__main__":
topic = "foo8"
# Bootstrap connection
- conn = KafkaClient("localhost", 9092)
+ conn = KafkaClient("localhost", 49720)
# Create some Messages
messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]),
@@ -799,7 +840,6 @@ if __name__ == "__main__":
return 0
else:
return offset_response.offset
-
# Load offsets
(offset1, offset2) = conn.send_offset_fetch_request(
group="group1",