summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 22:49:38 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:56 -0800
commit391b53201f200ab246b78e76c6e7945c8af6846e (patch)
treee4901cd1a5e3a92add02e588563afb3c8712f270 /kafka
parent742af4f7e0bad6159e63ed4b369e34426ab9f670 (diff)
downloadkafka-python-391b53201f200ab246b78e76c6e7945c8af6846e.tar.gz
Update docstrings w/ current interface / config defaults
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/new.py37
1 files changed, 21 insertions, 16 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index bad1f3d..e0884d3 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -54,9 +54,8 @@ class KafkaConsumer(object):
for m in kafka:
print m
- # Alternate interface: next()
- while True:
- print kafka.next()
+ # Alternate interface: next()
+ print kafka.next()
# Alternate interface: batch iteration
while True:
@@ -79,17 +78,18 @@ class KafkaConsumer(object):
kafka.task_done(m)
# Alternate interface: next()
- while True:
- m = kafka.next()
- process_message(m)
- kafka.task_done(m)
+ m = kafka.next()
+ process_message(m)
+ kafka.task_done(m)
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
- # Batch process interface does not auto_commit!
+ # Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
kafka.task_done(m)
- kafka.commit()
```
messages (m) are namedtuples with attributes:
@@ -97,7 +97,7 @@ class KafkaConsumer(object):
m.partition: partition number (int)
m.offset: message offset on topic-partition log (int)
m.key: key (bytes - can be None)
- m.value: message (output of deserializer_class - default is event object)
+ m.value: message (output of deserializer_class - default is raw bytes)
Configuration settings can be passed to constructor,
otherwise defaults will be used:
@@ -110,12 +110,11 @@ class KafkaConsumer(object):
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
- deserializer_class=Event.from_bytes,
+ deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1
-
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
@@ -137,7 +136,7 @@ class KafkaConsumer(object):
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
- deserializer_class=Event.from_bytes,
+ deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
@@ -345,9 +344,9 @@ class KafkaConsumer(object):
# and send each group as a single FetchRequest to the correct broker
try:
responses = self._client.send_fetch_request(fetches,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- fail_on_error=False)
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ fail_on_error=False)
except FailedPayloadsError:
logger.warning('FailedPayloadsError attempting to fetch data from kafka')
self._refresh_metadata_on_error()
@@ -537,6 +536,7 @@ class KafkaConsumer(object):
#
# Topic/partition management private methods
#
+
def _consume_topic_partition(self, topic, partition):
if not isinstance(topic, six.string_types):
raise KafkaConfigurationError('Unknown topic type (%s) '
@@ -570,6 +570,7 @@ class KafkaConsumer(object):
#
# Offset-managment private methods
#
+
def _get_commit_offsets(self):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
@@ -632,6 +633,7 @@ class KafkaConsumer(object):
#
# Consumer Timeout private methods
#
+
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
if self._config['consumer_timeout_ms'] >= 0:
@@ -644,6 +646,7 @@ class KafkaConsumer(object):
#
# Autocommit private methods
#
+
def _should_auto_commit(self):
if not self._config['auto_commit_enable']:
return False
@@ -670,6 +673,7 @@ class KafkaConsumer(object):
#
# Message iterator private methods
#
+
def __iter__(self):
return self
@@ -686,6 +690,7 @@ class KafkaConsumer(object):
#
# python private methods
#
+
def __repr__(self):
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
for topic_partition in