diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 09:34:48 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 09:35:45 -0700 |
commit | 5a14bd8c947251d1a8f848175cc3cf2b07af3411 (patch) | |
tree | a251ddbc60c84405762365429de9b04727653e6c /kafka/protocol/legacy.py | |
parent | 221f56d8a05cdc2d37f85018e4af352b4b2a95c5 (diff) | |
download | kafka-python-5a14bd8c947251d1a8f848175cc3cf2b07af3411.tar.gz |
Update imports from kafka.common -> kafka.errors / kafka.structs
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r-- | kafka/protocol/legacy.py | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index 1835521..e4745f1 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -7,26 +7,21 @@ import six from six.moves import xrange -import kafka.common import kafka.protocol.commit import kafka.protocol.fetch import kafka.protocol.message import kafka.protocol.metadata import kafka.protocol.offset import kafka.protocol.produce +import kafka.structs from kafka.codec import ( - gzip_encode, gzip_decode, snappy_encode, snappy_decode -) -from kafka.common import ( - ProtocolError, ChecksumError, - UnsupportedCodecError, - ConsumerMetadataResponse -) + gzip_encode, gzip_decode, snappy_encode, snappy_decode) +from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError +from kafka.structs import ConsumerMetadataResponse from kafka.util import ( crc32, read_short_string, read_int_string, relative_unpack, - write_short_string, write_int_string, group_by_topic_and_partition -) + write_short_string, write_int_string, group_by_topic_and_partition) log = logging.getLogger(__name__) @@ -166,7 +161,7 @@ class KafkaProtocol(object): Return: list of ProduceResponsePayload """ return [ - kafka.common.ProduceResponsePayload(topic, partition, error, offset) + kafka.structs.ProduceResponsePayload(topic, partition, error, offset) for topic, partitions in response.topics for partition, error, offset in partitions ] @@ -207,9 +202,9 @@ class KafkaProtocol(object): response: FetchResponse """ return [ - kafka.common.FetchResponsePayload( + kafka.structs.FetchResponsePayload( topic, partition, error, highwater_offset, [ - kafka.common.OffsetAndMessage(offset, message) + kafka.structs.OffsetAndMessage(offset, message) for offset, _, message in messages]) for topic, partitions in response.topics for partition, error, highwater_offset, messages in partitions @@ -239,7 +234,7 @@ class KafkaProtocol(object): Returns: list of OffsetResponsePayloads """ return [ - kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets)) + kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets)) for topic, partitions in response.topics for partition, error, offsets in partitions ] @@ -323,7 +318,7 @@ class KafkaProtocol(object): response: OffsetCommitResponse """ return [ - kafka.common.OffsetCommitResponsePayload(topic, partition, error) + kafka.structs.OffsetCommitResponsePayload(topic, partition, error) for topic, partitions in response.topics for partition, error in partitions ] @@ -362,7 +357,7 @@ class KafkaProtocol(object): response: OffsetFetchResponse """ return [ - kafka.common.OffsetFetchResponsePayload( + kafka.structs.OffsetFetchResponsePayload( topic, partition, offset, metadata, error ) for topic, partitions in response.topics @@ -379,7 +374,7 @@ def create_message(payload, key=None): key: bytes, a key used for partition routing (optional) """ - return kafka.common.Message(0, 0, key, payload) + return kafka.structs.Message(0, 0, key, payload) def create_gzip_message(payloads, key=None, compresslevel=None): @@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None): gzipped = gzip_encode(message_set, compresslevel=compresslevel) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP - return kafka.common.Message(0, 0x00 | codec, key, gzipped) + return kafka.structs.Message(0, 0x00 | codec, key, gzipped) def create_snappy_message(payloads, key=None): @@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None): snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY - return kafka.common.Message(0, 0x00 | codec, key, snapped) + return kafka.structs.Message(0, 0x00 | codec, key, snapped) def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): |