diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/__init__.py | 2 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 4 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 6 | ||||
-rw-r--r-- | kafka/coordinator/assignors/roundrobin.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 2 | ||||
-rw-r--r-- | kafka/producer/base.py | 4 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 8 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 4 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 5 | ||||
-rw-r--r-- | kafka/structs.py | 4 |
10 files changed, 18 insertions, 23 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index f108eff..ff364d3 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -25,8 +25,8 @@ from kafka.conn import BrokerConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message) from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner -from kafka.structs import TopicPartition, OffsetAndMetadata from kafka.serializer import Serializer, Deserializer +from kafka.structs import TopicPartition, OffsetAndMetadata # To be deprecated when KafkaProducer interface is released from kafka.client import SimpleClient diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 1da4a33..758bb92 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -8,7 +8,7 @@ import warnings from kafka.vendor.six.moves import queue # pylint: disable=import-error -from kafka.common import KafkaError +from kafka.errors import KafkaError from kafka.consumer.base import ( Consumer, AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, @@ -92,7 +92,7 @@ def _mp_consume(client, group, topic, message_queue, size, events, **consumer_op except KafkaError as e: # Retry with exponential backoff - log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval) time.sleep(interval) interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index c0c1b1e..b60a586 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -24,13 +24,13 @@ from kafka.consumer.base import ( ITER_TIMEOUT_SECONDS, NO_MESSAGES_WAIT_TIME_SECONDS ) -from kafka.common import ( - FetchRequestPayload, KafkaError, OffsetRequestPayload, - ConsumerFetchSizeTooSmall, +from kafka.errors import ( + KafkaError, ConsumerFetchSizeTooSmall, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error ) from kafka.protocol.message import PartialMessage +from kafka.structs import FetchRequestPayload, OffsetRequestPayload log = logging.getLogger(__name__) diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index a831033..2d24a5c 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -7,8 +7,8 @@ import logging from kafka.vendor import six from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.common import TopicPartition from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment +from kafka.structs import TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index cb1de0d..f90d182 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -11,7 +11,7 @@ from kafka.coordinator.base import BaseCoordinator, Generation from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.protocol import ConsumerProtocol -from kafka import errors as Errors +import kafka.errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c9dd6c3..956cef6 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -14,13 +14,13 @@ from threading import Thread, Event from kafka.vendor import six -from kafka.structs import ( - ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) from kafka.errors import ( kafka_errors, UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError, AsyncProducerQueueFull, UnknownError, RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set +from kafka.structs import ( + ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) log = logging.getLogger('kafka.producer') diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f285ab4..7d52bdf 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -10,18 +10,18 @@ import weakref from kafka.vendor import six -from kafka import errors as Errors +import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors from kafka.codec import has_gzip, has_snappy, has_lz4 from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner +from kafka.producer.future import FutureRecordMetadata, FutureProduceResult +from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator +from kafka.producer.sender import Sender from kafka.record.default_records import DefaultRecordBatchBuilder from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition -from kafka.producer.future import FutureRecordMetadata, FutureProduceResult -from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator -from kafka.producer.sender import Sender log = logging.getLogger(__name__) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 61f1e0e..1cd5413 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -6,12 +6,12 @@ import logging import threading import time -from kafka import errors as Errors +import kafka.errors as Errors from kafka.producer.buffer import SimpleBufferPool from kafka.producer.future import FutureRecordMetadata, FutureProduceResult -from kafka.structs import TopicPartition from kafka.record.memory_records import MemoryRecordsBuilder from kafka.record.legacy_records import LegacyRecordBatchBuilder +from kafka.structs import TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index b8f84e7..7dd2580 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -15,7 +15,6 @@ import kafka.structs from kafka.codec import gzip_encode, snappy_encode from kafka.errors import ProtocolError, UnsupportedCodecError -from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, relative_unpack, write_int_string, group_by_topic_and_partition) @@ -322,7 +321,7 @@ class KafkaProtocol(object): @classmethod def decode_consumer_metadata_response(cls, data): """ - Decode bytes to a ConsumerMetadataResponse + Decode bytes to a kafka.structs.ConsumerMetadataResponse Arguments: data: bytes to decode @@ -331,7 +330,7 @@ class KafkaProtocol(object): (host, cur) = read_short_string(data, cur) ((port,), cur) = relative_unpack('>i', data, cur) - return ConsumerMetadataResponse(error, nodeId, host, port) + return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port) @classmethod def encode_offset_commit_request(cls, group, payloads): diff --git a/kafka/structs.py b/kafka/structs.py index 62f36dd..e15e92e 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -93,7 +93,3 @@ KafkaMessage = namedtuple("KafkaMessage", # Limit value: int >= 0, 0 means no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) - - -# Support legacy imports from kafka.common -from kafka.errors import * |