summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java51
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java12
10 files changed, 160 insertions, 44 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5b6a986997..fd21b376ac 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -173,8 +173,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
//Indicates the sync publish options (persistent|all)
//By default it's async publish
private String _syncPublish = "";
-
- // Indicates whether to use the old map message format or the
+
+ // Indicates whether to use the old map message format or the
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
@@ -261,7 +261,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
throw new IllegalArgumentException("Connection must be specified");
}
-
+
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
@@ -311,7 +311,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
}
-
+
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
{
_useLegacyMapMessageFormat = Boolean.parseBoolean(
@@ -322,16 +322,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// use the default value set for all connections
_useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT);
}
-
+
String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
_logger.debug("AMQP version " + amqpVersion);
-
+
_failoverPolicy = new FailoverPolicy(connectionURL, this);
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
- if ("0-8".equals(amqpVersion))
+ if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
- }
+ }
else if ("0-9".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_0_9(this);
@@ -418,6 +418,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
+ verifyClientID();
if (_logger.isDebugEnabled())
{
@@ -504,7 +505,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- //Update our session to use this new protocol version
+ //Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
}
@@ -1074,7 +1075,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_username = id;
}
-
+
public String getPassword()
{
return _password;
@@ -1250,7 +1251,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
je.setLinkedException((Exception) cause);
}
-
+
je.initCause(cause);
}
@@ -1283,7 +1284,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info("Not a hard-error connection not closing: " + cause);
}
-
+
// deliver the exception if there is a listener
if (_exceptionListener != null)
{
@@ -1293,7 +1294,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.error("Throwable Received but no listener set: " + cause);
}
-
+
// if we are closing the connection, close sessions first
if (closer)
{
@@ -1351,17 +1352,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
/**
- * Returns connection url.
+ * Returns connection url.
* @return connection url
*/
public ConnectionURL getConnectionURL()
{
return _connectionURL;
}
-
+
/**
* Returns stringified connection url. This url is suitable only for display
- * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
+ * as {@link AMQConnectionURL#toString()} converts any password to asterisks.
* @return connection url
*/
public String toURL()
@@ -1477,9 +1478,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.getNextChannelId();
}
-
+
public boolean isUseLegacyMapMessageFormat()
{
return _useLegacyMapMessageFormat;
}
+
+ private void verifyClientID() throws AMQException
+ {
+ if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID))
+ {
+ try
+ {
+ _delegate.verifyClientID();
+ }
+ catch(JMSException e)
+ {
+ throw new AMQException(AMQConstant.ALREADY_EXISTS,"ClientID must be unique",e);
+ }
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 9560bd5c7c..5acdaaa185 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -57,10 +57,12 @@ public interface AMQConnectionDelegate
void closeConnection(long timeout) throws JMSException, AMQException;
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
-
+
int getMaxChannelID();
int getMinChannelID();
ProtocolVersion getProtocolVersion();
+
+ void verifyClientID() throws JMSException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index d50c9e16fe..e1643f095d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -47,6 +47,7 @@ import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
+import org.apache.qpid.transport.SessionDetachCode;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +87,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
/**
* create a Session and start it if required.
*/
+
public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow)
+ throws JMSException
+ {
+ return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
+ }
+
+ public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name)
throws JMSException
{
_conn.checkNotClosed();
@@ -101,7 +109,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
try
{
session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
+ prefetchLow,name);
_conn.registerSession(channelId, session);
if (_conn._started)
{
@@ -449,12 +457,31 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
else
{
heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
- }
+ }
return heartbeat;
}
-
+
protected org.apache.qpid.transport.Connection getQpidConnection()
{
return _qpidConnection;
}
+
+ public void verifyClientID() throws JMSException
+ {
+ int prefetch = (int)_conn.getMaxPrefetch();
+ AMQSession_0_10 ssn = (AMQSession_0_10)createSession(false, 1,prefetch,prefetch,_conn.getClientID());
+ org.apache.qpid.transport.Session ssn_0_10 = ssn.getQpidSession();
+ try
+ {
+ ssn_0_10.awaitOpen();
+ }
+ catch(Exception e)
+ {
+ if (ssn_0_10.getDetachCode() != null &&
+ ssn_0_10.getDetachCode() == SessionDetachCode.SESSION_BUSY)
+ {
+ throw new JMSException("ClientID must be unique");
+ }
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index a837975304..22d1936463 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -332,4 +332,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return ProtocolVersion.v8_0;
}
+
+ public void verifyClientID() throws JMSException
+ {
+ // NOOP
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1ea92c67f7..75d96d67af 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -159,13 +159,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ int defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
{
super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
- _qpidSession = _qpidConnection.createSession(1);
+ if (name == null)
+ {
+ _qpidSession = _qpidConnection.createSession(1);
+ }
+ else
+ {
+ _qpidSession = _qpidConnection.createSession(name,1);
+ }
_qpidSession.setSessionListener(this);
if (_transacted)
{
@@ -192,11 +199,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
{
this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow);
+ defaultPrefetchHigh, defaultPrefetchLow,name);
}
private void addUnacked(int id)
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 354b67cd35..6b9121811d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
createSession();
_xaResource = new XAResourceImpl(this);
}
diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 5fa14cf103..36f9a1ae2d 100644
--- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
-
+
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -68,38 +68,40 @@ public class ClientProperties
* by the broker in TuneOK it will be used as the heartbeat interval.
* If not a warning will be printed and the max value specified for
* heartbeat in TuneOK will be used
- *
+ *
* The default idle timeout is set to 120 secs
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-
+
public static final String HEARTBEAT = "qpid.heartbeat";
public static final int HEARTBEAT_DEFAULT = 120;
-
+
/**
* This value will be used to determine the default destination syntax type.
* Currently the two types are Binding URL (java only) and the Addressing format (used by
- * all clients).
+ * all clients).
*/
public static final String DEST_SYNTAX = "qpid.dest_syntax";
-
+
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
public static final String AMQP_VERSION = "qpid.amqp.version";
-
+
+ public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
+
private static ClientProperties _instance = new ClientProperties();
-
+
/*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
+ public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
+
public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
+
+
public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
+
+
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 88dd2d6afa..f183c1e241 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -95,6 +95,7 @@ public abstract class ConnectionDelegate
Session ssn = conn.getSession(dtc.getChannel());
if (ssn != null)
{
+ ssn.setDetachCode(dtc.getCode());
conn.unmap(ssn);
ssn.closed();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 862c37283b..e0c6cb29d3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -120,7 +120,9 @@ public class Session extends SessionInvoker
private Thread resumer = null;
private boolean transacted = false;
-
+ private SessionDetachCode detachCode;
+ private final Object stateLock = new Object();
+
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
@@ -1045,13 +1047,54 @@ public class Session extends SessionInvoker
{
return String.format("ssn:%s", name);
}
-
+
public void setTransacted(boolean b) {
this.transacted = b;
}
-
+
public boolean isTransacted(){
return transacted;
}
-
+
+ public void setDetachCode(SessionDetachCode dtc)
+ {
+ this.detachCode = dtc;
+ }
+
+ public SessionDetachCode getDetachCode()
+ {
+ return this.detachCode;
+ }
+
+ public void awaitOpen()
+ {
+ switch (state)
+ {
+ case NEW:
+ synchronized(stateLock)
+ {
+ Waiter w = new Waiter(stateLock, timeout);
+ while (w.hasTime() && state == NEW)
+ {
+ w.await();
+ }
+ }
+
+ if (state != OPEN)
+ {
+ throw new SessionException("Timed out waiting for Session to open");
+ }
+ case DETACHED:
+ case CLOSING:
+ case CLOSED:
+ throw new SessionException("Session closed");
+ default :
+ break;
+ }
+ }
+
+ public Object getStateLock()
+ {
+ return stateLock;
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
index 9a02961dc4..3341149e5f 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
@@ -76,6 +76,10 @@ public class SessionDelegate
@Override public void sessionAttached(Session ssn, SessionAttached atc)
{
ssn.setState(Session.State.OPEN);
+ synchronized (ssn.getStateLock())
+ {
+ ssn.getStateLock().notifyAll();
+ }
}
@Override public void sessionTimeout(Session ssn, SessionTimeout t)
@@ -203,10 +207,18 @@ public class SessionDelegate
public void closed(Session session)
{
log.debug("CLOSED: [%s]", session);
+ synchronized (session.getStateLock())
+ {
+ session.getStateLock().notifyAll();
+ }
}
public void detached(Session session)
{
log.debug("DETACHED: [%s]", session);
+ synchronized (session.getStateLock())
+ {
+ session.getStateLock().notifyAll();
+ }
}
}