diff options
Diffstat (limited to 'java/client/src')
35 files changed, 927 insertions, 812 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0abcc8ef26..472eaef5b5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -26,7 +26,6 @@ import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; @@ -76,11 +75,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); private int _size = 0; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; - public AMQSession get(int channelId) { - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { return _fastAccessSessions[channelId]; } @@ -93,7 +91,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQSession put(int channelId, AMQSession session) { AMQSession oldVal; - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { oldVal = _fastAccessSessions[channelId]; _fastAccessSessions[channelId] = session; @@ -102,11 +100,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { oldVal = _slowAccessSessions.put(channelId, session); } - if((oldVal != null) && (session == null)) + if ((oldVal != null) && (session == null)) { _size--; } - else if((oldVal == null) && (session != null)) + else if ((oldVal == null) && (session != null)) { _size++; } @@ -115,13 +113,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } - public AMQSession remove(int channelId) { AMQSession session; - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { - session = _fastAccessSessions[channelId]; + session = _fastAccessSessions[channelId]; _fastAccessSessions[channelId] = null; } else @@ -129,7 +126,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect session = _slowAccessSessions.remove(channelId); } - if(session != null) + if (session != null) { _size--; } @@ -141,9 +138,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); - for(int i = 0; i < 16; i++) + for (int i = 0; i < 16; i++) { - if(_fastAccessSessions[i] != null) + if (_fastAccessSessions[i] != null) { values.add(_fastAccessSessions[i]); } @@ -162,14 +159,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _size = 0; _slowAccessSessions.clear(); - for(int i = 0; i<16; i++) + for (int i = 0; i < 16; i++) { _fastAccessSessions[i] = null; } } } - private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); protected AtomicInteger _idFactory = new AtomicInteger(0); @@ -211,7 +207,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** The virtual path to connect to on the AMQ server */ private String _virtualHost; - protected ExceptionListener _exceptionListener; @@ -252,15 +247,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this protected AMQConnectionDelegate _delegate; - + // this connection maximum number of prefetched messages private long _maxPrefetch; //Indicates whether persistent messages are synchronized private boolean _syncPersistence; - - /** used to hold a list of all exceptions that have been thrown during connection construction. gross */ - final ArrayList<Exception> _exceptions = new ArrayList<Exception>(); /** * @param broker brokerdetails @@ -337,20 +329,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception - * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. + * was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon. */ public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { // set this connection maxPrefetch if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null) { - _maxPrefetch = Long.parseLong( connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); + _maxPrefetch = Long.parseLong(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH)); } else { // use the defaul value set for all connections _maxPrefetch = Long.valueOf(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, - ClientProperties.MAX_PREFETCH_DEFAULT)); + ClientProperties.MAX_PREFETCH_DEFAULT)); } if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null) @@ -378,25 +370,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - - class Listener implements ExceptionListener - { - public void onException(JMSException e) - { - _exceptions.add(e); - } - } - - try - { - setExceptionListener(new Listener()); - } - catch (JMSException e) - { - // Shouldn't happen - throw new AMQException(null, null, e); - } - if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); @@ -436,15 +409,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName(); } - - _protocolHandler = new AMQProtocolHandler(this); + _protocolHandler = new AMQProtocolHandler(this); // We are not currently connected _connected = false; // TMG FIXME this seems... wrong... boolean retryAllowed = true; - while (!_connected && retryAllowed ) + Exception connectionException = null; + while (!_connected && retryAllowed) { try { @@ -456,37 +429,29 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _logger.info(pe.getMessage()); _logger.info("Trying broker supported protocol version: " + - TransportConstants.getVersionMajor() + "." + - TransportConstants.getVersionMinor()); + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor()); } // we need to check whether we have a delegate for the supported protocol getDelegate(); } catch (Exception e) { - _exceptions.add(e); if (_logger.isInfoEnabled()) { - _logger.info("Unable to connect to broker at " + - _failoverPolicy.getCurrentBrokerDetails(), - e); + _logger.info("Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails(), + e); } + connectionException = e; } - + if (!_connected) { retryAllowed = _failoverPolicy.failoverAllowed(); brokerDetails = _failoverPolicy.getNextBrokerDetails(); } } - try - { - setExceptionListener(null); - } - catch (JMSException e1) - { - // Can't happen - } if (_logger.isDebugEnabled()) { @@ -496,26 +461,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { String message = null; - try - { - Thread.sleep(150); - } - catch (InterruptedException e) - { - // Eat it, we've hopefully got all the exceptions if this happened - } - - Exception lastException = null; - if (_exceptions.size() > 0) + + if (connectionException != null) { - lastException = _exceptions.get(_exceptions.size() - 1); - if (lastException.getCause() != null) + if (connectionException.getCause() != null) { - message = lastException.getCause().getMessage(); + message = connectionException.getCause().getMessage(); } else { - message = lastException.getMessage(); + message = connectionException.getMessage(); } } @@ -527,20 +482,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else // can only be "" if getMessage() returned it therfore lastException != null { - message = "Unable to Connect:" + lastException.getClass(); + message = "Unable to Connect:" + connectionException.getClass(); } } - AMQException e = new AMQConnectionFailureException(message, _exceptions); - - if (lastException != null) + AMQException e = new AMQConnectionFailureException(message, connectionException); + + if (connectionException != null) { - if (lastException instanceof UnresolvedAddressException) + if (connectionException instanceof UnresolvedAddressException) { e = new AMQUnresolvedAddressException(message, _failoverPolicy.getCurrentBrokerDetails().toString(), null); } - + } throw e; } @@ -565,18 +520,18 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { Class c = Class.forName("org.apache.qpid.client.AMQConnectionDelegate_" + - TransportConstants.getVersionMajor() + "_" + - TransportConstants.getVersionMinor()); - Class partypes[] = new Class[1]; + TransportConstants.getVersionMajor() + "_" + + TransportConstants.getVersionMinor()); + Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this); } catch (Exception e) { throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, - "Protocol: " + TransportConstants.getVersionMajor() + "." - + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + - "currently supported by this client library implementation", e); + "Protocol: " + TransportConstants.getVersionMajor() + "." + + TransportConstants.getVersionMinor() + " is rquired by the broker but is not " + + "currently supported by this client library implementation", e); } } @@ -867,7 +822,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - } } @@ -892,14 +846,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - public void close() throws JMSException + public void close() throws JMSException { close(DEFAULT_TIMEOUT); } public void close(long timeout) throws JMSException { - close(new ArrayList<AMQSession>(_sessions.values()),timeout); + close(new ArrayList<AMQSession>(_sessions.values()), timeout); } public void close(List<AMQSession> sessions, long timeout) throws JMSException @@ -912,12 +866,12 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void doClose(List<AMQSession> sessions, long timeout) throws JMSException { - synchronized(_sessionCreationLock) + synchronized (_sessionCreationLock) { - if(!sessions.isEmpty()) + if (!sessions.isEmpty()) { AMQSession session = sessions.remove(0); - synchronized(session.getMessageDeliveryLock()) + synchronized (session.getMessageDeliveryLock()) { doClose(sessions, timeout); } @@ -1120,7 +1074,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions; } - + public String getUsername() { return _username; @@ -1297,6 +1251,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (cause instanceof IOException) { closer = !_closed.getAndSet(true); + + _protocolHandler.getProtocolSession().notifyError(je); } if (_exceptionListener != null) @@ -1339,7 +1295,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (cause instanceof AMQException) { - return ((AMQException)cause).isHardError(); + return ((AMQException) cause).isHardError(); } return true; @@ -1483,7 +1439,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ public long getMaxPrefetch() { - return _maxPrefetch; + return _maxPrefetch; } /** @@ -1495,14 +1451,4 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _syncPersistence; } - - public Exception getLastException() - { - if (_exceptions.size() > 0) - { - return _exceptions.get(_exceptions.size() - 1); - } - return null; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 5074658070..aab094ca7d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -32,12 +32,12 @@ import java.util.Set; import javax.jms.JMSException; import javax.jms.XASession; -import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; @@ -84,11 +84,15 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate final Set<AMQState> openOrClosedStates = EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); + + StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); + TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up - AMQState state = _conn._protocolHandler.attainState(openOrClosedStates); + AMQState state = waiter.await(); + if(state == AMQState.CONNECTION_OPEN) { _conn._failoverPolicy.attainedConnection(); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 6755e4da5f..2f593ce0c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -79,6 +79,8 @@ import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -90,22 +92,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> * </table> * * @todo Different FailoverSupport implementation are needed on the same method call, in different situations. For - * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second - * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of - * the fail-over process, the retry handler could be used to automatically retry the operation once the connection - * has been reestablished. All fail-over protected operations should be placed in private methods, with - * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the - * fail-over process sets a nowait flag and uses an async method call instead. - * + * example, when failing-over and reestablishing the bindings, the bind cannot be interrupted by a second + * fail-over, if it fails with an exception, the fail-over process should also fail. When binding outside of + * the fail-over process, the retry handler could be used to automatically retry the operation once the connection + * has been reestablished. All fail-over protected operations should be placed in private methods, with + * FailoverSupport passed in by the caller to provide the correct support for the calling context. Sometimes the + * fail-over process sets a nowait flag and uses an async method call instead. * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this, - * after looking at worse bottlenecks first. + * after looking at worse bottlenecks first. */ public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -114,10 +114,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); - public BasicMessageConsumer get(int id) { - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { return _fastAccessConsumers[id]; } @@ -130,7 +129,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) { BasicMessageConsumer oldVal; - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { oldVal = _fastAccessConsumers[id]; _fastAccessConsumers[id] = consumer; @@ -144,13 +143,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - public BasicMessageConsumer remove(int id) { BasicMessageConsumer consumer; - if((id & 0xFFFFFFF0) == 0) + if ((id & 0xFFFFFFF0) == 0) { - consumer = _fastAccessConsumers[id]; + consumer = _fastAccessConsumers[id]; _fastAccessConsumers[id] = null; } else @@ -166,9 +164,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); - for(int i = 0; i < 16; i++) + for (int i = 0; i < 16; i++) { - if(_fastAccessConsumers[i] != null) + if (_fastAccessConsumers[i] != null) { values.add(_fastAccessConsumers[i]); } @@ -178,11 +176,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return values; } - public void clear() { _slowAccessConsumers.clear(); - for(int i = 0; i<16; i++) + for (int i = 0; i < 16; i++) { _fastAccessConsumers[i] = null; } @@ -280,19 +277,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ protected final FlowControllingBlockingQueue _queue; - /** - * Holds the highest received delivery tag. - */ + /** Holds the highest received delivery tag. */ private final AtomicLong _highestDeliveryTag = new AtomicLong(-1); - /** - * All the not yet acknowledged message tags - */ + /** All the not yet acknowledged message tags */ protected ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new ConcurrentLinkedQueue<Long>(); - /** - * All the delivered message tags - */ + /** All the delivered message tags */ protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new ConcurrentLinkedQueue<Long>(); /** Holds the dispatcher thread for this session. */ @@ -315,9 +306,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * consumer. */ protected final IdToConsumerMap _consumers = new IdToConsumerMap(); - - //Map<AMQShortString, BasicMessageConsumer> _consumers = - //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + + //Map<AMQShortString, BasicMessageConsumer> _consumers = + //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** * Contains a list of consumers which have been removed but which might still have @@ -380,15 +371,13 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /** Has failover occured on this session */ private boolean _failedOver; - - private static final class FlowControlIndicator { private volatile boolean _flowControl = true; public synchronized void setFlowControl(boolean flowControl) { - _flowControl= flowControl; + _flowControl = flowControl; notify(); } @@ -450,8 +439,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void aboveThreshold(int currentValue) { _logger.debug( - "Above threshold(" + _defaultPrefetchHighMark - + ") so suspending channel. Current value is " + currentValue); + "Above threshold(" + _defaultPrefetchHighMark + + ") so suspending channel. Current value is " + currentValue); _suspendState.set(true); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -460,8 +449,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void underThreshold(int currentValue) { _logger.debug( - "Below threshold(" + _defaultPrefetchLowMark - + ") so unsuspending channel. Current value is " + currentValue); + "Below threshold(" + _defaultPrefetchLowMark + + ") so unsuspending channel. Current value is " + currentValue); _suspendState.set(false); new Thread(new SuspenderRunner(_suspendState)).start(); @@ -503,6 +492,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess close(-1); } + public void checkNotClosed() throws JMSException + { + // if the Connection has closed then we should throw any exception that has occured that we were not waiting for + AMQStateManager manager = _connection.getProtocolHandler().getStateManager(); + if (isClosed() && manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && manager.getLastException() != null) + { + JMSException jmse = new IllegalStateException("Object " + toString() + " has been closed"); + jmse.setLinkedException(manager.getLastException()); + throw jmse; + } + super.checkNotClosed(); + } + public BytesMessage createBytesMessage() throws JMSException { checkNotClosed(); @@ -519,7 +521,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (isClosed()) { throw new IllegalStateException("Session is already closed"); - } + } else if (hasFailedOver()) { throw new IllegalStateException("has failed over"); @@ -564,39 +566,35 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param exchangeName The exchange to bind the queue on. * * @throws AMQException If the queue cannot be bound for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName,final AMQDestination destination) throws AMQException + final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName,routingKey,arguments,exchangeName,destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); return null; } }, _connection).execute(); } - public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException { - if( consumer.getQueuename() != null) + if (consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd); + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd); } } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; /** - * Closes the session. * * <p/>Note that this operation succeeds automatically if a fail-over interupts the sycnronous request to close @@ -606,14 +604,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param timeout The timeout in milliseconds to wait for the session close acknoledgement from the broker. * * @throws JMSException If the JMS provider fails to close the session due to some internal error. - * * @todo Be aware of possible changes to parameter order as versions change. - * * @todo Not certain about the logic of ignoring the failover exception, because the channel won't be - * re-opened. May need to examine this more carefully. - * + * re-opened. May need to examine this more carefully. * @todo Note that taking the failover mutex doesn't prevent this operation being interrupted by a failover, - * because the failover process sends the failover event before acquiring the mutex itself. + * because the failover process sends the failover event before acquiring the mutex itself. */ public void close(long timeout) throws JMSException { @@ -621,7 +616,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); _logger.info("Closing session: " + this); // + ":" - // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } // Ensure we only try and close an open session. @@ -638,7 +633,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess try { - sendClose(timeout); + sendClose(timeout); } catch (AMQException e) { @@ -705,7 +700,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess amqe = new AMQException("Closing session forcibly", e); } - _connection.deregisterSession(_channelId); closeProducersAndConsumers(amqe); } @@ -723,12 +717,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does * not mean that the commit is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void commit() throws JMSException { - checkTransacted(); + checkTransacted(); try { @@ -792,10 +785,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // consumer.markClosed(); - - if (consumer.isAutoClose()) - { + { // There is a small window where the message is between the two queues in the dispatcher. if (consumer.isClosed()) { @@ -863,7 +854,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -881,7 +871,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector, null, false, false); } - public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { @@ -891,7 +880,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess messageSelector, null, false, false); } - public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException { @@ -928,7 +916,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException + throws JMSException { checkNotClosed(); checkValidTopic(topic); @@ -996,7 +984,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1022,7 +1010,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - /** * Declares the named queue. * @@ -1034,7 +1021,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param exclusive Flag to indicate that the queue is exclusive to this client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1043,7 +1029,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess createQueue(name, autoDelete, durable, exclusive, null); } - /** * Declares the named queue. * @@ -1056,7 +1041,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param arguments Arguments used to set special properties of the queue * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, @@ -1073,7 +1057,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException; + final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException; + /** * Creates a QueueReceiver * @@ -1191,7 +1176,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); } - /** * Creates a non-durable subscriber with a message selector * @@ -1382,7 +1366,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (message instanceof ReturnMessage) { // Return of the bounced message. - returnBouncedMessage((ReturnMessage)message); + returnBouncedMessage((ReturnMessage) message); } else { @@ -1398,7 +1382,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler, false); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } /** @@ -1413,7 +1397,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * <li>Stop message delivery.</li> * <li>Mark all messages that might have been delivered but not acknowledged as "redelivered". * <li>Restart the delivery sequence including all unacknowledged messages that had been previously delivered. - * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> + * Redelivered messages do not have to be delivered in exactly their original delivery order.</li> * </ul> * * <p/>If the recover operation is interrupted by a fail-over, between asking that the broker begin recovery and @@ -1503,7 +1487,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @throws JMSException If the JMS provider fails to rollback the transaction due to some internal error. This does * not mean that the rollback is known to have failed, merely that it is not known whether it * failed or not. - * * @todo Be aware of possible changes to parameter order as versions change. */ public void rollback() throws JMSException @@ -1545,8 +1528,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract void releaseForRollback(); - public abstract void sendRollback() throws AMQException, FailoverException ; - + public abstract void sendRollback() throws AMQException, FailoverException; public void run() { @@ -1673,8 +1655,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess ft.addAll(rawSelector); } - BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow, - noLocal,exclusive, messageSelector, ft, noConsume, autoClose); + BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, + noLocal, exclusive, messageSelector, ft, noConsume, autoClose); if (_messageListener != null) { @@ -1718,8 +1700,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException; + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException; /** * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer @@ -1782,12 +1764,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @return <tt>true</tt> if the queue is bound to the exchange and routing key, <tt>false</tt> if not. * * @throws JMSException If the query fails for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException; - + public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; /** @@ -1828,10 +1809,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * * @throws AMQException If the session cannot be started for any reason. - * * @todo This should be controlled by _stopped as it pairs with the stop method fixme or check the - * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages - * for each subsequent call to flow.. only need to do this if we have called stop. + * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages + * for each subsequent call to flow.. only need to do this if we have called stop. */ void start() throws AMQException { @@ -2032,7 +2012,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } // at this point the _consumers map will be empty - if (_dispatcher != null) + if (_dispatcher != null) { _dispatcher.close(); _dispatcher = null; @@ -2124,7 +2104,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, - AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException; + AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException; private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException @@ -2143,7 +2123,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess checkNotClosed(); long producerId = getNextProducerId(); BasicMessageProducer producer = createMessageProducer(destination, mandatory, - immediate, waitUntilSent, producerId); + immediate, waitUntilSent, producerId); registerProducer(producerId, producer); return producer; @@ -2152,20 +2132,19 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, long producerId); + final boolean immediate, final boolean waitUntilSent, long producerId); private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } - /** * Returns the number of messages currently queued for the given destination. * * <p/>Note that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to be checked + * @param amqd The destination to be checked * * @return the number of queued messages. * @@ -2198,7 +2177,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param nowait * * @throws AMQException If the exchange cannot be declared for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ private void declareExchange(final AMQShortString name, final AMQShortString type, @@ -2215,8 +2193,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; - + final boolean nowait) throws AMQException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2234,9 +2211,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * the client. * * @throws AMQException If the queue cannot be declared for any reason. - * * @todo Verify the destiation is valid or throw an exception. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, @@ -2262,7 +2237,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2272,7 +2247,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @param queueName The name of the queue to delete. * * @throws JMSException If the queue could not be deleted for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected void deleteQueue(final AMQShortString queueName) throws JMSException @@ -2294,7 +2268,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } - public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; + public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; private long getNextProducerId() { @@ -2384,7 +2358,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.setQueuename(queueName); // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2469,7 +2443,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (_logger.isDebugEnabled()) { _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:" - + message.getDeliveryTag()); + + message.getDeliveryTag()); } messages.remove(); @@ -2519,10 +2493,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, msg.getExchange(), - msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); - AMQShortString reason = msg.getReplyText(); - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); + msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies()); + AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode()); + AMQShortString reason = msg.getReplyText(); + _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. if (errorCode == AMQConstant.NO_CONSUMERS) @@ -2557,7 +2531,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * should be unsuspended. * * @throws AMQException If the session cannot be suspended for any reason. - * * @todo Be aware of possible changes to parameter order as versions change. */ protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException @@ -2598,7 +2571,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess return getAMQConnection().getMaxPrefetch() > 0; } - /** Signifies that the session has pending sends to commit. */ public void markDirty() { @@ -2642,12 +2614,11 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess _flowControl.setFlowControl(active); } - public void checkFlowControl() throws InterruptedException { - synchronized(_flowControl) + synchronized (_flowControl) { - while(!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl()) { _flowControl.wait(); } @@ -2655,7 +2626,6 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } - /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ class Dispatcher extends Thread { @@ -2856,7 +2826,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess //if (message.getDeliverBody() != null) //{ final BasicMessageConsumer consumer = - _consumers.get(message.getConsumerTag().toIntValue()); + _consumers.get(message.getConsumerTag().toIntValue()); if ((consumer == null) || consumer.isClosed()) { @@ -2865,14 +2835,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (consumer == null) { _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " - + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); + + message.getDeliveryTag() + "] from queue " + + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliveryTag() + "] from queue " + " consumer(" - + message.getConsumerTag() + ") is closed rejecting(requeue)..."); + + message.getDeliveryTag() + "] from queue " + " consumer(" + + message.getConsumerTag() + ") is closed rejecting(requeue)..."); } } // Don't reject if we're already closing @@ -2930,7 +2900,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { try { - synchronized(_suspensionLock) + synchronized (_suspensionLock) { suspendChannel(_suspend.get()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index e0e319250e..82bff1dda7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -482,7 +482,7 @@ public final class AMQSession_0_8 extends AMQSession false, null).generateFrame(_channelId); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); return okHandler._messageCount; } diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 76c899f565..927f660932 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -139,11 +139,15 @@ public class FailoverHandler implements Runnable // have a state waiter waiting until the connection is closed for some reason. Or in future we may have // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); - _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession())); + + _amqProtocolHandler.setStateManager(new AMQStateManager()); + + if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { _logger.info("Failover process veto-ed by client"); + //Restore Existing State Manager _amqProtocolHandler.setStateManager(existingStateManager); //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that @@ -181,13 +185,19 @@ public class FailoverHandler implements Runnable if (!failoverSucceeded) { + //Restore Existing State Manager _amqProtocolHandler.setStateManager(existingStateManager); + _amqProtocolHandler.getConnection().exceptionReceived( new AMQDisconnectedException("Server closed connection and no failover " + "was successful", null)); } else { + // Set the new Protocol Session in the StateManager. + existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession()); + + //Restore Existing State Manager _amqProtocolHandler.setStateManager(existingStateManager); try { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java index 5bd36aa88b..365fed6aa5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java @@ -5,14 +5,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.framing.*; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.protocol.AMQConstant; public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody> { @@ -25,11 +19,10 @@ public class AccessRequestOkMethodHandler implements StateAwareMethodListener<Ac return _handler; } - public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId) + public void methodReceived(AMQProtocolSession session, AccessRequestOkBody method, int channelId) throws AMQException { _logger.debug("AccessRequestOk method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); session.setTicket(method.getTicket(), channelId); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java index e3e08667d8..5cb9412d51 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java @@ -22,11 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +42,9 @@ public class BasicCancelOkMethodHandler implements StateAwareMethodListener<Basi private BasicCancelOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, BasicCancelOkBody body, int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - - - if (_logger.isInfoEnabled()) { _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag()); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 4deaa314ec..6029e7c171 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -21,10 +21,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; import org.slf4j.Logger; @@ -41,10 +39,9 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic return _instance; } - public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId) - throws AMQException + public void methodReceived(AMQProtocolSession session, BasicDeliverBody body, int channelId) + throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8( channelId, body.getDeliveryTag(), @@ -52,7 +49,7 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic body.getExchange(), body.getRoutingKey(), body.getRedelivered()); - _logger.debug("New JmsDeliver method received"); + _logger.debug("New JmsDeliver method received:" + session); session.unprocessedMessageReceived(msg); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 682c3ac2c1..5731fb7473 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -22,13 +22,9 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.ReturnMessage; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +41,10 @@ public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicR } - public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId) + public void methodReceived(AMQProtocolSession session, BasicReturnBody body, int channelId) throws AMQException { _logger.debug("New JmsBounce method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); final ReturnMessage msg = new ReturnMessage(channelId, body.getExchange(), body.getRoutingKey(), diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index ee4cf14d58..2b6745ebe4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -26,14 +26,12 @@ import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,12 +47,10 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann return _handler; } - public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId) throws AMQException { _logger.debug("ChannelClose method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); AMQShortString reason = method.getReplyText(); diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java index 8d3277d4de..9a9a0b4e63 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java @@ -23,9 +23,7 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +39,11 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener<Cha return _instance; } - public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelCloseOkBody method, int channelId) throws AMQException { _logger.info("Received channel-close-ok for channel-id " + channelId); - final AMQProtocolSession session = stateManager.getProtocolSession(); // todo this should do the local closure } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java index b47fe751d6..2153b9cc8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java @@ -2,7 +2,6 @@ package org.apache.qpid.client.handler; import org.apache.qpid.framing.ChannelFlowBody; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.slf4j.Logger; @@ -42,11 +41,9 @@ public class ChannelFlowMethodHandler implements StateAwareMethodListener<Channe private ChannelFlowMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelFlowBody body, int channelId) throws AMQException { - - final AMQProtocolSession session = stateManager.getProtocolSession(); session.setFlowControl(channelId, body.getActive()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java index 96de8af54b..6f66a972d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +41,7 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener<Chan private ChannelFlowOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelFlowOkBody body, int channelId) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index afb7517a12..a0f3808b23 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -27,31 +27,33 @@ import org.apache.qpid.framing.*; import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQMethodNotImplementedException; - +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientMethodDispatcherImpl implements MethodDispatcher { - - private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); - private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); - private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); - private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); - private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); - private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); - private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); + private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); + private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); + private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); + private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); + private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); + private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); - private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); - private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); - private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); - private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); - private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); + private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); + private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); + private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); + private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); + private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); + private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class); private static interface DispatcherFactory { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager); + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session); } private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories = @@ -62,44 +64,40 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher _dispatcherFactories.put(ProtocolVersion.v8_0, new DispatcherFactory() { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session) { - return new ClientMethodDispatcherImpl_8_0(stateManager); + return new ClientMethodDispatcherImpl_8_0(session); } }); _dispatcherFactories.put(ProtocolVersion.v0_9, new DispatcherFactory() { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session) { - return new ClientMethodDispatcherImpl_0_9(stateManager); + return new ClientMethodDispatcherImpl_0_9(session); } }); } - - public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager) + public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session) { + _logger.error("New Method Dispatcher:" + session); DispatcherFactory factory = _dispatcherFactories.get(version); - return factory.createMethodDispatcher(stateManager); + return factory.createMethodDispatcher(session); } - - + AMQProtocolSession _session; - private AMQStateManager _stateManager; - - public ClientMethodDispatcherImpl(AMQStateManager stateManager) + public ClientMethodDispatcherImpl(AMQProtocolSession session) { - _stateManager = stateManager; + _session = session; } - public AMQStateManager getStateManager() { - return _stateManager; + return _session.getStateManager(); } public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException @@ -109,7 +107,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException { - _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId); + _basicCancelOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -120,7 +118,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException { - _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId); + _basicDeliverMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -141,13 +139,13 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException { - _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId); + _basicReturnMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException { - _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId); + _channelCloseMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -163,7 +161,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException { - _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId); + _channelFlowOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -174,7 +172,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException { - _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionCloseMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -185,37 +183,37 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException { - _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionOpenOkMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException { - _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionRedirectMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException { - _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionSecureMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException { - _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionStartMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException { - _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId); + _connectionTuneMethodHandler.methodReceived(_session, body, channelId); return true; } public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException { - _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId); + _queueDeleteOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -431,7 +429,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException { - _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId); + _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId); return true; } @@ -522,7 +520,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException { - return false; + return false; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java index e235368357..d3e9fba8ed 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java @@ -26,16 +26,15 @@ import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; import org.apache.qpid.AMQException; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQMethodNotImplementedException; - +import org.apache.qpid.client.protocol.AMQProtocolSession; public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 { - public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager) + public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session) { - super(stateManager); + super(session); } - public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException { return false; @@ -148,8 +147,7 @@ public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl i public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException { - return false; + return false; } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index b0f003cd2d..19f758817d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -24,13 +24,13 @@ import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 { - public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager) + public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session) { - super(stateManager); + super(session); } public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 950a3288fc..bc82d6bc62 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -25,13 +25,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,14 +46,13 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co } private ConnectionCloseMethodHandler() - { } + { + } - public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId) - throws AMQException + public void methodReceived(AMQProtocolSession session, ConnectionCloseBody method, int channelId) + throws AMQException { _logger.info("ConnectionClose frame received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - // does it matter // stateManager.changeState(AMQState.CONNECTION_CLOSING); @@ -63,6 +60,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); AMQShortString reason = method.getReplyText(); + AMQException error = null; + try { @@ -75,35 +74,33 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co { if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED)) { - _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName()); - - // todo ritchiem : Why do this here when it is going to be done in the finally block? - session.closeProtocolSession(); + _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName()); - // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. - stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - - throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); + error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null); } else { _logger.info("Connection close received with error code " + errorCode); - throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null); + error = new AMQConnectionClosedException(errorCode, "Error: " + reason, null); } } } finally { - // this actually closes the connection in the case where it is not an error. + if (error != null) + { + session.notifyError(error); + } + + // Close the protocol Session, including any open TCP connections session.closeProtocolSession(); - // ritchiem: Doing this though will cause any waiting connection start to be released without being able to - // see what the cause was. - stateManager.changeState(AMQState.CONNECTION_CLOSED); + // Closing the session should not introduce a race condition as this thread will continue to propgate any + // exception in to the exceptionCaught method of the SessionHandler. + // Any sessionClosed event should occur after this. } } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java index fd7acac84f..e639a33450 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java @@ -24,9 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionOpenOkBody; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody> { @@ -41,10 +39,10 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<C { } - public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId) throws AMQException { - stateManager.changeState(AMQState.CONNECTION_OPEN); + session.getStateManager().changeState(AMQState.CONNECTION_OPEN); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index cac68c9467..472c471fd6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionRedirectBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +44,10 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener private ConnectionRedirectMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionRedirectBody method, int channelId) throws AMQException { _logger.info("ConnectionRedirect frame received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); String host = method.getHost().toString(); // the host is in the form hostname:port with the port being optional diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java index 900aa2abac..9a9bee757b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java @@ -25,12 +25,9 @@ import javax.security.sasl.SaslException; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody> { @@ -41,10 +38,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener<C return _instance; } - public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionSecureBody body, int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); SaslClient client = session.getSaslClient(); if (client == null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index d3746f137e..8857f1115a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -25,7 +25,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.security.AMQCallbackHandler; import org.apache.qpid.client.security.CallbackHandlerRegistry; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.common.QpidProperties; @@ -35,7 +34,6 @@ import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,15 +60,12 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co private ConnectionStartMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId) throws AMQException { _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, " + "AMQMethodEvent evt): called"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - - ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor()); // For the purposes of interop, we can make the client accept the broker's version string. @@ -145,7 +140,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener<Co throw new AMQException(null, "No locales sent from server, passed: " + locales, null); } - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java index fc0e40b745..e4e58c317d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java @@ -24,10 +24,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +44,10 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con protected ConnectionTuneMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId) + public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId) throws AMQException { _logger.debug("ConnectionTune frame received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); final MethodRegistry methodRegistry = session.getMethodRegistry(); @@ -65,7 +62,7 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener<Con params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); session.setConnectionTuneParameters(params); - stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED); ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(), params.getFrameMax(), diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java index 8de40beb10..690d782b40 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +44,7 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener<Ex private ExchangeBoundOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, ExchangeBoundOkBody body, int channelId) throws AMQException { if (_logger.isDebugEnabled()) diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java index 41225c0569..01d82c9b55 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java @@ -22,10 +22,8 @@ package org.apache.qpid.client.handler; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +44,7 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener<Queu private QueueDeleteOkMethodHandler() { } - public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId) + public void methodReceived(AMQProtocolSession session, QueueDeleteOkBody body, int channelId) throws AMQException { if (_logger.isDebugEnabled()) diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 1b75d6e829..8ac4f843de 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.failover.FailoverHandler; import org.apache.qpid.client.failover.FailoverState; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; @@ -100,23 +101,22 @@ import java.util.concurrent.CountDownLatch; * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> Create the filter chain to filter this handlers events. - * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. + * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}. * * <tr><td> Maintain fail-over state. * <tr><td> * </table> * * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the - * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing - * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec - * filter before it mean not doing the read/write asynchronously but in the main filter thread? - * + * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing + * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec + * filter before it mean not doing the read/write asynchronously but in the main filter thread? * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including - * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of - * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could - * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data - * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so - * that lifecycles of the fields match lifecycles of their containing objects. + * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of + * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could + * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data + * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so + * that lifecycles of the fields match lifecycles of their containing objects. */ public class AMQProtocolHandler extends IoHandlerAdapter { @@ -136,7 +136,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter private AMQStateManager _stateManager = new AMQStateManager(); /** Holds the method listeners, */ - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>(); /** * We create the failover handler when the session is created since it needs a reference to the IoSession in order @@ -154,14 +154,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; - /** The last failover exception that occured */ private FailoverException _lastFailoverException; /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -245,7 +243,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); } } - _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + _protocolSession = new AMQProtocolSession(this, session, _connection); + + _stateManager.setProtocolSession(_protocolSession); + _protocolSession.init(); } @@ -263,7 +264,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param session The MINA session. * * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and - * not otherwise? The above comment doesn't make that clear. + * not otherwise? The above comment doesn't make that clear. */ public void sessionClosed(IoSession session) { @@ -374,7 +375,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter "cause isn't AMQConnectionClosedException: " + cause, cause); AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); + propagateExceptionToAllWaiters(amqe); } _connection.exceptionReceived(cause); @@ -395,7 +396,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter // we notify the state manager of the error in case we have any clients waiting on a state // change. Those "waiters" will be interrupted and can handle the exception AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); + propagateExceptionToAllWaiters(amqe); _connection.exceptionReceived(cause); } } @@ -405,11 +406,33 @@ public class AMQProtocolHandler extends IoHandlerAdapter * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. * + * This should be called only when the exception is fatal for the connection. + * * @param e the exception to propagate + * + * @see #propagateExceptionToFrameListeners + * @see #propagateExceptionToStateWaiters */ - public void propagateExceptionToWaiters(Exception e) + public void propagateExceptionToAllWaiters(Exception e) + { + propagateExceptionToFrameListeners(e); + propagateExceptionToStateWaiters(e); + } + + /** + * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any + * protocol level waits. + * + * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should + * stop waiting and relinquish the Failover lock {@see FailoverHandler}. + * + * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt + * their protocol request and so listen again for the correct frame. + * + * @param e the exception to propagate + */ + public void propagateExceptionToFrameListeners(Exception e) { - if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -421,6 +444,22 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + /** + * This caters for the case where we only need to propogate an exception to the the state manager to interupt any + * thing waiting for a state change. + * + * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement. + * + * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal + * cases {@link #propagateExceptionToAllWaiters} would be the correct choice. + * + * @param e the exception to propagate + */ + public void propagateExceptionToStateWaiters(Exception e) + { + getStateManager().error(e); + } + public void notifyFailoverStarting() { // Set the last exception in the sync block to ensure the ordering with add. @@ -431,7 +470,9 @@ public class AMQProtocolHandler extends IoHandlerAdapter _lastFailoverException = new FailoverException("Failing over about to start"); } - propagateExceptionToWaiters(_lastFailoverException); + //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be + // interupted unless failover cannot restore the state. + propagateExceptionToFrameListeners(_lastFailoverException); } public void failoverInProgress() @@ -443,7 +484,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { - if(message instanceof AMQFrame) + if (message instanceof AMQFrame) { final boolean debug = _logger.isDebugEnabled(); final long msgNumber = ++_messageReceivedCount; @@ -459,7 +500,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - bodyFrame.handle(frame.getChannel(),_protocolSession); + bodyFrame.handle(frame.getChannel(), _protocolSession); _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -508,20 +549,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (!wasAnyoneInterested) { throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners, null); + + _frameListeners, null); } } catch (AMQException e) - { - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } + { + propagateExceptionToFrameListeners(e); exceptionCaught(session, e); } @@ -548,28 +581,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } - /* - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - */ - public void attainState(AMQState s) throws Exception - { - getStateManager().attainState(s); - } - - public AMQState attainState(Set<AMQState> states) throws AMQException + public StateWaiter createWaiter(Set<AMQState> states) throws AMQException { - return getStateManager().attainState(states); + return getStateManager().createWaiter(states); } - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -617,14 +633,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter { throw _lastFailoverException; } - + _frameListeners.add(listener); } _protocolSession.writeFrame(frame); - AMQMethodEvent e = listener.blockForFrame(timeout); - - return e; + return listener.blockForFrame(timeout); // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } @@ -669,8 +683,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter getStateManager().changeState(AMQState.CONNECTION_CLOSING); ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection."),0,0); - + new AMQShortString("JMS client is closing the connection."), 0, 0); final AMQFrame frame = body.generateFrame(0); @@ -745,10 +758,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; - if (_protocolSession != null) - { - _protocolSession.setStateManager(stateManager); - } } public AMQProtocolSession getProtocolSession() @@ -778,7 +787,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public MethodRegistry getMethodRegistry() { - return getStateManager().getMethodRegistry(); + return _protocolSession.getMethodRegistry(); } public ProtocolVersion getProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 6e782e0bfc..6beec3c9ba 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.security.sasl.SaslClient; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,10 +37,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.message.UnprocessedMessage_0_8; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; @@ -67,8 +66,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final IoSession _minaProtocolSession; - private AMQStateManager _stateManager; - protected WriteFuture _lastWriteFuture; /** @@ -86,7 +83,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>(); + private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>(); private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; /** Counter to ensure unique queue names */ @@ -97,26 +94,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession // private VersionSpecificRegistry _registry = // MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); - private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); - private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { - this(protocolHandler, protocolSession, connection, new AMQStateManager()); - - } - - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, - AMQStateManager stateManager) - { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; _minaProtocolSession.setAttachment(this); @@ -124,11 +111,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); // fixme - real value needed _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _stateManager = stateManager; - _stateManager.setProtocolSession(this); _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - stateManager); + this); _connection = connection; } @@ -161,14 +146,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public AMQStateManager getStateManager() { - return _stateManager; - } - - public void setStateManager(AMQStateManager stateManager) - { - _stateManager = stateManager; - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion, - stateManager); + return _protocolHandler.getStateManager(); } public String getVirtualHost() @@ -238,9 +216,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException { final int channelId = message.getChannelId(); - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { - _channelId2UnprocessedMsgArray[channelId] = message; + _channelId2UnprocessedMsgArray[channelId] = message; } else { @@ -251,17 +229,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] - : _channelId2UnprocessedMsgMap.get(channelId); - + : _channelId2UnprocessedMsgMap.get(channelId); if (msg == null) { - throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null); + throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null); } if (msg.getContentHeader() != null) { - throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null); + throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null); } msg.setContentHeader(contentHeader); @@ -275,7 +252,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { UnprocessedMessage_0_8 msg; final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; - if(fastAccess) + if (fastAccess) { msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId]; } @@ -291,7 +268,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession if (msg.getContentHeader() == null) { - if(fastAccess) + if (fastAccess) { _channelId2UnprocessedMsgArray[channelId] = null; } @@ -333,7 +310,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { AMQSession session = getSession(channelId); session.messageReceived(msg); - if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) { _channelId2UnprocessedMsgArray[channelId] = null; } @@ -431,12 +408,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { closeProtocolSession(true); } - public void closeProtocolSession(boolean waitLast) + public void closeProtocolSession(boolean waitLast) throws AMQException { _logger.debug("Waiting for last write to join."); if (waitLast && (_lastWriteFuture != null)) @@ -446,6 +423,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _logger.debug("Closing protocol session"); final CloseFuture future = _minaProtocolSession.close(); + + // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED + // then wait for the connection to close. + // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any + // error now shouldn't matter. + + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); + future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); } @@ -489,9 +474,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager); + _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); - // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); } public byte getProtocolMinorVersion() @@ -524,12 +509,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return _methodDispatcher; } - public void setTicket(int ticket, int channelId) { final AMQSession session = getSession(channelId); session.setTicket(ticket); } + public void setMethodDispatcher(MethodDispatcher methodDispatcher) { _methodDispatcher = methodDispatcher; @@ -545,4 +530,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); } + + public void notifyError(Exception error) + { + _protocolHandler.propagateExceptionToAllWaiters(error); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 0ab2e07340..2bc609ebf2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -20,9 +20,14 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.util.BlockingWaiter; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -54,38 +59,17 @@ import org.apache.qpid.protocol.AMQMethodListener; * </table> * * @todo Might be neater if this method listener simply wrapped another that provided the method handling using a - * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations - * seem to use it. So wrapping the listeners is possible. - * - * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener, - * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot - * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for - * method has been received. - * - * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull - * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry - * when this happens. At the very least, restore the interrupted status flag. - * + * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations + * seem to use it. So wrapping the listeners is possible. * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to - * check that SynchronousQueue has a non-blocking put method available. + * check that SynchronousQueue has a non-blocking put method available. */ -public abstract class BlockingMethodFrameListener implements AMQMethodListener +public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener { - /** This flag is used to indicate that the blocked for method has been received. */ - private volatile boolean _ready = false; - - /** Used to protect the shared event and ready flag between the producer and consumer. */ - private final Object _lock = new Object(); - - /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ - private volatile Exception _error; /** Holds the channel id for the channel upon which this listener is waiting for a response. */ protected int _channelId; - /** Holds the incoming method. */ - protected AMQMethodEvent _doneEvt = null; - /** * Creates a new method listener, that filters incoming method to just those that match the specified channel id. * @@ -104,7 +88,14 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * * @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise. */ - public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException; + public abstract boolean processMethod(int channelId, AMQMethodBody frame); + + public boolean process(AMQMethodEvent evt) + { + AMQMethodBody method = evt.getMethod(); + + return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); + } /** * Informs this listener that an AMQP method has been received. @@ -113,37 +104,9 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * * @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise. */ - public boolean methodReceived(AMQMethodEvent evt) // throws AMQException + public boolean methodReceived(AMQMethodEvent evt) { - AMQMethodBody method = evt.getMethod(); - - /*try - {*/ - boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method); - - if (ready) - { - // we only update the flag from inside the synchronized block - // so that the blockForFrame method cannot "miss" an update - it - // will only ever read the flag from within the synchronized block - synchronized (_lock) - { - _doneEvt = evt; - _ready = ready; - _lock.notify(); - } - } - - return ready; - - /*} - catch (AMQException e) - { - error(e); - // we rethrow the error here, and the code in the frame dispatcher will go round - // each listener informing them that an exception has been thrown - throw e; - }*/ + return received(evt); } /** @@ -159,75 +122,15 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener */ public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException { - synchronized (_lock) + try { - while (!_ready) - { - try - { - if (timeout == -1) - { - _lock.wait(); - } - else - { - - _lock.wait(timeout); - if (!_ready) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); - _ready = true; - } - } - } - catch (InterruptedException e) - { - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - // if (!_ready && timeout != -1) - // { - // _error = new AMQException("Server did not respond timely"); - // _ready = true; - // } - } - } + return (AMQMethodEvent) block(timeout); } - - if (_error != null) + finally { - if (_error instanceof AMQException) - { - throw (AMQException) _error; - } - else if (_error instanceof FailoverException) - { - // This should ensure that FailoverException is not wrapped and can be caught. - throw (FailoverException) _error; // needed to expose FailoverException. - } - else - { - throw new AMQException(null, "Woken up due to " + _error.getClass(), _error); - } + //Prevent any more errors being notified to this waiter. + close(); } - - return _doneEvt; } - /** - * This is a callback, called by the MINA dispatcher thread only. It is also called from within this - * class to avoid code repetition but again is only called by the MINA dispatcher thread. - * - * @param e - */ - public void error(Exception e) - { - // set the error so that the thread that is blocking (against blockForFrame()) - // can pick up the exception and rethrow to the caller - _error = e; - - synchronized (_lock) - { - _ready = true; - _lock.notify(); - } - } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 21f190bd7e..7a5d70ad15 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -28,15 +28,30 @@ import org.apache.qpid.protocol.AMQMethodListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** - * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler - * there is a separate state manager. + * The state manager is responsible for managing the state of the protocol session. <p/> + * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager. + * + * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that + * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around. + * + * The StateManager works by any component can wait for a state change to occur by using the following sequence. + * + * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states); + * <li> // Perform action that will cause state change + * <li>waiter.await(); + * + * The two step process is required as there is an inherit race condition between starting a process that will cause + * the state to change and then attempting to wait for that change. The interest in the change must be first set up so + * that any asynchrous errors that occur can be delivered to the correct waiters. + * + * */ -public class AMQStateManager +public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class); @@ -45,16 +60,13 @@ public class AMQStateManager /** The current state */ private AMQState _currentState; - - /** - * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of - * AMQFrame. - */ - - private final Object _stateLock = new Object(); + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); + protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>(); + private Exception _lastException; + public AMQStateManager() { this(null); @@ -62,18 +74,15 @@ public class AMQStateManager public AMQStateManager(AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); + this(AMQState.CONNECTION_NOT_STARTED, protocolSession); } - protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession) + protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession) { _protocolSession = protocolSession; _currentState = state; - } - - public AMQState getCurrentState() { return _currentState; @@ -86,117 +95,101 @@ public class AMQStateManager synchronized (_stateLock) { _currentState = newState; - _stateLock.notifyAll(); + + _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters); + + for (StateWaiter waiter : _waiters) + { + waiter.received(newState); + } } } - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { - B method = evt.getMethod(); - + // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId()); return true; } + /** + * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted. + * + * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the + * connection to the network. + * + * @param session The new protocol session + */ + public void setProtocolSession(AMQProtocolSession session) + { + _logger.error("Setting ProtocolSession:" + session); + _protocolSession = session; + } - public void attainState(final AMQState s) throws Exception + /** + * Propogate error to waiters + * + * @param error The error to propogate. + */ + public void error(Exception error) { - synchronized (_stateLock) + if (_waiters.size() == 0) { - final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; - long waitTime = MAXIMUM_STATE_WAIT_TIME; - - while ((_currentState != s) && (waitTime > 0)) - { - try - { - _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); - } - catch (InterruptedException e) - { - _logger.warn("Thread interrupted"); - if (_protocolSession.getAMQConnection().getLastException() != null) - { - throw _protocolSession.getAMQConnection().getLastException(); - } - - } - - if (_currentState != s) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - - if (_currentState != s) - { - _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + s, null); - } + _logger.error("No Waiters for error saving as last error:" + error.getMessage()); + _lastException = error; + } + for (StateWaiter waiter : _waiters) + { + _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage()); + waiter.error(error); } - - // at this point the state will have changed. } - public AMQProtocolSession getProtocolSession() + /** + * This provides a single place that the maximum time for state change to occur can be accessed. + * It is currently set via System property amqj.MaximumStateWait + * + * @return long Milliseconds value for a timeout + */ + public long getWaitTimeout() { - return _protocolSession; + return MAXIMUM_STATE_WAIT_TIME; } - public void setProtocolSession(AMQProtocolSession session) + /** + * Create and add a new waiter to the notifcation list. + * @param states The waiter will attempt to wait for one of these desired set states to be achived. + * @return the created StateWaiter. + */ + public StateWaiter createWaiter(Set<AMQState> states) { - _protocolSession = session; - } + final StateWaiter waiter; + synchronized (_stateLock) + { + waiter = new StateWaiter(this, _currentState, states); - public MethodRegistry getMethodRegistry() - { - return getProtocolSession().getMethodRegistry(); + _waiters.add(waiter); + } + + return waiter; } - public AMQState attainState(Set<AMQState> stateSet) throws AMQException + /** + * Remove the waiter from the notification list. + * @param waiter The waiter to remove. + */ + public void removeWaiter(StateWaiter waiter) { synchronized (_stateLock) { - final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; - long waitTime = MAXIMUM_STATE_WAIT_TIME; - - while (!stateSet.contains(_currentState) && (waitTime > 0)) - { - try - { - _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); - } - catch (InterruptedException e) - { - _logger.warn("Thread interrupted"); - if (_protocolSession.getAMQConnection().getLastException() != null) - { - throw new AMQException(null, "Could not attain state due to exception", - _protocolSession.getAMQConnection().getLastException()); - } - } - - if (!stateSet.contains(_currentState)) - { - waitTime = waitUntilTime - System.currentTimeMillis(); - } - } - - if (!stateSet.contains(_currentState)) - { - _logger.warn("State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + stateSet); - throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState - + ", desired state: " + stateSet, null); - } - return _currentState; + _waiters.remove(waiter); } + } - + public Exception getLastException() + { + return _lastException; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java index 8c65f56af3..17d04f4fa3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java @@ -33,6 +33,6 @@ import org.apache.qpid.protocol.AMQMethodEvent; public interface StateAwareMethodListener<B extends AMQMethodBody> { - void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException; + void methodReceived(AMQProtocolSession session, B body, int channelId) throws AMQException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java deleted file mode 100644 index df207a0a23..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateListener.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.state; - -import org.apache.qpid.AMQException; - -public interface StateListener -{ - void stateChanged(AMQState oldState, AMQState newState) throws AMQException; - - void error(Throwable t); -} diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java index 8b8453a1b0..4695b195d5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java @@ -20,103 +20,110 @@ */ package org.apache.qpid.client.state; +import org.apache.qpid.client.util.BlockingWaiter; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; - -import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.util.Set; /** - * Waits for a particular state to be reached. + * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state + * changes. + * + * On construction the current state and a set of States to await for is provided. + * + * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is + * a desired state then await() returns immediately. + * + * Otherwise it will block for the set timeout for a desired state to be achieved. + * + * The state changes are notified via the {@link #process} method. + * + * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method. + * */ -public class StateWaiter implements StateListener +public class StateWaiter extends BlockingWaiter<AMQState> { private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class); - private final AMQState _state; - - private volatile boolean _newStateAchieved; - - private volatile Throwable _throwable; - - private final Object _monitor = new Object(); - private static final long TIME_OUT = 1000 * 60 * 2; - - public StateWaiter(AMQState state) + Set<AMQState> _awaitStates; + private AMQState _startState; + private AMQStateManager _stateManager; + + /** + * + * @param stateManager The StateManager + * @param currentState + * @param awaitStates + */ + public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates) { - _state = state; + _logger.info("New StateWaiter :" + currentState + ":" + awaitStates); + _stateManager = stateManager; + _awaitStates = awaitStates; + _startState = currentState; } - public void waituntilStateHasChanged() throws AMQException + /** + * When the state is changed this StateWaiter is notified to process the change. + * + * @param state The new state that has been achieved. + * @return + */ + public boolean process(AMQState state) { - synchronized (_monitor) - { - // - // The guard is required in case we are woken up by a spurious - // notify(). - // - while (!_newStateAchieved && (_throwable == null)) - { - try - { - _logger.debug("State " + _state + " not achieved so waiting..."); - _monitor.wait(TIME_OUT); - // fixme this won't cause the timeout to exit the loop. need to set _throwable - } - catch (InterruptedException e) - { - _logger.debug("Interrupted exception caught while waiting: " + e, e); - } - } - } + return _awaitStates.contains(state); + } - if (_throwable != null) - { - _logger.debug("Throwable reached state waiter: " + _throwable); - if (_throwable instanceof AMQException) - { - throw (AMQException) _throwable; - } - else - { - throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught. - } - } + /** + * Await for the requried State to be achieved within the default timeout. + * @return The achieved state that was requested. + * @throws AMQException The exception that prevented the required state from being achived. + */ + public AMQState await() throws AMQException + { + return await(_stateManager.getWaitTimeout()); } - public void stateChanged(AMQState oldState, AMQState newState) + /** + * Await for the requried State to be achieved. + * + * <b>It is the responsibility of this class to remove the waiter from the StateManager + * + * @param timeout The time in milliseconds to wait for any of the states to be achived. + * @return The achieved state that was requested. + * @throws AMQException The exception that prevented the required state from being achived. + */ + public AMQState await(long timeout) throws AMQException { - synchronized (_monitor) + try { - if (_logger.isDebugEnabled()) + if (process(_startState)) { - _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState); + return _startState; } - if (_state == newState) + try { - _newStateAchieved = true; - - if (_logger.isDebugEnabled()) - { - _logger.debug("New state reached so notifying monitor"); - } + return (AMQState) block(timeout); + } + catch (FailoverException e) + { + _logger.error("Failover occured whilst waiting for states:" + _awaitStates); - _monitor.notifyAll(); + e.printStackTrace(); + return null; } } - } - - public void error(Throwable t) - { - synchronized (_monitor) + finally { - if (_logger.isDebugEnabled()) - { - _logger.debug("exceptionThrown called"); - } + //Prevent any more errors being notified to this waiter. + close(); - _throwable = t; - _monitor.notifyAll(); + //Remove the waiter from the notifcation list in the statee manager + _stateManager.removeWaiter(this); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 623591e0b6..f0d7feb059 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.client.state.listener; -import org.apache.qpid.AMQException; + import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.framing.AMQMethodBody; @@ -34,7 +34,7 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener _expectedClass = expectedClass; } - public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException + public boolean processMethod(int channelId, AMQMethodBody frame) { return _expectedClass.isInstance(frame); } diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java new file mode 100644 index 0000000000..67cda957fb --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; + +/** + * BlockingWaiter is a 'rendezvous' which delegates handling of + * incoming Objects to a listener implemented as a sub-class of this and hands off the process or + * error to a consumer. The producer of the event does not have to wait for the consumer to take the event, so this + * differs from a 'rendezvous' in that sense. + * + * <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response. + * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they + * have been completed. + * + * <p/>The {@link #process} must return <tt>true</tt> on any incoming method that it handles. This indicates to + * this listeners that the object just processed ends the waiting process. + * + * <p/>Errors from the producer are rethrown to the consumer. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations </td> + * <tr><td> Accept generic objects as events for processing via {@link #process}. <td> + * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td> + * <tr><td> Block until {@link #process} determines that waiting is no longer required <td> + * <tr><td> Propagate the most recent exception to the consumer.<td> + * </table> + * + * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry + * when this happens. At the very least, restore the interrupted status flag. + * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to + * check that SynchronousQueue has a non-blocking put method available. + */ +public abstract class BlockingWaiter<T> +{ + /** This flag is used to indicate that the blocked for method has been received. */ + private volatile boolean _ready = false; + + /** This flag is used to indicate that the received error has been processed. */ + private volatile boolean _errorAck = false; + + /** Used to protect the shared event and ready flag between the producer and consumer. */ + private final ReentrantLock _lock = new ReentrantLock(); + + /** Used to signal that a method has been received */ + private final Condition _receivedCondition = _lock.newCondition(); + + /** Used to signal that a error has been processed */ + private final Condition _errorConditionAck = _lock.newCondition(); + + /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ + private volatile Exception _error; + + /** Holds the incomming Object. */ + protected Object _doneObject = null; + private AtomicBoolean _waiting = new AtomicBoolean(false); + private boolean _closed = false; + + /** + * Delegates processing of the incomming object to the handler. + * + * @param object The object to process. + * + * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue. + */ + public abstract boolean process(T object); + + /** + * An Object has been received and should be processed to see if our wait condition has been reached. + * + * @param object The object received. + * + * @return <tt>true</tt> if the waiting is complete, <tt>false</tt> if waiting should continue. + */ + public boolean received(T object) + { + + boolean ready = process(object); + + if (ready) + { + // we only update the flag from inside the synchronized block + // so that the blockForFrame method cannot "miss" an update - it + // will only ever read the flag from within the synchronized block + _lock.lock(); + try + { + _doneObject = object; + _ready = ready; + _receivedCondition.signal(); + } + finally + { + _lock.unlock(); + } + } + + return ready; + } + + /** + * Blocks until an object is received that is handled by process, or the specified timeout + * has passed. + * + * Once closed any attempt to wait will throw an exception. + * + * @param timeout The timeout in milliseconds. + * + * @return The object that resolved the blocking. + * + * @throws AMQException + * @throws FailoverException + */ + public Object block(long timeout) throws AMQException, FailoverException + { + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); + + _lock.lock(); + + try + { + if (_closed) + { + throw throwClosedException(); + } + + if (_error == null) + { + _waiting.set(true); + + while (!_ready) + { + try + { + if (timeout == -1) + { + _receivedCondition.await(); + } + else + { + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); + _ready = true; + } + } + } + catch (InterruptedException e) + { + System.err.println(e.getMessage()); + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess + // if (!_ready && timeout != -1) + // { + // _error = new AMQException("Server did not respond timely"); + // _ready = true; + // } + } + } + } + + if (_error != null) + { + if (_error instanceof AMQException) + { + throw (AMQException) _error; + } + else if (_error instanceof FailoverException) + { + // This should ensure that FailoverException is not wrapped and can be caught. + throw (FailoverException) _error; // needed to expose FailoverException. + } + else + { + throw new AMQException("Woken up due to " + _error.getClass(), _error); + } + } + + } + finally + { + _waiting.set(false); + + //Release Error handling thread + if (_error != null) + { + _errorAck = true; + _errorConditionAck.signal(); + + _error = null; + } + _lock.unlock(); + } + + return _doneObject; + } + + /** + * This is a callback, called when an error has occured that should interupt any waiter. + * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads. + * + * Once closed any notification of an exception will be ignored. + * + * @param e The exception being propogated. + */ + public void error(Exception e) + { + // set the error so that the thread that is blocking (against blockForFrame()) + // can pick up the exception and rethrow to the caller + + _lock.lock(); + + if (_closed) + { + return; + } + + if (_error == null) + { + _error = e; + } + else + { + System.err.println("WARNING: new error arrived while old one not yet processed"); + } + + try + { + if (_waiting.get()) + { + + _ready = true; + _receivedCondition.signal(); + + while (!_errorAck) + { + try + { + _errorConditionAck.await(); + } + catch (InterruptedException e1) + { + System.err.println(e.getMessage()); + } + } + _errorAck = false; + } + } + finally + { + _lock.unlock(); + } + } + + /** + * Close this Waiter so that no more errors are processed. + * This is a preventative method to ensure that a second error thread does not get stuck in the error method after + * the await has returned. This has not happend but in practise but if two errors occur on the Connection at + * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a + * waiter. + * + * Once closed any attempt to wait will throw an exception. + * Any notification of an exception will be ignored. + */ + public void close() + { + _lock.lock(); + try + { + //if we have already closed then our job is done. + if (_closed) + { + return; + } + + //Close Waiter so no more exceptions are processed + _closed = true; + + //Wake up any await() threads + + //If we are waiting then use the error() to wake them up. + if (_waiting.get()) + { + error(throwClosedException()); + } + //If they are not waiting then there is nothing to do. + + // Wake up any error handling threads + + if (!_errorAck) + { + _errorAck = true; + _errorConditionAck.signal(); + + _error = null; + } + } + finally + { + _lock.unlock(); + } + } + + /** + * Helper method to generate the a closed Exception. + * + * todo: This should be converted to something more friendly. + * + * @return AMQException to throw to waiters when the Waiter is closed. + */ + private AMQException throwClosedException() + { + return new AMQException(null, "Waiter was closed.", null); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java index b6776a1a44..66f220643c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -27,12 +27,10 @@ import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQNoRouteException; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +46,10 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe return _handler; } - public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId) + public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId) throws AMQException { _logger.debug("ChannelClose method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); AMQShortString reason = method.getReplyText(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 27adc4dd77..6f4c26945c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -33,11 +33,12 @@ public class ConnectionURLTest extends TestCase public void testFailoverURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); + assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE)); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); assertTrue(connectionurl.getVirtualHost().equals("/test")); @@ -276,7 +277,7 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportMultiOptionURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'"; + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'"; ConnectionURL connectionurl = new AMQConnectionURL(url); @@ -493,8 +494,38 @@ public class ConnectionURLTest extends TestCase } } + public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getFailoverMethod() == null); + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + assertTrue(connectionurl.getBrokerCount() == 1); + + BrokerDetails service = connectionurl.getBrokerDetails(0); + + assertTrue(service.getTransport().equals("tcp")); + + + assertTrue(service.getHost().equals("localhost")); + assertTrue(service.getPort() == 5672); + assertEquals("jim",service.getProperty("foo")); + assertEquals("bob",service.getProperty("bar")); + assertEquals("jimmy",service.getProperty("fred")); + + assertTrue(connectionurl.getOption("routingkey").equals("jim")); + assertTrue(connectionurl.getOption("timeout").equals("200")); + assertTrue(connectionurl.getOption("immediatedelivery").equals("true")); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(ConnectionURLTest.class); } } + |
