summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/base.py3
-rw-r--r--kafka/consumer/multiprocess.py14
-rw-r--r--kafka/consumer/simple.py18
3 files changed, 27 insertions, 8 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index f53217f..6365cfa 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -13,7 +13,8 @@ from kafka.common import (
from kafka.util import kafka_bytestring, ReentrantTimer
-log = logging.getLogger("kafka")
+
+log = logging.getLogger('kafka.consumer')
AUTO_COMMIT_MSG_COUNT = 100
AUTO_COMMIT_INTERVAL = 5000
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index cfe0ef6..8cec92d 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -18,9 +18,11 @@ from .base import (
)
from .simple import Consumer, SimpleConsumer
+
+log = logging.getLogger(__name__)
+
Events = namedtuple("Events", ["start", "pause", "exit"])
-log = logging.getLogger("kafka")
def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
"""
@@ -98,6 +100,7 @@ class MultiProcessConsumer(Consumer):
topic: the topic to consume
Keyword Arguments:
+ partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
@@ -114,16 +117,19 @@ class MultiProcessConsumer(Consumer):
commit method on this class. A manual call to commit will also reset
these triggers
"""
- def __init__(self, client, group, topic, auto_commit=True,
+ def __init__(self, client, group, topic,
+ partitions=None,
+ auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- num_procs=1, partitions_per_proc=0,
+ num_procs=1,
+ partitions_per_proc=0,
**simple_consumer_options):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
client, group, topic,
- partitions=None,
+ partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index ae00dab..384fa8e 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -19,7 +19,7 @@ from kafka.common import (
FetchRequest, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, check_error
+ OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from .base import (
Consumer,
@@ -34,7 +34,9 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
-log = logging.getLogger("kafka")
+
+log = logging.getLogger(__name__)
+
class FetchContext(object):
"""
@@ -342,9 +344,12 @@ class SimpleConsumer(Consumer):
try:
check_error(resp)
- except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
+ except UnknownTopicOrPartitionError:
self.client.reset_topic_metadata(resp.topic)
raise
+ except NotLeaderForPartitionError:
+ self.client.reset_topic_metadata(resp.topic)
+ continue
except OffsetOutOfRangeError:
log.warning("OffsetOutOfRangeError for %s - %d. "
"Resetting partition offset...",
@@ -353,6 +358,13 @@ class SimpleConsumer(Consumer):
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
+ except FailedPayloadsError as e:
+ log.warning("Failed payloads of %s"
+ "Resetting partition offset...",
+ e.payload)
+ # Retry this partition
+ retry_partitions[e.payload.partition] = partitions[e.payload.partition]
+ continue
partition = resp.partition
buffer_size = partitions[partition]