diff options
| -rw-r--r-- | kafka/consumer/group.py | 58 | ||||
| -rw-r--r-- | kafka/producer/kafka.py | 26 | 
2 files changed, 49 insertions, 35 deletions
| diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 344e7e3..5063579 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -35,7 +35,8 @@ class KafkaConsumer(six.Iterator):      Arguments:          *topics (str): optional list of topics to subscribe to. If not set, -            call :meth:`.subscribe` or :meth:`.assign` before consuming records. +            call :meth:`~kafka.KafkaConsumer.subscribe` or +            :meth:`~kafka.KafkaConsumer.assign` before consuming records.      Keyword Arguments:          bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' @@ -127,7 +128,7 @@ class KafkaConsumer(six.Iterator):          session_timeout_ms (int): The timeout used to detect failures when              using Kafka's group management facilities. Default: 30000          max_poll_records (int): The maximum number of records returned in a -            single call to :meth:`.poll`. Default: 500 +            single call to :meth:`~kafka.KafkaConsumer.poll`. Default: 500          receive_buffer_bytes (int): The size of the TCP receive buffer              (SO_RCVBUF) to use when reading data. Default: None (relies on              system defaults). The java client defaults to 32768. @@ -172,6 +173,7 @@ class KafkaConsumer(six.Iterator):          api_version (tuple): Specify which Kafka API version to use. If set to              None, the client will attempt to infer the broker version by probing              various APIs. Different versions enable different functionality. +              Examples:                  (0, 9) enables full group coordination features with automatic                      partition assignment and rebalancing, @@ -181,6 +183,7 @@ class KafkaConsumer(six.Iterator):                      partition assignment only,                  (0, 8, 0) enables basic functionality but requires manual                      partition assignment and offset management. +              For the full list of supported versions, see              KafkaClient.API_VERSIONS. Default: None          api_version_auto_timeout_ms (int): number of milliseconds to throw a @@ -336,11 +339,13 @@ class KafkaConsumer(six.Iterator):              partitions (list of TopicPartition): Assignment for this instance.          Raises: -            IllegalStateError: If consumer has already called :meth:`.subscribe`. +            IllegalStateError: If consumer has already called +            :meth:`~kafka.KafkaConsumer.subscribe`.          Warning:              It is not possible to use both manual partition assignment with -            :meth:`.assign` and group assignment with :meth:`.subscribe`. +            :meth:`~kafka.KafkaConsumer.assign` and group assignment with +            :meth:`~kafka.KafkaConsumer.subscribe`.          Note:              This interface does not support incremental assignment and will @@ -358,12 +363,13 @@ class KafkaConsumer(six.Iterator):      def assignment(self):          """Get the TopicPartitions currently assigned to this consumer. -        If partitions were directly assigned using :meth:`.assign`, then this -        will simply return the same partitions that were previously assigned. -        If topics were subscribed using :meth:`.subscribe`, then this will give -        the set of topic partitions currently assigned to the consumer (which -        may be None if the assignment hasn't happened yet, or if the partitions -        are in the process of being reassigned). +        If partitions were directly assigned using +        :meth:`~kafka.KafkaConsumer.assign`, then this will simply return the +        same partitions that were previously assigned.  If topics were +        subscribed using :meth:`~kafka.KafkaConsumer.subscribe`, then this will +        give the set of topic partitions currently assigned to the consumer +        (which may be None if the assignment hasn't happened yet, or if the +        partitions are in the process of being reassigned).          Returns:              set: {TopicPartition, ...} @@ -527,8 +533,8 @@ class KafkaConsumer(six.Iterator):                  with any records that are available currently in the buffer,                  else returns empty. Must not be negative. Default: 0              max_records (int, optional): The maximum number of records returned -                in a single call to :meth:`.poll`. Default: Inherit value from -                max_poll_records. +                in a single call to :meth:`~kafka.KafkaConsumer.poll`. +                Default: Inherit value from max_poll_records.          Returns:              dict: Topic to list of records since the last fetch for the @@ -639,10 +645,12 @@ class KafkaConsumer(six.Iterator):      def pause(self, *partitions):          """Suspend fetching from the requested partitions. -        Future calls to :meth:`.poll` will not return any records from these -        partitions until they have been resumed using :meth:`.resume`. Note that -        this method does not affect partition subscription. In particular, it -        does not cause a group rebalance when automatic assignment is used. +        Future calls to :meth:`~kafka.KafkaConsumer.poll` will not return any +        records from these partitions until they have been resumed using +        :meth:`~kafka.KafkaConsumer.resume`. + +        Note: This method does not affect partition subscription. In particular, +        it does not cause a group rebalance when automatic assignment is used.          Arguments:              *partitions (TopicPartition): Partitions to pause. @@ -654,7 +662,8 @@ class KafkaConsumer(six.Iterator):              self._subscription.pause(partition)      def paused(self): -        """Get the partitions that were previously paused using :meth:`.pause`. +        """Get the partitions that were previously paused using +        :meth:`~kafka.KafkaConsumer.pause`.          Returns:              set: {partition (TopicPartition), ...} @@ -677,10 +686,12 @@ class KafkaConsumer(six.Iterator):          """Manually specify the fetch offset for a TopicPartition.          Overrides the fetch offsets that the consumer will use on the next -        :meth:`.poll`. If this API is invoked for the same partition more than -        once, the latest offset will be used on the next :meth:`.poll`. Note -        that you may lose data if this API is arbitrarily used in the middle of -        consumption, to reset the fetch offsets. +        :meth:`~kafka.KafkaConsumer.poll`. If this API is invoked for the same +        partition more than once, the latest offset will be used on the next +        :meth:`~kafka.KafkaConsumer.poll`. + +        Note: You may lose data if this API is arbitrarily used in the middle of +        consumption to reset the fetch offsets.          Arguments:              partition (TopicPartition): Partition for seek operation @@ -752,7 +763,7 @@ class KafkaConsumer(six.Iterator):          Topic subscriptions are not incremental: this list will replace the          current assignment (if there is one). -        This method is incompatible with :meth:`.assign`. +        This method is incompatible with :meth:`~kafka.KafkaConsumer.assign`.          Arguments:              topics (list): List of topics for subscription. @@ -781,7 +792,8 @@ class KafkaConsumer(six.Iterator):                  through this interface are from topics subscribed in this call.          Raises: -            IllegalStateError: If called after previously calling :meth:`.assign`. +            IllegalStateError: If called after previously calling +                :meth:`~kafka.KafkaConsumer.assign`.              AssertionError: If neither topics or pattern is provided.              TypeError: If listener is not a ConsumerRebalanceListener.          """ diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f137b4e..91e253b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -35,9 +35,9 @@ class KafkaProducer(object):      thread that is responsible for turning these records into requests and      transmitting them to the cluster. -    :meth:`.send` is asynchronous. When called it adds the record to a buffer of -    pending record sends and immediately returns. This allows the producer to -    batch together individual records for efficiency. +    :meth:`~kafka.KafkaProducer.send` is asynchronous. When called it adds the +    record to a buffer of pending record sends and immediately returns. This +    allows the producer to batch together individual records for efficiency.      The 'acks' config controls the criteria under which requests are considered      complete. The "all" setting will result in blocking on the full commit of @@ -167,11 +167,12 @@ class KafkaProducer(object):              will block up to max_block_ms, raising an exception on timeout.              In the current implementation, this setting is an approximation.              Default: 33554432 (32MB) -        max_block_ms (int): Number of milliseconds to block during :meth:`.send` -            and :meth:`.partitions_for`. These methods can be blocked either -            because the buffer is full or metadata unavailable. Blocking in the -            user-supplied serializers or partitioner will not be counted against -            this timeout. Default: 60000. +        max_block_ms (int): Number of milliseconds to block during +            :meth:`~kafka.KafkaProducer.send` and +            :meth:`~kafka.KafkaProducer.partitions_for`. These methods can be +            blocked either because the buffer is full or metadata unavailable. +            Blocking in the user-supplied serializers or partitioner will not be +            counted against this timeout. Default: 60000.          max_request_size (int): The maximum size of a request. This is also              effectively a cap on the maximum record size. Note that the server              has its own cap on record size which may be different from this. @@ -541,10 +542,11 @@ class KafkaProducer(object):          Invoking this method makes all buffered records immediately available          to send (even if linger_ms is greater than 0) and blocks on the          completion of the requests associated with these records. The -        post-condition of :meth:`.flush` is that any previously sent record will -        have completed (e.g. Future.is_done() == True). A request is considered -        completed when either it is successfully acknowledged according to the -        'acks' configuration for the producer, or it results in an error. +        post-condition of :meth:`~kafka.KafkaProducer.flush` is that any +        previously sent record will have completed +        (e.g. Future.is_done() == True). A request is considered completed when +        either it is successfully acknowledged according to the 'acks' +        configuration for the producer, or it results in an error.          Other threads can continue sending messages while one thread is blocked          waiting for a flush call to complete; however, no guarantee is made | 
