summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 09:34:48 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 09:35:45 -0700
commit5a14bd8c947251d1a8f848175cc3cf2b07af3411 (patch)
treea251ddbc60c84405762365429de9b04727653e6c /kafka/protocol/legacy.py
parent221f56d8a05cdc2d37f85018e4af352b4b2a95c5 (diff)
downloadkafka-python-5a14bd8c947251d1a8f848175cc3cf2b07af3411.tar.gz
Update imports from kafka.common -> kafka.errors / kafka.structs
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py33
1 files changed, 14 insertions, 19 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 1835521..e4745f1 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -7,26 +7,21 @@ import six
from six.moves import xrange
-import kafka.common
import kafka.protocol.commit
import kafka.protocol.fetch
import kafka.protocol.message
import kafka.protocol.metadata
import kafka.protocol.offset
import kafka.protocol.produce
+import kafka.structs
from kafka.codec import (
- gzip_encode, gzip_decode, snappy_encode, snappy_decode
-)
-from kafka.common import (
- ProtocolError, ChecksumError,
- UnsupportedCodecError,
- ConsumerMetadataResponse
-)
+ gzip_encode, gzip_decode, snappy_encode, snappy_decode)
+from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError
+from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
- write_short_string, write_int_string, group_by_topic_and_partition
-)
+ write_short_string, write_int_string, group_by_topic_and_partition)
log = logging.getLogger(__name__)
@@ -166,7 +161,7 @@ class KafkaProtocol(object):
Return: list of ProduceResponsePayload
"""
return [
- kafka.common.ProduceResponsePayload(topic, partition, error, offset)
+ kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
for topic, partitions in response.topics
for partition, error, offset in partitions
]
@@ -207,9 +202,9 @@ class KafkaProtocol(object):
response: FetchResponse
"""
return [
- kafka.common.FetchResponsePayload(
+ kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
- kafka.common.OffsetAndMessage(offset, message)
+ kafka.structs.OffsetAndMessage(offset, message)
for offset, _, message in messages])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
@@ -239,7 +234,7 @@ class KafkaProtocol(object):
Returns: list of OffsetResponsePayloads
"""
return [
- kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
+ kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
for topic, partitions in response.topics
for partition, error, offsets in partitions
]
@@ -323,7 +318,7 @@ class KafkaProtocol(object):
response: OffsetCommitResponse
"""
return [
- kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
for topic, partitions in response.topics
for partition, error in partitions
]
@@ -362,7 +357,7 @@ class KafkaProtocol(object):
response: OffsetFetchResponse
"""
return [
- kafka.common.OffsetFetchResponsePayload(
+ kafka.structs.OffsetFetchResponsePayload(
topic, partition, offset, metadata, error
)
for topic, partitions in response.topics
@@ -379,7 +374,7 @@ def create_message(payload, key=None):
key: bytes, a key used for partition routing (optional)
"""
- return kafka.common.Message(0, 0, key, payload)
+ return kafka.structs.Message(0, 0, key, payload)
def create_gzip_message(payloads, key=None, compresslevel=None):
@@ -400,7 +395,7 @@ def create_gzip_message(payloads, key=None, compresslevel=None):
gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
- return kafka.common.Message(0, 0x00 | codec, key, gzipped)
+ return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
def create_snappy_message(payloads, key=None):
@@ -421,7 +416,7 @@ def create_snappy_message(payloads, key=None):
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
- return kafka.common.Message(0, 0x00 | codec, key, snapped)
+ return kafka.structs.Message(0, 0x00 | codec, key, snapped)
def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):