diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-13 11:57:45 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-11-17 16:38:01 -0800 |
commit | 8eb26b6420a358dc10af7e58d270fae690e07fdf (patch) | |
tree | 2d1b7f3e0744e80b7757182c0e978ccd20814d52 /kafka | |
parent | 7bd6b5da6d402565f25fce9e710be26b2d4cc125 (diff) | |
download | kafka-python-use-explicit-tuples-for-strings.tar.gz |
Be explicit with tuples for %s formattinguse-explicit-tuples-for-strings
Fix #1633
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/admin/kafka.py | 2 | ||||
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 14 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 2 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 6 | ||||
-rw-r--r-- | kafka/metrics/metrics.py | 2 | ||||
-rw-r--r-- | kafka/metrics/stats/percentiles.py | 2 | ||||
-rw-r--r-- | kafka/metrics/stats/rate.py | 2 | ||||
-rw-r--r-- | kafka/metrics/stats/sensor.py | 2 | ||||
-rw-r--r-- | kafka/producer/base.py | 4 | ||||
-rw-r--r-- | kafka/producer/future.py | 2 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 10 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 2 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 | ||||
-rw-r--r-- | kafka/producer/simple.py | 2 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 2 | ||||
-rw-r--r-- | kafka/protocol/message.py | 4 | ||||
-rw-r--r-- | kafka/protocol/parser.py | 2 | ||||
-rw-r--r-- | kafka/record/legacy_records.py | 2 |
22 files changed, 38 insertions, 38 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index fbbbcc2..98bac5d 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -166,7 +166,7 @@ class KafkaAdmin(object): log.debug("Starting Kafka administration interface") extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs) + raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) diff --git a/kafka/client.py b/kafka/client.py index 789d4da..148cae0 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -174,7 +174,7 @@ class SimpleClient(object): return decoder_fn(future.value) - raise KafkaUnavailableError('All servers failed to process request: %s' % hosts) + raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,)) def _payloads_by_broker(self, payloads): payloads_by_broker = collections.defaultdict(list) diff --git a/kafka/client_async.py b/kafka/client_async.py index bf395c5..cf57ef9 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -355,7 +355,7 @@ class KafkaClient(object): conn = self._conns.get(node_id) if conn is None: - assert broker, 'Broker id %s not in current metadata' % node_id + assert broker, 'Broker id %s not in current metadata' % (node_id,) log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7d58b7c..3638831 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -298,7 +298,7 @@ class Fetcher(six.Iterator): remaining_ms = timeout_ms - elapsed_ms raise Errors.KafkaTimeoutError( - "Failed to get offsets by timestamps in %s ms" % timeout_ms) + "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) def fetched_records(self, max_records=None): """Returns previously fetched records and updates consumed offsets. @@ -911,7 +911,7 @@ class FetchResponseMetricAggregator(object): class FetchManagerMetrics(object): def __init__(self, metrics, prefix): self.metrics = metrics - self.group_name = '%s-fetch-manager-metrics' % prefix + self.group_name = '%s-fetch-manager-metrics' % (prefix,) self.bytes_fetched = metrics.sensor('bytes-fetched') self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name, @@ -955,15 +955,15 @@ class FetchManagerMetrics(object): bytes_fetched = self.metrics.sensor(name) bytes_fetched.add(self.metrics.metric_name('fetch-size-avg', self.group_name, - 'The average number of bytes fetched per request for topic %s' % topic, + 'The average number of bytes fetched per request for topic %s' % (topic,), metric_tags), Avg()) bytes_fetched.add(self.metrics.metric_name('fetch-size-max', self.group_name, - 'The maximum number of bytes fetched per request for topic %s' % topic, + 'The maximum number of bytes fetched per request for topic %s' % (topic,), metric_tags), Max()) bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate', self.group_name, - 'The average number of bytes consumed per second for topic %s' % topic, + 'The average number of bytes consumed per second for topic %s' % (topic,), metric_tags), Rate()) bytes_fetched.record(num_bytes) @@ -976,10 +976,10 @@ class FetchManagerMetrics(object): records_fetched = self.metrics.sensor(name) records_fetched.add(self.metrics.metric_name('records-per-request-avg', self.group_name, - 'The average number of records in each request for topic %s' % topic, + 'The average number of records in each request for topic %s' % (topic,), metric_tags), Avg()) records_fetched.add(self.metrics.metric_name('records-consumed-rate', self.group_name, - 'The average number of records consumed per second for topic %s' % topic, + 'The average number of records consumed per second for topic %s' % (topic,), metric_tags), Rate()) records_fetched.record(num_records) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 279cce0..8727de7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -309,7 +309,7 @@ class KafkaConsumer(six.Iterator): # Only check for extra config keys in top-level class extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: - raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs) + raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,)) self.config = copy.copy(self.DEFAULT_CONFIG) self.config.update(configs) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index b60a586..a6a64a5 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -247,7 +247,7 @@ class SimpleConsumer(Consumer): self.offsets[resp.partition] = \ resp.offsets[0] + deltas[resp.partition] else: - raise ValueError('Unexpected value for `whence`, %d' % whence) + raise ValueError('Unexpected value for `whence`, %d' % (whence,)) # Reset queue and fetch offsets since they are invalid self.fetch_offsets = self.offsets.copy() diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 10d722e..4b0b275 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -247,7 +247,7 @@ class SubscriptionState(object): for tp in assignments: if tp.topic not in self.subscription: - raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp)) + raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,)) # after rebalancing, we always reinitialize the assignment state self.assignment.clear() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 647a6b5..14eee0f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -216,7 +216,7 @@ class ConsumerCoordinator(BaseCoordinator): self._assignment_snapshot = None assignor = self._lookup_assignor(protocol) - assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol + assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) @@ -297,7 +297,7 @@ class ConsumerCoordinator(BaseCoordinator): def _perform_assignment(self, leader_id, assignment_strategy, members): assignor = self._lookup_assignor(assignment_strategy) - assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy + assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) member_metadata = {} all_subscribed_topics = set() for member_id, metadata_bytes in members: @@ -804,7 +804,7 @@ class ConsumerCoordinator(BaseCoordinator): class ConsumerCoordinatorMetrics(object): def __init__(self, metrics, metric_group_prefix, subscription): self.metrics = metrics - self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix + self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,) self.commit_latency = metrics.sensor('commit-latency') self.commit_latency.add(metrics.metric_name( diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py index f2e99ed..2c53488 100644 --- a/kafka/metrics/metrics.py +++ b/kafka/metrics/metrics.py @@ -225,7 +225,7 @@ class Metrics(object): with self._lock: if metric.metric_name in self.metrics: raise ValueError('A metric named "%s" already exists, cannot' - ' register another one.' % metric.metric_name) + ' register another one.' % (metric.metric_name,)) self.metrics[metric.metric_name] = metric for reporter in self._reporters: reporter.metric_change(metric) diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py index b55c5ac..6d702e8 100644 --- a/kafka/metrics/stats/percentiles.py +++ b/kafka/metrics/stats/percentiles.py @@ -27,7 +27,7 @@ class Percentiles(AbstractSampledStat, AbstractCompoundStat): ' to be 0.0.') self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val) else: - ValueError('Unknown bucket type: %s' % bucketing) + ValueError('Unknown bucket type: %s' % (bucketing,)) def stats(self): measurables = [] diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py index 810c543..68393fb 100644 --- a/kafka/metrics/stats/rate.py +++ b/kafka/metrics/stats/rate.py @@ -101,7 +101,7 @@ class Rate(AbstractMeasurableStat): elif self._unit == TimeUnit.DAYS: return time_ms / (24.0 * 60.0 * 60.0 * 1000.0) else: - raise ValueError('Unknown unit: %s' % self._unit) + raise ValueError('Unknown unit: %s' % (self._unit,)) class SampledTotal(AbstractSampledStat): diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py index 73a4665..571723f 100644 --- a/kafka/metrics/stats/sensor.py +++ b/kafka/metrics/stats/sensor.py @@ -35,7 +35,7 @@ class Sensor(object): """Validate that this sensor doesn't end up referencing itself.""" if self in sensors: raise ValueError('Circular dependency in sensors: %s is its own' - 'parent.' % self.name) + 'parent.' % (self.name,)) sensors.add(self) for parent in self._parents: parent._check_forest(sensors) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 1da74c8..b323966 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -316,7 +316,7 @@ class Producer(object): if codec is None: codec = CODEC_NONE elif codec not in ALL_CODECS: - raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) + raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,)) self.codec = codec self.codec_compresslevel = codec_compresslevel @@ -419,7 +419,7 @@ class Producer(object): raise AsyncProducerQueueFull( msg[idx:], 'Producer async queue overfilled. ' - 'Current queue size %d.' % self.queue.qsize()) + 'Current queue size %d.' % (self.queue.qsize(),)) resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 1c5d6d7..f67db09 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -59,7 +59,7 @@ class FutureRecordMetadata(Future): def get(self, timeout=None): if not self.is_done and not self._produce_future.wait(timeout): raise Errors.KafkaTimeoutError( - "Timeout after waiting for %s secs." % timeout) + "Timeout after waiting for %s secs." % (timeout,)) assert self.is_done if self.failed(): raise self.exception # pylint: disable-msg=raising-bad-type diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 45bb058..685c3f9 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -340,11 +340,11 @@ class KafkaProducer(object): self.config[key] = configs.pop(key) # Only check for extra config keys in top-level class - assert not configs, 'Unrecognized configs: %s' % configs + assert not configs, 'Unrecognized configs: %s' % (configs,) if self.config['client_id'] is None: self.config['client_id'] = 'kafka-python-producer-%s' % \ - PRODUCER_CLIENT_ID_SEQUENCE.increment() + (PRODUCER_CLIENT_ID_SEQUENCE.increment(),) if self.config['acks'] == 'all': self.config['acks'] = -1 @@ -633,12 +633,12 @@ class KafkaProducer(object): raise Errors.MessageSizeTooLargeError( "The message is %d bytes when serialized which is larger than" " the maximum request size you have configured with the" - " max_request_size configuration" % size) + " max_request_size configuration" % (size,)) if size > self.config['buffer_memory']: raise Errors.MessageSizeTooLargeError( "The message is %d bytes when serialized which is larger than" " the total memory buffer you have configured with the" - " buffer_memory configuration." % size) + " buffer_memory configuration." % (size,)) def _wait_on_metadata(self, topic, max_wait): """ @@ -679,7 +679,7 @@ class KafkaProducer(object): elapsed = time.time() - begin if not metadata_event.is_set(): raise Errors.KafkaTimeoutError( - "Failed to update metadata after %.1f secs." % max_wait) + "Failed to update metadata after %.1f secs." % (max_wait,)) elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) else: diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 62bb733..3ba9216 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -46,4 +46,4 @@ class KeyedProducer(Producer): return self.send_messages(topic, key, msg) def __repr__(self): - return '<KeyedProducer batch=%s>' % self.async_send + return '<KeyedProducer batch=%s>' % (self.async_send,) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 728bf18..eeb928d 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -102,11 +102,11 @@ class ProducerBatch(object): error = None if not self.in_retry() and is_full and timeout < since_append: - error = "%d seconds have passed since last append" % since_append + error = "%d seconds have passed since last append" % (since_append,) elif not self.in_retry() and timeout < since_ready: - error = "%d seconds have passed since batch creation plus linger time" % since_ready + error = "%d seconds have passed since batch creation plus linger time" % (since_ready,) elif self.in_retry() and timeout < since_backoff: - error = "%d seconds have passed since last attempt plus backoff time" % since_backoff + error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,) if error: self.records.close() diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index e06e659..f334a49 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -51,4 +51,4 @@ class SimpleProducer(Producer): ) def __repr__(self): - return '<SimpleProducer batch=%s>' % self.async_send + return '<SimpleProducer batch=%s>' % (self.async_send,) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 7dd2580..2e8f5bc 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -471,4 +471,4 @@ def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None) elif codec == CODEC_SNAPPY: return [create_snappy_message(messages, key)] else: - raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) + raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,)) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 19dcbd9..31527bf 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -77,7 +77,7 @@ class Message(Struct): elif version == 0: fields = (self.crc, self.magic, self.attributes, self.key, self.value) else: - raise ValueError('Unrecognized message version: %s' % version) + raise ValueError('Unrecognized message version: %s' % (version,)) message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message @@ -143,7 +143,7 @@ class Message(Struct): class PartialMessage(bytes): def __repr__(self): - return 'PartialMessage(%s)' % self + return 'PartialMessage(%s)' % (self,) class MessageSet(AbstractType): diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index 4d77bb3..a99b3ae 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -136,7 +136,7 @@ class KafkaProtocol(object): raise Errors.CorrelationIdError( 'No in-flight-request found for server response' ' with correlation ID %d' - % recv_correlation_id) + % (recv_correlation_id,)) (correlation_id, request) = self.in_flight_requests.popleft() diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index 1bdba81..bb6c21c 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -254,7 +254,7 @@ class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase): # There should only ever be a single layer of compression assert not attrs & self.CODEC_MASK, ( 'MessageSet at offset %d appears double-compressed. This ' - 'should not happen -- check your producers!' % offset) + 'should not happen -- check your producers!' % (offset,)) # When magic value is greater than 0, the timestamp # of a compressed message depends on the |