summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-23 16:58:09 -0800
committerDana Powers <dana.powers@rd.io>2016-01-24 17:33:08 -0800
commit44330f49cf15d2d22d7e382b52a0727deb246cd6 (patch)
treed7f2a68b12cd2ddd4a7858446b5f62ea46cd1689
parent434d1abb18a130f54682662909a891edcdb98f5f (diff)
downloadkafka-python-44330f49cf15d2d22d7e382b52a0727deb246cd6.tar.gz
Implement new KafkaProducer, mimicing java client interface / design
-rw-r--r--kafka/producer/kafka.py496
1 files changed, 496 insertions, 0 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
new file mode 100644
index 0000000..220528f
--- /dev/null
+++ b/kafka/producer/kafka.py
@@ -0,0 +1,496 @@
+from __future__ import absolute_import
+
+import atexit
+import copy
+import logging
+import signal
+import threading
+import time
+
+from ..client_async import KafkaClient
+from ..common import TopicPartition
+from ..partitioner.default import DefaultPartitioner
+from ..protocol.message import Message, MessageSet
+from .future import FutureRecordMetadata, FutureProduceResult
+from .record_accumulator import AtomicInteger, RecordAccumulator
+from .sender import Sender
+
+import kafka.common as Errors
+
+log = logging.getLogger(__name__)
+PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()
+
+
+class KafkaProducer(object):
+ """A Kafka client that publishes records to the Kafka cluster.
+
+ The producer is thread safe and sharing a single producer instance across
+ threads will generally be faster than having multiple instances.
+
+ The producer consists of a pool of buffer space that holds records that
+ haven't yet been transmitted to the server as well as a background I/O
+ thread that is responsible for turning these records into requests and
+ transmitting them to the cluster.
+
+ The send() method is asynchronous. When called it adds the record to a
+ buffer of pending record sends and immediately returns. This allows the
+ producer to batch together individual records for efficiency.
+
+ The 'acks' config controls the criteria under which requests are considered
+ complete. The "all" setting will result in blocking on the full commit of
+ the record, the slowest but most durable setting.
+
+ If the request fails, the producer can automatically retry, unless
+ 'retries' is configured to 0. Enabling retries also opens up the
+ possibility of duplicates (see the documentation on message
+ delivery semantics for details:
+ http://kafka.apache.org/documentation.html#semantics
+ ).
+
+ The producer maintains buffers of unsent records for each partition. These
+ buffers are of a size specified by the 'batch_size' config. Making this
+ larger can result in more batching, but requires more memory (since we will
+ generally have one of these buffers for each active partition).
+
+ By default a buffer is available to send immediately even if there is
+ additional unused space in the buffer. However if you want to reduce the
+ number of requests you can set 'linger_ms' to something greater than 0.
+ This will instruct the producer to wait up to that number of milliseconds
+ before sending a request in hope that more records will arrive to fill up
+ the same batch. This is analogous to Nagle's algorithm in TCP. Note that
+ records that arrive close together in time will generally batch together
+ even with linger_ms=0 so under heavy load batching will occur regardless of
+ the linger configuration; however setting this to something larger than 0
+ can lead to fewer, more efficient requests when not under maximal load at
+ the cost of a small amount of latency.
+
+ The buffer_memory controls the total amount of memory available to the
+ producer for buffering. If records are sent faster than they can be
+ transmitted to the server then this buffer space will be exhausted. When
+ the buffer space is exhausted additional send calls will block.
+
+ The key_serializer and value_serializer instruct how to turn the key and
+ value objects the user provides into bytes.
+
+ Keyword Arguments:
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the producer should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
+ client_id (str): a name for this client. This string is passed in
+ each request to servers and can be used to identify specific
+ server-side log entries that correspond to this client.
+ Default: 'kafka-python-producer-#' (appended with a unique number
+ per instance)
+ key_serializer (callable): used to convert user-supplied keys to bytes
+ If not None, called as f(key), should return bytes. Default: None.
+ value_serializer (callable): used to convert user-supplied message
+ values to bytes. If not None, called as f(value), should return
+ bytes. Default: None.
+ acks (0, 1, 'all'): The number of acknowledgments the producer requires
+ the leader to have received before considering a request complete.
+ This controls the durability of records that are sent. The
+ following settings are common:
+ 0: Producer will not wait for any acknowledgment from the server
+ at all. The message will immediately be added to the socket
+ buffer and considered sent. No guarantee can be made that the
+ server has received the record in this case, and the retries
+ configuration will not take effect (as the client won't
+ generally know of any failures). The offset given back for each
+ record will always be set to -1.
+ 1: The broker leader will write the record to its local log but
+ will respond without awaiting full acknowledgement from all
+ followers. In this case should the leader fail immediately
+ after acknowledging the record but before the followers have
+ replicated it then the record will be lost.
+ all: The broker leader will wait for the full set of in-sync
+ replicas to acknowledge the record. This guarantees that the
+ record will not be lost as long as at least one in-sync replica
+ remains alive. This is the strongest available guarantee.
+ If unset, defaults to acks=1.
+ compression_type (str): The compression type for all data generated by
+ the producer. Valid values are 'gzip', 'snappy', or None.
+ Compression is of full batches of data, so the efficacy of batching
+ will also impact the compression ratio (more batching means better
+ compression). Default: None.
+ retries (int): Setting a value greater than zero will cause the client
+ to resend any record whose send fails with a potentially transient
+ error. Note that this retry is no different than if the client
+ resent the record upon receiving the error. Allowing retries will
+ potentially change the ordering of records because if two records
+ are sent to a single partition, and the first fails and is retried
+ but the second succeeds, then the second record may appear first.
+ Default: 0.
+ batch_size (int): Requests sent to brokers will contain multiple
+ batches, one for each partition with data available to be sent.
+ A small batch size will make batching less common and may reduce
+ throughput (a batch size of zero will disable batching entirely).
+ Default: 16384
+ linger_ms (int): The producer groups together any records that arrive
+ in between request transmissions into a single batched request.
+ Normally this occurs only under load when records arrive faster
+ than they can be sent out. However in some circumstances the client
+ may want to reduce the number of requests even under moderate load.
+ This setting accomplishes this by adding a small amount of
+ artificial delay; that is, rather than immediately sending out a
+ record the producer will wait for up to the given delay to allow
+ other records to be sent so that the sends can be batched together.
+ This can be thought of as analogous to Nagle's algorithm in TCP.
+ This setting gives the upper bound on the delay for batching: once
+ we get batch_size worth of records for a partition it will be sent
+ immediately regardless of this setting, however if we have fewer
+ than this many bytes accumulated for this partition we will
+ 'linger' for the specified time waiting for more records to show
+ up. This setting defaults to 0 (i.e. no delay). Setting linger_ms=5
+ would have the effect of reducing the number of requests sent but
+ would add up to 5ms of latency to records sent in the absense of
+ load. Default: 0.
+ partitioner (callable): Callable used to determine which partition
+ each message is assigned to. Called (after key serialization):
+ partitioner(key_bytes, all_partitions, available_partitions).
+ The default partitioner implementation hashes each non-None key
+ using the same murmur2 algorithm as the java client so that
+ messages with the same key are assigned to the same partition.
+ When a key is None, the message is delivered to a random partition
+ (filtered to partitions with available leaders only, if possible).
+ buffer_memory (int): The total bytes of memory the producer should use
+ to buffer records waiting to be sent to the server. If records are
+ sent faster than they can be delivered to the server the producer
+ will block up to max_block_ms, raising an exception on timeout.
+ In the current implementation, this setting is an approximation.
+ Default: 33554432 (32MB)
+ max_block_ms (int): Number of milliseconds to block during send()
+ when attempting to allocate additional memory before raising an
+ exception. Default: 60000.
+ max_request_size (int): The maximum size of a request. This is also
+ effectively a cap on the maximum record size. Note that the server
+ has its own cap on record size which may be different from this.
+ This setting will limit the number of record batches the producer
+ will send in a single request to avoid sending huge requests.
+ Default: 1048576.
+ metadata_max_age_ms (int): The period of time in milliseconds after
+ which we force a refresh of metadata even if we haven't seen any
+ partition leadership changes to proactively discover any new
+ brokers or partitions. Default: 300000
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 30000.
+ receive_buffer_bytes (int): The size of the TCP receive buffer
+ (SO_RCVBUF) to use when reading data. Default: 32768
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: 131072
+ reconnect_backoff_ms (int): The amount of time in milliseconds to
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
+ api_version (str): specify which kafka API version to use.
+ If set to 'auto', will attempt to infer the broker version by
+ probing various APIs. Default: auto
+
+ Note:
+ Configuration parameters are described in more detail at
+ https://kafka.apache.org/090/configuration.html#producerconfigs
+ """
+ _DEFAULT_CONFIG = {
+ 'bootstrap_servers': 'localhost',
+ 'client_id': None,
+ 'key_serializer': None,
+ 'value_serializer': None,
+ 'acks': 1,
+ 'compression_type': None,
+ 'retries': 0,
+ 'batch_size': 16384,
+ 'linger_ms': 0,
+ 'partitioner': DefaultPartitioner(),
+ 'buffer_memory': 33554432,
+ 'connections_max_idle_ms': 600000, # not implemented yet
+ 'max_block_ms': 60000,
+ 'max_request_size': 1048576,
+ 'metadata_max_age_ms': 300000,
+ 'retry_backoff_ms': 100,
+ 'request_timeout_ms': 30000,
+ 'receive_buffer_bytes': 32768,
+ 'send_buffer_bytes': 131072,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'api_version': 'auto',
+ }
+
+ def __init__(self, **configs):
+ log.debug("Starting the Kafka producer") # trace
+ self.config = copy.copy(self._DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs.pop(key)
+
+ # Only check for extra config keys in top-level class
+ assert not configs, 'Unrecognized configs: %s' % configs
+
+ if self.config['client_id'] is None:
+ self.config['client_id'] = 'kafka-python-producer-%s' % \
+ PRODUCER_CLIENT_ID_SEQUENCE.increment()
+
+ if self.config['acks'] == 'all':
+ self.config['acks'] = -1
+
+ client = KafkaClient(**self.config)
+
+ # Check Broker Version if not set explicitly
+ if self.config['api_version'] == 'auto':
+ self.config['api_version'] = client.check_version()
+ assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0')
+
+ # Convert api_version config to tuple for easy comparisons
+ self.config['api_version'] = tuple(
+ map(int, self.config['api_version'].split('.')))
+
+ if self.config['compression_type'] == 'lz4':
+ assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
+
+ self._accumulator = RecordAccumulator(**self.config)
+ self._metadata = client.cluster
+ self._metadata_lock = threading.Condition()
+ self._sender = Sender(client, self._metadata, self._metadata_lock,
+ self._accumulator, **self.config)
+ self._sender.daemon = True
+ self._sender.start()
+ self._closed = False
+ atexit.register(self.close, timeout=0)
+ log.debug("Kafka producer started")
+
+ def __del__(self):
+ self.close(timeout=0)
+
+ def close(self, timeout=None):
+ """Close this producer."""
+ if self._closed:
+ log.info('Kafka producer closed')
+ return
+ if timeout is None:
+ timeout = 999999999
+ assert timeout >= 0
+
+ log.info("Closing the Kafka producer with %s secs timeout.", timeout)
+ #first_exception = AtomicReference() # this will keep track of the first encountered exception
+ invoked_from_callback = bool(threading.current_thread() is self._sender)
+ if timeout > 0:
+ if invoked_from_callback:
+ log.warning("Overriding close timeout %s secs to 0 in order to"
+ " prevent useless blocking due to self-join. This"
+ " means you have incorrectly invoked close with a"
+ " non-zero timeout from the producer call-back.",
+ timeout)
+ else:
+ # Try to close gracefully.
+ if self._sender is not None:
+ self._sender.initiate_close()
+ self._sender.join(timeout)
+
+ if self._sender is not None and self._sender.is_alive():
+
+ log.info("Proceeding to force close the producer since pending"
+ " requests could not be completed within timeout %s.",
+ timeout)
+ self._sender.force_close()
+ # Only join the sender thread when not calling from callback.
+ if not invoked_from_callback:
+ self._sender.join()
+
+ try:
+ self.config['key_serializer'].close()
+ except AttributeError:
+ pass
+ try:
+ self.config['value_serializer'].close()
+ except AttributeError:
+ pass
+ self._closed = True
+ log.debug("The Kafka producer has closed.")
+
+ def partitions_for(self, topic):
+ """Returns set of all known partitions for the topic."""
+ max_wait = self.config['max_block_ms'] / 1000.0
+ return self._wait_on_metadata(topic, max_wait)
+
+ def send(self, topic, value=None, key=None, partition=None):
+ """Publish a message to a topic.
+
+ Arguments:
+ topic (str): topic where the message will be published
+ value (optional): message value. Must be type bytes, or be
+ serializable to bytes via configured value_serializer. If value
+ is None, key is required and message acts as a 'delete'.
+ See kafka compaction documentation for more details:
+ http://kafka.apache.org/documentation.html#compaction
+ (compaction requires kafka >= 0.8.1)
+ partition (int, optional): optionally specify a partition. If not
+ set, the partition will be selected using the configured
+ 'partitioner'.
+ key (optional): a key to associate with the message. Can be used to
+ determine which partition to send the message to. If partition
+ is None (and producer's partitioner config is left as default),
+ then messages with the same key will be delivered to the same
+ partition (but if key is None, partition is chosen randomly).
+ Must be type bytes, or be serializable to bytes via configured
+ key_serializer.
+
+ Returns:
+ FutureRecordMetadata: resolves to RecordMetadata
+
+ Raises:
+ KafkaTimeoutError: if unable to fetch topic metadata, or unable
+ to obtain memory buffer prior to configured max_block_ms
+ """
+ assert value is not None or self.config['api_version'] >= (0, 8, 1), (
+ 'Null messages require kafka >= 0.8.1')
+ assert not (value is None and key is None), 'Need at least one: key or value'
+ try:
+ # first make sure the metadata for the topic is
+ # available
+ self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
+
+ key_bytes, value_bytes = self._serialize(topic, key, value)
+ partition = self._partition(topic, partition, key, value,
+ key_bytes, value_bytes)
+
+ message_size = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
+ if key_bytes is not None:
+ message_size += len(key_bytes)
+ if value_bytes is not None:
+ message_size += len(value_bytes)
+ self._ensure_valid_record_size(message_size)
+
+ tp = TopicPartition(topic, partition)
+ log.debug("Sending (key=%s value=%s) to %s", key, value, tp)
+ result = self._accumulator.append(tp, key_bytes, value_bytes,
+ self.config['max_block_ms'])
+ future, batch_is_full, new_batch_created = result
+ if batch_is_full or new_batch_created:
+ log.debug("Waking up the sender since %s is either full or"
+ " getting a new batch", tp)
+ self._sender.wakeup()
+
+ return future
+ # handling exceptions and record the errors;
+ # for API exceptions return them in the future,
+ # for other exceptions raise directly
+ except Errors.KafkaTimeoutError:
+ raise
+ except AssertionError:
+ raise
+ except Exception as e:
+ log.debug("Exception occurred during message send: %s", e)
+ return FutureRecordMetadata(
+ FutureProduceResult(TopicPartition(topic, partition)),
+ -1).failure(e)
+
+ def flush(self):
+ """
+ Invoking this method makes all buffered records immediately available
+ to send (even if linger_ms is greater than 0) and blocks on the
+ completion of the requests associated with these records. The
+ post-condition of flush() is that any previously sent record will have
+ completed (e.g. Future.is_done() == True). A request is considered
+ completed when either it is successfully acknowledged according to the
+ 'acks' configuration for the producer, or it results in an error.
+
+ Other threads can continue sending messages while one thread is blocked
+ waiting for a flush call to complete; however, no guarantee is made
+ about the completion of messages sent after the flush call begins.
+ """
+ log.debug("Flushing accumulated records in producer.") # trace
+ self._accumulator.begin_flush()
+ self._sender.wakeup()
+ self._accumulator.await_flush_completion()
+
+ def _ensure_valid_record_size(self, size):
+ """Validate that the record size isn't too large."""
+ if size > self.config['max_request_size']:
+ raise Errors.MessageSizeTooLargeError(
+ "The message is %d bytes when serialized which is larger than"
+ " the maximum request size you have configured with the"
+ " max_request_size configuration" % size)
+ if size > self.config['buffer_memory']:
+ raise Errors.MessageSizeTooLargeError(
+ "The message is %d bytes when serialized which is larger than"
+ " the total memory buffer you have configured with the"
+ " buffer_memory configuration." % size)
+
+ def _wait_on_metadata(self, topic, max_wait):
+ """
+ Wait for cluster metadata including partitions for the given topic to
+ be available.
+
+ Arguments:
+ topic (str): topic we want metadata for
+ max_wait (float): maximum time in secs for waiting on the metadata
+
+ Returns:
+ set: partition ids for the topic
+
+ Raises:
+ TimeoutException: if partitions for topic were not obtained before
+ specified max_wait timeout
+ """
+ # add topic to metadata topic list if it is not there already.
+ self._sender.add_topic(topic)
+ partitions = self._metadata.partitions_for_topic(topic)
+ if partitions:
+ return partitions
+
+ event = threading.Event()
+ def event_set(*args):
+ event.set()
+ def request_update(self, event):
+ event.clear()
+ log.debug("Requesting metadata update for topic %s.", topic)
+ f = self._metadata.request_update()
+ f.add_both(event_set)
+ return f
+
+ begin = time.time()
+ elapsed = 0.0
+ future = request_update(self, event)
+ while elapsed < max_wait:
+ self._sender.wakeup()
+ event.wait(max_wait - elapsed)
+ if future.failed():
+ future = request_update(self, event)
+ elapsed = time.time() - begin
+
+ partitions = self._metadata.partitions_for_topic(topic)
+ if partitions:
+ return partitions
+ else:
+ raise Errors.KafkaTimeoutError(
+ "Failed to update metadata after %s secs.", max_wait)
+
+ def _serialize(self, topic, key, value):
+ # pylint: disable-msg=not-callable
+ if self.config['key_serializer']:
+ serialized_key = self.config['key_serializer'](key)
+ else:
+ serialized_key = key
+ if self.config['value_serializer']:
+ serialized_value = self.config['value_serializer'](value)
+ else:
+ serialized_value = value
+ return serialized_key, serialized_value
+
+ def _partition(self, topic, partition, key, value,
+ serialized_key, serialized_value):
+ if partition is not None:
+ assert partition >= 0
+ assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
+ return partition
+
+ all_partitions = list(self._metadata.partitions_for_topic(topic))
+ available = list(self._metadata.available_partitions_for_topic(topic))
+ return self.config['partitioner'](serialized_key,
+ all_partitions,
+ available)