diff options
author | David Arthur <mumrah@gmail.com> | 2013-12-28 12:01:11 -0800 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-12-28 12:01:11 -0800 |
commit | 8f076df94c0d06d67d4ab39c991d8f83995431b5 (patch) | |
tree | 4a6074cf8aa3ddd003355106d12947edbd185ce3 | |
parent | bec4dd3c162ed04b2888fff75ade60880ef5a8eb (diff) | |
parent | cab017a9aa92fa63550eb72e1571508b431ce791 (diff) | |
download | kafka-python-8f076df94c0d06d67d4ab39c991d8f83995431b5.tar.gz |
Merge pull request #84 from nieksand/simpler_timeouts
Replace _send_upstream datetime logic with simpler time().
-rw-r--r-- | kafka/producer.py | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/producer.py b/kafka/producer.py index 7ef7896..1d47336 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,10 +1,10 @@ from collections import defaultdict -from datetime import datetime, timedelta from itertools import cycle from multiprocessing import Queue, Process from Queue import Empty import logging import sys +import time from kafka.common import ProduceRequest from kafka.common import FailedPayloadsException @@ -36,7 +36,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, while not stop: timeout = batch_time count = batch_size - send_at = datetime.now() + timedelta(seconds=timeout) + send_at = time.time() + timeout msgset = defaultdict(list) # Keep fetching till we gather enough messages or a @@ -54,7 +54,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, # Adjust the timeout to match the remaining period count -= 1 - timeout = (send_at - datetime.now()).total_seconds() + timeout = send_at - time.time() msgset[partition].append(msg) # Send collected requests upstream |