diff options
Diffstat (limited to 'python/qpid')
| -rw-r--r-- | python/qpid/messaging/driver.py | 6 | ||||
| -rw-r--r-- | python/qpid/messaging/endpoints.py | 10 | ||||
| -rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 75 |
3 files changed, 89 insertions, 2 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index cd86542860..e9058d8154 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -323,6 +323,7 @@ class Driver: self._selector = Selector.default() self._attempts = 0 + self._delay = self.connection.reconnect_interval_min self._hosts = [(self.connection.host, self.connection.port)] + \ self.connection.backups self._host = 0 @@ -390,7 +391,9 @@ class Driver: if self._host > 0: delay = 0 else: - delay = self.connection.reconnect_delay + delay = self._delay + self._delay = min(2*self._delay, + self.connection.reconnect_interval_max) self._timeout = time.time() + delay log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) if delay > 0: @@ -951,6 +954,7 @@ class Engine: # XXX: we're ignoring acks that get lost when disconnected, # could we deal this via some message-id based purge? if m._transfer_id is None: + ssn.acked.remove(m) continue ids.add(m._transfer_id) disp = m._disposition or DEFAULT_DISPOSITION diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index c3f76e4adc..fd5dc35a42 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -84,7 +84,15 @@ class Connection: self.mechanisms = options.get("mechanisms") self.heartbeat = options.get("heartbeat") self.reconnect = options.get("reconnect", False) - self.reconnect_delay = options.get("reconnect_delay", 3) + self.reconnect_timeout = options.get("reconnect_timeout") + if "reconnect_interval_min" in options: + self.reconnect_interval_min = options["reconnect_interval_min"] + else: + self.reconnect_interval_min = options.get("reconnect_interval", 1) + if "reconnect_interval_max" in options: + self.reconnect_interval_max = options["reconnect_interval_max"] + else: + self.reconnect_interval_max = options.get("reconnect_interval", 2*60) self.reconnect_limit = options.get("reconnect_limit") self.transport = options.get("transport", "plain") self.backups = options.get("backups", []) diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index aced76feac..a849957374 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -72,6 +72,81 @@ class SetupTests(Base): while fds: os.close(fds.pop()) + def testReconnect(self): + options = self.connection_options() + import socket + from qpid.messaging import transports + transport = options.get("transport", "plain") + real = getattr(transports, transport) + + class flaky: + + def __init__(self, host, port): + self.real = real(host, port) + self.sent_count = 0 + self.recv_count = 0 + + def fileno(self): + return self.real.fileno() + + def reading(self, reading): + return self.real.reading(reading) + + def writing(self, writing): + return self.real.writing(writing) + + def send(self, bytes): + if self.sent_count > 1024: + raise socket.error("fake error") + n = self.real.send(bytes) + self.sent_count += n + return n + + def recv(self, n): + if self.recv_count > 1024: + return "" + bytes = self.real.recv(n) + self.recv_count += len(bytes) + return bytes + + def close(self): + self.real.close() + + transports.flaky = flaky + + options["reconnect"] = True + options["reconnect_interval"] = 0 + options["transport"] = "flaky" + + self.conn = Connection.open(self.broker.host, self.broker.port, **options) + ssn = self.conn.session() + snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") + rcv = ssn.receiver(snd.target) + + msgs = [self.message("testReconnect", i) for i in range(10)] + for m in msgs: + snd.send(m) + + content = set() + drained = [] + duplicates = [] + try: + while True: + m = rcv.fetch(timeout=0) + if m.content not in content: + content.add(m.content) + drained.append(m) + else: + duplicates.append(m) + ssn.acknowledge(m) + except Empty: + pass + assert duplicates, "no duplicates" + redelivered = 3*[False] + 3*[True, False] + [True] + assert len(drained) == len(msgs) == len(redelivered) + for m, d, r in zip(msgs, drained, redelivered): + self.assertEcho(m, d, r) + class ConnectionTests(Base): def setup_connection(self): |
