summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-12-28 12:01:11 -0800
committerDavid Arthur <mumrah@gmail.com>2013-12-28 12:01:11 -0800
commit8f076df94c0d06d67d4ab39c991d8f83995431b5 (patch)
tree4a6074cf8aa3ddd003355106d12947edbd185ce3
parentbec4dd3c162ed04b2888fff75ade60880ef5a8eb (diff)
parentcab017a9aa92fa63550eb72e1571508b431ce791 (diff)
downloadkafka-python-8f076df94c0d06d67d4ab39c991d8f83995431b5.tar.gz
Merge pull request #84 from nieksand/simpler_timeouts
Replace _send_upstream datetime logic with simpler time().
-rw-r--r--kafka/producer.py6
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 7ef7896..1d47336 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,10 +1,10 @@
from collections import defaultdict
-from datetime import datetime, timedelta
from itertools import cycle
from multiprocessing import Queue, Process
from Queue import Empty
import logging
import sys
+import time
from kafka.common import ProduceRequest
from kafka.common import FailedPayloadsException
@@ -36,7 +36,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
while not stop:
timeout = batch_time
count = batch_size
- send_at = datetime.now() + timedelta(seconds=timeout)
+ send_at = time.time() + timeout
msgset = defaultdict(list)
# Keep fetching till we gather enough messages or a
@@ -54,7 +54,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
# Adjust the timeout to match the remaining period
count -= 1
- timeout = (send_at - datetime.now()).total_seconds()
+ timeout = send_at - time.time()
msgset[partition].append(msg)
# Send collected requests upstream