diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 14 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 2 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 2 |
4 files changed, 10 insertions, 10 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7d58b7c..3638831 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -298,7 +298,7 @@ class Fetcher(six.Iterator): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by timestamps in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) def fetched_records(self, max_records=None): """Returns previously fetched records and updates consumed offsets. @@ -911,7 +911,7 @@ class FetchResponseMetricAggregator(object): class FetchManagerMetrics(object): def __init__(self, metrics, prefix): self.metrics = metrics - self.group_name = '%s-fetch-manager-metrics' % prefix + self.group_name = '%s-fetch-manager-metrics' % (prefix,) self.bytes_fetched = metrics.sensor('bytes-fetched') self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, @@ -955,15 +955,15 @@ class FetchManagerMetrics(object): bytes_fetched = self.metrics.sensor(name) bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', self.group_name, - 'The average number of bytes fetched per request for topic %s' % topic, + 'The average number of bytes fetched per request for topic %s' % (topic,), metric_tags), Avg()) bytes_fetched.add(self.metrics.metric_name('fetch-size-max', self.group_name, - 'The maximum number of bytes fetched per request for topic %s' % topic, + 'The maximum number of bytes fetched per request for topic %s' % (topic,), metric_tags), Max()) bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', self.group_name, - 'The average number of bytes consumed per second for topic %s' % topic, + 'The average number of bytes consumed per second for topic %s' % (topic,), metric_tags), Rate()) bytes_fetched.record(num_bytes) @@ -976,10 +976,10 @@ class FetchManagerMetrics(object): records_fetched = self.metrics.sensor(name) records_fetched.add(self.metrics.metric_name('records-per-request-avg', self.group_name, - 'The average number of records in each request for topic %s' % topic, + 'The average number of records in each request for topic %s' % (topic,), metric_tags), Avg()) records_fetched.add(self.metrics.metric_name('records-consumed-rate', self.group_name, - 'The average number of records consumed per second for topic %s' % topic, + 'The average number of records consumed per second for topic %s' % (topic,), metric_tags), Rate()) records_fetched.record(num_records) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 279cce0..8727de7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -309,7 +309,7 @@ class KafkaConsumer(six.Iterator): # Only check for extra config keys in top-level class extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs) + raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index b60a586..a6a64a5 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -247,7 +247,7 @@ class SimpleConsumer(Consumer): self.offsets[resp.partition] = \ resp.offsets[0] + deltas[resp.partition] else: - raise ValueError('Unexpected value for `whence`, %d' % whence) + raise ValueError('Unexpected value for `whence`, %d' % (whence,)) # Reset queue and fetch offsets since they are invalid self.fetch_offsets = self.offsets.copy() diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 10d722e..4b0b275 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -247,7 +247,7 @@ class SubscriptionState(object): for tp in assignments: if tp.topic not in self.subscription: - raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp)) + raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,)) # after rebalancing, we always reinitialize the assignment state self.assignment.clear() |