diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-09-07 12:17:54 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-09-07 12:17:54 -0700 |
commit | a99384f4c601d127ab1c4fe5b272ea5c07fd695d (patch) | |
tree | d559e3c3f650dab1ce9247aa7a89f41bdd410e46 /kafka | |
parent | 9856cc36d7742922133af0aa53767c8ed4731957 (diff) | |
parent | 1b282d21522d101f4129d5fc3e70e2b904d3b171 (diff) | |
download | kafka-python-a99384f4c601d127ab1c4fe5b272ea5c07fd695d.tar.gz |
Merge pull request #221 from dpkp/minor_cleanups
Minor cleanups
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 2 | ||||
-rw-r--r-- | kafka/consumer.py | 12 | ||||
-rw-r--r-- | kafka/producer.py | 10 |
3 files changed, 12 insertions, 12 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index a1b0a80..a577eba 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,8 +1,8 @@ import copy import logging +from random import shuffle import socket import struct -from random import shuffle from threading import local from kafka.common import ConnectionError diff --git a/kafka/consumer.py b/kafka/consumer.py index 0935dd2..928bbac 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -8,12 +8,12 @@ from threading import Lock from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue -import kafka +import kafka.common from kafka.common import ( - FetchRequest, - OffsetRequest, OffsetCommitRequest, - OffsetFetchRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + FetchRequest, OffsetRequest, + OffsetCommitRequest, OffsetFetchRequest, + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError ) from kafka.util import ReentrantTimer @@ -114,7 +114,7 @@ class Consumer(object): try: kafka.common.check_error(resp) return resp.offset - except kafka.common.UnknownTopicOrPartitionError: + except UnknownTopicOrPartitionError: return 0 for partition in partitions: diff --git a/kafka/producer.py b/kafka/producer.py index 8a6bff0..b28a424 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -156,11 +156,11 @@ class Producer(object): Helper method to send produce requests @param: topic, name of topic for produce request -- type str @param: partition, partition number for produce request -- type int - @param: *msg, one or more message payloads -- type str + @param: *msg, one or more message payloads -- type bytes @returns: ResponseRequest returned by server raises on error - Note that msg type *must* be encoded to str by user. + Note that msg type *must* be encoded to bytes by user. Passing unicode message will not work, for example you should encode before calling send_messages via something like `unicode_message.encode('utf-8')` @@ -172,9 +172,9 @@ class Producer(object): if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") - # Raise TypeError if any message is not encoded as a str - if any(not isinstance(m, str) for m in msg): - raise TypeError("all produce message payloads must be type str") + # Raise TypeError if any message is not encoded as bytes + if any(not isinstance(m, bytes) for m in msg): + raise TypeError("all produce message payloads must be type bytes") if self.async: for m in msg: |