summaryrefslogtreecommitdiff
path: root/kafka/producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer.py')
-rw-r--r--kafka/producer.py15
1 files changed, 7 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 5aead43..6ed22ee 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,17 +1,16 @@
from __future__ import absolute_import
+import logging
+import time
+
+from Queue import Empty
from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from Queue import Empty
-import logging
-import sys
-import time
from kafka.common import ProduceRequest
-from kafka.common import FailedPayloadsException
-from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
+from kafka.protocol import create_message
log = logging.getLogger("kafka")
@@ -188,7 +187,7 @@ class SimpleProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- client._load_metadata_for_topics(topic)
+ client.load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])
super(SimpleProducer, self).__init__(client, async, req_acks,
@@ -225,7 +224,7 @@ class KeyedProducer(Producer):
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- client._load_metadata_for_topics(topic)
+ client.load_metadata_for_topics(topic)
if not partitioner:
partitioner = HashedPartitioner