diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/__init__.py | 2 | ||||
-rw-r--r-- | kafka/client.py | 14 | ||||
-rw-r--r-- | kafka/client_async.py | 3 | ||||
-rw-r--r-- | kafka/cluster.py | 4 | ||||
-rw-r--r-- | kafka/conn.py | 4 | ||||
-rw-r--r-- | kafka/consumer/base.py | 10 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 4 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 3 | ||||
-rw-r--r-- | kafka/context.py | 3 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 4 | ||||
-rw-r--r-- | kafka/coordinator/heartbeat.py | 2 | ||||
-rw-r--r-- | kafka/coordinator/protocol.py | 2 | ||||
-rw-r--r-- | kafka/future.py | 2 | ||||
-rw-r--r-- | kafka/producer/base.py | 9 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 2 | ||||
-rw-r--r-- | kafka/producer/future.py | 3 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 4 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 4 | ||||
-rw-r--r-- | kafka/producer/sender.py | 4 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 33 | ||||
-rw-r--r-- | kafka/util.py | 2 |
23 files changed, 57 insertions, 65 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index 3f0d8bd..6b2ba97 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -22,7 +22,7 @@ from kafka.conn import BrokerConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message) from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner -from kafka.common import TopicPartition +from kafka.structs import TopicPartition # To be deprecated when KafkaProducer interface is released from kafka.client import SimpleClient diff --git a/kafka/client.py b/kafka/client.py index 99d6fec..2bd2324 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -7,12 +7,12 @@ import time import six -import kafka.common -from kafka.common import (TopicPartition, BrokerMetadata, UnknownError, - ConnectionError, FailedPayloadsError, +import kafka.errors +from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError, KafkaTimeoutError, KafkaUnavailableError, LeaderNotAvailableError, UnknownTopicOrPartitionError, NotLeaderForPartitionError, ReplicaNotAvailableError) +from kafka.structs import TopicPartition, BrokerMetadata from kafka.conn import ( collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, @@ -123,7 +123,7 @@ class SimpleClient(object): # If there's a problem with finding the coordinator, raise the # provided error - kafka.common.check_error(resp) + kafka.errors.check_error(resp) # Otherwise return the BrokerMetadata return BrokerMetadata(resp.nodeId, resp.host, resp.port) @@ -389,7 +389,7 @@ class SimpleClient(object): # Or a server api error response try: - kafka.common.check_error(resp) + kafka.errors.check_error(resp) except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): self.reset_topic_metadata(resp.topic) raise @@ -509,7 +509,7 @@ class SimpleClient(object): for error, topic, partitions in resp.topics: # Errors expected for new topics if error: - error_type = kafka.common.kafka_errors.get(error, UnknownError) + error_type = kafka.errors.kafka_errors.get(error, UnknownError) if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError): log.error('Error loading topic metadata for %s: %s (%s)', topic, error_type, error) @@ -530,7 +530,7 @@ class SimpleClient(object): # Check for partition errors if error: - error_type = kafka.common.kafka_errors.get(error, UnknownError) + error_type = kafka.errors.kafka_errors.get(error, UnknownError) # If No Leader, topics_to_brokers topic_partition -> None if error_type is LeaderNotAvailableError: diff --git a/kafka/client_async.py b/kafka/client_async.py index d70e4f2..b77ead5 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -11,10 +11,9 @@ import time import six -import kafka.common as Errors # TODO: make Errors a separate class - from .cluster import ClusterMetadata from .conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi +from . import errors as Errors from .future import Future from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest diff --git a/kafka/cluster.py b/kafka/cluster.py index 9ab6e6e..f7940e6 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -9,9 +9,9 @@ import time import six -import kafka.common as Errors -from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition +from . import errors as Errors from .future import Future +from .structs import BrokerMetadata, PartitionMetadata, TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/conn.py b/kafka/conn.py index ffc839e..dc7dd23 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -13,7 +13,7 @@ import warnings import six -import kafka.common as Errors +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse @@ -149,7 +149,7 @@ class BrokerConnection(object): Arguments: error (Exception, optional): pending in-flight-requests will be failed with this exception. - Default: kafka.common.ConnectionError. + Default: kafka.errors.ConnectionError. """ if self._sock: self._sock.close() diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 75c3ee1..d2d9e8d 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -6,12 +6,10 @@ import numbers from threading import Lock import warnings -import kafka.common -from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, - UnknownTopicOrPartitionError, check_error, KafkaError -) - +from kafka.errors import ( + UnknownTopicOrPartitionError, check_error, KafkaError) +from kafka.structs import ( + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload) from kafka.util import ReentrantTimer diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 7112c7e..2c9c0b9 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -6,12 +6,12 @@ import logging import six -import kafka.common as Errors -from kafka.common import TopicPartition +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.structs import TopicPartition log = logging.getLogger(__name__) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9172040..6c85c21 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -7,13 +7,13 @@ import time import six from kafka.client_async import KafkaClient -from kafka.common import TopicPartition from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.protocol.offset import OffsetResetStrategy +from kafka.structs import TopicPartition from kafka.version import __version__ log = logging.getLogger(__name__) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 3d170ae..1c045aa 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -6,8 +6,9 @@ import re import six -from kafka.common import IllegalStateError, OffsetAndMetadata +from kafka.errors import IllegalStateError from kafka.protocol.offset import OffsetResetStrategy +from kafka.structs import OffsetAndMetadata log = logging.getLogger(__name__) diff --git a/kafka/context.py b/kafka/context.py index 376fad1..d6c15fe 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -3,7 +3,8 @@ Context manager to commit/rollback consumer offsets. """ from logging import getLogger -from kafka.common import check_error, OffsetCommitRequestPayload, OffsetOutOfRangeError +from kafka.errors import check_error, OffsetOutOfRangeError +from kafka.structs import OffsetCommitRequestPayload class OffsetCommitContext(object): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b0a0981..fcf3901 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -6,7 +6,7 @@ import weakref import six -import kafka.common as Errors +import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import (GroupCoordinatorRequest, OffsetCommitRequest_v2 as OffsetCommitRequest) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index b2ef1ea..ae2344f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -12,14 +12,14 @@ from .base import BaseCoordinator from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol -from ..common import OffsetAndMetadata, TopicPartition +from .. import errors as Errors from ..future import Future from ..protocol.commit import ( OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod -import kafka.common as Errors log = logging.getLogger(__name__) diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index 4ddcf09..e73b3e5 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -1,7 +1,7 @@ import copy import time -import kafka.common as Errors +import kafka.errors as Errors class Heartbeat(object): diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 9e37397..56a3901 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -1,8 +1,8 @@ from __future__ import absolute_import -from kafka.common import TopicPartition from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String +from kafka.structs import TopicPartition class ConsumerProtocolMemberMetadata(Struct): diff --git a/kafka/future.py b/kafka/future.py index c7e0b14..b379272 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -1,7 +1,7 @@ import functools import logging -import kafka.common as Errors +import kafka.errors as Errors log = logging.getLogger(__name__) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2067c7e..07e61d5 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -14,13 +14,12 @@ from threading import Thread, Event import six -from kafka.common import ( - ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions, +from kafka.structs import ( + ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) +from kafka.errors import ( kafka_errors, UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError, AsyncProducerQueueFull, UnknownError, - RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES -) - + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set log = logging.getLogger('kafka.producer') diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index 8c83ffc..b2ac747 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -7,10 +7,10 @@ import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_encode, snappy_encode, lz4_encode) +from .. import errors as Errors from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message -import kafka.common as Errors class MessageSetBuffer(object): diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 5a7a9dc..35520d8 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -3,10 +3,9 @@ from __future__ import absolute_import import collections import threading +from .. import errors as Errors from ..future import Future -import kafka.common as Errors - class FutureProduceResult(Future): def __init__(self, topic_partition): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e1a0374..dd8e71f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -8,14 +8,14 @@ import threading import time from ..client_async import KafkaClient -from ..common import TopicPartition +from ..structs import TopicPartition from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet +from .. import errors as Errors from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator from .sender import Sender -import kafka.common as Errors log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 19dc199..b3abaa3 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -8,12 +8,12 @@ import time import six -from ..common import TopicPartition +from .. import errors as Errors +from ..structs import TopicPartition from ..protocol.message import Message, MessageSet from .buffer import MessageSetBuffer, SimpleBufferPool from .future import FutureRecordMetadata, FutureProduceResult -import kafka.common as Errors log = logging.getLogger(__name__) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 9a86a16..3cafb26 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -7,11 +7,11 @@ import threading import six -from ..common import TopicPartition +from .. import errors as Errors +from ..structs import TopicPartition from ..version import __version__ from ..protocol.produce import ProduceRequest -import kafka.common as Errors log = logging.getLogger(__name__) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 1835521..e4745f1 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -7,26 +7,21 @@ import six from six.moves import xrange -import kafka.common import kafka.protocol.commit import kafka.protocol.fetch import kafka.protocol.message import kafka.protocol.metadata import kafka.protocol.offset import kafka.protocol.produce +import kafka.structs from kafka.codec import ( - gzip_encode, gzip_decode, snappy_encode, snappy_decode -) -from kafka.common import ( - ProtocolError, ChecksumError, - UnsupportedCodecError, - ConsumerMetadataResponse -) + gzip_encode, gzip_decode, snappy_encode, snappy_decode) +from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError +from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, - write_short_string, write_int_string, group_by_topic_and_partition -) + write_short_string, write_int_string, group_by_topic_and_partition) log = logging.getLogger(__name__) @@ -166,7 +161,7 @@ class KafkaProtocol(object): Return: list of ProduceResponsePayload """ return [ - kafka.common.ProduceResponsePayload(topic, partition, error, offset) + kafka.structs.ProduceResponsePayload(topic, partition, error, offset) for topic, partitions in response.topics for partition, error, offset in partitions ] @@ -207,9 +202,9 @@ class KafkaProtocol(object): response: FetchResponse """ return [ - kafka.common.FetchResponsePayload( + kafka.structs.FetchResponsePayload( topic, partition, error, highwater_offset, [ - kafka.common.OffsetAndMessage(offset, message) + kafka.structs.OffsetAndMessage(offset, message) for offset, _, message in messages]) for topic, partitions in response.topics for partition, error, highwater_offset, messages in partitions @@ -239,7 +234,7 @@ class KafkaProtocol(object): Returns: list of OffsetResponsePayloads """ return [ - kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets)) + kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets)) for topic, partitions in response.topics for partition, error, offsets in partitions ] @@ -323,7 +318,7 @@ class KafkaProtocol(object): response: OffsetCommitResponse """ return [ - kafka.common.OffsetCommitResponsePayload(topic, partition, error) + kafka.structs.OffsetCommitResponsePayload(topic, partition, error) for topic, partitions in response.topics for partition, error in partitions ] @@ -362,7 +357,7 @@ class KafkaProtocol(object): response: OffsetFetchResponse """ return [ - kafka.common.OffsetFetchResponsePayload( + kafka.structs.OffsetFetchResponsePayload( topic, partition, offset, metadata, error ) for topic, partitions in response.topics @@ -379,7 +374,7 @@ def create_message(payload, key=None): key: bytes, a key used for partition routing (optional) """ - return kafka.common.Message(0, 0, key, payload) + return kafka.structs.Message(0, 0, key, payload) def create_gzip_message(payloads, key=None, compresslevel=None): @@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None): gzipped = gzip_encode(message_set, compresslevel=compresslevel) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP - return kafka.common.Message(0, 0x00 | codec, key, gzipped) + return kafka.structs.Message(0, 0x00 | codec, key, gzipped) def create_snappy_message(payloads, key=None): @@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None): snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY - return kafka.common.Message(0, 0x00 | codec, key, snapped) + return kafka.structs.Message(0, 0x00 | codec, key, snapped) def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): diff --git a/kafka/util.py b/kafka/util.py index 7a11910..18c39a4 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -7,7 +7,7 @@ import weakref import six -from kafka.common import BufferUnderflowError +from kafka.errors import BufferUnderflowError def crc32(data): |