diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2007-11-26 01:41:31 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2007-11-26 01:41:31 +0000 |
| commit | a1391b5724829e4268faf4de60625b2d05e86288 (patch) | |
| tree | d124245b02b8ead3be3e37cef3fbac684c606e4e /java | |
| parent | 66f97f32c78e0cf5914a441ae8277ee3aa659ce9 (diff) | |
| download | qpid-python-a1391b5724829e4268faf4de60625b2d05e86288.tar.gz | |
QPID-567 : Add mutliversion support to Qpid/Java, fixed client support when server returns Protocol header.
Added QueueUnbind
Added ability to select protocol version in ConnectionURL or with -Dorg.apache.qpid.amqp_version
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598105 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
15 files changed, 349 insertions, 91 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 5edffc19ed..1b5bf4f343 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -34,6 +34,7 @@ import javax.management.openmbean.TabularDataSupport; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -150,7 +151,7 @@ public class DestNameExchange extends AbstractExchange if (!_index.remove(routingKey, queue)) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey + ". No queue was registered with that _routing key"); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index b55dbcc792..3da152c528 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -277,7 +278,7 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = _routingKey2queues.get(routingKey); if (queues == null) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey + ". No queue was registered with that _routing key"); } @@ -285,7 +286,7 @@ public class DestWildExchange extends AbstractExchange boolean removedQ = queues.remove(queue); if (!removedQ) { - throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index aa13f1d8ee..82d57770df 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -146,7 +147,7 @@ public class FanoutExchange extends AbstractExchange if (!_queues.remove(queue))
{
- throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
+ throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 3544e9d1f8..31ae799877 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.AMQTypedValue; @@ -200,7 +201,11 @@ public class HeadersExchange extends AbstractExchange public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName()); - _bindings.remove(new Registration(new HeadersBinding(args), queue)); + if(!_bindings.remove(new Registration(new HeadersBinding(args), queue))) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + + " with headers args " + args); + } } public void route(AMQMessage payload) throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java new file mode 100644 index 0000000000..b056fa6797 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java @@ -0,0 +1,110 @@ +package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+ private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+ public static QueueUnbindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueUnbindHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
+ {
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId);
+ }
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+ }
+
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+ }
+ else
+ {
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+ }
+
+
+ try
+ {
+ queue.unBind(routingKey, body.getArguments(), exch);
+ }
+ catch (AMQInvalidRoutingKeyException rke)
+ {
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
+ }
+ catch (AMQException e)
+ {
+ if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
+ }
+ throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+ }
+
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java index 97646a015f..8b1dca77ba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java @@ -36,6 +36,9 @@ public class ServerMethodDispatcherImpl_0_9 private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
BasicRecoverSyncMethodHandler.getInstance();
+ private static final QueueUnbindHandler _queueUnbindHandler =
+ QueueUnbindHandler.getInstance();
+
public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
{
@@ -155,6 +158,7 @@ public class ServerMethodDispatcherImpl_0_9 public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
{
- return false;
+ _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+ return true;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 85a5fbf996..38325a1e41 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -32,13 +32,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicQosBody; -import org.apache.qpid.framing.BasicQosOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.TxSelectBody; -import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Connection; @@ -161,6 +155,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); private static final long DEFAULT_TIMEOUT = 1000 * 30; + private ProtocolVersion _protocolVersion; /** * @param broker brokerdetails @@ -253,6 +248,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _clientName = connectionURL.getClientName(); _username = connectionURL.getUsername(); _password = connectionURL.getPassword(); + + _protocolVersion = connectionURL.getProtocolVersion(); + setVirtualHost(connectionURL.getVirtualHost()); if (connectionURL.getDefaultQueueExchangeName() != null) @@ -393,16 +391,24 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { + final Set<AMQState> openOrClosedStates = + EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); try { TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up - _protocolHandler.attainState(AMQState.CONNECTION_OPEN); - _failoverPolicy.attainedConnection(); - // Again this should be changed to a suitable notify - _connected = true; + //_protocolHandler.attainState(AMQState.CONNECTION_OPEN); + AMQState state = _protocolHandler.attainState(openOrClosedStates); + if(state == AMQState.CONNECTION_OPEN) + { + + _failoverPolicy.attainedConnection(); + + // Again this should be changed to a suitable notify + _connected = true; + } } catch (AMQException e) { @@ -1285,4 +1291,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.get(channelId); } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public void setProtocolVersion(ProtocolVersion protocolVersion) + { + _protocolVersion = protocolVersion; + _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion); + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java index 24f5ead2d0..64dbabf222 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.url.URLHelper; @@ -52,6 +53,7 @@ public class AMQConnectionURL implements ConnectionURL private AMQShortString _defaultTopicExchangeName; private AMQShortString _temporaryTopicExchangeName; private AMQShortString _temporaryQueueExchangeName; + private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion(); public AMQConnectionURL(String fullURL) throws URLSyntaxException { @@ -255,6 +257,15 @@ public class AMQConnectionURL implements ConnectionURL { _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE)); } + if(_options.containsKey(OPTIONS_PROTOCOL_VERSION)) + { + ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION)); + if(pv != null) + { + _protocolVersion = pv; + } + } + } public String getURL() @@ -377,6 +388,11 @@ public class AMQConnectionURL implements ConnectionURL return _temporaryTopicExchangeName; } + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + public String toString() { StringBuffer sb = new StringBuffer(); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 56efec4fa2..5ee3fa5407 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -52,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; @@ -387,94 +388,109 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void messageReceived(IoSession session, Object message) throws Exception { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && ((msgNumber % 1000) == 0)) + if(message instanceof AMQFrame) { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; - AMQFrame frame = (AMQFrame) message; + if (debug && ((msgNumber % 1000) == 0)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } - final AMQBody bodyFrame = frame.getBodyFrame(); + AMQFrame frame = (AMQFrame) message; - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + final AMQBody bodyFrame = frame.getBodyFrame(); - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } + switch (bodyFrame.getFrameType()) + { + case AMQMethodBody.TYPE: - final AMQMethodEvent<AMQMethodBody> evt = - new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } - try - { + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) + try { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } } - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners); + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) + catch (AMQException e) { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) + getStateManager().error(e); + if (!_frameListeners.isEmpty()) { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } } + + exceptionCaught(session, e); } - exceptionCaught(session, e); - } + break; - break; + case ContentHeaderBody.TYPE: - case ContentHeaderBody.TYPE: + _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); + break; - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; + case ContentBody.TYPE: - case ContentBody.TYPE: + _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); + break; - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; + case HeartbeatBody.TYPE: - case HeartbeatBody.TYPE: + if (debug) + { + _logger.debug("Received heartbeat"); + } - if (debug) - { - _logger.debug("Received heartbeat"); - } + break; - break; + default: - default: + } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + } + else if (message instanceof ProtocolInitiation) + { + // We get here if the server sends a response to our initial protocol header + // suggesting an alternate ProtocolVersion; the server will then close the + // connection. + ProtocolInitiation protocolInit = (ProtocolInitiation) message; + ProtocolVersion pv = protocolInit.checkVersion(); + getConnection().setProtocolVersion(pv); + + // get round a bug in old versions of qpid whereby the connection is not closed + _stateManager.changeState(AMQState.CONNECTION_CLOSED); } - - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } private static int _messagesOut; @@ -515,6 +531,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter getStateManager().attainState(s); } + public AMQState attainState(Set<AMQState> states) throws AMQException + { + return getStateManager().attainState(states); + } + + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 18c1e85eaa..b48adbdb08 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -121,7 +121,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = stateManager; _stateManager.setProtocolSession(this); - _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); + _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), stateManager); _connection = connection; @@ -133,7 +133,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java index 4996f59345..d32d10542f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java @@ -24,8 +24,22 @@ package org.apache.qpid.client.state; * States used in the AMQ protocol. Used by the finite state machine to determine * valid responses. */ -public class AMQState +public enum AMQState { + + CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"), + + CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"), + + CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"), + + CONNECTION_OPEN(4, "CONNECTION_OPEN"), + + CONNECTION_CLOSING(5, "CONNECTION_CLOSING"), + + CONNECTION_CLOSED(6, "CONNECTION_CLOSED"); + + private final int _id; private final String _name; @@ -41,16 +55,6 @@ public class AMQState return "AMQState: id = " + _id + " name: " + _name; } - public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED"); - - public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED"); - - public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED"); - public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN"); - public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING"); - - public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED"); - } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index a9473df08c..b6baefe1b0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; /** @@ -165,4 +166,41 @@ public class AMQStateManager implements AMQMethodListener { return getProtocolSession().getMethodRegistry(); } + + public AMQState attainState(Set<AMQState> stateSet) throws AMQException + { + synchronized (_stateLock) + { + final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME; + long waitTime = MAXIMUM_STATE_WAIT_TIME; + + while (!stateSet.contains(_currentState) && (waitTime > 0)) + { + try + { + _stateLock.wait(MAXIMUM_STATE_WAIT_TIME); + } + catch (InterruptedException e) + { + _logger.warn("Thread interrupted"); + } + + if (!stateSet.contains(_currentState)) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + + if (!stateSet.contains(_currentState)) + { + _logger.warn("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + stateSet); + throw new AMQException("State not achieved within permitted time. Current state " + _currentState + + ", desired state: " + stateSet); + } + return _currentState; + } + + + } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 2d91e290c4..098256c75f 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -21,6 +21,7 @@ package org.apache.qpid.jms; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ProtocolVersion; import java.util.List; @@ -41,6 +42,7 @@ public interface ConnectionURL public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange"; public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange"; public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange"; + public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion"; String getURL(); @@ -83,4 +85,6 @@ public interface ConnectionURL AMQShortString getTemporaryQueueExchangeName(); AMQShortString getTemporaryTopicExchangeName(); + + ProtocolVersion getProtocolVersion(); } diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 02ae3cb089..ff0bc798da 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -56,6 +56,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder /** Flag to indicate whether this decoder needs to handle protocol initiation. */ private boolean _expectProtocolInitiation; + private boolean firstDecode = true; /** * Creates a new AMQP decoder. @@ -81,14 +82,24 @@ public class AMQDecoder extends CumulativeProtocolDecoder */ protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - if (_expectProtocolInitiation) + + boolean decoded; + if (_expectProtocolInitiation + || (firstDecode + && (in.remaining() > 0) + && (in.get(in.position()) == (byte)'A'))) { - return doDecodePI(session, in, out); + decoded = doDecodePI(session, in, out); } else { - return doDecodeDataBlock(session, in, out); + decoded = doDecodeDataBlock(session, in, out); + } + if(firstDecode && decoded) + { + firstDecode = false; } + return decoded; } /** diff --git a/java/common/templates/model/ProtocolVersionListClass.vm b/java/common/templates/model/ProtocolVersionListClass.vm index d56d14e1ed..9ac6adfdf5 100644 --- a/java/common/templates/model/ProtocolVersionListClass.vm +++ b/java/common/templates/model/ProtocolVersionListClass.vm @@ -33,6 +33,8 @@ package org.apache.qpid.framing; import java.util.SortedSet; import java.util.Collections; import java.util.TreeSet; +import java.util.Map; +import java.util.HashMap; public class ProtocolVersion implements Comparable @@ -124,6 +126,9 @@ public class ProtocolVersion implements Comparable } private static final SortedSet<ProtocolVersion> _supportedVersions; + private static final Map<String, ProtocolVersion> _nameToVersionMap = + new HashMap<String, ProtocolVersion>(); + private static final ProtocolVersion _defaultVersion; #foreach( $version in $model.getVersionSet() ) @@ -138,8 +143,17 @@ public class ProtocolVersion implements Comparable #foreach( $version in $model.getVersionSet() ) #set( $versionId = "v$version.getMajor()_$version.getMinor()" ) versions.add($versionId); + _nameToVersionMap.put("${version.getMajor()}-${version.getMinor()}", $versionId); #end _supportedVersions = Collections.unmodifiableSortedSet(versions); + + + ProtocolVersion systemDefinedVersion = + _nameToVersionMap.get(System.getProperty("org.apache.qpid.amqp_version")); + + _defaultVersion = (systemDefinedVersion == null) + ? getLatestSupportedVersion() + : systemDefinedVersion; } @@ -149,7 +163,16 @@ public class ProtocolVersion implements Comparable } + + public static ProtocolVersion parse(String name) + { + return _nameToVersionMap.get(name); + } + public static ProtocolVersion defaultProtocolVersion() + { + return _defaultVersion; + } } |
