diff options
Diffstat (limited to 'qpid/java/common/src')
4 files changed, 76 insertions, 18 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 5fa14cf103..36f9a1ae2d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -23,7 +23,7 @@ package org.apache.qpid.configuration; */ public class ClientProperties { - + /** * Currently with Qpid it is not possible to change the client ID. * If one is not specified upon connection construction, an id is generated automatically. @@ -68,38 +68,40 @@ public class ClientProperties * by the broker in TuneOK it will be used as the heartbeat interval. * If not a warning will be printed and the max value specified for * heartbeat in TuneOK will be used - * + * * The default idle timeout is set to 120 secs */ public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout"; public static final long DEFAULT_IDLE_TIMEOUT = 120000; - + public static final String HEARTBEAT = "qpid.heartbeat"; public static final int HEARTBEAT_DEFAULT = 120; - + /** * This value will be used to determine the default destination syntax type. * Currently the two types are Binding URL (java only) and the Addressing format (used by - * all clients). + * all clients). */ public static final String DEST_SYNTAX = "qpid.dest_syntax"; - + public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; public static final String AMQP_VERSION = "qpid.amqp.version"; - + + public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id"; + private static ClientProperties _instance = new ClientProperties(); - + /* - public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = + public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); - + public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME = QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence"); - - + + public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME = QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */ - - + + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 88dd2d6afa..f183c1e241 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -95,6 +95,7 @@ public abstract class ConnectionDelegate Session ssn = conn.getSession(dtc.getChannel()); if (ssn != null) { + ssn.setDetachCode(dtc.getCode()); conn.unmap(ssn); ssn.closed(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 862c37283b..e0c6cb29d3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -120,7 +120,9 @@ public class Session extends SessionInvoker private Thread resumer = null; private boolean transacted = false; - + private SessionDetachCode detachCode; + private final Object stateLock = new Object(); + protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); @@ -1045,13 +1047,54 @@ public class Session extends SessionInvoker { return String.format("ssn:%s", name); } - + public void setTransacted(boolean b) { this.transacted = b; } - + public boolean isTransacted(){ return transacted; } - + + public void setDetachCode(SessionDetachCode dtc) + { + this.detachCode = dtc; + } + + public SessionDetachCode getDetachCode() + { + return this.detachCode; + } + + public void awaitOpen() + { + switch (state) + { + case NEW: + synchronized(stateLock) + { + Waiter w = new Waiter(stateLock, timeout); + while (w.hasTime() && state == NEW) + { + w.await(); + } + } + + if (state != OPEN) + { + throw new SessionException("Timed out waiting for Session to open"); + } + case DETACHED: + case CLOSING: + case CLOSED: + throw new SessionException("Session closed"); + default : + break; + } + } + + public Object getStateLock() + { + return stateLock; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java index 9a02961dc4..3341149e5f 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java @@ -76,6 +76,10 @@ public class SessionDelegate @Override public void sessionAttached(Session ssn, SessionAttached atc) { ssn.setState(Session.State.OPEN); + synchronized (ssn.getStateLock()) + { + ssn.getStateLock().notifyAll(); + } } @Override public void sessionTimeout(Session ssn, SessionTimeout t) @@ -203,10 +207,18 @@ public class SessionDelegate public void closed(Session session) { log.debug("CLOSED: [%s]", session); + synchronized (session.getStateLock()) + { + session.getStateLock().notifyAll(); + } } public void detached(Session session) { log.debug("DETACHED: [%s]", session); + synchronized (session.getStateLock()) + { + session.getStateLock().notifyAll(); + } } } |
