From 8ead4c97b75e508a877e8d446a5bef096e606d84 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 9 Feb 2011 22:52:14 +0000 Subject: QPID-3043 Added the extra state 'RESUMING', to ensure that any new session creation is delayed until the connection is able to reattach all existing sessions. If a connection is reconnecting then it will first go to RESUMING state (instead of OPEN) in connection-open-ok. Once the 'resume' method in Connection.java is completed the state will be set to OPEN. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1069159 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/transport/ClientDelegate.java | 10 +++++++++- .../main/java/org/apache/qpid/transport/Connection.java | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'java') diff --git a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index bce64075e5..0d9f8c0b28 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -28,6 +28,7 @@ import org.ietf.jgss.Oid; import org.apache.qpid.security.UsernamePasswordCallbackHandler; import static org.apache.qpid.transport.Connection.State.OPEN; +import static org.apache.qpid.transport.Connection.State.RESUMING; import org.apache.qpid.transport.util.Logger; import javax.security.sasl.Sasl; @@ -216,7 +217,14 @@ public class ClientDelegate extends ConnectionDelegate } } - conn.setState(OPEN); + if (conn.isConnectionResuming()) + { + conn.setState(RESUMING); + } + else + { + conn.setState(OPEN); + } } @Override diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 0f9f1445c8..929e5a56b5 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -25,6 +25,7 @@ import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; +import static org.apache.qpid.transport.Connection.State.RESUMING; import java.util.ArrayList; import java.util.Collections; @@ -32,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; @@ -63,7 +65,7 @@ public class Connection extends ConnectionInvoker public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; - public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } static class DefaultConnectionListener implements ConnectionListener { @@ -119,7 +121,8 @@ public class Connection extends ConnectionInvoker private static final AtomicLong idGenerator = new AtomicLong(0); private final long _connectionId = idGenerator.incrementAndGet(); - + private static final AtomicBoolean connectionLost = new AtomicBoolean(false); + public Connection() {} public void setConnectionDelegate(ConnectionDelegate delegate) @@ -270,6 +273,8 @@ public class Connection extends ConnectionInvoker close(); throw new ConnectionException("connect() timed out"); case OPEN: + case RESUMING: + connectionLost.set(false); break; case CLOSED: throw new ConnectionException("connect() aborted"); @@ -475,11 +480,13 @@ public class Connection extends ConnectionInvoker ssn.resume(); } } + setState(OPEN); } } public void exception(ConnectionException e) { + connectionLost.set(true); synchronized (lock) { switch (state) @@ -682,5 +689,10 @@ public class Connection extends ConnectionInvoker { return securityLayer; } + + public boolean isConnectionResuming() + { + return connectionLost.get(); + } } -- cgit v1.2.1