summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-05-06 21:24:15 -0700
committerMark Roberts <wizzat@gmail.com>2014-05-06 21:24:15 -0700
commitefcf58b84214aeda6cf79319f182407cde7833a6 (patch)
tree94cbb3cc886432dc77bf09f7101982d871db0365 /kafka
parent99320fbd8c33f3b831557c507deeaf5e650ab813 (diff)
downloadkafka-python-efcf58b84214aeda6cf79319f182407cde7833a6.tar.gz
Attempt to fix travis build. Decrease complexity of service.py in favor of in memory logging. Address code review concerns
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/common.py10
-rw-r--r--kafka/consumer.py4
-rw-r--r--kafka/util.py4
4 files changed, 12 insertions, 8 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 4870ab9..d0e07d0 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -6,8 +6,6 @@ import kafka.common
from functools import partial
from itertools import count
-from kafka.common import *
-
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
diff --git a/kafka/common.py b/kafka/common.py
index d288b89..d515532 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -121,11 +121,16 @@ class StaleControllerEpochError(BrokerResponseError):
message = 'STALE_CONTROLLER_EPOCH'
-class OffsetMetadataTooLarge(BrokerResponseError):
+class OffsetMetadataTooLargeError(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
+class StaleLeaderEpochCodeError(BrokerResponseError):
+ errno = 13
+ message = 'STALE_LEADER_EPOCH_CODE'
+
+
class KafkaUnavailableError(KafkaError):
pass
@@ -178,7 +183,8 @@ kafka_errors = {
9 : ReplicaNotAvailableError,
10 : MessageSizeTooLargeError,
11 : StaleControllerEpochError,
- 12 : OffsetMetadataTooLarge,
+ 12 : OffsetMetadataTooLargeError,
+ 13 : StaleLeaderEpochCodeError,
}
def check_error(response):
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 085f5e8..ef8fbda 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -430,12 +430,12 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall as e:
+ except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
- raise e
+ raise
if self.max_buffer_size is None:
self.buffer_size *= 2
else:
diff --git a/kafka/util.py b/kafka/util.py
index 0577a88..a918234 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,6 +1,6 @@
-import sys
-import struct
import collections
+import struct
+import sys
from threading import Thread, Event
from kafka.common import BufferUnderflowError