From 58ff90ab8f48cd6ff3301811343a4b3bab569955 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 21 Jan 2009 14:19:20 +0000 Subject: QPID-1605: added an assertion to catch acknowledgments of message-ids outside the range permitted on a session; added code to pause failover until messages from old sessions have been cleared out of the dispatcher queue git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736316 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 5 + .../apache/qpid/client/AMQConnectionDelegate.java | 2 + .../qpid/client/AMQConnectionDelegate_0_10.java | 13 ++- .../qpid/client/AMQConnectionDelegate_8_0.java | 5 + .../java/org/apache/qpid/client/AMQSession.java | 129 ++++++++++++++------- .../org/apache/qpid/client/AMQSession_0_10.java | 6 + .../qpid/client/message/UnprocessedMessage.java | 7 +- 7 files changed, 122 insertions(+), 45 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4e8fdc2370..0aaeafc442 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1198,6 +1198,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } + public void failoverPrep() + { + _delegate.failoverPrep(); + } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index b64147fe8f..5a4abcc9bb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -41,6 +41,8 @@ public interface AMQConnectionDelegate XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + void failoverPrep(); + void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 30ea4dcf8d..a2e5ac9800 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -162,9 +162,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return null; } - /** - * Not supported at this level. - */ + public void failoverPrep() + { + List sessions = new ArrayList(_conn.getSessions().values()); + for (AMQSession s : sessions) + { + s.failoverPrep(); + } + } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { List sessions = new ArrayList(_conn.getSessions().values()); @@ -218,6 +224,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { + _conn.failoverPrep(); _qpidConnection.resume(); if (_conn.firePreResubscribe()) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 035e3830ca..806e4d67bc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -217,6 +217,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } + public void failoverPrep() + { + // do nothing + } + /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index af0ed3faa3..733bee2d81 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1811,6 +1812,26 @@ public abstract class AMQSession