summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/consumer/multiprocess.py4
-rw-r--r--kafka/consumer/simple.py6
-rw-r--r--kafka/coordinator/assignors/roundrobin.py2
-rw-r--r--kafka/coordinator/consumer.py2
-rw-r--r--kafka/producer/base.py4
-rw-r--r--kafka/producer/kafka.py8
-rw-r--r--kafka/producer/record_accumulator.py4
-rw-r--r--kafka/protocol/legacy.py5
-rw-r--r--kafka/structs.py4
10 files changed, 18 insertions, 23 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index f108eff..ff364d3 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -25,8 +25,8 @@ from kafka.conn import BrokerConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
-from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.serializer import Serializer, Deserializer
+from kafka.structs import TopicPartition, OffsetAndMetadata
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 1da4a33..758bb92 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -8,7 +8,7 @@ import warnings
from kafka.vendor.six.moves import queue # pylint: disable=import-error
-from kafka.common import KafkaError
+from kafka.errors import KafkaError
from kafka.consumer.base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
@@ -92,7 +92,7 @@ def _mp_consume(client, group, topic, message_queue, size, events, **consumer_op
except KafkaError as e:
# Retry with exponential backoff
- log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
+ log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval)
time.sleep(interval)
interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index c0c1b1e..b60a586 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -24,13 +24,13 @@ from kafka.consumer.base import (
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
-from kafka.common import (
- FetchRequestPayload, KafkaError, OffsetRequestPayload,
- ConsumerFetchSizeTooSmall,
+from kafka.errors import (
+ KafkaError, ConsumerFetchSizeTooSmall,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
from kafka.protocol.message import PartialMessage
+from kafka.structs import FetchRequestPayload, OffsetRequestPayload
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
index a831033..2d24a5c 100644
--- a/kafka/coordinator/assignors/roundrobin.py
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -7,8 +7,8 @@ import logging
from kafka.vendor import six
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
-from kafka.common import TopicPartition
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index cb1de0d..f90d182 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -11,7 +11,7 @@ from kafka.coordinator.base import BaseCoordinator, Generation
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
-from kafka import errors as Errors
+import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index c9dd6c3..956cef6 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -14,13 +14,13 @@ from threading import Thread, Event
from kafka.vendor import six
-from kafka.structs import (
- ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
from kafka.errors import (
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
+from kafka.structs import (
+ ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
log = logging.getLogger('kafka.producer')
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index f285ab4..7d52bdf 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -10,18 +10,18 @@ import weakref
from kafka.vendor import six
-from kafka import errors as Errors
+import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
from kafka.codec import has_gzip, has_snappy, has_lz4
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
+from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
+from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
+from kafka.producer.sender import Sender
from kafka.record.default_records import DefaultRecordBatchBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
-from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
-from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
-from kafka.producer.sender import Sender
log = logging.getLogger(__name__)
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 61f1e0e..1cd5413 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -6,12 +6,12 @@ import logging
import threading
import time
-from kafka import errors as Errors
+import kafka.errors as Errors
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
-from kafka.structs import TopicPartition
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
+from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index b8f84e7..7dd2580 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -15,7 +15,6 @@ import kafka.structs
from kafka.codec import gzip_encode, snappy_encode
from kafka.errors import ProtocolError, UnsupportedCodecError
-from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, relative_unpack,
write_int_string, group_by_topic_and_partition)
@@ -322,7 +321,7 @@ class KafkaProtocol(object):
@classmethod
def decode_consumer_metadata_response(cls, data):
"""
- Decode bytes to a ConsumerMetadataResponse
+ Decode bytes to a kafka.structs.ConsumerMetadataResponse
Arguments:
data: bytes to decode
@@ -331,7 +330,7 @@ class KafkaProtocol(object):
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
- return ConsumerMetadataResponse(error, nodeId, host, port)
+ return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port)
@classmethod
def encode_offset_commit_request(cls, group, payloads):
diff --git a/kafka/structs.py b/kafka/structs.py
index 62f36dd..e15e92e 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -93,7 +93,3 @@ KafkaMessage = namedtuple("KafkaMessage",
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])
-
-
-# Support legacy imports from kafka.common
-from kafka.errors import *