diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-04-01 20:39:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-04-01 20:39:31 +0000 |
| commit | a807cbe0a5b732225f8a2f3a9a4357f0fe5a3ddd (patch) | |
| tree | 9bdf176facc416f683295850fe0faac6bda6aeaf /qpid/python | |
| parent | 4d3714d33c6e779f26d27b18eaaae53c69f7de87 (diff) | |
| download | qpid-python-a807cbe0a5b732225f8a2f3a9a4357f0fe5a3ddd.tar.gz | |
updated reconnect option names to match C++ API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@930084 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rwxr-xr-x | qpid/python/examples/api/drain | 6 | ||||
| -rwxr-xr-x | qpid/python/examples/api/server | 6 | ||||
| -rwxr-xr-x | qpid/python/examples/api/spout | 8 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/driver.py | 6 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging/endpoints.py | 10 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging/endpoints.py | 75 |
6 files changed, 99 insertions, 12 deletions
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain index d7ac03afa6..372426c801 100755 --- a/qpid/python/examples/api/drain +++ b/qpid/python/examples/api/drain @@ -33,8 +33,8 @@ parser.add_option("-f", "--forever", action="store_true", help="ignore timeout and wait forever") parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect") -parser.add_option("-d", "--reconnect-delay", type=float, default=3, - help="delay between reconnect attempts") +parser.add_option("-i", "--reconnect-interval", type=float, default=3, + help="interval between reconnect attempts") parser.add_option("-l", "--reconnect-limit", type=int, help="maximum number of reconnect attempts") parser.add_option("-t", "--timeout", type=float, default=0, @@ -77,7 +77,7 @@ conn = Connection(url.host, url.port, username=url.user, password=url.password, reconnect=opts.reconnect, - reconnect_delay=opts.reconnect_delay, + reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit) try: conn.connect() diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server index f0bf1c2a4b..3103dd795a 100755 --- a/qpid/python/examples/api/server +++ b/qpid/python/examples/api/server @@ -30,8 +30,8 @@ parser.add_option("-b", "--broker", default="localhost", help="connect to specified BROKER (default %default)") parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect") -parser.add_option("-d", "--reconnect-delay", type=float, default=3, - help="delay between reconnect attempts") +parser.add_option("-i", "--reconnect-interval", type=float, default=3, + help="interval between reconnect attempts") parser.add_option("-l", "--reconnect-limit", type=int, help="maximum number of reconnect attempts") parser.add_option("-v", dest="verbose", action="store_true", @@ -55,7 +55,7 @@ conn = Connection(url.host, url.port, username=url.user, password=url.password, reconnect=opts.reconnect, - reconnect_delay=opts.reconnect_delay, + reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit) def dispatch(msg): msg_type = msg.properties.get("type") diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout index 0d37ede512..9e5759b2dd 100755 --- a/qpid/python/examples/api/spout +++ b/qpid/python/examples/api/spout @@ -39,15 +39,15 @@ parser.add_option("-b", "--broker", default="localhost", help="connect to specified BROKER (default %default)") parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect") -parser.add_option("-d", "--reconnect-delay", type=float, default=3, - help="delay between reconnect attempts") +parser.add_option("-i", "--reconnect-interval", type=float, default=3, + help="interval between reconnect attempts") parser.add_option("-l", "--reconnect-limit", type=int, help="maximum number of reconnect attempts") parser.add_option("-c", "--count", type=int, default=1, help="stop after count messages have been sent, zero disables (default %default)") parser.add_option("-t", "--timeout", type=float, default=None, help="exit after the specified time") -parser.add_option("-i", "--id", help="use the supplied id instead of generating one") +parser.add_option("-I", "--id", help="use the supplied id instead of generating one") parser.add_option("-S", "--subject", help="specify a subject") parser.add_option("-R", "--reply-to", help="specify reply-to address") parser.add_option("-P", "--property", dest="properties", action="append", default=[], @@ -97,7 +97,7 @@ conn = Connection(url.host, url.port, username=url.user, password=url.password, reconnect=opts.reconnect, - reconnect_delay=opts.reconnect_delay, + reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit) try: conn.connect() diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index cd86542860..e9058d8154 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -323,6 +323,7 @@ class Driver: self._selector = Selector.default() self._attempts = 0 + self._delay = self.connection.reconnect_interval_min self._hosts = [(self.connection.host, self.connection.port)] + \ self.connection.backups self._host = 0 @@ -390,7 +391,9 @@ class Driver: if self._host > 0: delay = 0 else: - delay = self.connection.reconnect_delay + delay = self._delay + self._delay = min(2*self._delay, + self.connection.reconnect_interval_max) self._timeout = time.time() + delay log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) if delay > 0: @@ -951,6 +954,7 @@ class Engine: # XXX: we're ignoring acks that get lost when disconnected, # could we deal this via some message-id based purge? if m._transfer_id is None: + ssn.acked.remove(m) continue ids.add(m._transfer_id) disp = m._disposition or DEFAULT_DISPOSITION diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index c3f76e4adc..fd5dc35a42 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -84,7 +84,15 @@ class Connection: self.mechanisms = options.get("mechanisms") self.heartbeat = options.get("heartbeat") self.reconnect = options.get("reconnect", False) - self.reconnect_delay = options.get("reconnect_delay", 3) + self.reconnect_timeout = options.get("reconnect_timeout") + if "reconnect_interval_min" in options: + self.reconnect_interval_min = options["reconnect_interval_min"] + else: + self.reconnect_interval_min = options.get("reconnect_interval", 1) + if "reconnect_interval_max" in options: + self.reconnect_interval_max = options["reconnect_interval_max"] + else: + self.reconnect_interval_max = options.get("reconnect_interval", 2*60) self.reconnect_limit = options.get("reconnect_limit") self.transport = options.get("transport", "plain") self.backups = options.get("backups", []) diff --git a/qpid/python/qpid/tests/messaging/endpoints.py b/qpid/python/qpid/tests/messaging/endpoints.py index aced76feac..a849957374 100644 --- a/qpid/python/qpid/tests/messaging/endpoints.py +++ b/qpid/python/qpid/tests/messaging/endpoints.py @@ -72,6 +72,81 @@ class SetupTests(Base): while fds: os.close(fds.pop()) + def testReconnect(self): + options = self.connection_options() + import socket + from qpid.messaging import transports + transport = options.get("transport", "plain") + real = getattr(transports, transport) + + class flaky: + + def __init__(self, host, port): + self.real = real(host, port) + self.sent_count = 0 + self.recv_count = 0 + + def fileno(self): + return self.real.fileno() + + def reading(self, reading): + return self.real.reading(reading) + + def writing(self, writing): + return self.real.writing(writing) + + def send(self, bytes): + if self.sent_count > 1024: + raise socket.error("fake error") + n = self.real.send(bytes) + self.sent_count += n + return n + + def recv(self, n): + if self.recv_count > 1024: + return "" + bytes = self.real.recv(n) + self.recv_count += len(bytes) + return bytes + + def close(self): + self.real.close() + + transports.flaky = flaky + + options["reconnect"] = True + options["reconnect_interval"] = 0 + options["transport"] = "flaky" + + self.conn = Connection.open(self.broker.host, self.broker.port, **options) + ssn = self.conn.session() + snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") + rcv = ssn.receiver(snd.target) + + msgs = [self.message("testReconnect", i) for i in range(10)] + for m in msgs: + snd.send(m) + + content = set() + drained = [] + duplicates = [] + try: + while True: + m = rcv.fetch(timeout=0) + if m.content not in content: + content.add(m.content) + drained.append(m) + else: + duplicates.append(m) + ssn.acknowledge(m) + except Empty: + pass + assert duplicates, "no duplicates" + redelivered = 3*[False] + 3*[True, False] + [True] + assert len(drained) == len(msgs) == len(redelivered) + for m, d, r in zip(msgs, drained, redelivered): + self.assertEcho(m, d, r) + class ConnectionTests(Base): def setup_connection(self): |
