diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-15 22:49:38 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:56 -0800 |
commit | 391b53201f200ab246b78e76c6e7945c8af6846e (patch) | |
tree | e4901cd1a5e3a92add02e588563afb3c8712f270 /kafka | |
parent | 742af4f7e0bad6159e63ed4b369e34426ab9f670 (diff) | |
download | kafka-python-391b53201f200ab246b78e76c6e7945c8af6846e.tar.gz |
Update docstrings w/ current interface / config defaults
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/new.py | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index bad1f3d..e0884d3 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -54,9 +54,8 @@ class KafkaConsumer(object): for m in kafka: print m - # Alternate interface: next() - while True: - print kafka.next() + # Alternate interface: next() + print kafka.next() # Alternate interface: batch iteration while True: @@ -79,17 +78,18 @@ class KafkaConsumer(object): kafka.task_done(m) # Alternate interface: next() - while True: - m = kafka.next() - process_message(m) - kafka.task_done(m) + m = kafka.next() + process_message(m) + kafka.task_done(m) + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() - # Batch process interface does not auto_commit! + # Batch process interface while True: for m in kafka.fetch_messages(): process_message(m) kafka.task_done(m) - kafka.commit() ``` messages (m) are namedtuples with attributes: @@ -97,7 +97,7 @@ class KafkaConsumer(object): m.partition: partition number (int) m.offset: message offset on topic-partition log (int) m.key: key (bytes - can be None) - m.value: message (output of deserializer_class - default is event object) + m.value: message (output of deserializer_class - default is raw bytes) Configuration settings can be passed to constructor, otherwise defaults will be used: @@ -110,12 +110,11 @@ class KafkaConsumer(object): metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', - deserializer_class=Event.from_bytes, + deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, consumer_timeout_ms=-1 - Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi """ @@ -137,7 +136,7 @@ class KafkaConsumer(object): metadata_broker_list=None, socket_timeout_ms=30*1000, auto_offset_reset='largest', - deserializer_class=Event.from_bytes, + deserializer_class=lambda msg: msg, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, auto_commit_interval_messages=None, @@ -345,9 +344,9 @@ class KafkaConsumer(object): # and send each group as a single FetchRequest to the correct broker try: responses = self._client.send_fetch_request(fetches, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - fail_on_error=False) + max_wait_time=max_wait_time, + min_bytes=min_bytes, + fail_on_error=False) except FailedPayloadsError: logger.warning('FailedPayloadsError attempting to fetch data from kafka') self._refresh_metadata_on_error() @@ -537,6 +536,7 @@ class KafkaConsumer(object): # # Topic/partition management private methods # + def _consume_topic_partition(self, topic, partition): if not isinstance(topic, six.string_types): raise KafkaConfigurationError('Unknown topic type (%s) ' @@ -570,6 +570,7 @@ class KafkaConsumer(object): # # Offset-managment private methods # + def _get_commit_offsets(self): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: @@ -632,6 +633,7 @@ class KafkaConsumer(object): # # Consumer Timeout private methods # + def _set_consumer_timeout_start(self): self._consumer_timeout = False if self._config['consumer_timeout_ms'] >= 0: @@ -644,6 +646,7 @@ class KafkaConsumer(object): # # Autocommit private methods # + def _should_auto_commit(self): if not self._config['auto_commit_enable']: return False @@ -670,6 +673,7 @@ class KafkaConsumer(object): # # Message iterator private methods # + def __iter__(self): return self @@ -686,6 +690,7 @@ class KafkaConsumer(object): # # python private methods # + def __repr__(self): return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition for topic_partition in |