diff options
m--------- | kafka-src | 0 | ||||
-rw-r--r-- | kafka/client08.py | 180 | ||||
-rw-r--r-- | kafka/util.py | 16 | ||||
-rw-r--r-- | test/integration.py | 407 | ||||
-rw-r--r-- | test/resources/log4j.properties | 2 | ||||
-rw-r--r-- | test/resources/server.properties | 63 |
6 files changed, 396 insertions, 272 deletions
diff --git a/kafka-src b/kafka-src -Subproject df3248b758340990010b6a83ebfced60b1339c4 +Subproject e7edb5e1e933f5535378d546bcf4d8b178d2e69 diff --git a/kafka/client08.py b/kafka/client08.py index b048d68..49d786f 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,7 +14,7 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string -from .util import group_list_by_key +from .util import group_by_topic_and_partition from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") @@ -33,7 +33,7 @@ OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) -OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) @@ -74,6 +74,9 @@ class KafkaProtocol(object): OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 ################### # Private API # @@ -171,13 +174,13 @@ class KafkaProtocol(object): (key, cur) = read_int_string(data, cur) (value, cur) = read_int_string(data, cur) - if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0: + if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE: yield (offset, Message(magic, att, key, value)) - elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1: + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP: gz = gzip_decode(value) for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): yield (offset, message) - elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2: + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY: snp = snappy_decode(value) for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): yield (offset, message) @@ -214,8 +217,25 @@ class KafkaProtocol(object): message_set = KafkaProtocol._encode_message_set( [KafkaProtocol.create_message(payload) for payload in payloads]) gzipped = gzip_encode(message_set) - return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped) + @classmethod + def create_snappy_message(cls, payloads, key=None): + """ + Construct a Snappy Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Params + ====== + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + snapped = snappy_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped) @classmethod def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000): @@ -234,14 +254,14 @@ class KafkaProtocol(object): -1: waits for all replicas to be in sync timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) - message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): - message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) - for payload in payloads: + message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): + message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads)) + for partition, payload in topic_payloads.items(): message_set = KafkaProtocol._encode_message_set(payload.messages) - message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) + message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -276,15 +296,15 @@ class KafkaProtocol(object): max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before returning the response """ - - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) - message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -308,14 +328,14 @@ class KafkaProtocol(object): @classmethod def encode_offset_request(cls, client_id, correlation_id, payloads=[]): - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) - message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iqi', partition, payload.time, payload.max_offsets) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -332,8 +352,12 @@ class KafkaProtocol(object): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) for i in range(num_partitions): - ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) - yield OffsetResponse(topic, partition, error, offset) + ((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur) + offsets = [] + for j in range(num_offsets): + ((offset,), cur) = relative_unpack('>q', data, cur) + offsets.append(offset) + yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod def encode_metadata_request(cls, client_id, correlation_id, topics=[]): @@ -400,15 +424,15 @@ class KafkaProtocol(object): group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads= group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) message += write_short_string(group) - message += struct.pack('>i', len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>i', len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iq', payload.partition, payload.offset) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iq', partition, payload.offset) message += write_short_string(payload.metadata) return struct.pack('>i%ds' % len(message), len(message), message) @@ -421,6 +445,7 @@ class KafkaProtocol(object): ====== data: bytes to decode """ + data = data[2:] # TODO remove me when versionId is removed ((correlation_id,), cur) = relative_unpack('>i', data, 0) (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -443,15 +468,15 @@ class KafkaProtocol(object): group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) message += write_short_string(group) - message += struct.pack('>i', len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>i', len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>i', payload.partition) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>i', partition) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -493,6 +518,9 @@ class KafkaConnection(object): self._sock.connect((host, port)) self._sock.settimeout(10) + def __str__(self): + return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) + ################### # Private API # ################### @@ -536,6 +564,8 @@ class KafkaConnection(object): # Public API # ################## + # TODO multiplex socket communication to allow for multi-threaded clients + def send(self, requestId, payload): "Send a request to Kafka" sent = self._sock.sendall(payload) @@ -566,6 +596,10 @@ class KafkaClient(object): self.topics_to_brokers = {} # topic_id -> broker_id self.load_metadata_for_topics() + def close(self): + for conn in self.conns.values(): + conn.close() + def get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: @@ -626,20 +660,14 @@ class KafkaClient(object): ====== list of ProduceResponse or callback(ProduceResponse), in the order of input payloads """ - key_fn = lambda x: (x.topic, x.partition) - - # Note the order of the incoming payloads - original_keys = [key_fn(payload) for payload in payloads] - - # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) - # Group the produce requests by which broker they go to + original_keys = [] payloads_by_broker = defaultdict(list) - for (topic, partition), payloads in payloads_by_topic_and_partition.items(): - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + for payload in payloads: + payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads + original_keys.append((payload.topic, payload.partition)) - # Accumulate the responses in a dictionary, keyed by key_fn + # Accumulate the responses in a dictionary acc = {} # For each broker, send the list of request payloads @@ -657,11 +685,10 @@ class KafkaClient(object): (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: - acc[key_fn(produce_response)] = callback(produce_response) + acc[(produce_response.topic, produce_response.partition)] = callback(produce_response) else: - acc[key_fn(produce_response)] = produce_response + acc[(produce_response.topic, produce_response.partition)] = produce_response - print(acc) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) @@ -672,20 +699,14 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - key_fn = lambda x: (x.topic, x.partition) - - # Note the order of the incoming payloads - original_keys = [key_fn(payload) for payload in payloads] - - # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) - # Group the produce requests by which broker they go to + original_keys = [] payloads_by_broker = defaultdict(list) - for (topic, partition), payloads in payloads_by_topic_and_partition.items(): - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + for payload in payloads: + payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload) + original_keys.append((payload.topic, payload.partition)) - # Accumulate the responses in a dictionary, keyed by key_fn + # Accumulate the responses in a dictionary, keyed by topic+partition acc = {} # For each broker, send the list of request payloads @@ -703,9 +724,9 @@ class KafkaClient(object): (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback if callback is not None: - acc[key_fn(fetch_response)] = callback(fetch_response) + acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response) else: - acc[key_fn(fetch_response)] = fetch_response + acc[(fetch_response.topic, fetch_response.partition)] = fetch_response # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) @@ -720,11 +741,30 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception: - log.warning("Could not commit offset to server %s, trying next server", conn) + except Exception, e: + log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) continue return None + def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): + requestId = self.next_id() + request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None + out = [] + for offset_response in KafkaProtocol.decode_offset_response(response): + if fail_on_error == True and offset_response.error != 0: + raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) + if callback is not None: + out.append(callback(offset_response)) + else: + out.append(offset_response) + return out + def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): requestId = self.next_id() request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) @@ -737,6 +777,7 @@ class KafkaClient(object): out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): if fail_on_error == True and offset_commit_response.error != 0: + print(offset_commit_response) raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) if callback is not None: out.append(callback(offset_commit_response)) @@ -770,7 +811,7 @@ if __name__ == "__main__": topic = "foo8" # Bootstrap connection - conn = KafkaClient("localhost", 9092) + conn = KafkaClient("localhost", 49720) # Create some Messages messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]), @@ -799,7 +840,6 @@ if __name__ == "__main__": return 0 else: return offset_response.offset - # Load offsets (offset1, offset2) = conn.send_offset_fetch_request( group="group1", diff --git a/kafka/util.py b/kafka/util.py index cb8f7f5..509c5b8 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,15 +1,16 @@ +from collections import defaultdict from itertools import groupby import struct def write_int_string(s): if s is None: - return struct.pack('>i', -1) + return struct.pack('>i', 0) # TODO change this to -1 when KAFKA-771 is accepted else: return struct.pack('>i%ds' % len(s), len(s), s) def write_short_string(s): if s is None: - return struct.pack('>h', -1) + return struct.pack('>h', 0) # TODO change this to -1 when KAFKA-771 is accepted else: return struct.pack('>h%ds' % len(s), len(s), s) @@ -44,12 +45,11 @@ def relative_unpack(fmt, data, cur): out = struct.unpack(fmt, data[cur:cur+size]) return (out, cur+size) -def group_list_by_key(it, key): - sorted_it = sorted(it, key=key) - out = {} - for k, group in groupby(sorted_it, key=key): - out[k] = list(group) - return out +def group_by_topic_and_partition(tuples): + out = defaultdict(dict) + for t in tuples: + out[t.topic][t.partition] = t + return out class BufferUnderflowError(Exception): pass diff --git a/test/integration.py b/test/integration.py index 3971d3f..598b17a 100644 --- a/test/integration.py +++ b/test/integration.py @@ -11,8 +11,7 @@ from threading import Thread, Event import time import unittest -from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest -from kafka.queue import KafkaQueue +from kafka.client08 import * def get_open_port(): sock = socket.socket() @@ -27,12 +26,15 @@ def build_kafka_classpath(): jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar")) jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar")) jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar")) - jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar")) jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar")) - return ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + jars += glob.glob(os.path.join(baseDir, "core/target/scala-2.8.0/kafka_2.8.0-*.jar")) + jars += glob.glob(os.path.join(baseDir, "/Users/mumrah/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")) + cp = ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + cp += ":" + os.path.abspath(os.path.join(baseDir, "conf/log4j.properties")) + return cp class KafkaFixture(Thread): - def __init__(self, port): + def __init__(self, host, port): Thread.__init__(self) self.port = port self.capture = "" @@ -57,7 +59,7 @@ class KafkaFixture(Thread): # Start Kafka args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile)) - proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) + proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) killed = False while True: @@ -65,6 +67,7 @@ class KafkaFixture(Thread): if proc.stdout in rlist: read = proc.stdout.readline() stdout.write(read) + stdout.flush() self.capture += read if self.shouldDie.is_set(): @@ -88,174 +91,254 @@ class KafkaFixture(Thread): return True time.sleep(0.100) + def close(self): + self.shouldDie.set() -class IntegrationTest(unittest.TestCase): +class ExternalKafkaFixture(object): + def __init__(self, host, port): + print("Using already running Kafka at %s:%d" % (host, port)) + + def close(self): + pass + + +class TestKafkaClient(unittest.TestCase): @classmethod def setUpClass(cls): - port = get_open_port() - cls.server = KafkaFixture(port) - cls.server.start() - cls.server.wait_for("Kafka server started") - cls.kafka = KafkaClient("localhost", port) + if os.environ.has_key('KAFKA_URI'): + parse = urlparse(os.environ['KAFKA_URI']) + (host, port) = (parse.hostname, parse.port) + cls.server = ExternalKafkaFixture(host, port) + cls.client = KafkaClient(host, port) + else: + port = get_open_port() + cls.server = KafkaFixture("localhost", port) + cls.server.start() + cls.server.wait_for("Kafka server started") + cls.client = KafkaClient("localhost", port) @classmethod def tearDownClass(cls): - cls.kafka.close() - cls.server.shouldDie.set() - - def test_send_simple(self): - self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3") - self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'")) - self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple")) - - def test_produce(self): - # Produce a message, check that the log got created - req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0")) - - # Same thing, different partition - req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) - - def _test_produce_consume(self, topic, create_func): - # Send two messages and consume them - message1 = create_func("testing 1") - message2 = create_func("testing 2") - req = ProduceRequest(topic, 0, [message1, message2]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic)) - self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic)) - req = FetchRequest(topic, 0, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 2) - self.assertEquals(messages[0].payload, "testing 1") - self.assertEquals(messages[1].payload, "testing 2") - - # Do the same, but for a different partition - message3 = create_func("testing 3") - message4 = create_func("testing 4") - req = ProduceRequest(topic, 1, [message3, message4]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic)) - self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic)) - req = FetchRequest(topic, 1, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 2) - self.assertEquals(messages[0].payload, "testing 3") - self.assertEquals(messages[1].payload, "testing 4") + cls.client.close() + cls.server.close() + + ##################### + # Produce Tests # + ##################### + + def test_produce_many_simple(self): + produce = ProduceRequest("test_produce_many_simple", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 200) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 300) + + def test_produce_10k_simple(self): + produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(10000) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 10000) + + def test_produce_many_gzip(self): + message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) + message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_many_snappy(self): + message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) + message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_mixed(self): + message1 = KafkaProtocol.create_message("Just a plain message") + message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)]) + message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 201) + + + def test_produce_100k_gzipped(self): + produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)]) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100000) + + ##################### + # Consume Tests # + ##################### + + def test_consume_none(self): + fetch = FetchRequest("test_consume_none", 0, 0, 1024) + + fetch_resp = self.client.send_fetch_request([fetch]).next() + self.assertEquals(fetch_resp.error, 0) + self.assertEquals(fetch_resp.topic, "test_consume_none") + self.assertEquals(fetch_resp.partition, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 0) def test_produce_consume(self): - self._test_produce_consume("test-produce-consume", KafkaClient.create_message) - - def test_produce_consume_snappy(self): - self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message) - - def test_produce_consume_gzip(self): - self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message) - - def test_check_offset(self): - # Produce/consume a message, check that the next offset looks correct - message1 = KafkaClient.create_message("testing 1") - req = ProduceRequest("test-check-offset", 0, [message1]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) - req = FetchRequest("test-check-offset", 0, 0, 1024) - (messages, nextReq) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], message1) - self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1))) - - # Produce another message, consume with the last offset - message2 = KafkaClient.create_message("test 2") - req = ProduceRequest("test-check-offset", 0, [message2]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) - - # Verify - (messages, nextReq) = self.kafka.get_message_set(nextReq) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], message2) - self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2))) - - def test_iterator(self): - # Produce 100 messages - messages = [] - for i in range(100): - messages.append(KafkaClient.create_message("testing %d" % i)) - req = ProduceRequest("test-iterator", 0, messages) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'")) - - # Initialize an iterator of fetch size 64 bytes - big enough for one message - # but not enough for all 100 messages - cnt = 0 - for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)): - self.assertEquals(messages[i], msg) - cnt += 1 - self.assertEquals(cnt, 100) - - # Same thing, but don't auto paginate - cnt = 0 - for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)): - self.assertEquals(messages[i], msg) - cnt += 1 - self.assertTrue(cnt < 100) - - def test_offset_request(self): - # Produce a message to create the topic/partition - message1 = KafkaClient.create_message("testing 1") - req = ProduceRequest("test-offset-request", 0, [message1]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'")) - - t1 = int(time.time()*1000) # now - t2 = t1 + 60000 # one minute from now - req = OffsetRequest("test-offset-request", 0, t1, 1024) - self.kafka.get_offsets(req) - - req = OffsetRequest("test-offset-request", 0, t2, 1024) - self.kafka.get_offsets(req) - - def test_10k_messages(self): - msg_tmpl = "this is a test message with a few bytes in it. this is message number %d" - # TODO 10k actually fails, why? - msg = KafkaClient.create_gzip_message(*[msg_tmpl % i for i in range(1000)]) - req = ProduceRequest("test-10k", 0, [msg]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-10k'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-10k-0'")) - #self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1")) - #self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'")) - - def test_queue(self): - # Send 1000 messages - q = KafkaQueue(self.kafka, "test-queue", [0,1]) - t1 = time.time() - for i in range(1000): - q.put("test %d" % i) - t2 = time.time() + produce = ProduceRequest("test_produce_consume", 0, messages=[ + KafkaProtocol.create_message("Just a test message"), + KafkaProtocol.create_message("Message with a key", "foo"), + ]) - # Wait for the producer to fully flush - time.sleep(2) + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) - # Copy all the messages into a list - t1 = time.time() - consumed = [] - for i in range(1000): - consumed.append(q.get()) - t2 = time.time() + fetch = FetchRequest("test_produce_consume", 0, 0, 1024) - # Verify everything is there - for i in range(1000): - self.assertTrue("test %d" % i in consumed) + fetch_resp = self.client.send_fetch_request([fetch]).next() + self.assertEquals(fetch_resp.error, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].offset, 0) + self.assertEquals(messages[0].message.value, "Just a test message") + self.assertEquals(messages[0].message.key, None) + self.assertEquals(messages[1].offset, 1) + self.assertEquals(messages[1].message.value, "Message with a key") + self.assertEquals(messages[1].message.key, "foo") + + def test_produce_consume_many(self): + produce = ProduceRequest("test_produce_consume_many", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # 1024 is not enough for 100 messages... + fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024) + + (fetch_resp1,) = self.client.send_fetch_request([fetch1]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 100) + messages = list(fetch_resp1.messages) + self.assertTrue(len(messages) < 100) + + # 10240 should be enough + fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240) + (fetch_resp2,) = self.client.send_fetch_request([fetch2]) + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 100) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 100) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Test message %d" % i) + self.assertEquals(message.message.key, None) + + def test_produce_consume_two_partitions(self): + produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ + KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10) + ]) + produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ + KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce1, produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + return + + fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) + fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 10) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 0 %d" % i) + self.assertEquals(message.message.key, None) + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 10) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 1 %d" % i) + self.assertEquals(message.message.key, None) + + #################### + # Offset Tests # + #################### + + def test_commit_fetch_offsets(self): + req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") + (resp,) = self.client.send_offset_commit_request("group", [req]) + self.assertEquals(resp.error, 0) + + req = OffsetFetchRequest("test_commit_fetch_offsets", 0) + (resp,) = self.client.send_offset_fetch_request("group", [req]) + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 42) + self.assertEquals(resp.metadata, "metadata") + + + - # Shutdown the queue - q.close() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) diff --git a/test/resources/log4j.properties b/test/resources/log4j.properties index c4ecd2c..47a817a 100644 --- a/test/resources/log4j.properties +++ b/test/resources/log4j.properties @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=DEBUG, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/test/resources/server.properties b/test/resources/server.properties index 2eefe3b..0d01fca 100644 --- a/test/resources/server.properties +++ b/test/resources/server.properties @@ -17,31 +17,32 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -brokerid=0 - -# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned -# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost -# may not be what you want. -#hostname= - +broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on port=%(kafka.port)d -# The number of processor threads the socket server uses for receiving and answering requests. -# Defaults to the number of cores on the machine -num.threads=2 +# Hostname the broker will bind to and advertise to producers and consumers. +# If not set, the server will bind to all interfaces and advertise the value returned from +# from java.net.InetAddress.getCanonicalHostName(). +#host.name=localhost + +# The number of threads handling network requests +num.network.threads=2 + +# The number of threads doing disk I/O +num.io.threads=2 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) -max.socket.request.bytes=104857600 +socket.request.max.bytes=104857600 ############################# Log Basics ############################# @@ -53,9 +54,6 @@ log.dir=%(kafka.tmp.dir)s # for consumption, but also mean more files. num.partitions=%(kafka.partitions)d -# Overrides for for the default given by num.partitions on a per-topic basis -#topic.partition.count.map=topic1:3, topic2:4 - ############################# Log Flush Policy ############################# # The following configurations control the flush of data to disk. This is the most @@ -68,16 +66,13 @@ num.partitions=%(kafka.partitions)d # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=1 +log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=10000 +log.flush.interval.ms=1000 -# Per-topic overrides for log.default.flush.interval.ms -#topic.flush.intervals.ms=topic1:1000, topic2:3000 - -# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=10000 +# Per-topic overrides for log.flush.interval.ms +#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000 ############################# Log Retention Policy ############################# @@ -90,11 +85,11 @@ log.default.flush.scheduler.interval.ms=10000 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.size. -#log.retention.size=1073741824 +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.file.size=536870912 +log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies @@ -102,15 +97,21 @@ log.cleanup.interval.mins=1 ############################# Zookeeper ############################# -# Enable connecting to zookeeper -enable.zookeeper=false - # Zk connection string (see zk docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zk.connect=localhost:2181 +zk.connect=localhost:2181/kafka-python # Timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 +zk.connection.timeout.ms=1000000 + +# metrics reporter properties +kafka.metrics.polling.interval.secs=5 +kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter +kafka.csv.metrics.dir=/tmp/kafka_metrics +# Disable csv reporting by default. +kafka.csv.metrics.reporter.enabled=false + +log.cleanup.policy=delete |