diff options
Diffstat (limited to 'kafka/producer.py')
-rw-r--r-- | kafka/producer.py | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 5aead43..6ed22ee 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,17 +1,16 @@ from __future__ import absolute_import +import logging +import time + +from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from Queue import Empty -import logging -import sys -import time from kafka.common import ProduceRequest -from kafka.common import FailedPayloadsException -from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner +from kafka.protocol import create_message log = logging.getLogger("kafka") @@ -188,7 +187,7 @@ class SimpleProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) self.next_partition = cycle(client.topic_partitions[topic]) super(SimpleProducer, self).__init__(client, async, req_acks, @@ -225,7 +224,7 @@ class KeyedProducer(Producer): batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) if not partitioner: partitioner = HashedPartitioner |