summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 09:34:48 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 09:35:45 -0700
commit5a14bd8c947251d1a8f848175cc3cf2b07af3411 (patch)
treea251ddbc60c84405762365429de9b04727653e6c /kafka
parent221f56d8a05cdc2d37f85018e4af352b4b2a95c5 (diff)
downloadkafka-python-5a14bd8c947251d1a8f848175cc3cf2b07af3411.tar.gz
Update imports from kafka.common -> kafka.errors / kafka.structs
Diffstat (limited to 'kafka')
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/client.py14
-rw-r--r--kafka/client_async.py3
-rw-r--r--kafka/cluster.py4
-rw-r--r--kafka/conn.py4
-rw-r--r--kafka/consumer/base.py10
-rw-r--r--kafka/consumer/fetcher.py4
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/consumer/subscription_state.py3
-rw-r--r--kafka/context.py3
-rw-r--r--kafka/coordinator/base.py2
-rw-r--r--kafka/coordinator/consumer.py4
-rw-r--r--kafka/coordinator/heartbeat.py2
-rw-r--r--kafka/coordinator/protocol.py2
-rw-r--r--kafka/future.py2
-rw-r--r--kafka/producer/base.py9
-rw-r--r--kafka/producer/buffer.py2
-rw-r--r--kafka/producer/future.py3
-rw-r--r--kafka/producer/kafka.py4
-rw-r--r--kafka/producer/record_accumulator.py4
-rw-r--r--kafka/producer/sender.py4
-rw-r--r--kafka/protocol/legacy.py33
-rw-r--r--kafka/util.py2
23 files changed, 57 insertions, 65 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 3f0d8bd..6b2ba97 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -22,7 +22,7 @@ from kafka.conn import BrokerConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message)
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
-from kafka.common import TopicPartition
+from kafka.structs import TopicPartition
# To be deprecated when KafkaProducer interface is released
from kafka.client import SimpleClient
diff --git a/kafka/client.py b/kafka/client.py
index 99d6fec..2bd2324 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -7,12 +7,12 @@ import time
import six
-import kafka.common
-from kafka.common import (TopicPartition, BrokerMetadata, UnknownError,
- ConnectionError, FailedPayloadsError,
+import kafka.errors
+from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
+from kafka.structs import TopicPartition, BrokerMetadata
from kafka.conn import (
collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
@@ -123,7 +123,7 @@ class SimpleClient(object):
# If there's a problem with finding the coordinator, raise the
# provided error
- kafka.common.check_error(resp)
+ kafka.errors.check_error(resp)
# Otherwise return the BrokerMetadata
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
@@ -389,7 +389,7 @@ class SimpleClient(object):
# Or a server api error response
try:
- kafka.common.check_error(resp)
+ kafka.errors.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise
@@ -509,7 +509,7 @@ class SimpleClient(object):
for error, topic, partitions in resp.topics:
# Errors expected for new topics
if error:
- error_type = kafka.common.kafka_errors.get(error, UnknownError)
+ error_type = kafka.errors.kafka_errors.get(error, UnknownError)
if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
log.error('Error loading topic metadata for %s: %s (%s)',
topic, error_type, error)
@@ -530,7 +530,7 @@ class SimpleClient(object):
# Check for partition errors
if error:
- error_type = kafka.common.kafka_errors.get(error, UnknownError)
+ error_type = kafka.errors.kafka_errors.get(error, UnknownError)
# If No Leader, topics_to_brokers topic_partition -> None
if error_type is LeaderNotAvailableError:
diff --git a/kafka/client_async.py b/kafka/client_async.py
index d70e4f2..b77ead5 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -11,10 +11,9 @@ import time
import six
-import kafka.common as Errors # TODO: make Errors a separate class
-
from .cluster import ClusterMetadata
from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
+from . import errors as Errors
from .future import Future
from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9ab6e6e..f7940e6 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -9,9 +9,9 @@ import time
import six
-import kafka.common as Errors
-from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition
+from . import errors as Errors
from .future import Future
+from .structs import BrokerMetadata, PartitionMetadata, TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/conn.py b/kafka/conn.py
index ffc839e..dc7dd23 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -13,7 +13,7 @@ import warnings
import six
-import kafka.common as Errors
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
@@ -149,7 +149,7 @@ class BrokerConnection(object):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
- Default: kafka.common.ConnectionError.
+ Default: kafka.errors.ConnectionError.
"""
if self._sock:
self._sock.close()
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 75c3ee1..d2d9e8d 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -6,12 +6,10 @@ import numbers
from threading import Lock
import warnings
-import kafka.common
-from kafka.common import (
- OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
- UnknownTopicOrPartitionError, check_error, KafkaError
-)
-
+from kafka.errors import (
+ UnknownTopicOrPartitionError, check_error, KafkaError)
+from kafka.structs import (
+ OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
from kafka.util import ReentrantTimer
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 7112c7e..2c9c0b9 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -6,12 +6,12 @@ import logging
import six
-import kafka.common as Errors
-from kafka.common import TopicPartition
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
+from kafka.structs import TopicPartition
log = logging.getLogger(__name__)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 9172040..6c85c21 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -7,13 +7,13 @@ import time
import six
from kafka.client_async import KafkaClient
-from kafka.common import TopicPartition
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.protocol.offset import OffsetResetStrategy
+from kafka.structs import TopicPartition
from kafka.version import __version__
log = logging.getLogger(__name__)
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 3d170ae..1c045aa 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -6,8 +6,9 @@ import re
import six
-from kafka.common import IllegalStateError, OffsetAndMetadata
+from kafka.errors import IllegalStateError
from kafka.protocol.offset import OffsetResetStrategy
+from kafka.structs import OffsetAndMetadata
log = logging.getLogger(__name__)
diff --git a/kafka/context.py b/kafka/context.py
index 376fad1..d6c15fe 100644
--- a/kafka/context.py
+++ b/kafka/context.py
@@ -3,7 +3,8 @@ Context manager to commit/rollback consumer offsets.
"""
from logging import getLogger
-from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError
+from kafka.errors import check_error, OffsetOutOfRangeError
+from kafka.structs import OffsetCommitRequestPayload
class OffsetCommitContext(object):
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index b0a0981..fcf3901 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -6,7 +6,7 @@ import weakref
import six
-import kafka.common as Errors
+import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (GroupCoordinatorRequest,
OffsetCommitRequest_v2 as OffsetCommitRequest)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index b2ef1ea..ae2344f 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -12,14 +12,14 @@ from .base import BaseCoordinator
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
-from ..common import OffsetAndMetadata, TopicPartition
+from .. import errors as Errors
from ..future import Future
from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
-import kafka.common as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
index 4ddcf09..e73b3e5 100644
--- a/kafka/coordinator/heartbeat.py
+++ b/kafka/coordinator/heartbeat.py
@@ -1,7 +1,7 @@
import copy
import time
-import kafka.common as Errors
+import kafka.errors as Errors
class Heartbeat(object):
diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py
index 9e37397..56a3901 100644
--- a/kafka/coordinator/protocol.py
+++ b/kafka/coordinator/protocol.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import
-from kafka.common import TopicPartition
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
+from kafka.structs import TopicPartition
class ConsumerProtocolMemberMetadata(Struct):
diff --git a/kafka/future.py b/kafka/future.py
index c7e0b14..b379272 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -1,7 +1,7 @@
import functools
import logging
-import kafka.common as Errors
+import kafka.errors as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 2067c7e..07e61d5 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -14,13 +14,12 @@ from threading import Thread, Event
import six
-from kafka.common import (
- ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions,
+from kafka.structs import (
+ ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
+from kafka.errors import (
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
- RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
-)
-
+ RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
log = logging.getLogger('kafka.producer')
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index 8c83ffc..b2ac747 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -7,10 +7,10 @@ import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_encode, snappy_encode, lz4_encode)
+from .. import errors as Errors
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
-import kafka.common as Errors
class MessageSetBuffer(object):
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 5a7a9dc..35520d8 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -3,10 +3,9 @@ from __future__ import absolute_import
import collections
import threading
+from .. import errors as Errors
from ..future import Future
-import kafka.common as Errors
-
class FutureProduceResult(Future):
def __init__(self, topic_partition):
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e1a0374..dd8e71f 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -8,14 +8,14 @@ import threading
import time
from ..client_async import KafkaClient
-from ..common import TopicPartition
+from ..structs import TopicPartition
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
+from .. import errors as Errors
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
from .sender import Sender
-import kafka.common as Errors
log = logging.getLogger(__name__)
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 19dc199..b3abaa3 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -8,12 +8,12 @@ import time
import six
-from ..common import TopicPartition
+from .. import errors as Errors
+from ..structs import TopicPartition
from ..protocol.message import Message, MessageSet
from .buffer import MessageSetBuffer, SimpleBufferPool
from .future import FutureRecordMetadata, FutureProduceResult
-import kafka.common as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 9a86a16..3cafb26 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -7,11 +7,11 @@ import threading
import six
-from ..common import TopicPartition
+from .. import errors as Errors
+from ..structs import TopicPartition
from ..version import __version__
from ..protocol.produce import ProduceRequest
-import kafka.common as Errors
log = logging.getLogger(__name__)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 1835521..e4745f1 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -7,26 +7,21 @@ import six
from six.moves import xrange
-import kafka.common
import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
+import kafka.structs
from kafka.codec import (
- gzip_encode, gzip_decode, snappy_encode, snappy_decode
-)
-from kafka.common import (
- ProtocolError, ChecksumError,
- UnsupportedCodecError,
- ConsumerMetadataResponse
-)
+ gzip_encode, gzip_decode, snappy_encode, snappy_decode)
+from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError
+from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
- write_short_string, write_int_string, group_by_topic_and_partition
-)
+ write_short_string, write_int_string, group_by_topic_and_partition)
log = logging.getLogger(__name__)
@@ -166,7 +161,7 @@ class KafkaProtocol(object):
Return: list of ProduceResponsePayload
"""
return [
- kafka.common.ProduceResponsePayload(topic, partition, error, offset)
+ kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
for topic, partitions in response.topics
for partition, error, offset in partitions
]
@@ -207,9 +202,9 @@ class KafkaProtocol(object):
response: FetchResponse
"""
return [
- kafka.common.FetchResponsePayload(
+ kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
- kafka.common.OffsetAndMessage(offset, message)
+ kafka.structs.OffsetAndMessage(offset, message)
for offset, _, message in messages])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
@@ -239,7 +234,7 @@ class KafkaProtocol(object):
Returns: list of OffsetResponsePayloads
"""
return [
- kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
+ kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
for topic, partitions in response.topics
for partition, error, offsets in partitions
]
@@ -323,7 +318,7 @@ class KafkaProtocol(object):
response: OffsetCommitResponse
"""
return [
- kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
for topic, partitions in response.topics
for partition, error in partitions
]
@@ -362,7 +357,7 @@ class KafkaProtocol(object):
response: OffsetFetchResponse
"""
return [
- kafka.common.OffsetFetchResponsePayload(
+ kafka.structs.OffsetFetchResponsePayload(
topic, partition, offset, metadata, error
)
for topic, partitions in response.topics
@@ -379,7 +374,7 @@ def create_message(payload, key=None):
key: bytes, a key used for partition routing (optional)
"""
- return kafka.common.Message(0, 0, key, payload)
+ return kafka.structs.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
@@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None):
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
- return kafka.common.Message(0, 0x00 | codec, key, gzipped)
+ return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
@@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None):
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
- return kafka.common.Message(0, 0x00 | codec, key, snapped)
+ return kafka.structs.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
diff --git a/kafka/util.py b/kafka/util.py
index 7a11910..18c39a4 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -7,7 +7,7 @@ import weakref
import six
-from kafka.common import BufferUnderflowError
+from kafka.errors import BufferUnderflowError
def crc32(data):