summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/consumer.py12
-rw-r--r--kafka/producer.py10
3 files changed, 12 insertions, 12 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index a1b0a80..a577eba 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -1,8 +1,8 @@
import copy
import logging
+from random import shuffle
import socket
import struct
-from random import shuffle
from threading import local
from kafka.common import ConnectionError
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 0935dd2..928bbac 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -8,12 +8,12 @@ from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
-import kafka
+import kafka.common
from kafka.common import (
- FetchRequest,
- OffsetRequest, OffsetCommitRequest,
- OffsetFetchRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+ FetchRequest, OffsetRequest,
+ OffsetCommitRequest, OffsetFetchRequest,
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError
)
from kafka.util import ReentrantTimer
@@ -114,7 +114,7 @@ class Consumer(object):
try:
kafka.common.check_error(resp)
return resp.offset
- except kafka.common.UnknownTopicOrPartitionError:
+ except UnknownTopicOrPartitionError:
return 0
for partition in partitions:
diff --git a/kafka/producer.py b/kafka/producer.py
index 8a6bff0..b28a424 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -156,11 +156,11 @@ class Producer(object):
Helper method to send produce requests
@param: topic, name of topic for produce request -- type str
@param: partition, partition number for produce request -- type int
- @param: *msg, one or more message payloads -- type str
+ @param: *msg, one or more message payloads -- type bytes
@returns: ResponseRequest returned by server
raises on error
- Note that msg type *must* be encoded to str by user.
+ Note that msg type *must* be encoded to bytes by user.
Passing unicode message will not work, for example
you should encode before calling send_messages via
something like `unicode_message.encode('utf-8')`
@@ -172,9 +172,9 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
- # Raise TypeError if any message is not encoded as a str
- if any(not isinstance(m, str) for m in msg):
- raise TypeError("all produce message payloads must be type str")
+ # Raise TypeError if any message is not encoded as bytes
+ if any(not isinstance(m, bytes) for m in msg):
+ raise TypeError("all produce message payloads must be type bytes")
if self.async:
for m in msg: