diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/base.py | 3 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 14 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 18 |
3 files changed, 27 insertions, 8 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index f53217f..6365cfa 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -13,7 +13,8 @@ from kafka.common import ( from kafka.util import kafka_bytestring, ReentrantTimer -log = logging.getLogger("kafka") + +log = logging.getLogger('kafka.consumer') AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index cfe0ef6..8cec92d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -18,9 +18,11 @@ from .base import ( ) from .simple import Consumer, SimpleConsumer + +log = logging.getLogger(__name__) + Events = namedtuple("Events", ["start", "pause", "exit"]) -log = logging.getLogger("kafka") def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ @@ -98,6 +100,7 @@ class MultiProcessConsumer(Consumer): topic: the topic to consume Keyword Arguments: + partitions: An optional list of partitions to consume the data from auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit @@ -114,16 +117,19 @@ class MultiProcessConsumer(Consumer): commit method on this class. A manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=True, + def __init__(self, client, group, topic, + partitions=None, + auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0, + num_procs=1, + partitions_per_proc=0, **simple_consumer_options): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( client, group, topic, - partitions=None, + partitions=partitions, auto_commit=auto_commit, auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index ae00dab..384fa8e 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -19,7 +19,7 @@ from kafka.common import ( FetchRequest, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, - OffsetOutOfRangeError, check_error + OffsetOutOfRangeError, FailedPayloadsError, check_error ) from .base import ( Consumer, @@ -34,7 +34,9 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) -log = logging.getLogger("kafka") + +log = logging.getLogger(__name__) + class FetchContext(object): """ @@ -342,9 +344,12 @@ class SimpleConsumer(Consumer): try: check_error(resp) - except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + except UnknownTopicOrPartitionError: self.client.reset_topic_metadata(resp.topic) raise + except NotLeaderForPartitionError: + self.client.reset_topic_metadata(resp.topic) + continue except OffsetOutOfRangeError: log.warning("OffsetOutOfRangeError for %s - %d. " "Resetting partition offset...", @@ -353,6 +358,13 @@ class SimpleConsumer(Consumer): # Retry this partition retry_partitions[resp.partition] = partitions[resp.partition] continue + except FailedPayloadsError as e: + log.warning("Failed payloads of %s" + "Resetting partition offset...", + e.payload) + # Retry this partition + retry_partitions[e.payload.partition] = partitions[e.payload.partition] + continue partition = resp.partition buffer_size = partitions[partition] |