summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-03-17 15:43:00 -0400
committermrtheb <mrlabbe@gmail.com>2014-03-17 15:43:00 -0400
commit19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e (patch)
tree421d3e2d628e8b564eecde6a4efcd4edac31d1ff /kafka
parent828133cff064f4f8fba753183ac21619355ac005 (diff)
parent32edabdaaff6746e4926cc897b4bba917a80cb54 (diff)
downloadkafka-python-19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e.tar.gz
Merge branch 'master' into develop
Conflicts: test/test_unit.py
Diffstat (limited to 'kafka')
-rw-r--r--kafka/NOTES.md2
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/client.py29
-rw-r--r--kafka/codec.py98
-rw-r--r--kafka/conn.py28
5 files changed, 144 insertions, 15 deletions
diff --git a/kafka/NOTES.md b/kafka/NOTES.md
index 540cdad..8fb0f47 100644
--- a/kafka/NOTES.md
+++ b/kafka/NOTES.md
@@ -18,7 +18,7 @@ There are a few levels of abstraction:
# Possible API
- client = KafkaClient("localhost", 9092)
+ client = KafkaClient("localhost:9092")
producer = KafkaProducer(client, "topic")
producer.send_string("hello")
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 73aa760..e446f58 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -1,5 +1,5 @@
__title__ = 'kafka'
-__version__ = '0.2-alpha'
+__version__ = '0.9.0'
__author__ = 'David Arthur'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0'
diff --git a/kafka/client.py b/kafka/client.py
index c3606e4..ab0eb8d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -11,7 +11,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition,
LeaderUnavailableError,
KafkaUnavailableError)
-from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -25,14 +25,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
- def __init__(self, host, port, client_id=CLIENT_ID,
+ def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
- self.conns = { # (host, port) -> KafkaConnection
- (host, port): KafkaConnection(host, port, timeout=timeout)
- }
+ self.hosts = collect_hosts(hosts)
+
+ # create connections only when we need them
+ self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -42,6 +43,15 @@ class KafkaClient(object):
# Private API #
##################
+ def _get_conn(self, host, port):
+ "Get or create a connection to a broker using host and port"
+
+ host_key = (host, port)
+ if host_key not in self.conns:
+ self.conns[host_key] = KafkaConnection(host, port)
+
+ return self.conns[host_key]
+
def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
@@ -50,7 +60,7 @@ class KafkaClient(object):
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
- return self.conns[(broker.host, broker.port)]
+ return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
"""
@@ -83,14 +93,15 @@ class KafkaClient(object):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
- for conn in self.conns.values():
+ for (host, port) in self.hosts:
try:
+ conn = self._get_conn(host, port)
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
- log.warning("Could not send request [%r] to server %s, "
- "trying next server: %s" % (request, conn, e))
+ log.warning("Could not send request [%r] to server %s:%i, "
+ "trying next server: %s" % (request, host, port, e))
continue
raise KafkaUnavailableError("All servers failed to process request")
diff --git a/kafka/codec.py b/kafka/codec.py
index eb5d03c..206ddb4 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,5 +1,9 @@
from cStringIO import StringIO
import gzip
+import struct
+
+_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
+_XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
@@ -36,13 +40,101 @@ def gzip_decode(payload):
return result
-def snappy_encode(payload):
+def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
+ """Encodes the given data with snappy if xerial_compatible is set then the
+ stream is encoded in a fashion compatible with the xerial snappy library
+
+ The block size (xerial_blocksize) controls how frequent the blocking occurs
+ 32k is the default in the xerial library.
+
+ The format winds up being
+ +-------------+------------+--------------+------------+--------------+
+ | Header | Block1 len | Block1 data | Blockn len | Blockn data |
+ |-------------+------------+--------------+------------+--------------|
+ | 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
+ +-------------+------------+--------------+------------+--------------+
+
+ It is important to not that the blocksize is the amount of uncompressed
+ data presented to snappy at each block, whereas the blocklen is the
+ number of bytes that will be present in the stream, that is the
+ length will always be <= blocksize.
+ """
+
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
- return snappy.compress(payload)
+
+ if xerial_compatible:
+ def _chunker():
+ for i in xrange(0, len(payload), xerial_blocksize):
+ yield payload[i:i+xerial_blocksize]
+
+ out = StringIO()
+
+ header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
+ in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
+
+ out.write(header)
+ for chunk in _chunker():
+ block = snappy.compress(chunk)
+ block_size = len(block)
+ out.write(struct.pack('!i', block_size))
+ out.write(block)
+
+ out.seek(0)
+ return out.read()
+
+ else:
+ return snappy.compress(payload)
+
+
+def _detect_xerial_stream(payload):
+ """Detects if the data given might have been encoded with the blocking mode
+ of the xerial snappy library.
+
+ This mode writes a magic header of the format:
+ +--------+--------------+------------+---------+--------+
+ | Marker | Magic String | Null / Pad | Version | Compat |
+ |--------+--------------+------------+---------+--------|
+ | byte | c-string | byte | int32 | int32 |
+ |--------+--------------+------------+---------+--------|
+ | -126 | 'SNAPPY' | \0 | | |
+ +--------+--------------+------------+---------+--------+
+
+ The pad appears to be to ensure that SNAPPY is a valid cstring
+ The version is the version of this format as written by xerial,
+ in the wild this is currently 1 as such we only support v1.
+
+ Compat is there to claim the miniumum supported version that
+ can read a xerial block stream, presently in the wild this is
+ 1.
+ """
+
+ if len(payload) > 16:
+ header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
+ return header == _XERIAL_V1_HEADER
+ return False
def snappy_decode(payload):
if not _has_snappy:
raise NotImplementedError("Snappy codec is not available")
- return snappy.decompress(payload)
+
+ if _detect_xerial_stream(payload):
+ # TODO ? Should become a fileobj ?
+ out = StringIO()
+ byt = buffer(payload[16:])
+ length = len(byt)
+ cursor = 0
+
+ while cursor < length:
+ block_size = struct.unpack_from('!i', byt[cursor:])[0]
+ # Skip the block size
+ cursor += 4
+ end = cursor + block_size
+ out.write(snappy.decompress(byt[cursor:end]))
+ cursor = end
+
+ out.seek(0)
+ return out.read()
+ else:
+ return snappy.decompress(payload)
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b8f1c2..7266ae8 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import ConnectionError
@@ -9,6 +10,31 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
+DEFAULT_KAFKA_PORT = 9092
+
+
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionally
+ randomize the returned list.
+ """
+
+ if isinstance(hosts, str):
+ hosts = hosts.strip().split(',')
+
+ result = []
+ for host_port in hosts:
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
class KafkaConnection(local):
"""
@@ -81,7 +107,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error, e:
+ except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()