diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
commit | 19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e (patch) | |
tree | 421d3e2d628e8b564eecde6a4efcd4edac31d1ff /kafka | |
parent | 828133cff064f4f8fba753183ac21619355ac005 (diff) | |
parent | 32edabdaaff6746e4926cc897b4bba917a80cb54 (diff) | |
download | kafka-python-19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e.tar.gz |
Merge branch 'master' into develop
Conflicts:
test/test_unit.py
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/NOTES.md | 2 | ||||
-rw-r--r-- | kafka/__init__.py | 2 | ||||
-rw-r--r-- | kafka/client.py | 29 | ||||
-rw-r--r-- | kafka/codec.py | 98 | ||||
-rw-r--r-- | kafka/conn.py | 28 |
5 files changed, 144 insertions, 15 deletions
diff --git a/kafka/NOTES.md b/kafka/NOTES.md index 540cdad..8fb0f47 100644 --- a/kafka/NOTES.md +++ b/kafka/NOTES.md @@ -18,7 +18,7 @@ There are a few levels of abstraction: # Possible API - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = KafkaProducer(client, "topic") producer.send_string("hello") diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa760..e446f58 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,5 +1,5 @@ __title__ = 'kafka' -__version__ = '0.2-alpha' +__version__ = '0.9.0' __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' diff --git a/kafka/client.py b/kafka/client.py index c3606e4..ab0eb8d 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -11,7 +11,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, LeaderUnavailableError, KafkaUnavailableError) -from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -25,14 +25,15 @@ class KafkaClient(object): # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. - def __init__(self, host, port, client_id=CLIENT_ID, + def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, timeout=timeout) - } + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] @@ -42,6 +43,15 @@ class KafkaClient(object): # Private API # ################## + def _get_conn(self, host, port): + "Get or create a connection to a broker using host and port" + + host_key = (host, port) + if host_key not in self.conns: + self.conns[host_key] = KafkaConnection(host, port) + + return self.conns[host_key] + def _get_conn_for_broker(self, broker): """ Get or create a connection to a broker @@ -50,7 +60,7 @@ class KafkaClient(object): self.conns[(broker.host, broker.port)] = \ KafkaConnection(broker.host, broker.port, timeout=self.timeout) - return self.conns[(broker.host, broker.port)] + return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): """ @@ -83,14 +93,15 @@ class KafkaClient(object): Attempt to send a broker-agnostic request to one of the available brokers. Keep trying until you succeed. """ - for conn in self.conns.values(): + for (host, port) in self.hosts: try: + conn = self._get_conn(host, port) conn.send(requestId, request) response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, " - "trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s:%i, " + "trying next server: %s" % (request, host, port, e)) continue raise KafkaUnavailableError("All servers failed to process request") diff --git a/kafka/codec.py b/kafka/codec.py index eb5d03c..206ddb4 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,5 +1,9 @@ from cStringIO import StringIO import gzip +import struct + +_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +_XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy @@ -36,13 +40,101 @@ def gzip_decode(payload): return result -def snappy_encode(payload): +def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): + """Encodes the given data with snappy if xerial_compatible is set then the + stream is encoded in a fashion compatible with the xerial snappy library + + The block size (xerial_blocksize) controls how frequent the blocking occurs + 32k is the default in the xerial library. + + The format winds up being + +-------------+------------+--------------+------------+--------------+ + | Header | Block1 len | Block1 data | Blockn len | Blockn data | + |-------------+------------+--------------+------------+--------------| + | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes | + +-------------+------------+--------------+------------+--------------+ + + It is important to not that the blocksize is the amount of uncompressed + data presented to snappy at each block, whereas the blocklen is the + number of bytes that will be present in the stream, that is the + length will always be <= blocksize. + """ + if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.compress(payload) + + if xerial_compatible: + def _chunker(): + for i in xrange(0, len(payload), xerial_blocksize): + yield payload[i:i+xerial_blocksize] + + out = StringIO() + + header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + + out.write(header) + for chunk in _chunker(): + block = snappy.compress(chunk) + block_size = len(block) + out.write(struct.pack('!i', block_size)) + out.write(block) + + out.seek(0) + return out.read() + + else: + return snappy.compress(payload) + + +def _detect_xerial_stream(payload): + """Detects if the data given might have been encoded with the blocking mode + of the xerial snappy library. + + This mode writes a magic header of the format: + +--------+--------------+------------+---------+--------+ + | Marker | Magic String | Null / Pad | Version | Compat | + |--------+--------------+------------+---------+--------| + | byte | c-string | byte | int32 | int32 | + |--------+--------------+------------+---------+--------| + | -126 | 'SNAPPY' | \0 | | | + +--------+--------------+------------+---------+--------+ + + The pad appears to be to ensure that SNAPPY is a valid cstring + The version is the version of this format as written by xerial, + in the wild this is currently 1 as such we only support v1. + + Compat is there to claim the miniumum supported version that + can read a xerial block stream, presently in the wild this is + 1. + """ + + if len(payload) > 16: + header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + return header == _XERIAL_V1_HEADER + return False def snappy_decode(payload): if not _has_snappy: raise NotImplementedError("Snappy codec is not available") - return snappy.decompress(payload) + + if _detect_xerial_stream(payload): + # TODO ? Should become a fileobj ? + out = StringIO() + byt = buffer(payload[16:]) + length = len(byt) + cursor = 0 + + while cursor < length: + block_size = struct.unpack_from('!i', byt[cursor:])[0] + # Skip the block size + cursor += 4 + end = cursor + block_size + out.write(snappy.decompress(byt[cursor:end])) + cursor = end + + out.seek(0) + return out.read() + else: + return snappy.decompress(payload) diff --git a/kafka/conn.py b/kafka/conn.py index 2b8f1c2..7266ae8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import ConnectionError @@ -9,6 +10,31 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 +DEFAULT_KAFKA_PORT = 9092 + + +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionally + randomize the returned list. + """ + + if isinstance(hosts, str): + hosts = hosts.strip().split(',') + + result = [] + for host_port in hosts: + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + class KafkaConnection(local): """ @@ -81,7 +107,7 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error, e: + except socket.error: log.exception('Unable to send payload to Kafka') self._raise_connection_error() |