summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java156
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java208
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java86
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java135
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java70
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java145
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java193
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateListener.java30
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java149
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java348
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java35
35 files changed, 927 insertions, 812 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0abcc8ef26..472eaef5b5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -76,11 +75,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
private int _size = 0;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
-
public AMQSession get(int channelId)
{
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
return _fastAccessSessions[channelId];
}
@@ -93,7 +91,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQSession put(int channelId, AMQSession session)
{
AMQSession oldVal;
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
oldVal = _fastAccessSessions[channelId];
_fastAccessSessions[channelId] = session;
@@ -102,11 +100,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
oldVal = _slowAccessSessions.put(channelId, session);
}
- if((oldVal != null) && (session == null))
+ if ((oldVal != null) && (session == null))
{
_size--;
}
- else if((oldVal == null) && (session != null))
+ else if ((oldVal == null) && (session != null))
{
_size++;
}
@@ -115,13 +113,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
-
public AMQSession remove(int channelId)
{
AMQSession session;
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
- session = _fastAccessSessions[channelId];
+ session = _fastAccessSessions[channelId];
_fastAccessSessions[channelId] = null;
}
else
@@ -129,7 +126,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
session = _slowAccessSessions.remove(channelId);
}
- if(session != null)
+ if (session != null)
{
_size--;
}
@@ -141,9 +138,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
- for(int i = 0; i < 16; i++)
+ for (int i = 0; i < 16; i++)
{
- if(_fastAccessSessions[i] != null)
+ if (_fastAccessSessions[i] != null)
{
values.add(_fastAccessSessions[i]);
}
@@ -162,14 +159,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_size = 0;
_slowAccessSessions.clear();
- for(int i = 0; i<16; i++)
+ for (int i = 0; i < 16; i++)
{
_fastAccessSessions[i] = null;
}
}
}
-
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
protected AtomicInteger _idFactory = new AtomicInteger(0);
@@ -211,7 +207,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** The virtual path to connect to on the AMQ server */
private String _virtualHost;
-
protected ExceptionListener _exceptionListener;
@@ -252,15 +247,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
-
+
// this connection maximum number of prefetched messages
private long _maxPrefetch;
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
-
- /** used to hold a list of all exceptions that have been thrown during connection construction. gross */
- final ArrayList<Exception> _exceptions = new ArrayList<Exception>();
/**
* @param broker brokerdetails
@@ -337,20 +329,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/**
* @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
- * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
+ * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
{
- _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
@@ -378,25 +370,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_delegate = new AMQConnectionDelegate_0_10(this);
}
-
- class Listener implements ExceptionListener
- {
- public void onException(JMSException e)
- {
- _exceptions.add(e);
- }
- }
-
- try
- {
- setExceptionListener(new Listener());
- }
- catch (JMSException e)
- {
- // Shouldn't happen
- throw new AMQException(null, null, e);
- }
-
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
@@ -436,15 +409,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
}
-
- _protocolHandler = new AMQProtocolHandler(this);
+ _protocolHandler = new AMQProtocolHandler(this);
// We are not currently connected
_connected = false;
// TMG FIXME this seems... wrong...
boolean retryAllowed = true;
- while (!_connected && retryAllowed )
+ Exception connectionException = null;
+ while (!_connected && retryAllowed)
{
try
{
@@ -456,37 +429,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_logger.info(pe.getMessage());
_logger.info("Trying broker supported protocol version: " +
- TransportConstants.getVersionMajor() + "." +
- TransportConstants.getVersionMinor());
+ TransportConstants.getVersionMajor() + "." +
+ TransportConstants.getVersionMinor());
}
// we need to check whether we have a delegate for the supported protocol
getDelegate();
}
catch (Exception e)
{
- _exceptions.add(e);
if (_logger.isInfoEnabled())
{
- _logger.info("Unable to connect to broker at " +
- _failoverPolicy.getCurrentBrokerDetails(),
- e);
+ _logger.info("Unable to connect to broker at " +
+ _failoverPolicy.getCurrentBrokerDetails(),
+ e);
}
+ connectionException = e;
}
-
+
if (!_connected)
{
retryAllowed = _failoverPolicy.failoverAllowed();
brokerDetails = _failoverPolicy.getNextBrokerDetails();
}
}
- try
- {
- setExceptionListener(null);
- }
- catch (JMSException e1)
- {
- // Can't happen
- }
if (_logger.isDebugEnabled())
{
@@ -496,26 +461,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (!_connected)
{
String message = null;
- try
- {
- Thread.sleep(150);
- }
- catch (InterruptedException e)
- {
- // Eat it, we've hopefully got all the exceptions if this happened
- }
-
- Exception lastException = null;
- if (_exceptions.size() > 0)
+
+ if (connectionException != null)
{
- lastException = _exceptions.get(_exceptions.size() - 1);
- if (lastException.getCause() != null)
+ if (connectionException.getCause() != null)
{
- message = lastException.getCause().getMessage();
+ message = connectionException.getCause().getMessage();
}
else
{
- message = lastException.getMessage();
+ message = connectionException.getMessage();
}
}
@@ -527,20 +482,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else // can only be "" if getMessage() returned it therfore lastException != null
{
- message = "Unable to Connect:" + lastException.getClass();
+ message = "Unable to Connect:" + connectionException.getClass();
}
}
- AMQException e = new AMQConnectionFailureException(message, _exceptions);
-
- if (lastException != null)
+ AMQException e = new AMQConnectionFailureException(message, connectionException);
+
+ if (connectionException != null)
{
- if (lastException instanceof UnresolvedAddressException)
+ if (connectionException instanceof UnresolvedAddressException)
{
e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(),
null);
}
-
+
}
throw e;
}
@@ -565,18 +520,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" +
- TransportConstants.getVersionMajor() + "_" +
- TransportConstants.getVersionMinor());
- Class partypes[] = new Class[1];
+ TransportConstants.getVersionMajor() + "_" +
+ TransportConstants.getVersionMinor());
+ Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
}
catch (Exception e)
{
throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR,
- "Protocol: " + TransportConstants.getVersionMajor() + "."
- + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
- "currently supported by this client library implementation", e);
+ "Protocol: " + TransportConstants.getVersionMajor() + "."
+ + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " +
+ "currently supported by this client library implementation", e);
}
}
@@ -867,7 +822,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
-
}
}
@@ -892,14 +846,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
- public void close() throws JMSException
+ public void close() throws JMSException
{
close(DEFAULT_TIMEOUT);
}
public void close(long timeout) throws JMSException
{
- close(new ArrayList<AMQSession>(_sessions.values()),timeout);
+ close(new ArrayList<AMQSession>(_sessions.values()), timeout);
}
public void close(List<AMQSession> sessions, long timeout) throws JMSException
@@ -912,12 +866,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private void doClose(List<AMQSession> sessions, long timeout) throws JMSException
{
- synchronized(_sessionCreationLock)
+ synchronized (_sessionCreationLock)
{
- if(!sessions.isEmpty())
+ if (!sessions.isEmpty())
{
AMQSession session = sessions.remove(0);
- synchronized(session.getMessageDeliveryLock())
+ synchronized (session.getMessageDeliveryLock())
{
doClose(sessions, timeout);
}
@@ -1120,7 +1074,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions;
}
-
+
public String getUsername()
{
return _username;
@@ -1297,6 +1251,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (cause instanceof IOException)
{
closer = !_closed.getAndSet(true);
+
+ _protocolHandler.getProtocolSession().notifyError(je);
}
if (_exceptionListener != null)
@@ -1339,7 +1295,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (cause instanceof AMQException)
{
- return ((AMQException)cause).isHardError();
+ return ((AMQException) cause).isHardError();
}
return true;
@@ -1483,7 +1439,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
*/
public long getMaxPrefetch()
{
- return _maxPrefetch;
+ return _maxPrefetch;
}
/**
@@ -1495,14 +1451,4 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _syncPersistence;
}
-
- public Exception getLastException()
- {
- if (_exceptions.size() > 0)
- {
- return _exceptions.get(_exceptions.size() - 1);
- }
- return null;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 5074658070..aab094ca7d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -32,12 +32,12 @@ import java.util.Set;
import javax.jms.JMSException;
import javax.jms.XASession;
-import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
@@ -84,11 +84,15 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
+
+ StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
+
TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
- AMQState state = _conn._protocolHandler.attainState(openOrClosedStates);
+ AMQState state = waiter.await();
+
if(state == AMQState.CONNECTION_OPEN)
{
_conn._failoverPolicy.attainedConnection();
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 6755e4da5f..2f593ce0c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -79,6 +79,8 @@ import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -90,22 +92,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td>
* </table>
*
* @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For
- * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
- * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
- * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
- * has been reestablished. All fail-over protected operations should be placed in private methods, with
- * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
- * fail-over process sets a nowait flag and uses an async method call instead.
- *
+ * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second
+ * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of
+ * the fail-over process, the retry handler could be used to automatically retry the operation once the connection
+ * has been reestablished. All fail-over protected operations should be placed in private methods, with
+ * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the
+ * fail-over process sets a nowait flag and uses an async method call instead.
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
- * after looking at worse bottlenecks first.
+ * after looking at worse bottlenecks first.
*/
public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -114,10 +114,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
-
public BasicMessageConsumer get(int id)
{
- if((id & 0xFFFFFFF0) == 0)
+ if ((id & 0xFFFFFFF0) == 0)
{
return _fastAccessConsumers[id];
}
@@ -130,7 +129,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
{
BasicMessageConsumer oldVal;
- if((id & 0xFFFFFFF0) == 0)
+ if ((id & 0xFFFFFFF0) == 0)
{
oldVal = _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
@@ -144,13 +143,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
-
public BasicMessageConsumer remove(int id)
{
BasicMessageConsumer consumer;
- if((id & 0xFFFFFFF0) == 0)
+ if ((id & 0xFFFFFFF0) == 0)
{
- consumer = _fastAccessConsumers[id];
+ consumer = _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
@@ -166,9 +164,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
- for(int i = 0; i < 16; i++)
+ for (int i = 0; i < 16; i++)
{
- if(_fastAccessConsumers[i] != null)
+ if (_fastAccessConsumers[i] != null)
{
values.add(_fastAccessConsumers[i]);
}
@@ -178,11 +176,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return values;
}
-
public void clear()
{
_slowAccessConsumers.clear();
- for(int i = 0; i<16; i++)
+ for (int i = 0; i < 16; i++)
{
_fastAccessConsumers[i] = null;
}
@@ -280,19 +277,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*/
protected final FlowControllingBlockingQueue _queue;
- /**
- * Holds the highest received delivery tag.
- */
+ /** Holds the highest received delivery tag. */
private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
- /**
- * All the not yet acknowledged message tags
- */
+ /** All the not yet acknowledged message tags */
protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>();
- /**
- * All the delivered message tags
- */
+ /** All the delivered message tags */
protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>();
/** Holds the dispatcher thread for this session. */
@@ -315,9 +306,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* consumer.
*/
protected final IdToConsumerMap _consumers = new IdToConsumerMap();
-
- //Map<AMQShortString, BasicMessageConsumer> _consumers =
- //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+
+ //Map<AMQShortString, BasicMessageConsumer> _consumers =
+ //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -380,15 +371,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/** Has failover occured on this session */
private boolean _failedOver;
-
-
private static final class FlowControlIndicator
{
private volatile boolean _flowControl = true;
public synchronized void setFlowControl(boolean flowControl)
{
- _flowControl= flowControl;
+ _flowControl = flowControl;
notify();
}
@@ -450,8 +439,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public void aboveThreshold(int currentValue)
{
_logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
_suspendState.set(true);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -460,8 +449,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public void underThreshold(int currentValue)
{
_logger.debug(
- "Below threshold(" + _defaultPrefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
+ "Below threshold(" + _defaultPrefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
_suspendState.set(false);
new Thread(new SuspenderRunner(_suspendState)).start();
@@ -503,6 +492,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
close(-1);
}
+ public void checkNotClosed() throws JMSException
+ {
+ // if the Connection has closed then we should throw any exception that has occured that we were not waiting for
+ AMQStateManager manager = _connection.getProtocolHandler().getStateManager();
+ if (isClosed() && manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null)
+ {
+ JMSException jmse = new IllegalStateException("Object " + toString() + " has been closed");
+ jmse.setLinkedException(manager.getLastException());
+ throw jmse;
+ }
+ super.checkNotClosed();
+ }
+
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
@@ -519,7 +521,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (isClosed())
{
throw new IllegalStateException("Session is already closed");
- }
+ }
else if (hasFailedOver())
{
throw new IllegalStateException("has failed over");
@@ -564,39 +566,35 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param exchangeName The exchange to bind the queue on.
*
* @throws AMQException If the queue cannot be bound for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
- *
* @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges?
*/
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName,final AMQDestination destination) throws AMQException
+ final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName,routingKey,arguments,exchangeName,destination);
+ sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
return null;
}
}, _connection).execute();
}
-
public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
{
- if( consumer.getQueuename() != null)
+ if (consumer.getQueuename() != null)
{
- bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd);
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd);
}
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException;
+ final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
/**
-
* Closes the session.
*
* <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close
@@ -606,14 +604,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker.
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
- *
* @todo Be aware of possible changes to parameter order as versions change.
- *
* @todo Not certain about the logic of ignoring the failover exception, because the channel won't be
- * re-opened. May need to examine this more carefully.
- *
+ * re-opened. May need to examine this more carefully.
* @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover,
- * because the failover process sends the failover event before acquiring the mutex itself.
+ * because the failover process sends the failover event before acquiring the mutex itself.
*/
public void close(long timeout) throws JMSException
{
@@ -621,7 +616,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
_logger.info("Closing session: " + this); // + ":"
- // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
// Ensure we only try and close an open session.
@@ -638,7 +633,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
try
{
- sendClose(timeout);
+ sendClose(timeout);
}
catch (AMQException e)
{
@@ -705,7 +700,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
amqe = new AMQException("Closing session forcibly", e);
}
-
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
}
@@ -723,12 +717,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
- checkTransacted();
+ checkTransacted();
try
{
@@ -792,10 +785,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// consumer.markClosed();
-
-
if (consumer.isAutoClose())
- {
+ {
// There is a small window where the message is between the two queues in the dispatcher.
if (consumer.isClosed())
{
@@ -863,7 +854,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
-
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -881,7 +871,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector, null, false, false);
}
-
public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
@@ -891,7 +880,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
messageSelector, null, false, false);
}
-
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
String selector) throws JMSException
{
@@ -928,7 +916,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
- throws JMSException
+ throws JMSException
{
checkNotClosed();
checkValidTopic(topic);
@@ -996,7 +984,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1022,7 +1010,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
-
/**
* Declares the named queue.
*
@@ -1034,7 +1021,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param exclusive Flag to indicate that the queue is exclusive to this client.
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1043,7 +1029,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
createQueue(name, autoDelete, durable, exclusive, null);
}
-
/**
* Declares the named queue.
*
@@ -1056,7 +1041,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param arguments Arguments used to set special properties of the queue
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
@@ -1073,7 +1057,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException;
+ final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException;
+
/**
* Creates a QueueReceiver
*
@@ -1191,7 +1176,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
}
-
/**
* Creates a non-durable subscriber with a message selector
*
@@ -1382,7 +1366,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (message instanceof ReturnMessage)
{
// Return of the bounced message.
- returnBouncedMessage((ReturnMessage)message);
+ returnBouncedMessage((ReturnMessage) message);
}
else
{
@@ -1398,7 +1382,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler, false);
AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
- bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
/**
@@ -1413,7 +1397,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* <li>Stop message delivery.</li>
* <li>Mark all messages that might have been delivered but not acknowledged as "redelivered".
* <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
- * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
+ * Redelivered messages do not have to be delivered in exactly their original delivery order.</li>
* </ul>
*
* <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and
@@ -1503,7 +1487,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does
* not mean that the rollback is known to have failed, merely that it is not known whether it
* failed or not.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public void rollback() throws JMSException
@@ -1545,8 +1528,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public abstract void releaseForRollback();
- public abstract void sendRollback() throws AMQException, FailoverException ;
-
+ public abstract void sendRollback() throws AMQException, FailoverException;
public void run()
{
@@ -1673,8 +1655,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
ft.addAll(rawSelector);
}
- BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow,
- noLocal,exclusive, messageSelector, ft, noConsume, autoClose);
+ BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
{
@@ -1718,8 +1700,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose) throws JMSException;
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+ final boolean noConsume, final boolean autoClose) throws JMSException;
/**
* Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
@@ -1782,12 +1764,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not.
*
* @throws JMSException If the query fails for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException;
-
+
public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException;
/**
@@ -1828,10 +1809,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
* @throws AMQException If the session cannot be started for any reason.
- *
* @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the
- * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
- * for each subsequent call to flow.. only need to do this if we have called stop.
+ * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages
+ * for each subsequent call to flow.. only need to do this if we have called stop.
*/
void start() throws AMQException
{
@@ -2032,7 +2012,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
// at this point the _consumers map will be empty
- if (_dispatcher != null)
+ if (_dispatcher != null)
{
_dispatcher.close();
_dispatcher = null;
@@ -2124,7 +2104,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException;
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException;
private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
@@ -2143,7 +2123,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
checkNotClosed();
long producerId = getNextProducerId();
BasicMessageProducer producer = createMessageProducer(destination, mandatory,
- immediate, waitUntilSent, producerId);
+ immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
return producer;
@@ -2152,20 +2132,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent, long producerId);
+ final boolean immediate, final boolean waitUntilSent, long producerId);
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
-
/**
* Returns the number of messages currently queued for the given destination.
*
* <p/>Note that this operation automatically retries in the event of fail-over.
*
- * @param amqd The destination to be checked
+ * @param amqd The destination to be checked
*
* @return the number of queued messages.
*
@@ -2198,7 +2177,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param nowait
*
* @throws AMQException If the exchange cannot be declared for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
private void declareExchange(final AMQShortString name, final AMQShortString type,
@@ -2215,8 +2193,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
-
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Declares a queue for a JMS destination.
@@ -2234,9 +2211,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* the client.
*
* @throws AMQException If the queue cannot be declared for any reason.
- *
* @todo Verify the destiation is valid or throw an exception.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
@@ -2262,7 +2237,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}, _connection).execute();
}
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException;
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2272,7 +2247,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* @param queueName The name of the queue to delete.
*
* @throws JMSException If the queue could not be deleted for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected void deleteQueue(final AMQShortString queueName) throws JMSException
@@ -2294,7 +2268,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
- public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
+ public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
{
@@ -2384,7 +2358,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
consumer.setQueuename(queueName);
// bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
@@ -2469,7 +2443,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (_logger.isDebugEnabled())
{
_logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliveryTag());
+ + message.getDeliveryTag());
}
messages.remove();
@@ -2519,10 +2493,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
@@ -2557,7 +2531,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* should be unsuspended.
*
* @throws AMQException If the session cannot be suspended for any reason.
- *
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
@@ -2598,7 +2571,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
return getAMQConnection().getMaxPrefetch() > 0;
}
-
/** Signifies that the session has pending sends to commit. */
public void markDirty()
{
@@ -2642,12 +2614,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
_flowControl.setFlowControl(active);
}
-
public void checkFlowControl() throws InterruptedException
{
- synchronized(_flowControl)
+ synchronized (_flowControl)
{
- while(!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl())
{
_flowControl.wait();
}
@@ -2655,7 +2626,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
{
@@ -2856,7 +2826,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
//if (message.getDeliverBody() != null)
//{
final BasicMessageConsumer consumer =
- _consumers.get(message.getConsumerTag().toIntValue());
+ _consumers.get(message.getConsumerTag().toIntValue());
if ((consumer == null) || consumer.isClosed())
{
@@ -2865,14 +2835,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (consumer == null)
{
_dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliveryTag() + "] from queue "
- + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
+ + message.getDeliveryTag() + "] from queue "
+ + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliveryTag() + "] from queue " + " consumer("
- + message.getConsumerTag() + ") is closed rejecting(requeue)...");
+ + message.getDeliveryTag() + "] from queue " + " consumer("
+ + message.getConsumerTag() + ") is closed rejecting(requeue)...");
}
}
// Don't reject if we're already closing
@@ -2930,7 +2900,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
try
{
- synchronized(_suspensionLock)
+ synchronized (_suspensionLock)
{
suspendChannel(_suspend.get());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index e0e319250e..82bff1dda7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -482,7 +482,7 @@ public final class AMQSession_0_8 extends AMQSession
false,
null).generateFrame(_channelId);
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
return okHandler._messageCount;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 76c899f565..927f660932 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -139,11 +139,15 @@ public class FailoverHandler implements Runnable
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
+
+ _amqProtocolHandler.setStateManager(new AMQStateManager());
+
+
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_logger.info("Failover process veto-ed by client");
+ //Restore Existing State Manager
_amqProtocolHandler.setStateManager(existingStateManager);
//todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that
@@ -181,13 +185,19 @@ public class FailoverHandler implements Runnable
if (!failoverSucceeded)
{
+ //Restore Existing State Manager
_amqProtocolHandler.setStateManager(existingStateManager);
+
_amqProtocolHandler.getConnection().exceptionReceived(
new AMQDisconnectedException("Server closed connection and no failover " +
"was successful", null));
}
else
{
+ // Set the new Protocol Session in the StateManager.
+ existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
+
+ //Restore Existing State Manager
_amqProtocolHandler.setStateManager(existingStateManager);
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
index 5bd36aa88b..365fed6aa5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
@@ -5,14 +5,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.protocol.AMQConstant;
public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
{
@@ -25,11 +19,10 @@ public class AccessRequestOkMethodHandler implements StateAwareMethodListener<Ac
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, AccessRequestOkBody method, int channelId)
throws AMQException
{
_logger.debug("AccessRequestOk method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
session.setTicket(method.getTicket(), channelId);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
index e3e08667d8..5cb9412d51 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
@@ -22,11 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,13 +42,9 @@ public class BasicCancelOkMethodHandler implements StateAwareMethodListener<Basi
private BasicCancelOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, BasicCancelOkBody body, int channelId)
throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
-
-
-
if (_logger.isInfoEnabled())
{
_logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 4deaa314ec..6029e7c171 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -21,10 +21,8 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
import org.slf4j.Logger;
@@ -41,10 +39,9 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
- throws AMQException
+ public void methodReceived(AMQProtocolSession session, BasicDeliverBody body, int channelId)
+ throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
channelId,
body.getDeliveryTag(),
@@ -52,7 +49,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
body.getExchange(),
body.getRoutingKey(),
body.getRedelivered());
- _logger.debug("New JmsDeliver method received");
+ _logger.debug("New JmsDeliver method received:" + session);
session.unprocessedMessageReceived(msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 682c3ac2c1..5731fb7473 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -22,13 +22,9 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +41,10 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicR
}
- public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, BasicReturnBody body, int channelId)
throws AMQException
{
_logger.debug("New JmsBounce method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
final ReturnMessage msg = new ReturnMessage(channelId,
body.getExchange(),
body.getRoutingKey(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index ee4cf14d58..2b6745ebe4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -26,14 +26,12 @@ import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +47,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
AMQShortString reason = method.getReplyText();
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
index 8d3277d4de..9a9a0b4e63 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
@@ -23,9 +23,7 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +39,11 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener<Cha
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelCloseOkBody method, int channelId)
throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + channelId);
- final AMQProtocolSession session = stateManager.getProtocolSession();
// todo this should do the local closure
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
index b47fe751d6..2153b9cc8c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
@@ -2,7 +2,6 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.slf4j.Logger;
@@ -42,11 +41,9 @@ public class ChannelFlowMethodHandler implements StateAwareMethodListener<Channe
private ChannelFlowMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelFlowBody body, int channelId)
throws AMQException
{
-
- final AMQProtocolSession session = stateManager.getProtocolSession();
session.setFlowControl(channelId, body.getActive());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
index 96de8af54b..6f66a972d5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
@@ -22,10 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +41,7 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener<Chan
private ChannelFlowOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelFlowOkBody body, int channelId)
throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index afb7517a12..a0f3808b23 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -27,31 +27,33 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientMethodDispatcherImpl implements MethodDispatcher
{
-
- private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
- private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
- private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
- private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
- private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
+ private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+ private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+ private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+ private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+ private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+ private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
- private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
- private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
- private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
- private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
- private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
+ private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+ private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+ private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+ private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+ private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
+ private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class);
private static interface DispatcherFactory
{
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session);
}
private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
@@ -62,44 +64,40 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
_dispatcherFactories.put(ProtocolVersion.v8_0,
new DispatcherFactory()
{
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
{
- return new ClientMethodDispatcherImpl_8_0(stateManager);
+ return new ClientMethodDispatcherImpl_8_0(session);
}
});
_dispatcherFactories.put(ProtocolVersion.v0_9,
new DispatcherFactory()
{
- public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
{
- return new ClientMethodDispatcherImpl_0_9(stateManager);
+ return new ClientMethodDispatcherImpl_0_9(session);
}
});
}
-
- public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+ public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
{
+ _logger.error("New Method Dispatcher:" + session);
DispatcherFactory factory = _dispatcherFactories.get(version);
- return factory.createMethodDispatcher(stateManager);
+ return factory.createMethodDispatcher(session);
}
-
-
+ AMQProtocolSession _session;
- private AMQStateManager _stateManager;
-
- public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl(AMQProtocolSession session)
{
- _stateManager = stateManager;
+ _session = session;
}
-
public AMQStateManager getStateManager()
{
- return _stateManager;
+ return _session.getStateManager();
}
public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
@@ -109,7 +107,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
- _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -120,7 +118,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
{
- _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+ _basicDeliverMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -141,13 +139,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
{
- _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+ _basicReturnMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ _channelCloseMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -163,7 +161,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
{
- _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _channelFlowOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -174,7 +172,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionCloseMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -185,37 +183,37 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
{
- _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionOpenOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
{
- _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionRedirectMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
{
- _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionSecureMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
{
- _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionStartMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
{
- _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+ _connectionTuneMethodHandler.methodReceived(_session, body, channelId);
return true;
}
public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
{
- _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _queueDeleteOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -431,7 +429,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
{
- _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+ _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
return true;
}
@@ -522,7 +520,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
{
- return false;
+ return false;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
index e235368357..d3e9fba8ed 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
@@ -26,16 +26,15 @@ import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
{
- public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session)
{
- super(stateManager);
+ super(session);
}
-
public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
{
return false;
@@ -148,8 +147,7 @@ public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl i
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
- return false;
+ return false;
}
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
index b0f003cd2d..19f758817d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
@@ -24,13 +24,13 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
{
- public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+ public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session)
{
- super(stateManager);
+ super(session);
}
public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 950a3288fc..bc82d6bc62 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -25,13 +25,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,14 +46,13 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
}
private ConnectionCloseMethodHandler()
- { }
+ {
+ }
- public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
- throws AMQException
+ public void methodReceived(AMQProtocolSession session, ConnectionCloseBody method, int channelId)
+ throws AMQException
{
_logger.info("ConnectionClose frame received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
// does it matter
// stateManager.changeState(AMQState.CONNECTION_CLOSING);
@@ -63,6 +60,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
AMQShortString reason = method.getReplyText();
+ AMQException error = null;
+
try
{
@@ -75,35 +74,33 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
{
if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
{
- _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName());
-
- // todo ritchiem : Why do this here when it is going to be done in the finally block?
- session.closeProtocolSession();
+ _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
- // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
- stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
-
- throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
+ error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
}
else
{
_logger.info("Connection close received with error code " + errorCode);
- throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
+ error = new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
}
}
}
finally
{
- // this actually closes the connection in the case where it is not an error.
+ if (error != null)
+ {
+ session.notifyError(error);
+ }
+
+ // Close the protocol Session, including any open TCP connections
session.closeProtocolSession();
- // ritchiem: Doing this though will cause any waiting connection start to be released without being able to
- // see what the cause was.
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ // Closing the session should not introduce a race condition as this thread will continue to propgate any
+ // exception in to the exceptionCaught method of the SessionHandler.
+ // Any sessionClosed event should occur after this.
}
}
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index fd7acac84f..e639a33450 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -24,9 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
{
@@ -41,10 +39,10 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId)
throws AMQException
{
- stateManager.changeState(AMQState.CONNECTION_OPEN);
+ session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index cac68c9467..472c471fd6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -22,10 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
private ConnectionRedirectMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionRedirectBody method, int channelId)
throws AMQException
{
_logger.info("ConnectionRedirect frame received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
String host = method.getHost().toString();
// the host is in the form hostname:port with the port being optional
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index 900aa2abac..9a9bee757b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -25,12 +25,9 @@ import javax.security.sasl.SaslException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
{
@@ -41,10 +38,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener<C
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionSecureBody body, int channelId)
throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
SaslClient client = session.getSaslClient();
if (client == null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index d3746f137e..8857f1115a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -25,7 +25,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.QpidProperties;
@@ -35,7 +34,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,15 +60,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
private ConnectionStartMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
throws AMQException
{
_log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
+ "AMQMethodEvent evt): called");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
-
ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
// For the purposes of interop, we can make the client accept the broker's version string.
@@ -145,7 +140,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co
throw new AMQException(null, "No locales sent from server, passed: " + locales, null);
}
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index fc0e40b745..e4e58c317d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -24,10 +24,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con
protected ConnectionTuneMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+ public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId)
throws AMQException
{
_logger.debug("ConnectionTune frame received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
final MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -65,7 +62,7 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con
params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
session.setConnectionTuneParameters(params);
- stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
params.getFrameMax(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
index 8de40beb10..690d782b40 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -22,10 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener<Ex
private ExchangeBoundOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, ExchangeBoundOkBody body, int channelId)
throws AMQException
{
if (_logger.isDebugEnabled())
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
index 41225c0569..01d82c9b55 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -22,10 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener<Queu
private QueueDeleteOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+ public void methodReceived(AMQProtocolSession session, QueueDeleteOkBody body, int channelId)
throws AMQException
{
if (_logger.isDebugEnabled())
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 1b75d6e829..8ac4f843de 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -43,6 +43,7 @@ import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
@@ -100,23 +101,22 @@ import java.util.concurrent.CountDownLatch;
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
*
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
*
* @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler extends IoHandlerAdapter
{
@@ -136,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private AMQStateManager _stateManager = new AMQStateManager();
/** Holds the method listeners, */
- private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+ private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
/**
* We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -154,14 +154,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
-
/** The last failover exception that occured */
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
-
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -245,7 +243,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
}
}
- _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+ _protocolSession = new AMQProtocolSession(this, session, _connection);
+
+ _stateManager.setProtocolSession(_protocolSession);
+
_protocolSession.init();
}
@@ -263,7 +264,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param session The MINA session.
*
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
+ * not otherwise? The above comment doesn't make that clear.
*/
public void sessionClosed(IoSession session)
{
@@ -374,7 +375,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
"cause isn't AMQConnectionClosedException: " + cause, cause);
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
+ propagateExceptionToAllWaiters(amqe);
}
_connection.exceptionReceived(cause);
@@ -395,7 +396,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
+ propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
}
@@ -405,11 +406,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
* of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
+ * This should be called only when the exception is fatal for the connection.
+ *
* @param e the exception to propagate
+ *
+ * @see #propagateExceptionToFrameListeners
+ * @see #propagateExceptionToStateWaiters
*/
- public void propagateExceptionToWaiters(Exception e)
+ public void propagateExceptionToAllWaiters(Exception e)
+ {
+ propagateExceptionToFrameListeners(e);
+ propagateExceptionToStateWaiters(e);
+ }
+
+ /**
+ * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+ * protocol level waits.
+ *
+ * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
+ * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
+ *
+ * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
+ * their protocol request and so listen again for the correct frame.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToFrameListeners(Exception e)
{
-
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -421,6 +444,22 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ /**
+ * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
+ * thing waiting for a state change.
+ *
+ * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
+ *
+ * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
+ * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToStateWaiters(Exception e)
+ {
+ getStateManager().error(e);
+ }
+
public void notifyFailoverStarting()
{
// Set the last exception in the sync block to ensure the ordering with add.
@@ -431,7 +470,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_lastFailoverException = new FailoverException("Failing over about to start");
}
- propagateExceptionToWaiters(_lastFailoverException);
+ //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
+ // interupted unless failover cannot restore the state.
+ propagateExceptionToFrameListeners(_lastFailoverException);
}
public void failoverInProgress()
@@ -443,7 +484,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void messageReceived(IoSession session, Object message) throws Exception
{
- if(message instanceof AMQFrame)
+ if (message instanceof AMQFrame)
{
final boolean debug = _logger.isDebugEnabled();
final long msgNumber = ++_messageReceivedCount;
@@ -459,7 +500,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- bodyFrame.handle(frame.getChannel(),_protocolSession);
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -508,20 +549,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (!wasAnyoneInterested)
{
throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners, null);
+ + _frameListeners, null);
}
}
catch (AMQException e)
- {
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
+ {
+ propagateExceptionToFrameListeners(e);
exceptionCaught(session, e);
}
@@ -548,28 +581,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
- /*
- public void addFrameListener(AMQMethodListener listener)
- {
- _frameListeners.add(listener);
- }
-
- public void removeFrameListener(AMQMethodListener listener)
- {
- _frameListeners.remove(listener);
- }
- */
- public void attainState(AMQState s) throws Exception
- {
- getStateManager().attainState(s);
- }
-
- public AMQState attainState(Set<AMQState> states) throws AMQException
+ public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
- return getStateManager().attainState(states);
+ return getStateManager().createWaiter(states);
}
-
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -617,14 +633,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
throw _lastFailoverException;
}
-
+
_frameListeners.add(listener);
}
_protocolSession.writeFrame(frame);
- AMQMethodEvent e = listener.blockForFrame(timeout);
-
- return e;
+ return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
@@ -669,8 +683,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."),0,0);
-
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
final AMQFrame frame = body.generateFrame(0);
@@ -745,10 +758,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
- if (_protocolSession != null)
- {
- _protocolSession.setStateManager(stateManager);
- }
}
public AMQProtocolSession getProtocolSession()
@@ -778,7 +787,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public MethodRegistry getMethodRegistry()
{
- return getStateManager().getMethodRegistry();
+ return _protocolSession.getMethodRegistry();
}
public ProtocolVersion getProtocolVersion()
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 6e782e0bfc..6beec3c9ba 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,10 +37,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -67,8 +66,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final IoSession _minaProtocolSession;
- private AMQStateManager _stateManager;
-
protected WriteFuture _lastWriteFuture;
/**
@@ -86,7 +83,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
@@ -97,26 +94,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
// private VersionSpecificRegistry _registry =
// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
-
private MethodRegistry _methodRegistry =
MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
-
private MethodDispatcher _methodDispatcher;
-
private final AMQConnection _connection;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
- this(protocolHandler, protocolSession, connection, new AMQStateManager());
-
- }
-
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
- AMQStateManager stateManager)
- {
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
_minaProtocolSession.setAttachment(this);
@@ -124,11 +111,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
// fixme - real value needed
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _stateManager = stateManager;
- _stateManager.setProtocolSession(this);
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- stateManager);
+ this);
_connection = connection;
}
@@ -161,14 +146,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQStateManager getStateManager()
{
- return _stateManager;
- }
-
- public void setStateManager(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
- stateManager);
+ return _protocolHandler.getStateManager();
}
public String getVirtualHost()
@@ -238,9 +216,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
{
final int channelId = message.getChannelId();
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
- _channelId2UnprocessedMsgArray[channelId] = message;
+ _channelId2UnprocessedMsgArray[channelId] = message;
}
else
{
@@ -251,17 +229,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
- : _channelId2UnprocessedMsgMap.get(channelId);
-
+ : _channelId2UnprocessedMsgMap.get(channelId);
if (msg == null)
{
- throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
+ throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
}
if (msg.getContentHeader() != null)
{
- throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null);
+ throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
}
msg.setContentHeader(contentHeader);
@@ -275,7 +252,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
UnprocessedMessage_0_8 msg;
final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
- if(fastAccess)
+ if (fastAccess)
{
msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
}
@@ -291,7 +268,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
if (msg.getContentHeader() == null)
{
- if(fastAccess)
+ if (fastAccess)
{
_channelId2UnprocessedMsgArray[channelId] = null;
}
@@ -333,7 +310,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
_channelId2UnprocessedMsgArray[channelId] = null;
}
@@ -431,12 +408,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
closeProtocolSession(true);
}
- public void closeProtocolSession(boolean waitLast)
+ public void closeProtocolSession(boolean waitLast) throws AMQException
{
_logger.debug("Waiting for last write to join.");
if (waitLast && (_lastWriteFuture != null))
@@ -446,6 +423,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_logger.debug("Closing protocol session");
final CloseFuture future = _minaProtocolSession.close();
+
+ // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
+ // then wait for the connection to close.
+ // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
+ // error now shouldn't matter.
+
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
+
future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
@@ -489,9 +474,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
- // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
@@ -524,12 +509,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return _methodDispatcher;
}
-
public void setTicket(int ticket, int channelId)
{
final AMQSession session = getSession(channelId);
session.setTicket(ticket);
}
+
public void setMethodDispatcher(MethodDispatcher methodDispatcher)
{
_methodDispatcher = methodDispatcher;
@@ -545,4 +530,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
}
+
+ public void notifyError(Exception error)
+ {
+ _protocolHandler.propagateExceptionToAllWaiters(error);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 0ab2e07340..2bc609ebf2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.client.protocol;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.util.BlockingWaiter;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -54,38 +59,17 @@ import org.apache.qpid.protocol.AMQMethodListener;
* </table>
*
* @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
- * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
- * seem to use it. So wrapping the listeners is possible.
- *
- * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
- * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
- * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
- * method has been received.
- *
- * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
- * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
- * when this happens. At the very least, restore the interrupted status flag.
- *
+ * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ * seem to use it. So wrapping the listeners is possible.
* @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
- * check that SynchronousQueue has a non-blocking put method available.
+ * check that SynchronousQueue has a non-blocking put method available.
*/
-public abstract class BlockingMethodFrameListener implements AMQMethodListener
+public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
{
- /** This flag is used to indicate that the blocked for method has been received. */
- private volatile boolean _ready = false;
-
- /** Used to protect the shared event and ready flag between the producer and consumer. */
- private final Object _lock = new Object();
-
- /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
- private volatile Exception _error;
/** Holds the channel id for the channel upon which this listener is waiting for a response. */
protected int _channelId;
- /** Holds the incoming method. */
- protected AMQMethodEvent _doneEvt = null;
-
/**
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
*
@@ -104,7 +88,14 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
*
* @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
*/
- public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+ public abstract boolean processMethod(int channelId, AMQMethodBody frame);
+
+ public boolean process(AMQMethodEvent evt)
+ {
+ AMQMethodBody method = evt.getMethod();
+
+ return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+ }
/**
* Informs this listener that an AMQP method has been received.
@@ -113,37 +104,9 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
*
* @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
*/
- public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt)
{
- AMQMethodBody method = evt.getMethod();
-
- /*try
- {*/
- boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
-
- if (ready)
- {
- // we only update the flag from inside the synchronized block
- // so that the blockForFrame method cannot "miss" an update - it
- // will only ever read the flag from within the synchronized block
- synchronized (_lock)
- {
- _doneEvt = evt;
- _ready = ready;
- _lock.notify();
- }
- }
-
- return ready;
-
- /*}
- catch (AMQException e)
- {
- error(e);
- // we rethrow the error here, and the code in the frame dispatcher will go round
- // each listener informing them that an exception has been thrown
- throw e;
- }*/
+ return received(evt);
}
/**
@@ -159,75 +122,15 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
*/
public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
{
- synchronized (_lock)
+ try
{
- while (!_ready)
- {
- try
- {
- if (timeout == -1)
- {
- _lock.wait();
- }
- else
- {
-
- _lock.wait(timeout);
- if (!_ready)
- {
- _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
- _ready = true;
- }
- }
- }
- catch (InterruptedException e)
- {
- // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
- // if (!_ready && timeout != -1)
- // {
- // _error = new AMQException("Server did not respond timely");
- // _ready = true;
- // }
- }
- }
+ return (AMQMethodEvent) block(timeout);
}
-
- if (_error != null)
+ finally
{
- if (_error instanceof AMQException)
- {
- throw (AMQException) _error;
- }
- else if (_error instanceof FailoverException)
- {
- // This should ensure that FailoverException is not wrapped and can be caught.
- throw (FailoverException) _error; // needed to expose FailoverException.
- }
- else
- {
- throw new AMQException(null, "Woken up due to " + _error.getClass(), _error);
- }
+ //Prevent any more errors being notified to this waiter.
+ close();
}
-
- return _doneEvt;
}
- /**
- * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
- * class to avoid code repetition but again is only called by the MINA dispatcher thread.
- *
- * @param e
- */
- public void error(Exception e)
- {
- // set the error so that the thread that is blocking (against blockForFrame())
- // can pick up the exception and rethrow to the caller
- _error = e;
-
- synchronized (_lock)
- {
- _ready = true;
- _lock.notify();
- }
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 21f190bd7e..7a5d70ad15 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -28,15 +28,30 @@ import org.apache.qpid.protocol.AMQMethodListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
- * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
- * there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/>
+ * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
+ *
+ * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
+ * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
+ *
+ * The StateManager works by any component can wait for a state change to occur by using the following sequence.
+ *
+ * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
+ * <li> // Perform action that will cause state change
+ * <li>waiter.await();
+ *
+ * The two step process is required as there is an inherit race condition between starting a process that will cause
+ * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
+ * that any asynchrous errors that occur can be delivered to the correct waiters.
+ *
+ *
*/
-public class AMQStateManager
+public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -45,16 +60,13 @@ public class AMQStateManager
/** The current state */
private AMQState _currentState;
-
- /**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
- * AMQFrame.
- */
-
-
private final Object _stateLock = new Object();
+
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
+ protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
+ private Exception _lastException;
+
public AMQStateManager()
{
this(null);
@@ -62,18 +74,15 @@ public class AMQStateManager
public AMQStateManager(AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+ this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
}
- protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
{
_protocolSession = protocolSession;
_currentState = state;
-
}
-
-
public AMQState getCurrentState()
{
return _currentState;
@@ -86,117 +95,101 @@ public class AMQStateManager
synchronized (_stateLock)
{
_currentState = newState;
- _stateLock.notifyAll();
+
+ _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters);
+
+ for (StateWaiter waiter : _waiters)
+ {
+ waiter.received(newState);
+ }
}
}
-
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
-
B method = evt.getMethod();
-
+
// StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
return true;
}
+ /**
+ * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+ *
+ * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
+ * connection to the network.
+ *
+ * @param session The new protocol session
+ */
+ public void setProtocolSession(AMQProtocolSession session)
+ {
+ _logger.error("Setting ProtocolSession:" + session);
+ _protocolSession = session;
+ }
- public void attainState(final AMQState s) throws Exception
+ /**
+ * Propogate error to waiters
+ *
+ * @param error The error to propogate.
+ */
+ public void error(Exception error)
{
- synchronized (_stateLock)
+ if (_waiters.size() == 0)
{
- final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
- long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
- while ((_currentState != s) && (waitTime > 0))
- {
- try
- {
- _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Thread interrupted");
- if (_protocolSession.getAMQConnection().getLastException() != null)
- {
- throw _protocolSession.getAMQConnection().getLastException();
- }
-
- }
-
- if (_currentState != s)
- {
- waitTime = waitUntilTime - System.currentTimeMillis();
- }
- }
-
- if (_currentState != s)
- {
- _logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s, null);
- }
+ _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+ _lastException = error;
+ }
+ for (StateWaiter waiter : _waiters)
+ {
+ _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+ waiter.error(error);
}
-
- // at this point the state will have changed.
}
- public AMQProtocolSession getProtocolSession()
+ /**
+ * This provides a single place that the maximum time for state change to occur can be accessed.
+ * It is currently set via System property amqj.MaximumStateWait
+ *
+ * @return long Milliseconds value for a timeout
+ */
+ public long getWaitTimeout()
{
- return _protocolSession;
+ return MAXIMUM_STATE_WAIT_TIME;
}
- public void setProtocolSession(AMQProtocolSession session)
+ /**
+ * Create and add a new waiter to the notifcation list.
+ * @param states The waiter will attempt to wait for one of these desired set states to be achived.
+ * @return the created StateWaiter.
+ */
+ public StateWaiter createWaiter(Set<AMQState> states)
{
- _protocolSession = session;
- }
+ final StateWaiter waiter;
+ synchronized (_stateLock)
+ {
+ waiter = new StateWaiter(this, _currentState, states);
- public MethodRegistry getMethodRegistry()
- {
- return getProtocolSession().getMethodRegistry();
+ _waiters.add(waiter);
+ }
+
+ return waiter;
}
- public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+ /**
+ * Remove the waiter from the notification list.
+ * @param waiter The waiter to remove.
+ */
+ public void removeWaiter(StateWaiter waiter)
{
synchronized (_stateLock)
{
- final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
- long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
- while (!stateSet.contains(_currentState) && (waitTime > 0))
- {
- try
- {
- _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Thread interrupted");
- if (_protocolSession.getAMQConnection().getLastException() != null)
- {
- throw new AMQException(null, "Could not attain state due to exception",
- _protocolSession.getAMQConnection().getLastException());
- }
- }
-
- if (!stateSet.contains(_currentState))
- {
- waitTime = waitUntilTime - System.currentTimeMillis();
- }
- }
-
- if (!stateSet.contains(_currentState))
- {
- _logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + stateSet);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + stateSet, null);
- }
- return _currentState;
+ _waiters.remove(waiter);
}
+ }
-
+ public Exception getLastException()
+ {
+ return _lastException;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
index 8c65f56af3..17d04f4fa3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
@@ -33,6 +33,6 @@ import org.apache.qpid.protocol.AMQMethodEvent;
public interface StateAwareMethodListener<B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+ void methodReceived(AMQProtocolSession session, B body, int channelId) throws AMQException;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java
deleted file mode 100644
index df207a0a23..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.state;
-
-import org.apache.qpid.AMQException;
-
-public interface StateListener
-{
- void stateChanged(AMQState oldState, AMQState newState) throws AMQException;
-
- void error(Throwable t);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 8b8453a1b0..4695b195d5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -20,103 +20,110 @@
*/
package org.apache.qpid.client.state;
+import org.apache.qpid.client.util.BlockingWaiter;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.AMQException;
-
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Set;
/**
- * Waits for a particular state to be reached.
+ * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state
+ * changes.
+ *
+ * On construction the current state and a set of States to await for is provided.
+ *
+ * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * a desired state then await() returns immediately.
+ *
+ * Otherwise it will block for the set timeout for a desired state to be achieved.
+ *
+ * The state changes are notified via the {@link #process} method.
+ *
+ * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method.
+ *
*/
-public class StateWaiter implements StateListener
+public class StateWaiter extends BlockingWaiter<AMQState>
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- private final AMQState _state;
-
- private volatile boolean _newStateAchieved;
-
- private volatile Throwable _throwable;
-
- private final Object _monitor = new Object();
- private static final long TIME_OUT = 1000 * 60 * 2;
-
- public StateWaiter(AMQState state)
+ Set<AMQState> _awaitStates;
+ private AMQState _startState;
+ private AMQStateManager _stateManager;
+
+ /**
+ *
+ * @param stateManager The StateManager
+ * @param currentState
+ * @param awaitStates
+ */
+ public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates)
{
- _state = state;
+ _logger.info("New StateWaiter :" + currentState + ":" + awaitStates);
+ _stateManager = stateManager;
+ _awaitStates = awaitStates;
+ _startState = currentState;
}
- public void waituntilStateHasChanged() throws AMQException
+ /**
+ * When the state is changed this StateWaiter is notified to process the change.
+ *
+ * @param state The new state that has been achieved.
+ * @return
+ */
+ public boolean process(AMQState state)
{
- synchronized (_monitor)
- {
- //
- // The guard is required in case we are woken up by a spurious
- // notify().
- //
- while (!_newStateAchieved && (_throwable == null))
- {
- try
- {
- _logger.debug("State " + _state + " not achieved so waiting...");
- _monitor.wait(TIME_OUT);
- // fixme this won't cause the timeout to exit the loop. need to set _throwable
- }
- catch (InterruptedException e)
- {
- _logger.debug("Interrupted exception caught while waiting: " + e, e);
- }
- }
- }
+ return _awaitStates.contains(state);
+ }
- if (_throwable != null)
- {
- _logger.debug("Throwable reached state waiter: " + _throwable);
- if (_throwable instanceof AMQException)
- {
- throw (AMQException) _throwable;
- }
- else
- {
- throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
- }
- }
+ /**
+ * Await for the requried State to be achieved within the default timeout.
+ * @return The achieved state that was requested.
+ * @throws AMQException The exception that prevented the required state from being achived.
+ */
+ public AMQState await() throws AMQException
+ {
+ return await(_stateManager.getWaitTimeout());
}
- public void stateChanged(AMQState oldState, AMQState newState)
+ /**
+ * Await for the requried State to be achieved.
+ *
+ * <b>It is the responsibility of this class to remove the waiter from the StateManager
+ *
+ * @param timeout The time in milliseconds to wait for any of the states to be achived.
+ * @return The achieved state that was requested.
+ * @throws AMQException The exception that prevented the required state from being achived.
+ */
+ public AMQState await(long timeout) throws AMQException
{
- synchronized (_monitor)
+ try
{
- if (_logger.isDebugEnabled())
+ if (process(_startState))
{
- _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState);
+ return _startState;
}
- if (_state == newState)
+ try
{
- _newStateAchieved = true;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("New state reached so notifying monitor");
- }
+ return (AMQState) block(timeout);
+ }
+ catch (FailoverException e)
+ {
+ _logger.error("Failover occured whilst waiting for states:" + _awaitStates);
- _monitor.notifyAll();
+ e.printStackTrace();
+ return null;
}
}
- }
-
- public void error(Throwable t)
- {
- synchronized (_monitor)
+ finally
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("exceptionThrown called");
- }
+ //Prevent any more errors being notified to this waiter.
+ close();
- _throwable = t;
- _monitor.notifyAll();
+ //Remove the waiter from the notifcation list in the statee manager
+ _stateManager.removeWaiter(this);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index 623591e0b6..f0d7feb059 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.client.state.listener;
-import org.apache.qpid.AMQException;
+
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.framing.AMQMethodBody;
@@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
_expectedClass = expectedClass;
}
- public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+ public boolean processMethod(int channelId, AMQMethodBody frame)
{
return _expectedClass.isInstance(frame);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
new file mode 100644
index 0000000000..67cda957fb
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -0,0 +1,348 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+
+/**
+ * BlockingWaiter is a 'rendezvous' which delegates handling of
+ * incoming Objects to a listener implemented as a sub-class of this and hands off the process or
+ * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this
+ * differs from a 'rendezvous' in that sense.
+ *
+ * <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response.
+ * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
+ * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
+ * have been completed.
+ *
+ * <p/>The {@link #process} must return <tt>true</tt> on any incoming method that it handles. This indicates to
+ * this listeners that the object just processed ends the waiting process.
+ *
+ * <p/>Errors from the producer are rethrown to the consumer.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations </td>
+ * <tr><td> Accept generic objects as events for processing via {@link #process}. <td>
+ * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td>
+ * <tr><td> Block until {@link #process} determines that waiting is no longer required <td>
+ * <tr><td> Propagate the most recent exception to the consumer.<td>
+ * </table>
+ *
+ * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
+ * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
+ * when this happens. At the very least, restore the interrupted status flag.
+ * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
+ * check that SynchronousQueue has a non-blocking put method available.
+ */
+public abstract class BlockingWaiter<T>
+{
+ /** This flag is used to indicate that the blocked for method has been received. */
+ private volatile boolean _ready = false;
+
+ /** This flag is used to indicate that the received error has been processed. */
+ private volatile boolean _errorAck = false;
+
+ /** Used to protect the shared event and ready flag between the producer and consumer. */
+ private final ReentrantLock _lock = new ReentrantLock();
+
+ /** Used to signal that a method has been received */
+ private final Condition _receivedCondition = _lock.newCondition();
+
+ /** Used to signal that a error has been processed */
+ private final Condition _errorConditionAck = _lock.newCondition();
+
+ /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
+ private volatile Exception _error;
+
+ /** Holds the incomming Object. */
+ protected Object _doneObject = null;
+ private AtomicBoolean _waiting = new AtomicBoolean(false);
+ private boolean _closed = false;
+
+ /**
+ * Delegates processing of the incomming object to the handler.
+ *
+ * @param object The object to process.
+ *
+ * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue.
+ */
+ public abstract boolean process(T object);
+
+ /**
+ * An Object has been received and should be processed to see if our wait condition has been reached.
+ *
+ * @param object The object received.
+ *
+ * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue.
+ */
+ public boolean received(T object)
+ {
+
+ boolean ready = process(object);
+
+ if (ready)
+ {
+ // we only update the flag from inside the synchronized block
+ // so that the blockForFrame method cannot "miss" an update - it
+ // will only ever read the flag from within the synchronized block
+ _lock.lock();
+ try
+ {
+ _doneObject = object;
+ _ready = ready;
+ _receivedCondition.signal();
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ return ready;
+ }
+
+ /**
+ * Blocks until an object is received that is handled by process, or the specified timeout
+ * has passed.
+ *
+ * Once closed any attempt to wait will throw an exception.
+ *
+ * @param timeout The timeout in milliseconds.
+ *
+ * @return The object that resolved the blocking.
+ *
+ * @throws AMQException
+ * @throws FailoverException
+ */
+ public Object block(long timeout) throws AMQException, FailoverException
+ {
+ long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
+
+ _lock.lock();
+
+ try
+ {
+ if (_closed)
+ {
+ throw throwClosedException();
+ }
+
+ if (_error == null)
+ {
+ _waiting.set(true);
+
+ while (!_ready)
+ {
+ try
+ {
+ if (timeout == -1)
+ {
+ _receivedCondition.await();
+ }
+ else
+ {
+ nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);
+
+ if (nanoTimeout <= 0 && !_ready && _error == null)
+ {
+ _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
+ _ready = true;
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ System.err.println(e.getMessage());
+ // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
+ // if (!_ready && timeout != -1)
+ // {
+ // _error = new AMQException("Server did not respond timely");
+ // _ready = true;
+ // }
+ }
+ }
+ }
+
+ if (_error != null)
+ {
+ if (_error instanceof AMQException)
+ {
+ throw (AMQException) _error;
+ }
+ else if (_error instanceof FailoverException)
+ {
+ // This should ensure that FailoverException is not wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose FailoverException.
+ }
+ else
+ {
+ throw new AMQException("Woken up due to " + _error.getClass(), _error);
+ }
+ }
+
+ }
+ finally
+ {
+ _waiting.set(false);
+
+ //Release Error handling thread
+ if (_error != null)
+ {
+ _errorAck = true;
+ _errorConditionAck.signal();
+
+ _error = null;
+ }
+ _lock.unlock();
+ }
+
+ return _doneObject;
+ }
+
+ /**
+ * This is a callback, called when an error has occured that should interupt any waiter.
+ * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
+ *
+ * Once closed any notification of an exception will be ignored.
+ *
+ * @param e The exception being propogated.
+ */
+ public void error(Exception e)
+ {
+ // set the error so that the thread that is blocking (against blockForFrame())
+ // can pick up the exception and rethrow to the caller
+
+ _lock.lock();
+
+ if (_closed)
+ {
+ return;
+ }
+
+ if (_error == null)
+ {
+ _error = e;
+ }
+ else
+ {
+ System.err.println("WARNING: new error arrived while old one not yet processed");
+ }
+
+ try
+ {
+ if (_waiting.get())
+ {
+
+ _ready = true;
+ _receivedCondition.signal();
+
+ while (!_errorAck)
+ {
+ try
+ {
+ _errorConditionAck.await();
+ }
+ catch (InterruptedException e1)
+ {
+ System.err.println(e.getMessage());
+ }
+ }
+ _errorAck = false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close this Waiter so that no more errors are processed.
+ * This is a preventative method to ensure that a second error thread does not get stuck in the error method after
+ * the await has returned. This has not happend but in practise but if two errors occur on the Connection at
+ * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a
+ * waiter.
+ *
+ * Once closed any attempt to wait will throw an exception.
+ * Any notification of an exception will be ignored.
+ */
+ public void close()
+ {
+ _lock.lock();
+ try
+ {
+ //if we have already closed then our job is done.
+ if (_closed)
+ {
+ return;
+ }
+
+ //Close Waiter so no more exceptions are processed
+ _closed = true;
+
+ //Wake up any await() threads
+
+ //If we are waiting then use the error() to wake them up.
+ if (_waiting.get())
+ {
+ error(throwClosedException());
+ }
+ //If they are not waiting then there is nothing to do.
+
+ // Wake up any error handling threads
+
+ if (!_errorAck)
+ {
+ _errorAck = true;
+ _errorConditionAck.signal();
+
+ _error = null;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Helper method to generate the a closed Exception.
+ *
+ * todo: This should be converted to something more friendly.
+ *
+ * @return AMQException to throw to waiters when the Waiter is closed.
+ */
+ private AMQException throwClosedException()
+ {
+ return new AMQException(null, "Waiter was closed.", null);
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
index b6776a1a44..66f220643c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
@@ -27,12 +27,10 @@ import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,12 +46,10 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
+ public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- final AMQProtocolSession session = stateManager.getProtocolSession();
-
AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
AMQShortString reason = method.getReplyText();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 27adc4dd77..6f4c26945c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -33,11 +33,12 @@ public class ConnectionURLTest extends TestCase
public void testFailoverURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
+ assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
assertTrue(connectionurl.getVirtualHost().equals("/test"));
@@ -276,7 +277,7 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportMultiOptionURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
@@ -493,8 +494,38 @@ public class ConnectionURLTest extends TestCase
}
}
+ public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+ assertTrue(service.getTransport().equals("tcp"));
+
+
+ assertTrue(service.getHost().equals("localhost"));
+ assertTrue(service.getPort() == 5672);
+ assertEquals("jim",service.getProperty("foo"));
+ assertEquals("bob",service.getProperty("bar"));
+ assertEquals("jimmy",service.getProperty("fred"));
+
+ assertTrue(connectionurl.getOption("routingkey").equals("jim"));
+ assertTrue(connectionurl.getOption("timeout").equals("200"));
+ assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
}
}
+