summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorWill Daly <will@edx.org>2015-01-04 12:23:10 -0500
committerWill Daly <will.e.daly@gmail.com>2015-01-15 18:01:40 -0500
commit01f378328e5383d05d52428b815f992eb2c536cb (patch)
tree2d3366ed91b9744efd40d935a460040150c6d4d8 /kafka
parent02c2b469003e2ddcb051dbb4d95977137050c19f (diff)
downloadkafka-python-01f378328e5383d05d52428b815f992eb2c536cb.tar.gz
Add Sphinx API docs
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py48
-rw-r--r--kafka/conn.py23
-rw-r--r--kafka/consumer/base.py10
-rw-r--r--kafka/consumer/kafka.py226
-rw-r--r--kafka/consumer/multiprocess.py39
-rw-r--r--kafka/consumer/simple.py67
-rw-r--r--kafka/context.py5
-rw-r--r--kafka/partitioner/base.py10
-rw-r--r--kafka/producer/base.py20
-rw-r--r--kafka/producer/keyed.py22
-rw-r--r--kafka/producer/simple.py30
-rw-r--r--kafka/protocol.py125
-rw-r--r--kafka/queue.py34
-rw-r--r--kafka/util.py10
14 files changed, 357 insertions, 312 deletions
diff --git a/kafka/client.py b/kafka/client.py
index bc3d853..6d1b9f8 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -131,19 +131,21 @@ class KafkaClient(object):
the leader broker for that partition using the supplied encode/decode
functions
- Params
- ======
+ Arguments:
+
payloads: list of object-like entities with a topic (str) and
- partition (int) attribute
+ partition (int) attribute
+
encode_fn: a method to encode the list of payloads to a request body,
- must accept client_id, correlation_id, and payloads as
- keyword arguments
+ must accept client_id, correlation_id, and payloads as
+ keyword arguments
+
decode_fn: a method to decode a response body into response objects.
- The response objects must be object-like and have topic
- and partition attributes
+ The response objects must be object-like and have topic
+ and partition attributes
+
+ Returns:
- Return
- ======
List of response objects in the same order as the supplied payloads
"""
@@ -285,9 +287,9 @@ class KafkaClient(object):
This method should be called after receiving any error
- @param: *topics (optional)
- If a list of topics is provided, the metadata refresh will be limited
- to the specified topics only.
+ Arguments:
+ *topics (optional): If a list of topics is provided,
+ the metadata refresh will be limited to the specified topics only.
Exceptions:
----------
@@ -379,18 +381,16 @@ class KafkaClient(object):
sent to a specific broker. Output is a list of responses in the
same order as the list of payloads specified
- Params
- ======
- payloads: list of ProduceRequest
- fail_on_error: boolean, should we raise an Exception if we
- encounter an API error?
- callback: function, instead of returning the ProduceResponse,
- first pass it through this function
-
- Return
- ======
- list of ProduceResponse or callback(ProduceResponse), in the
- order of input payloads
+ Arguments:
+ payloads: list of ProduceRequest
+ fail_on_error: boolean, should we raise an Exception if we
+ encounter an API error?
+ callback: function, instead of returning the ProduceResponse,
+ first pass it through this function
+
+ Returns:
+ list of ProduceResponse or callback(ProduceResponse), in the
+ order of input payloads
"""
encoder = functools.partial(
diff --git a/kafka/conn.py b/kafka/conn.py
index ddfee8b..30debec 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -47,10 +47,11 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
- host: the host name or IP address of a kafka broker
- port: the port number the kafka broker is listening on
- timeout: default 120. The socket timeout for sending and receiving data
- in seconds. None means no timeout, so a request can block forever.
+ Arguments:
+ host: the host name or IP address of a kafka broker
+ port: the port number the kafka broker is listening on
+ timeout: default 120. The socket timeout for sending and receiving data
+ in seconds. None means no timeout, so a request can block forever.
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
@@ -116,8 +117,10 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"""
Send a request to Kafka
- param: request_id -- can be any int (used only for debug logging...)
- param: payload -- an encoded kafka packet (see KafkaProtocol)
+
+ Arguments::
+ request_id (int): can be any int (used only for debug logging...)
+ payload: an encoded kafka packet (see KafkaProtocol)
"""
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
@@ -135,8 +138,12 @@ class KafkaConnection(local):
def recv(self, request_id):
"""
Get a response packet from Kafka
- param: request_id -- can be any int (only used for debug logging...)
- returns encoded kafka packet response from server as type str
+
+ Arguments:
+ request_id: can be any int (only used for debug logging...)
+
+ Returns:
+ str: Encoded kafka packet response from server
"""
log.debug("Reading response %d from Kafka" % request_id)
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 2464aaf..9cdcf89 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -32,9 +32,11 @@ class Consumer(object):
Base class to be used by other consumers. Not to be used directly
This base class provides logic for
+
* initialization and fetching metadata of partitions
* Auto-commit logic
* APIs for fetching pending message count
+
"""
def __init__(self, client, group, topic, partitions=None, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
@@ -93,8 +95,9 @@ class Consumer(object):
"""
Commit offsets for this consumer
- partitions: list of partitions to commit, default is to commit
- all of them
+ Keyword Arguments:
+ partitions (list): list of partitions to commit, default is to commit
+ all of them
"""
# short circuit if nothing happened. This check is kept outside
@@ -148,7 +151,8 @@ class Consumer(object):
"""
Gets the pending message count
- partitions: list of partitions to check for, default is to check all
+ Keyword Arguments:
+ partitions (list): list of partitions to check for, default is to check all
"""
if not partitions:
partitions = self.offsets.keys()
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index f16b526..ae0f0b9 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -54,72 +54,78 @@ class KafkaConsumer(object):
"""
A simpler kafka consumer
- ```
- # A very basic 'tail' consumer, with no stored offset management
- kafka = KafkaConsumer('topic1')
- for m in kafka:
- print m
-
- # Alternate interface: next()
- print kafka.next()
-
- # Alternate interface: batch iteration
- while True:
- for m in kafka.fetch_messages():
- print m
- print "Done with batch - let's do another!"
- ```
-
- ```
- # more advanced consumer -- multiple topics w/ auto commit offset management
- kafka = KafkaConsumer('topic1', 'topic2',
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
-
- # Infinite iteration
- for m in kafka:
- process_message(m)
- kafka.task_done(m)
-
- # Alternate interface: next()
- m = kafka.next()
- process_message(m)
- kafka.task_done(m)
-
- # If auto_commit_enable is False, remember to commit() periodically
- kafka.commit()
-
- # Batch process interface
- while True:
- for m in kafka.fetch_messages():
+ .. code:: python
+
+ # A very basic 'tail' consumer, with no stored offset management
+ kafka = KafkaConsumer('topic1')
+ for m in kafka:
+ print m
+
+ # Alternate interface: next()
+ print kafka.next()
+
+ # Alternate interface: batch iteration
+ while True:
+ for m in kafka.fetch_messages():
+ print m
+ print "Done with batch - let's do another!"
+
+
+ .. code:: python
+
+ # more advanced consumer -- multiple topics w/ auto commit offset management
+ kafka = KafkaConsumer('topic1', 'topic2',
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in kafka:
+ process_message(m)
+ kafka.task_done(m)
+
+ # Alternate interface: next()
+ m = kafka.next()
process_message(m)
kafka.task_done(m)
- ```
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
+
+ # Batch process interface
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+
messages (m) are namedtuples with attributes:
- m.topic: topic name (str)
- m.partition: partition number (int)
- m.offset: message offset on topic-partition log (int)
- m.key: key (bytes - can be None)
- m.value: message (output of deserializer_class - default is raw bytes)
+
+ * `m.topic`: topic name (str)
+ * `m.partition`: partition number (int)
+ * `m.offset`: message offset on topic-partition log (int)
+ * `m.key`: key (bytes - can be None)
+ * `m.value`: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor,
otherwise defaults will be used:
- client_id='kafka.consumer.kafka',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- metadata_broker_list=None,
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=lambda msg: msg,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1
+
+ .. code:: python
+
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ metadata_broker_list=None,
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
@@ -133,6 +139,9 @@ class KafkaConsumer(object):
"""
Configuration settings can be passed to constructor,
otherwise defaults will be used:
+
+ .. code:: python
+
client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
@@ -189,28 +198,35 @@ class KafkaConsumer(object):
Optionally specify offsets to start from
Accepts types:
- str (utf-8): topic name (will consume all available partitions)
- tuple: (topic, partition)
- dict: { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
+
+ * str (utf-8): topic name (will consume all available partitions)
+ * tuple: (topic, partition)
+ * dict:
+ - { topic: partition }
+ - { topic: [partition list] }
+ - { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
- Ex:
- kafka = KafkaConsumer()
+ * tuple: (topic, partition, offset)
+ * dict: { (topic, partition): offset, ... }
+
+ Example:
+
+ .. code:: python
+
+ kafka = KafkaConsumer()
- # Consume topic1-all; topic2-partition2; topic3-partition0
- kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+ # Consume topic1-all; topic2-partition2; topic3-partition0
+ kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
- # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
- # using tuples --
- kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+ # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
+ # using tuples --
+ kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+
+ # using dict --
+ kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
- # using dict --
- kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
"""
self._topics = []
self._client.load_metadata_for_topics()
@@ -309,10 +325,12 @@ class KafkaConsumer(object):
Otherwise blocks indefinitely
Note that this is also the method called internally during iteration:
- ```
- for m in consumer:
- pass
- ```
+
+ .. code:: python
+
+ for m in consumer:
+ pass
+
"""
self._set_consumer_timeout_start()
while True:
@@ -336,11 +354,12 @@ class KafkaConsumer(object):
OffsetOutOfRange, per the configured `auto_offset_reset` policy
Key configuration parameters:
- `fetch_message_max_bytes`
- `fetch_max_wait_ms`
- `fetch_min_bytes`
- `deserializer_class`
- `auto_offset_reset`
+
+ * `fetch_message_max_bytes`
+ * `fetch_max_wait_ms`
+ * `fetch_min_bytes`
+ * `deserializer_class`
+ * `auto_offset_reset`
"""
max_bytes = self._config['fetch_message_max_bytes']
@@ -418,20 +437,18 @@ class KafkaConsumer(object):
"""
Request available fetch offsets for a single topic/partition
- @param topic (str)
- @param partition (int)
- @param request_time_ms (int) -- Used to ask for all messages before a
- certain time (ms). There are two special
- values. Specify -1 to receive the latest
- offset (i.e. the offset of the next coming
- message) and -2 to receive the earliest
- available offset. Note that because offsets
- are pulled in descending order, asking for
- the earliest offset will always return you
- a single element.
- @param max_num_offsets (int)
-
- @return offsets (list)
+ Arguments:
+ topic (str)
+ partition (int)
+ request_time_ms (int): Used to ask for all messages before a
+ certain time (ms). There are two special values. Specify -1 to receive the latest
+ offset (i.e. the offset of the next coming message) and -2 to receive the earliest
+ available offset. Note that because offsets are pulled in descending order, asking for
+ the earliest offset will always return you a single element.
+ max_num_offsets (int)
+
+ Returns:
+ offsets (list)
"""
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
@@ -448,9 +465,12 @@ class KafkaConsumer(object):
def offsets(self, group=None):
"""
- Returns a copy of internal offsets struct
- optional param: group [fetch|commit|task_done|highwater]
- if no group specified, returns all groups
+ Keyword Arguments:
+ group: Either "fetch", "commit", "task_done", or "highwater".
+ If no group specified, returns all groups.
+
+ Returns:
+ A copy of internal offsets struct
"""
if not group:
return {
@@ -498,8 +518,8 @@ class KafkaConsumer(object):
Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group.
- Note -- this functionality requires server version >=0.8.1.1
- see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ **Note**: this functionality requires server version >=0.8.1.1
+ See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_.
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 912e64b..4dc04dc 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer):
A consumer implementation that consumes partitions for a topic in
parallel using multiple processes
- client: a connected KafkaClient
- group: a name for this consumer, used for offset storage and must be unique
- topic: the topic to consume
-
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- num_procs: Number of processes to start for consuming messages.
- The available partitions will be divided among these processes
- partitions_per_proc: Number of partitions to be allocated per process
- (overrides num_procs)
+ Arguments:
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+
+ Keyword Arguments:
+ auto_commit: default True. Whether or not to auto commit the offsets
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ num_procs: Number of processes to start for consuming messages.
+ The available partitions will be divided among these processes
+ partitions_per_proc: Number of partitions to be allocated per process
+ (overrides num_procs)
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer):
"""
Fetch the specified number of messages
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ Keyword Arguments:
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index df975f4..000fcd9 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -67,24 +67,32 @@ class SimpleConsumer(Consumer):
A simple consumer implementation that consumes all/specified partitions
for a topic
- client: a connected KafkaClient
- group: a name for this consumer, used for offset storage and must be unique
- topic: the topic to consume
- partitions: An optional list of partitions to consume the data from
-
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no
- timeout, so it will wait forever.
+ Arguments:
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+
+ Keyword Arguments:
+ partitions: An optional list of partitions to consume the data from
+
+ auto_commit: default True. Whether or not to auto commit the offsets
+
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ fetch_size_bytes: number of bytes to request in a FetchRequest
+
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -133,11 +141,13 @@ class SimpleConsumer(Consumer):
"""
Alter the current offset in the consumer, similar to fseek
- offset: how much to modify the offset
- whence: where to modify it from
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
+ Arguments:
+ offset: how much to modify the offset
+ whence: where to modify it from
+
+ * 0 is relative to the earliest available offset (head)
+ * 1 is relative to the current offset
+ * 2 is relative to the latest known offset (tail)
"""
if whence == 1: # relative to current position
@@ -180,11 +190,12 @@ class SimpleConsumer(Consumer):
"""
Fetch the specified number of messages
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ Keyword Arguments:
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
if timeout is not None:
diff --git a/kafka/context.py b/kafka/context.py
index 98ed7b3..ade4db8 100644
--- a/kafka/context.py
+++ b/kafka/context.py
@@ -18,6 +18,8 @@ class OffsetCommitContext(object):
Example:
+ .. code:: python
+
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
consumer.provide_partition_info()
consumer.fetch_last_known_offsets()
@@ -57,7 +59,10 @@ class OffsetCommitContext(object):
In order to know the current partition, it is helpful to initialize
the consumer to provide partition info via:
+ .. code:: python
+
consumer.provide_partition_info()
+
"""
max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py
index c62b7ed..0b1bb59 100644
--- a/kafka/partitioner/base.py
+++ b/kafka/partitioner/base.py
@@ -7,7 +7,8 @@ class Partitioner(object):
"""
Initialize the partitioner
- partitions - A list of available partitions (during startup)
+ Arguments:
+ partitions: A list of available partitions (during startup)
"""
self.partitions = partitions
@@ -16,8 +17,9 @@ class Partitioner(object):
Takes a string key and num_partitions as argument and returns
a partition to be used for the message
- partitions - The list of partitions is passed in every call. This
- may look like an overhead, but it will be useful
- (in future) when we handle cases like rebalancing
+ Arguments:
+ partitions: The list of partitions is passed in every call. This
+ may look like an overhead, but it will be useful
+ (in future) when we handle cases like rebalancing
"""
raise NotImplementedError('partition function has to be implemented')
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 6e19b92..5b41bc9 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -85,20 +85,20 @@ class Producer(object):
"""
Base class to be used by producers
- Params:
- client - The Kafka client instance to use
- async - If set to true, the messages are sent asynchronously via another
+ Arguments:
+ client: The Kafka client instance to use
+ async: If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
WARNING!!! current implementation of async producer does not
guarantee message delivery. Use at your own risk! Or help us
improve with a PR!
- req_acks - A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
+ req_acks: A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send: If True, messages are send in batches
+ batch_send_every_n: If set, messages are send in batches of this size
+ batch_send_every_t: If set, messages are send after this timeout
"""
ACK_NOT_REQUIRED = 0 # No ack is required
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 68c70d9..fe5b056 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -15,17 +15,19 @@ class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
- Args:
- client - The kafka client instance
- partitioner - A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner
- async - If True, the messages are sent asynchronously via another
+ Arguments:
+ client: The kafka client instance
+
+ Keyword Arguments:
+ partitioner: A partitioner class that will be used to get the partition
+ to send the message to. Must be derived from Partitioner
+ async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
+ ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send: If True, messages are send in batches
+ batch_send_every_n: If set, messages are send in batches of this size
+ batch_send_every_t: If set, messages are send after this timeout
"""
def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index a10fa8c..019395c 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -19,21 +19,23 @@ class SimpleProducer(Producer):
"""
A simple, round-robin producer. Each message goes to exactly one partition
- Params:
- client - The Kafka client instance to use
- async - If True, the messages are sent asynchronously via another
+ Arguments:
+ client: The Kafka client instance to use
+
+ Keyword Arguments:
+ async: If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
- req_acks - A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
- random_start - If true, randomize the initial partition which the
- the first message block will be published to, otherwise
- if false, the first message block will always publish
- to partition 0 before cycling through each partition
+ req_acks: A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout: Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send: If True, messages are send in batches
+ batch_send_every_n: If set, messages are send in batches of this size
+ batch_send_every_t: If set, messages are send after this timeout
+ random_start: If true, randomize the initial partition which the
+ the first message block will be published to, otherwise
+ if false, the first message block will always publish
+ to partition 0 before cycling through each partition
"""
def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
diff --git a/kafka/protocol.py b/kafka/protocol.py
index a85c7eb..2a39de6 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -185,18 +185,18 @@ class KafkaProtocol(object):
"""
Encode some ProduceRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- payloads: list of ProduceRequest
- acks: How "acky" you want the request to be
- 0: immediate response
- 1: written to disk by the leader
- 2+: waits for this many number of replicas to sync
- -1: waits for all replicas to be in sync
- timeout: Maximum time the server will wait for acks from replicas.
- This is _not_ a socket timeout
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: list of ProduceRequest
+ acks: How "acky" you want the request to be
+ 0: immediate response
+ 1: written to disk by the leader
+ 2+: waits for this many number of replicas to sync
+ -1: waits for all replicas to be in sync
+ timeout: Maximum time the server will wait for acks from replicas.
+ This is _not_ a socket timeout
+
"""
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -225,9 +225,9 @@ class KafkaProtocol(object):
"""
Decode bytes to a ProduceResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
+
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -248,14 +248,13 @@ class KafkaProtocol(object):
"""
Encodes some FetchRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- payloads: list of FetchRequest
- max_wait_time: int, how long to block waiting on min_bytes of data
- min_bytes: int, the minimum number of bytes to accumulate before
- returning the response
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: list of FetchRequest
+ max_wait_time: int, how long to block waiting on min_bytes of data
+ min_bytes: int, the minimum number of bytes to accumulate before
+ returning the response
"""
payloads = [] if payloads is None else payloads
@@ -284,9 +283,8 @@ class KafkaProtocol(object):
"""
Decode bytes to a FetchResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -333,9 +331,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -360,11 +357,10 @@ class KafkaProtocol(object):
"""
Encode a MetadataRequest
- Params
- ======
- client_id: string
- correlation_id: int
- topics: list of strings
+ Arguments:
+ client_id: string
+ correlation_id: int
+ topics: list of strings
"""
if payloads is None:
topics = [] if topics is None else topics
@@ -388,9 +384,8 @@ class KafkaProtocol(object):
"""
Decode bytes to a MetadataResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
@@ -439,12 +434,11 @@ class KafkaProtocol(object):
"""
Encode some OffsetCommitRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- group: string, the consumer group you are committing offsets for
- payloads: list of OffsetCommitRequest
+ Arguments:
+ client_id: string
+ correlation_id: int
+ group: string, the consumer group you are committing offsets for
+ payloads: list of OffsetCommitRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -470,9 +464,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetCommitResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -491,12 +484,11 @@ class KafkaProtocol(object):
"""
Encode some OffsetFetchRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- group: string, the consumer group you are fetching offsets for
- payloads: list of OffsetFetchRequest
+ Arguments:
+ client_id: string
+ correlation_id: int
+ group: string, the consumer group you are fetching offsets for
+ payloads: list of OffsetFetchRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -522,9 +514,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetFetchResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
@@ -547,10 +538,10 @@ def create_message(payload, key=None):
"""
Construct a Message
- Params
- ======
- payload: bytes, the payload to send to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payload: bytes, the payload to send to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
return Message(0, 0, key, payload)
@@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None):
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
- Params
- ======
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
@@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None):
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
- Params
- ======
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
diff --git a/kafka/queue.py b/kafka/queue.py
index ada495f..26cafad 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -129,13 +129,12 @@ class KafkaQueue(object):
Messages are buffered in the producer thread until
producer_flush_timeout or producer_flush_buffer is reached.
- Params
- ======
- client: KafkaClient object
- topic: str, the topic name
- partitions: list of ints, the partions to consume from
- producer_config: dict, see below
- consumer_config: dict, see below
+ Arguments:
+ client: KafkaClient object
+ topic: str, the topic name
+ partitions: list of ints, the partions to consume from
+ producer_config: dict, see below
+ consumer_config: dict, see below
Consumer Config
===============
@@ -184,14 +183,12 @@ class KafkaQueue(object):
"""
Consume a message from Kafka
- Params
- ======
- block: boolean, default True
- timeout: int, number of seconds to wait when blocking, default None
+ Arguments:
+ block: boolean, default True
+ timeout: int, number of seconds to wait when blocking, default None
- Returns
- =======
- msg: str, the payload from Kafka
+ Returns:
+ msg: str, the payload from Kafka
"""
return self.in_queue.get(block, timeout).payload
@@ -199,11 +196,10 @@ class KafkaQueue(object):
"""
Send a message to Kafka
- Params
- ======
- msg: std, the message to send
- block: boolean, default True
- timeout: int, number of seconds to wait when blocking, default None
+ Arguments:
+ msg: std, the message to send
+ block: boolean, default True
+ timeout: int, number of seconds to wait when blocking, default None
"""
self.out_queue.put(msg, block, timeout)
diff --git a/kafka/util.py b/kafka/util.py
index 72ac521..622b1a7 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -103,10 +103,12 @@ class ReentrantTimer(object):
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
- t: timer interval in milliseconds
- fn: a callable to invoke
- args: tuple of args to be passed to function
- kwargs: keyword arguments to be passed to function
+ Arguments:
+
+ t: timer interval in milliseconds
+ fn: a callable to invoke
+ args: tuple of args to be passed to function
+ kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):