From 3fb9be28593263e12623ce09084a230b59b81f4f Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 22 Mar 2007 13:14:42 +0000 Subject: made a copy git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/java.multi_version@521253 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 58 +- .../java/org/apache/qpid/client/AMQSession.java | 359 ++++-------- .../apache/qpid/client/BasicMessageConsumer.java | 33 +- .../apache/qpid/client/BasicMessageProducer.java | 159 ++---- .../qpid/client/failover/FailoverHandler.java | 6 +- .../client/handler/BasicCancelOkMethodHandler.java | 54 -- .../client/handler/BasicDeliverMethodHandler.java | 49 -- .../client/handler/BasicReturnMethodHandler.java | 50 -- .../client/handler/ChannelCloseMethodHandler.java | 99 ---- .../handler/ChannelCloseOkMethodHandler.java | 47 -- .../client/handler/ChannelFlowOkMethodHandler.java | 50 -- .../handler/ConnectionCloseMethodHandler.java | 100 ---- .../handler/ConnectionOpenOkMethodHandler.java | 48 -- .../handler/ConnectionRedirectMethodHandler.java | 70 --- .../handler/ConnectionSecureMethodHandler.java | 73 --- .../handler/ConnectionStartMethodHandler.java | 261 --------- .../handler/ConnectionTuneMethodHandler.java | 97 ---- .../handler/ExchangeBoundOkMethodHandler.java | 56 -- .../client/handler/QueueDeleteOkMethodHandler.java | 55 -- .../amqp_0_9/ChannelCloseMethodHandler.java | 24 + .../amqp_0_9/ConnectionCloseMethodHandler.java | 29 + .../amqp_0_9/ConnectionSecureMethodHandler.java | 28 + .../amqp_0_9/ConnectionTuneMethodHandler.java | 41 ++ .../amqp_8_0/BasicCancelOkMethodHandler.java | 56 ++ .../amqp_8_0/BasicDeliverMethodHandler.java | 50 ++ .../handler/amqp_8_0/BasicReturnMethodHandler.java | 51 ++ .../amqp_8_0/ChannelCloseMethodHandler.java | 106 ++++ .../amqp_8_0/ChannelCloseOkMethodHandler.java | 46 ++ .../amqp_8_0/ChannelFlowOkMethodHandler.java | 49 ++ .../amqp_8_0/ConnectionCloseMethodHandler.java | 112 ++++ .../amqp_8_0/ConnectionOpenOkMethodHandler.java | 47 ++ .../amqp_8_0/ConnectionRedirectMethodHandler.java | 71 +++ .../amqp_8_0/ConnectionSecureMethodHandler.java | 75 +++ .../amqp_8_0/ConnectionStartMethodHandler.java | 243 ++++++++ .../amqp_8_0/ConnectionTuneMethodHandler.java | 99 ++++ .../amqp_8_0/ExchangeBoundOkMethodHandler.java | 55 ++ .../amqp_8_0/QueueDeleteOkMethodHandler.java | 54 ++ .../qpid/client/message/AbstractJMSMessage.java | 12 +- .../apache/qpid/client/message/JMSTextMessage.java | 6 +- .../client/message/MessageFactoryRegistry.java | 4 +- .../qpid/client/protocol/AMQProtocolHandler.java | 636 +-------------------- .../client/protocol/AMQProtocolHandlerImpl.java | 537 +++++++++++++++++ .../qpid/client/protocol/AMQProtocolSession.java | 48 +- .../protocol/BlockingMethodFrameListener.java | 7 +- .../client/protocol/ProtocolOutputHandler.java | 58 ++ .../protocol/ProtocolOutputHandlerFactory.java | 50 ++ .../amqp_8_0/ProtocolOutputHandler_8_0.java | 278 +++++++++ .../apache/qpid/client/state/AMQStateManager.java | 44 +- .../client/state/StateAwareMethodListener.java | 4 +- .../listener/SpecificMethodFrameListener.java | 5 +- .../client/transport/ITransportConnection.java | 4 +- .../transport/SocketTransportConnection.java | 4 +- .../transport/VmPipeTransportConnection.java | 4 +- .../org/apache/qpid/codec/BasicDeliverTest.java | 14 +- .../org/apache/qpid/framing/FieldTableTest.java | 4 +- .../ChannelCloseMethodHandlerNoCloseOk.java | 7 +- .../unit/client/channelclose/ChannelCloseTest.java | 39 +- .../client/channelclose/NoCloseOKStateManager.java | 20 +- .../client/protocol/AMQProtocolSessionTest.java | 4 +- 59 files changed, 2487 insertions(+), 2262 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java delete mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java create mode 100644 java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 413524b6d8..1b4ae02399 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -25,7 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQUnresolvedAddressException; import org.apache.qpid.client.failover.FailoverSupport; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; +import org.apache.qpid.client.protocol.ProtocolOutputHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.exchange.ExchangeDefaults; @@ -36,6 +37,8 @@ import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.framing.AMQMethodFactory; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; import org.apache.qpid.jms.Connection; @@ -92,7 +95,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate * handler. */ - private AMQProtocolHandler _protocolHandler; + private AMQProtocolHandlerImpl _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap @@ -273,7 +276,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _failoverPolicy = new FailoverPolicy(connectionURL); - _protocolHandler = new AMQProtocolHandler(this); + _protocolHandler = new AMQProtocolHandlerImpl(this); // We are not currently connected _connected = false; @@ -550,26 +553,15 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { + // define this here, should be poassed in + final int prefetchSize = 0; - // TODO: Be aware of possible changes to parameter order as versions change. + AMQMethodFactory methodFactory = getAMQMethodFactory(); - _protocolHandler.syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); - - //todo send low water mark when protocol allows. - //todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite( - BasicQosBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - false, // global - prefetchHigh, // prefetchCount - 0), // prefetchSize - BasicQosOkBody.class); + ChannelOpenBody openBody = methodFactory.createChannelOpen(); + sendCommandReceiveResponse(channelId, openBody); + AMQMethodBody qosBody = methodFactory.createMessageQos(prefetchHigh, prefetchSize); + sendCommandReceiveResponse(channelId, qosBody); if (transacted) { @@ -578,14 +570,21 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.debug("Issuing TxSelect for " + channelId); } - // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), - TxSelectOkBody.class); + TxSelectBody txSelectBody = methodFactory.createTxSelect(); + sendCommandReceiveResponse(channelId, txSelectBody); } } + private AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(channelId, command); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getProtocolOutputHandler().getAMQMethodFactory(); + } + private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException { try @@ -934,7 +933,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _virtualHost; } - public AMQProtocolHandler getProtocolHandler() + public AMQProtocolHandlerImpl getProtocolHandler() { return _protocolHandler; } @@ -1218,4 +1217,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _taskPool.execute(task); } + + public ProtocolOutputHandler getProtocolOutputHandler() + { + return _protocolHandler.getOutputHandler(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 89f596e541..41101ff374 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -59,6 +59,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.AMQInvalidRoutingKeyException; import org.apache.qpid.AMQInvalidArgumentException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; @@ -68,41 +69,12 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.protocol.BlockingMethodFrameListener; +import org.apache.qpid.client.protocol.ProtocolOutputHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.AccessRequestBody; -import org.apache.qpid.framing.AccessRequestOkBody; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxCommitOkBody; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxRollbackOkBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -197,6 +169,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _suspended; private final Object _suspensionLock = new Object(); + private static final AMQShortString CHANNEL_CLOSE_REPLY_TEXT = new AMQShortString("JMS client closing channel"); /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -271,11 +244,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (message.getDeliverBody() != null) { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag()); if (consumer == null) { - _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); + _logger.warn("Received a message from queue " + message.getDeliverBody().getConsumerTag() + " without a handler - ignoring..."); _logger.warn("Consumers that exist: " + _consumers); _logger.warn("Session hashcode: " + System.identityHashCode(this)); } @@ -513,14 +486,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi i.next().acknowledgeLastDelivered(); } - // Commits outstanding messages sent and outstanding acknowledgements. - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQProtocolHandler handler = getProtocolHandler(); + TxCommitBody commitBody = getAMQMethodFactory().createTxCommit(); + sendCommandReceiveResponse(commitBody); + + - handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion()), - TxCommitOkBody.class); } catch (AMQException e) { @@ -549,8 +519,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi suspendChannel(true); } - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class); + TxRollbackBody rollbackBody = getAMQMethodFactory().createTxRollback(); + sendCommandReceiveResponse(rollbackBody); if (_dispatcher != null) { @@ -590,15 +560,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { getProtocolHandler().closeSession(this); - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ChannelCloseBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client closing channel")); // replyText - - getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); + ChannelCloseBody closeBody = getAMQMethodFactory().createChannelClose(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode + CHANNEL_CLOSE_REPLY_TEXT); + sendCommandReceiveResponse(closeBody, timeout); + + + // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully @@ -617,21 +584,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private AMQProtocolHandler getProtocolHandler() + private AMQProtocolHandlerImpl getProtocolHandler() { return _connection.getProtocolHandler(); } - - private byte getProtocolMinorVersion() + public ProtocolOutputHandler getProtocolOutputHandler() { - return getProtocolHandler().getProtocolMinorVersion(); + return _connection.getProtocolOutputHandler(); } - private byte getProtocolMajorVersion() - { - return getProtocolHandler().getProtocolMajorVersion(); - } + /** @@ -835,14 +798,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.clearUnackedMessages(); } - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - false) // requeue - , BasicRecoverOkBody.class); + final boolean requeue = false; + AMQMethodBody recoverBody = getAMQMethodFactory().createRecover(requeue); + sendCommandReceiveResponse(recoverBody); + + + if (_dispatcher != null) { @@ -1226,136 +1187,60 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException { - declareExchange(name, type, getProtocolHandler()); + ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket()); + sendCommandReceiveResponse(exchangeDeclare); } public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); - } + ExchangeDeclareBody exchangeDeclare = getAMQMethodFactory().createExchangeDeclare(name,type,getTicket()); + sendCommandReceiveResponse(exchangeDeclare); - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException - { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - false, // autoDelete - false, // durable - name, // exchange - false, // internal - false, // nowait - false, // passive - getTicket(), // ticket - type); // type - - protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException { - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - autoDelete, // autoDelete - durable, // durable - exclusive, // exclusive - false, // nowait - false, // passive - name, // queue - getTicket()); // ticket - - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); - + QueueDeclareBody queueDeclare = getAMQMethodFactory().createQueueDeclare(name, null, autoDelete, durable, exclusive, false ,getTicket()); + sendCommandReceiveResponse(queueDeclare); } public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - exchangeName, // exchange - false, // nowait - queueName, // queue - routingKey, // routingKey - getTicket()); // ticket - - - getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class); + QueueBindBody queueBind = getAMQMethodFactory().createQueueBind(queueName,exchangeName,routingKey,arguments,getTicket()); + sendCommandReceiveResponse(queueBind); } /** * Declare the queue. * * @param amqd - * @param protocolHandler * * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. * * @throws AMQException */ - private AMQShortString declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException + private AMQShortString declareQueue(AMQDestination amqd) throws AMQException { // For queues (but not topics) we generate the name in the client rather than the // server. This allows the name to be reused on failover if required. In general, // the destination indicates whether it wants a name generated or not. if (amqd.isNameRequired()) { - amqd.setQueueName(protocolHandler.generateQueueName()); + amqd.setQueueName(getProtocolHandler().generateQueueName()); } //TODO verify the destiation is valid. else throw - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - null, // arguments - amqd.isAutoDelete(), // autoDelete - amqd.isDurable(), // durable - amqd.isExclusive(), // exclusive - false, // nowait - false, // passive - amqd.getAMQQueueName(), // queue - getTicket()); // ticket - - protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); + createQueue(amqd.getAMQQueueName(),amqd.isAutoDelete(),amqd.isDurable(),amqd.isExclusive()); + + return amqd.getAMQQueueName(); } - private void bindQueue(AMQDestination amqd, AMQShortString queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException + private void bindQueue(AMQDestination amqd, AMQShortString queueName, FieldTable ft) throws AMQException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - ft, // arguments - amqd.getExchangeName(), // exchange - false, // nowait - queueName, // queue - amqd.getRoutingKey(), // routingKey - getTicket()); // ticket - - - protocolHandler.syncWrite(queueBind, QueueBindOkBody.class); + bindQueue(queueName,amqd.getRoutingKey(),ft,amqd.getExchangeName()); } /** @@ -1365,8 +1250,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @return the consumer tag generated by the broker */ - private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector) throws AMQException + private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, boolean nowait, String messageSelector) throws AMQException { //fixme prefetch values are not used here. Do we need to have them as parametsrs? //need to generate a consumer tag on the client so we can exploit the nowait flag @@ -1392,25 +1276,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - arguments, // arguments - tag, // consumerTag - consumer.isExclusive(), // exclusive - consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck - consumer.isNoLocal(), // noLocal - nowait, // nowait - queueName, // queue - getTicket()); // ticket - if (nowait) + final boolean noAck = consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE; + + AMQMethodBody consumeBody = getAMQMethodFactory().createConsumer(tag, + queueName, + arguments, + noAck, + consumer.isExclusive(), + consumer.isNoLocal(), + getTicket()); + if(nowait) { - protocolHandler.writeFrame(jmsConsume); + sendCommand(consumeBody); } else { - protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class); + sendCommandReceiveResponse(consumeBody); } + } catch (AMQException e) { @@ -1606,15 +1489,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { try { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - false, // ifEmpty - false, // ifUnused - true, // nowait - queueName, // queue - getTicket()); // ticket - getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); + + QueueDeleteBody deleteBody = getAMQMethodFactory().createQueueDelete(queueName, false, false, getTicket()); + sendCommandReceiveResponse(deleteBody); } catch (AMQException e) { @@ -1697,23 +1574,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException { - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - exchangeName, // exchange - queueName, // queue - routingKey); // routingKey AMQMethodEvent response = null; try { - response = getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + ExchangeBoundBody exchangeBoundBody = + getAMQMethodFactory().createExchangeBound(exchangeName, // exchange + queueName, // queue + routingKey); // routingKey + + + ExchangeBoundOkBody responseBody = sendCommandReceiveResponse(exchangeBoundBody, ExchangeBoundOkBody.class); + return (responseBody.getReplyCode() == 0); } catch (AMQException e) { throw new JMSAMQException(e); } - ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); - return (responseBody.replyCode == 0); //ExchangeBoundHandler.OK); Remove Broker compile dependency + } private void checkTransacted() throws JMSException @@ -1770,13 +1647,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, - message.getBounceBody().exchange, - message.getBounceBody().routingKey, + message.getBounceBody().getExchange(), + message.getBounceBody().getRoutingKey(), message.getContentHeader(), message.getBodies()); - AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().replyCode); - AMQShortString reason = message.getBounceBody().replyText; + AMQConstant errorCode = AMQConstant.getConstant(message.getBounceBody().getReplyCode()); + AMQShortString reason = message.getBounceBody().getReplyText(); _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. @@ -1812,16 +1689,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void acknowledgeMessage(long deliveryTag, boolean multiple) { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - deliveryTag, // deliveryTag - multiple); // multiple + AMQMethodBody ackBody = getAMQMethodFactory().createAcknowledge(deliveryTag, multiple); if (_logger.isDebugEnabled()) { _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); } - getProtocolHandler().writeFrame(ackFrame); + sendCommand(ackBody); } public int getDefaultPrefetch() @@ -1908,17 +1781,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { AMQDestination amqd = consumer.getDestination(); - AMQProtocolHandler protocolHandler = getProtocolHandler(); - - declareExchange(amqd, protocolHandler); + declareExchange(amqd.getExchangeName(), amqd.getExchangeClass()); - AMQShortString queueName = declareQueue(amqd, protocolHandler); + AMQShortString queueName = declareQueue(amqd); - bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); + bindQueue(amqd, queueName, consumer.getRawSelectorFieldTable()); try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); + consumeFromQueue(consumer, queueName, nowait, consumer.getMessageSelector()); } catch (JMSException e) //thrown by getMessageSelector { @@ -2019,14 +1890,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _suspended = suspend; - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - !suspend); // active + AMQMethodBody flowBody = getAMQMethodFactory().createChannelFlow(!suspend); + sendCommandReceiveResponse(flowBody); - _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class); } } @@ -2118,32 +1984,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException { - getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), - getProtocolMajorVersion(), - getProtocolMinorVersion(), - active, - exclusive, - passive, - read, - realm, - write), - new BlockingMethodFrameListener(_channelId) - { - public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException - { - if (frame instanceof AccessRequestOkBody) - { - setTicket(((AccessRequestOkBody) frame).getTicket()); - return true; - } - else - { - return false; - } - } - }); + AccessRequestBody accessRequest = getAMQMethodFactory().createAccessRequest(active,exclusive,passive,read,realm,write); + AccessRequestOkBody okBody = getProtocolOutputHandler().sendCommandReceiveResponse(_channelId,accessRequest, AccessRequestOkBody.class ); + setTicket(okBody.getTicket()); + } + AMQMethodFactory getAMQMethodFactory() + { + return getProtocolOutputHandler().getAMQMethodFactory(); } private class SuspenderRunner implements Runnable @@ -2187,7 +2036,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { UnprocessedMessage message = (UnprocessedMessage) messages.next(); - if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag)) + if (consumerTag == null || message.getDeliverBody().getConsumerTag().equals(consumerTag)) { if (_logger.isTraceEnabled()) { @@ -2196,7 +2045,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messages.remove(); - rejectMessage(message.getDeliverBody().deliveryTag, requeue); + rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue); _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag); } @@ -2209,13 +2058,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(long deliveryTag, boolean requeue) { - AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, - getProtocolMajorVersion(), - getProtocolMinorVersion(), - deliveryTag, - requeue); + AMQMethodBody rejectBody = getAMQMethodFactory().createRejectBody(deliveryTag, requeue); + sendCommand(rejectBody); + + } - _connection.getProtocolHandler().writeFrame(basicRejectBody); + T sendCommandReceiveResponse(AMQMethodBody command, Class responseClass) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, responseClass); + } + AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command); + } + AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command, long timeout) throws AMQException + { + return getProtocolOutputHandler().sendCommandReceiveResponse(_channelId, command, timeout); } + + void sendCommand(AMQMethodBody command) + { + getProtocolOutputHandler().sendCommand(_channelId, command); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e9b914425a..f0bea6cc90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,6 +33,7 @@ import javax.jms.MessageListener; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; @@ -42,6 +43,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQMethodFactory; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; @@ -438,16 +441,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { if (sendClose) { - // TODO: Be aware of possible changes to parameter order as versions change. - final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, - _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), - _consumerTag, // consumerTag - false); // nowait - try { - _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class); + AMQMethodBody cancelBody = getAMQMethodFactory().createConsumerCancel(_consumerTag); + sendCommandReceiveResponse(cancelBody); } catch (AMQException e) { @@ -467,6 +464,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } + private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody cancelBody) throws AMQException + { + return getSession().sendCommandReceiveResponse(cancelBody); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getSession().getAMQMethodFactory(); + } + /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has * vetoed automatic resubscription. The caller must hold the failover mutex. @@ -490,14 +497,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (debug) { - _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); + _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().getDeliveryTag()); } try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, - messageFrame.getDeliverBody().redelivered, - messageFrame.getDeliverBody().exchange, - messageFrame.getDeliverBody().routingKey, + AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(), + messageFrame.getDeliverBody().getRedelivered(), + messageFrame.getDeliverBody().getExchange(), + messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index b01e087ce1..43b9f3569a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -38,17 +38,11 @@ import javax.jms.Topic; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.CompositeAMQDataBlock; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; +import org.apache.qpid.framing.*; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -91,7 +85,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j */ private String _mimeType; - private AMQProtocolHandler _protocolHandler; + private AMQProtocolHandlerImpl _protocolHandler; /** * True if this producer was created from a transacted session @@ -121,7 +115,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + AMQSession session, AMQProtocolHandlerImpl protocolHandler, long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent) { _connection = connection; @@ -152,20 +146,28 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private void declareDestination(AMQDestination destination) { // Declare the exchange - // Note that the durable and internal arguments are ignored since passive is set to false - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame declare = - ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion(), null, // arguments - false, // autoDelete - false, // durable - destination.getExchangeName(), // exchange - false, // internal - true, // nowait - false, // passive - _session.getTicket(), // ticket - destination.getExchangeClass()); // type - _protocolHandler.writeFrame(declare); + + ExchangeDeclareBody exchangeDeclareBody = + getAMQMethodFactory().createExchangeDeclare(destination.getExchangeName(), + destination.getExchangeClass(), + _session.getTicket()); + sendCommand(exchangeDeclareBody); + + } + + private void sendCommand(AMQMethodBody command) + { + getSession().sendCommand(command); + } + + private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException + { + return getSession().sendCommandReceiveResponse(command); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getSession().getAMQMethodFactory(); } public void setDisableMessageID(boolean b) throws JMSException @@ -467,21 +469,9 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame publishFrame = - BasicPublishBody.createAMQFrame( - _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(), - destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - _session.getTicket()); // ticket - message.prepareForSending(); ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); + CommonContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); if (!_disableTimestamps) { @@ -501,37 +491,15 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j contentHeaderProperties.setDeliveryMode((byte) deliveryMode); contentHeaderProperties.setPriority((byte) priority); - final int size = (payload != null) ? payload.limit() : 0; - final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); - final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; + getSession().getProtocolOutputHandler().publishMessage(getSession().getChannelId(), + destination.getExchangeName(), + destination.getRoutingKey(), + immediate, + mandatory, + payload, + contentHeaderProperties, + getSession().getTicket()); - if (payload != null) - { - createContentBodies(payload, frames, 2, _channelId); - } - - if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled()) - { - _logger.debug("Sending content body frames to " + destination); - } - - // weight argument of zero indicates no child content headers, just bodies - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - AMQFrame contentHeaderFrame = - ContentHeaderBody.createAMQFrame(_channelId, - BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(), - _protocolHandler.getProtocolMinorVersion()), 0, - contentHeaderProperties, size); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending content header frame to " + destination); - } - - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - _protocolHandler.writeFrame(compositeFrame, wait); if (message != origMessage) { @@ -544,6 +512,8 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } + + private void checkTemporaryDestination(AMQDestination destination) throws JMSException { if (destination instanceof TemporaryDestination) @@ -564,59 +534,6 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } - /** - * Create content bodies. This will split a large message into numerous bodies depending on the negotiated - * maximum frame size. - * - * @param payload - * @param frames - * @param offset - * @param channelId @return the array of content bodies - */ - private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) - { - - if (frames.length == (offset + 1)) - { - frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); - } - else - { - - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - long remaining = payload.remaining(); - for (int i = offset; i < frames.length; i++) - { - payload.position((int) framePayloadMax * (i - offset)); - int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; - payload.limit(payload.position() + length); - frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); - - remaining -= length; - } - } - - } - - private int calculateContentBodyFrameCount(ByteBuffer payload) - { - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). - int frameCount; - if ((payload == null) || (payload.remaining() == 0)) - { - frameCount = 0; - } - else - { - int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; - int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; - frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - } - - return frameCount; - } public void setMimeType(String mimeType) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 844ecbe743..268456290e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import org.apache.mina.common.IoSession; import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.client.state.AMQStateManager; /** @@ -42,7 +42,7 @@ public class FailoverHandler implements Runnable private static final Logger _logger = Logger.getLogger(FailoverHandler.class); private final IoSession _session; - private AMQProtocolHandler _amqProtocolHandler; + private AMQProtocolHandlerImpl _amqProtocolHandler; /** * Used where forcing the failover host @@ -54,7 +54,7 @@ public class FailoverHandler implements Runnable */ private int _port; - public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session) + public FailoverHandler(AMQProtocolHandlerImpl amqProtocolHandler, IoSession session) { _amqProtocolHandler = amqProtocolHandler; _session = session; diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java deleted file mode 100644 index 9bd0205977..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -/** - * @author Apache Software Foundation - */ -public class BasicCancelOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); - private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); - - public static BasicCancelOkMethodHandler getInstance() - { - return _instance; - } - - private BasicCancelOkMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New BasicCancelOk method received"); - BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); - protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java deleted file mode 100644 index d34d6688c1..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class BasicDeliverMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class); - - private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler(); - - public static BasicDeliverMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod()); - _logger.debug("New JmsDeliver method received"); - protocolSession.unprocessedMessageReceived(msg); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java deleted file mode 100644 index 02573c5d00..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class BasicReturnMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class); - - private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler(); - - public static BasicReturnMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("New JmsBounce method received"); - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod()); - - protocolSession.unprocessedMessageReceived(msg); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java deleted file mode 100644 index e2b101ab79..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ChannelCloseMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class); - - private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler(); - - public static ChannelCloseMethodHandler getInstance() - { - return _handler; - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("ChannelClose method received"); - ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); - - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; - if (_logger.isDebugEnabled()) - { - _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); - } - - // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor()); - protocolSession.writeFrame(frame); - if (errorCode != AMQConstant.REPLY_SUCCESS) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); - } - if (errorCode == AMQConstant.NO_CONSUMERS) - { - throw new AMQNoConsumersException("Error: " + reason, null); - } - else if (errorCode == AMQConstant.NO_ROUTE) - { - throw new AMQNoRouteException("Error: " + reason, null); - } - else if (errorCode == AMQConstant.INVALID_ARGUMENT) - { - _logger.debug("Broker responded with Invalid Argument."); - - throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason)); - } - else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) - { - _logger.debug("Broker responded with Invalid Routing Key."); - - throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); - } - else - { - throw new AMQChannelClosedException(errorCode, "Error: " + reason); - } - - } - protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java deleted file mode 100644 index 794071cc34..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ChannelCloseOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ChannelCloseOkMethodHandler.class); - - private static final ChannelCloseOkMethodHandler _instance = new ChannelCloseOkMethodHandler(); - - public static ChannelCloseOkMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); - - //todo this should do the closure - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java deleted file mode 100644 index 1f003649c0..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ChannelFlowOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ChannelFlowOkMethodHandler.class); - private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler(); - - public static ChannelFlowOkMethodHandler getInstance() - { - return _instance; - } - - private ChannelFlowOkMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod(); - _logger.debug("Received Channel.Flow-Ok message, active = " + method.active); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java deleted file mode 100644 index 9c8e9188ec..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQAuthenticationException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ConnectionCloseMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); - - private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler(); - - public static ConnectionCloseMethodHandler getInstance() - { - return _handler; - } - - private ConnectionCloseMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.info("ConnectionClose frame received"); - ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod(); - - // does it matter - //stateManager.changeState(AMQState.CONNECTION_CLOSING); - - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; - - try - { - // TODO: check whether channel id of zero is appropriate - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor())); - - if (errorCode != AMQConstant.REPLY_SUCCESS) - { - if (errorCode == AMQConstant.NOT_ALLOWED) - { - _logger.info("Authentication Error:" + Thread.currentThread().getName()); - - protocolSession.closeProtocolSession(); - - //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. - stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); - - throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); - } - else - { - _logger.info("Connection close received with error code " + errorCode); - - - throw new AMQConnectionClosedException(errorCode, "Error: " + reason); - } - } - } - finally - { - // this actually closes the connection in the case where it is not an error. - - protocolSession.closeProtocolSession(); - - stateManager.changeState(AMQState.CONNECTION_CLOSED); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java deleted file mode 100644 index 2e0f273c32..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener -{ - private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler(); - - public static ConnectionOpenOkMethodHandler getInstance() - { - return _instance; - } - - private ConnectionOpenOkMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - stateManager.changeState(AMQState.CONNECTION_OPEN); - } - -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java deleted file mode 100644 index 866f65b384..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.ConnectionRedirectBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ConnectionRedirectMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ConnectionRedirectMethodHandler.class); - - private static final int DEFAULT_REDIRECT_PORT = 5672; - - private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler(); - - public static ConnectionRedirectMethodHandler getInstance() - { - return _handler; - } - - private ConnectionRedirectMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.info("ConnectionRedirect frame received"); - ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod(); - - String host = method.host.toString(); - // the host is in the form hostname:port with the port being optional - int portIndex = host.indexOf(':'); - - int port; - if (portIndex == -1) - { - port = DEFAULT_REDIRECT_PORT; - } - else - { - port = Integer.parseInt(host.substring(portIndex + 1)); - host = host.substring(0, portIndex); - - } - protocolSession.failover(host, port); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java deleted file mode 100644 index ab6acffeaf..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ConnectionSecureMethodHandler implements StateAwareMethodListener -{ - private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler(); - - public static ConnectionSecureMethodHandler getInstance() - { - return _instance; - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - SaslClient client = protocolSession.getSaslClient(); - if (client == null) - { - throw new AMQException("No SASL client set up - cannot proceed with authentication"); - } - - ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod(); - - try - { - // Evaluate server challenge - byte[] response = client.evaluateChallenge(body.challenge); - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(), - body.getMajor(), body.getMinor(), - response); // response - protocolSession.writeFrame(responseFrame); - } - catch (SaslException e) - { - throw new AMQException("Error processing SASL challenge: " + e, e); - } - - - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java deleted file mode 100644 index 2aa2c1872b..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import java.io.UnsupportedEncodingException; -import java.util.HashSet; -import java.util.StringTokenizer; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.security.AMQCallbackHandler; -import org.apache.qpid.client.security.CallbackHandlerRegistry; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.common.QpidProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ConnectionStartMethodHandler implements StateAwareMethodListener -{ - private static final Logger _log = Logger.getLogger(ConnectionStartMethodHandler.class); - - private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler(); - - public static ConnectionStartMethodHandler getInstance() - { - return _instance; - } - - private ConnectionStartMethodHandler() - { } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) - throws AMQException - { - _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, " - + "AMQMethodEvent evt): called"); - - ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); - - byte major = (byte) body.versionMajor; - byte minor = (byte) body.versionMinor; - boolean versionOk = false; - - // For the purposes of interop, we can make the client accept the broker's version string. - // If it does, it then internally records the version as being the latest one that it understands. - // It needs to do this since frame lookup is done by version. - if (Boolean.getBoolean("qpid.accept.broker.version")) - { - versionOk = true; - int lastIndex = ProtocolVersionList.pv.length - 1; - major = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MAJOR]; - minor = ProtocolVersionList.pv[lastIndex][ProtocolVersionList.PROTOCOL_MINOR]; - } - else - { - versionOk = checkVersionOK(major, minor); - } - - if (versionOk) - { - protocolSession.setProtocolVersion(major, minor); - - try - { - // Used to hold the SASL mechanism to authenticate with. - String mechanism; - - if (body.mechanisms == null) - { - throw new AMQException("mechanism not specified in ConnectionStart method frame"); - } - else - { - mechanism = chooseMechanism(body.mechanisms); - _log.debug("mechanism = " + mechanism); - } - - if (mechanism == null) - { - throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms)); - } - - byte[] saslResponse; - try - { - SaslClient sc = - Sasl.createSaslClient(new String[] { mechanism }, null, "AMQP", "localhost", null, - createCallbackHandler(mechanism, protocolSession)); - if (sc == null) - { - throw new AMQException( - "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism - + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " - + " details of how to register non-standard SASL client providers."); - } - - protocolSession.setSaslClient(sc); - saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null); - } - catch (SaslException e) - { - protocolSession.setSaslClient(null); - throw new AMQException("Unable to create SASL client: " + e, e); - } - - if (body.locales == null) - { - throw new AMQException("Locales is not defined in Connection Start method"); - } - - final String locales = new String(body.locales, "utf8"); - final StringTokenizer tokenizer = new StringTokenizer(locales, " "); - String selectedLocale = null; - if (tokenizer.hasMoreTokens()) - { - selectedLocale = tokenizer.nextToken(); - } - else - { - throw new AMQException("No locales sent from server, passed: " + locales); - } - - stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - FieldTable clientProperties = FieldTableFactory.newFieldTable(); - - clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), - protocolSession.getClientID()); - clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), - QpidProperties.getProductName()); - clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), - QpidProperties.getReleaseVersion()); - clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); - - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), - protocolSession.getProtocolMajorVersion(), - protocolSession.getProtocolMinorVersion(), - clientProperties, // clientProperties - new AMQShortString(selectedLocale), // locale - new AMQShortString(mechanism), // mechanism - saslResponse)); // response - - } - catch (UnsupportedEncodingException e) - { - throw new AMQException(_log, "Unable to decode data: " + e, e); - } - } - else - { - _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor - + "] which is not supported by this version of the client library"); - - protocolSession.closeProtocolSession(); - } - } - - private boolean checkVersionOK(byte versionMajor, byte versionMinor) - { - byte[][] supportedVersions = ProtocolVersionList.pv; - boolean supported = false; - int i = supportedVersions.length; - while ((i-- != 0) && !supported) - { - supported = (supportedVersions[i][ProtocolVersionList.PROTOCOL_MAJOR] == versionMajor) - && (supportedVersions[i][ProtocolVersionList.PROTOCOL_MINOR] == versionMinor); - } - - return supported; - } - - private String getFullSystemInfo() - { - StringBuffer fullSystemInfo = new StringBuffer(); - fullSystemInfo.append(System.getProperty("java.runtime.name")); - fullSystemInfo.append(", " + System.getProperty("java.runtime.version")); - fullSystemInfo.append(", " + System.getProperty("java.vendor")); - fullSystemInfo.append(", " + System.getProperty("os.arch")); - fullSystemInfo.append(", " + System.getProperty("os.name")); - fullSystemInfo.append(", " + System.getProperty("os.version")); - fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level")); - - return fullSystemInfo.toString(); - } - - private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException - { - final String mechanisms = new String(availableMechanisms, "utf8"); - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); - HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) - { - mechanismSet.add(tokenizer.nextToken()); - } - - String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); - StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); - while (prefTokenizer.hasMoreTokens()) - { - String mech = prefTokenizer.nextToken(); - if (mechanismSet.contains(mech)) - { - return mech; - } - } - - return null; - } - - private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession) - throws AMQException - { - Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); - try - { - Object instance = mechanismClass.newInstance(); - AMQCallbackHandler cbh = (AMQCallbackHandler) instance; - cbh.initialise(protocolSession); - - return cbh; - } - catch (Exception e) - { - throw new AMQException("Unable to create callback handler: " + e, e); - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java deleted file mode 100644 index 67f1a6519f..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.ConnectionTuneParameters; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -public class ConnectionTuneMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class); - - private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler(); - - public static ConnectionTuneMethodHandler getInstance() - { - return _instance; - } - - protected ConnectionTuneMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - _logger.debug("ConnectionTune frame received"); - ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod(); - - ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters(); - if (params == null) - { - params = new ConnectionTuneParameters(); - } - - params.setFrameMax(frame.frameMax); - params.setChannelMax(frame.channelMax); - params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat)); - protocolSession.setConnectionTuneParameters(params); - - stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); - protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor())); - - String host = protocolSession.getAMQConnection().getVirtualHost(); - AMQShortString virtualHost = new AMQShortString("/" + host); - - - protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor())); - } - - protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor) - { - // Be aware of possible changes to parameter order as versions change. - return ConnectionOpenBody.createAMQFrame(channel, - major, minor, // AMQP version (major, minor) - capabilities, // capabilities - insist, // insist - path); // virtualHost - } - - protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor) - { - // Be aware of possible changes to parameter order as versions change. - return ConnectionTuneOkBody.createAMQFrame(channel, - major, minor, - params.getChannelMax(), // channelMax - params.getFrameMax(), // frameMax - params.getHeartbeat()); // heartbeat - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java deleted file mode 100644 index 146c705c00..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -/** - * @author Apache Software Foundation - */ -public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class); - private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler(); - - public static ExchangeBoundOkMethodHandler getInstance() - { - return _instance; - } - - private ExchangeBoundOkMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - if (_logger.isDebugEnabled()) - { - ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); - _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " + - body.replyText); - } - } -} - - diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java deleted file mode 100644 index eaf4721445..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.protocol.AMQMethodEvent; - -/** - * @author Apache Software Foundation - */ -public class QueueDeleteOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class); - private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler(); - - public static QueueDeleteOkMethodHandler getInstance() - { - return _instance; - } - - private QueueDeleteOkMethodHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException - { - if (_logger.isDebugEnabled()) - { - QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); - _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount); - } - } -} - - diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java new file mode 100644 index 0000000000..994e58ed03 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java @@ -0,0 +1,24 @@ +package org.apache.qpid.client.handler.amqp_0_9; + +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.amqp_0_9.ChannelCloseOkBodyImpl; + +import org.apache.log4j.Logger; + +public class ChannelCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ChannelCloseMethodHandler +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class); + + private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler(); + + public static ChannelCloseMethodHandler getInstance() + { + return _handler; + } + + + protected ChannelCloseOkBody createChannelCloseOkBody() + { + return new ChannelCloseOkBodyImpl(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java new file mode 100644 index 0000000000..b9b7074fd2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java @@ -0,0 +1,29 @@ +package org.apache.qpid.client.handler.amqp_0_9; + +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.amqp_0_9.ConnectionCloseOkBodyImpl; + +import org.apache.log4j.Logger; + +public class ConnectionCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler +{ + private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); + + private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler(); + + public static ConnectionCloseMethodHandler getInstance() + { + return _handler; + } + + protected ConnectionCloseMethodHandler() + { + } + + + protected ConnectionCloseOkBody createConnectionCloseOkBody() + { + return new ConnectionCloseOkBodyImpl(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java new file mode 100644 index 0000000000..859fd2b63b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java @@ -0,0 +1,28 @@ +package org.apache.qpid.client.handler.amqp_0_9; + +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +public class ConnectionSecureMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler +{ + private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler(); + + public static ConnectionSecureMethodHandler getInstance() + { + return _instance; + } + + protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response) + { + return new ConnectionSecureOkBodyImpl(response); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java new file mode 100644 index 0000000000..e28619b378 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java @@ -0,0 +1,41 @@ +package org.apache.qpid.client.handler.amqp_0_9; + +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.amqp_0_9.ConnectionOpenBodyImpl; +import org.apache.qpid.framing.amqp_0_9.ConnectionTuneOkBodyImpl; + +import org.apache.log4j.Logger; + +public class ConnectionTuneMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler +{ + private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class); + + private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler(); + + public static ConnectionTuneMethodHandler getInstance() + { + return _instance; + } + + + protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist) + { + + return new ConnectionOpenBodyImpl(path,// virtualHost + capabilities, // capabilities + insist); // insist + + } + + protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params) + { + // Be aware of possible changes to parameter order as versions change. + return new ConnectionTuneOkBodyImpl( + params.getChannelMax(), // channelMax + params.getFrameMax(), // frameMax + params.getHeartbeat()); // heartbeat + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java new file mode 100644 index 0000000000..2acf3005f1 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +/** + * @author Apache Software Foundation + */ +public class BasicCancelOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class); + private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler(); + + public static BasicCancelOkMethodHandler getInstance() + { + return _instance; + } + + private BasicCancelOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + + _logger.debug("New BasicCancelOk method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod(); + protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.getConsumerTag()); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java new file mode 100644 index 0000000000..47586db2f2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.BasicDeliverBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class BasicDeliverMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class); + + private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler(); + + public static BasicDeliverMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod()); + _logger.debug("New JmsDeliver method received"); + protocolSession.unprocessedMessageReceived(msg); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java new file mode 100644 index 0000000000..ad5a1331b0 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.BasicReturnBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class BasicReturnMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class); + + private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler(); + + public static BasicReturnMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("New JmsBounce method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod()); + + protocolSession.unprocessedMessageReceived(msg); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java new file mode 100644 index 0000000000..edfd8e5110 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ChannelCloseMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class); + + private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler(); + + public static ChannelCloseMethodHandler getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ChannelClose method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); + + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), createChannelCloseOkBody()); + + + + if (errorCode != AMQConstant.REPLY_SUCCESS) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Channel close received with errorCode " + errorCode + ", and reason " + reason); + } + if (errorCode == AMQConstant.NO_CONSUMERS) + { + throw new AMQNoConsumersException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.NO_ROUTE) + { + throw new AMQNoRouteException("Error: " + reason, null); + } + else if (errorCode == AMQConstant.INVALID_ARGUMENT) + { + _logger.debug("Broker responded with Invalid Argument."); + + throw new org.apache.qpid.AMQInvalidArgumentException(String.valueOf(reason)); + } + else if (errorCode == AMQConstant.INVALID_ROUTING_KEY) + { + _logger.debug("Broker responded with Invalid Routing Key."); + + throw new AMQInvalidRoutingKeyException(String.valueOf(reason)); + } + else + { + throw new AMQChannelClosedException(errorCode, "Error: " + reason); + } + + } + protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason)); + } + + protected ChannelCloseOkBody createChannelCloseOkBody() + { + return new ChannelCloseOkBodyImpl(); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java new file mode 100644 index 0000000000..d51f1d9c41 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ChannelCloseOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelCloseOkMethodHandler.class); + + private static final ChannelCloseOkMethodHandler _instance = new ChannelCloseOkMethodHandler(); + + public static ChannelCloseOkMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId()); + + //todo this should do the closure + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java new file mode 100644 index 0000000000..baa9cb4319 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ChannelFlowOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ChannelFlowOkMethodHandler.class); + private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler(); + + public static ChannelFlowOkMethodHandler getInstance() + { + return _instance; + } + + private ChannelFlowOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod(); + _logger.debug("Received Channel.Flow-Ok message, active = " + method.getActive()); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java new file mode 100644 index 0000000000..57eaaff531 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java @@ -0,0 +1,112 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionCloseOkBodyImpl; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ConnectionCloseMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class); + + private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler(); + + public static ConnectionCloseMethodHandler getInstance() + { + return _handler; + } + + protected ConnectionCloseMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.info("ConnectionClose frame received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod(); + + // does it matter + //stateManager.changeState(AMQState.CONNECTION_CLOSING); + + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); + + try + { + + + + ConnectionCloseOkBody closeOkBody = createConnectionCloseOkBody(); + protocolSession.getOutputHandler().sendCommand(0, closeOkBody); + + + + if (errorCode != AMQConstant.REPLY_SUCCESS) + { + if (errorCode == AMQConstant.NOT_ALLOWED) + { + _logger.info("Authentication Error:" + Thread.currentThread().getName()); + + protocolSession.closeProtocolSession(); + + //todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. + stateManager.changeState(AMQState.CONNECTION_NOT_STARTED); + + throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString()); + } + else + { + _logger.info("Connection close received with error code " + errorCode); + + + throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + } + } + } + finally + { + // this actually closes the connection in the case where it is not an error. + + protocolSession.closeProtocolSession(); + + stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + } + + protected ConnectionCloseOkBody createConnectionCloseOkBody() + { + return new ConnectionCloseOkBodyImpl(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java new file mode 100644 index 0000000000..f4327ac748 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener +{ + private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler(); + + public static ConnectionOpenOkMethodHandler getInstance() + { + return _instance; + } + + private ConnectionOpenOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + stateManager.changeState(AMQState.CONNECTION_OPEN); + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java new file mode 100644 index 0000000000..8291b62596 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ConnectionRedirectBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ConnectionRedirectMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ConnectionRedirectMethodHandler.class); + + private static final int DEFAULT_REDIRECT_PORT = 5672; + + private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler(); + + public static ConnectionRedirectMethodHandler getInstance() + { + return _handler; + } + + private ConnectionRedirectMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.info("ConnectionRedirect frame received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod(); + + String host = method.getHost().toString(); + // the host is in the form hostname:port with the port being optional + int portIndex = host.indexOf(':'); + + int port; + if (portIndex == -1) + { + port = DEFAULT_REDIRECT_PORT; + } + else + { + port = Integer.parseInt(host.substring(portIndex + 1)); + host = host.substring(0, portIndex); + + } + protocolSession.failover(host, port); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java new file mode 100644 index 0000000000..f5d1840f45 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ConnectionSecureMethodHandler implements StateAwareMethodListener +{ + private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler(); + + public static ConnectionSecureMethodHandler getInstance() + { + return _instance; + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + SaslClient client = protocolSession.getSaslClient(); + if (client == null) + { + throw new AMQException("No SASL client set up - cannot proceed with authentication"); + } + + ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod(); + + try + { + // Evaluate server challenge + byte[] response = client.evaluateChallenge(body.getChallenge()); + + ConnectionSecureOkBody secureOkBody = createConnectionSecureOkBody(response); + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),secureOkBody); + } + catch (SaslException e) + { + throw new AMQException("Error processing SASL challenge: " + e, e); + } + + + } + + protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response) + { + return new ConnectionSecureOkBodyImpl(response); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java new file mode 100644 index 0000000000..10473b9751 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java @@ -0,0 +1,243 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.security.AMQCallbackHandler; +import org.apache.qpid.client.security.CallbackHandlerRegistry; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.amqp_8_0.ConnectionStartOkBodyImpl; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ConnectionStartMethodHandler implements StateAwareMethodListener +{ + private static final Logger _log = Logger.getLogger(ConnectionStartMethodHandler.class); + + private static final ConnectionStartMethodHandler _instance = new ConnectionStartMethodHandler(); + + public static ConnectionStartMethodHandler getInstance() + { + return _instance; + } + + private ConnectionStartMethodHandler() + { } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + throws AMQException + { + _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, " + + "AMQMethodEvent evt): called"); + + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + ConnectionStartBody body = (ConnectionStartBody) evt.getMethod(); + + ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(),(byte) body.getVersionMinor()); + + + // For the purposes of interop, we can make the client accept the broker's version string. + // If it does, it then internally records the version as being the latest one that it understands. + // It needs to do this since frame lookup is done by version. + if (Boolean.getBoolean("qpid.accept.broker.version") && !pv.isSupported()) + { + + pv = ProtocolVersion.getLatestSupportedVersion(); + } + + if (pv.isSupported()) + { + protocolSession.setProtocolVersion(pv); + + try + { + // Used to hold the SASL mechanism to authenticate with. + String mechanism; + + if (body.getMechanisms() == null) + { + throw new AMQException("mechanism not specified in ConnectionStart method frame"); + } + else + { + mechanism = chooseMechanism(body.getMechanisms()); + _log.debug("mechanism = " + mechanism); + } + + if (mechanism == null) + { + throw new AMQException("No supported security mechanism found, passed: " + new String(body.getMechanisms())); + } + + byte[] saslResponse; + try + { + SaslClient sc = + Sasl.createSaslClient(new String[] { mechanism }, null, "AMQP", "localhost", null, + createCallbackHandler(mechanism, protocolSession)); + if (sc == null) + { + throw new AMQException( + "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism + + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " + + " details of how to register non-standard SASL client providers."); + } + + protocolSession.setSaslClient(sc); + saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null); + } + catch (SaslException e) + { + protocolSession.setSaslClient(null); + throw new AMQException("Unable to create SASL client: " + e, e); + } + + if (body.getLocales() == null) + { + throw new AMQException("Locales is not defined in Connection Start method"); + } + + final String locales = new String(body.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + String selectedLocale = null; + if (tokenizer.hasMoreTokens()) + { + selectedLocale = tokenizer.nextToken(); + } + else + { + throw new AMQException("No locales sent from server, passed: " + locales); + } + + stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + + clientProperties.setString(ClientProperties.instance.getName(), + protocolSession.getClientID()); + clientProperties.setString(ClientProperties.product.getName(), + QpidProperties.getProductName()); + clientProperties.setString(ClientProperties.version.getName(), + QpidProperties.getReleaseVersion()); + clientProperties.setString(ClientProperties.platform.getName(), getFullSystemInfo()); + + ConnectionStartOkBody startOkBody = createConnectionStartOkBody(clientProperties, // clientProperties + new AMQShortString(selectedLocale), // locale + new AMQShortString(mechanism), // mechanism + saslResponse); // response + protocolSession.getOutputHandler().sendCommand(0, startOkBody); + + } + catch (UnsupportedEncodingException e) + { + throw new AMQException(_log, "Unable to decode data: " + e, e); + } + } + else + { + _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor() + + "] which is not supported by this version of the client library"); + + protocolSession.closeProtocolSession(); + } + } + + private ConnectionStartOkBody createConnectionStartOkBody(FieldTable clientProperties, AMQShortString locale, AMQShortString mechanism, byte[] saslResponse) + { + return new ConnectionStartOkBodyImpl(clientProperties,mechanism,saslResponse,locale); + } + + + private String getFullSystemInfo() + { + StringBuffer fullSystemInfo = new StringBuffer(); + fullSystemInfo.append(System.getProperty("java.runtime.name")); + fullSystemInfo.append(", " + System.getProperty("java.runtime.version")); + fullSystemInfo.append(", " + System.getProperty("java.vendor")); + fullSystemInfo.append(", " + System.getProperty("os.arch")); + fullSystemInfo.append(", " + System.getProperty("os.name")); + fullSystemInfo.append(", " + System.getProperty("os.version")); + fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level")); + + return fullSystemInfo.toString(); + } + + private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException + { + final String mechanisms = new String(availableMechanisms, "utf8"); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + + return null; + } + + private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession) + throws AMQException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQCallbackHandler cbh = (AMQCallbackHandler) instance; + cbh.initialise(protocolSession); + + return cbh; + } + catch (Exception e) + { + throw new AMQException("Unable to create callback handler: " + e, e); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java new file mode 100644 index 0000000000..3eb3401e37 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.amqp_8_0.ConnectionOpenBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ConnectionTuneOkBodyImpl; +import org.apache.qpid.protocol.AMQMethodEvent; + +public class ConnectionTuneMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class); + + private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler(); + + public static ConnectionTuneMethodHandler getInstance() + { + return _instance; + } + + protected ConnectionTuneMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + _logger.debug("ConnectionTune frame received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); + ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod(); + + ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters(); + if (params == null) + { + params = new ConnectionTuneParameters(); + } + + params.setFrameMax(frame.getFrameMax()); + params.setChannelMax(frame.getChannelMax()); + params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat())); + protocolSession.setConnectionTuneParameters(params); + + stateManager.changeState(AMQState.CONNECTION_NOT_OPENED); + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), + createTuneOkBody(params)); + + String host = protocolSession.getAMQConnection().getVirtualHost(); + AMQShortString virtualHost = new AMQShortString("/" + host); + + + protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), + createConnectionOpenBody( virtualHost, null, true)); + } + + protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist) + { + + return new ConnectionOpenBodyImpl(path,// virtualHost + capabilities, // capabilities + insist); // insist + + } + + protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params) + { + // Be aware of possible changes to parameter order as versions change. + return new ConnectionTuneOkBodyImpl( + params.getChannelMax(), // channelMax + params.getFrameMax(), // frameMax + params.getHeartbeat()); // heartbeat + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java new file mode 100644 index 0000000000..0752b38aaf --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(ExchangeBoundOkMethodHandler.class); + private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler(); + + public static ExchangeBoundOkMethodHandler getInstance() + { + return _instance; + } + + private ExchangeBoundOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod(); + _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: " + + body.getReplyText()); + } + } +} + + diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java new file mode 100644 index 0000000000..50bf03fe76 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.client.handler.amqp_8_0; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; + +/** + * @author Apache Software Foundation + */ +public class QueueDeleteOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class); + private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler(); + + public static QueueDeleteOkMethodHandler getInstance() + { + return _instance; + } + + private QueueDeleteOkMethodHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException + { + if (_logger.isDebugEnabled()) + { + QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod(); + _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount()); + } + } +} + + diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 36dd4d400c..66524edce3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -121,12 +121,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSMessageID() throws JMSException { - if (getContentHeaderProperties().getMessageId() == null) + if (getContentHeaderProperties().getMessageIdAsString() == null) { getContentHeaderProperties().setMessageId("ID:" + _deliveryTag); } - return getContentHeaderProperties().getMessageId(); + return getContentHeaderProperties().getMessageIdAsString(); } public void setJMSMessageID(String messageId) throws JMSException @@ -146,7 +146,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return getContentHeaderProperties().getCorrelationId().getBytes(); + return getContentHeaderProperties().getCorrelationIdAsString().getBytes(); } public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException @@ -161,12 +161,12 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSCorrelationID() throws JMSException { - return getContentHeaderProperties().getCorrelationId(); + return getContentHeaderProperties().getCorrelationIdAsString(); } public Destination getJMSReplyTo() throws JMSException { - String replyToEncoding = getContentHeaderProperties().getReplyTo(); + String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); if (replyToEncoding == null) { return null; @@ -250,7 +250,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getJMSType() throws JMSException { - return getContentHeaderProperties().getType(); + return getContentHeaderProperties().getTypeAsString(); } public void setJMSType(String string) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java index c05667902f..763af312f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java @@ -116,7 +116,7 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text _data.limit(text.length()) ; //_data.sweep(); _data.setAutoExpand(true); - final String encoding = getContentHeaderProperties().getEncoding(); + final String encoding = getContentHeaderProperties().getEncodingAsString(); if (encoding == null) { _data.put(text.getBytes()); @@ -155,11 +155,11 @@ public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.Text { return null; } - if (getContentHeaderProperties().getEncoding() != null) + if (getContentHeaderProperties().getEncodingAsString() != null) { try { - _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncoding()).newDecoder()); + _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder()); } catch (CharacterCodingException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index e02771d8f5..c2015f9e7c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -92,14 +92,14 @@ public class MessageFactoryRegistry // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. - AMQShortString contentTypeShortString = properties.getContentTypeShortString(); + AMQShortString contentTypeShortString = properties.getContentType(); contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) : contentTypeShortString; MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString); if (mf == null) { - throw new AMQException("Unsupport MIME type of " + properties.getContentType()); + throw new AMQException("Unsupport MIME type of " + properties.getContentTypeAsString()); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d0cc52271a..93c0cb5c12 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -1,630 +1,36 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ package org.apache.qpid.client.protocol; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; - -import org.apache.log4j.Logger; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.qpid.AMQConnectionClosedException; -import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.failover.FailoverHandler; -import org.apache.qpid.client.failover.FailoverState; -import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.ssl.SSLContextFactory; - +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.state.AMQStateManager; -public class AMQProtocolHandler extends IoHandlerAdapter +/** + * Created by IntelliJ IDEA. + * User: U146758 + * Date: 07-Mar-2007 + * Time: 19:40:08 + * To change this template use File | Settings | File Templates. + */ +public interface AMQProtocolHandler { - private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class); - - /** - * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances - * and protocol handler instances. - */ - private AMQConnection _connection; - - /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ - private volatile AMQProtocolSession _protocolSession; - - private AMQStateManager _stateManager = new AMQStateManager(); - - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); - - /** - * We create the failover handler when the session is created since it needs a reference to the IoSession in order - * to be able to send errors during failover back to the client application. The session won't be available in the - * case where we failing over due to a Connection.Redirect message from the broker. - */ - private FailoverHandler _failoverHandler; - - /** - * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly - * attempting failover where it is failing. - */ - private FailoverState _failoverState = FailoverState.NOT_STARTED; - - private CountDownLatch _failoverLatch; - - private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; - - public AMQProtocolHandler(AMQConnection con) - { - _connection = con; - } - - public void sessionCreated(IoSession session) throws Exception - { - _logger.debug("Protocol session created for session " + System.identityHashCode(session)); - _failoverHandler = new FailoverHandler(this, session); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); - - if (Boolean.getBoolean("amqj.shared_read_write_pool")) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_connection.getSSLConfiguration() != null) - { - SSLConfiguration sslConfig = _connection.getSSLConfiguration(); - SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); - } - - - try - { - - ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - e.printStackTrace(); - } - - _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); - _protocolSession.init(); - } - - public void sessionOpened(IoSession session) throws Exception - { - //System.setProperty("foo", "bar"); - } - - /** - * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by - * sessionClosed() depending on whether we were trying to send data at the time of failure. - * - * @param session - * - * @throws Exception - */ - public void sessionClosed(IoSession session) throws Exception - { - if (_connection.isClosed()) - { - _logger.info("Session closed called by client"); - } - else - { - _logger.info("Session closed called with failover state currently " + _failoverState); - - //reconnetablility was introduced here so as not to disturb the client as they have made their intentions - // known through the policy settings. - - if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) - { - _logger.info("FAILOVER STARTING"); - if (_failoverState == FailoverState.NOT_STARTED) - { - _failoverState = FailoverState.IN_PROGRESS; - startFailoverThread(); - } - else - { - _logger.info("Not starting failover as state currently " + _failoverState); - } - } - else - { - _logger.info("Failover not allowed by policy."); - - if (_logger.isDebugEnabled()) - { - _logger.debug(_connection.getFailoverPolicy().toString()); - } - - if (_failoverState != FailoverState.IN_PROGRESS) - { - _logger.info("sessionClose() not allowed to failover"); - _connection.exceptionReceived( - new AMQDisconnectedException("Server closed connection and reconnection " + - "not permitted.")); - } - else - { - _logger.info("sessionClose() failover in progress"); - } - } - } - - _logger.info("Protocol Session [" + this + "] closed"); - } - - /** See {@link FailoverHandler} to see rationale for separate thread. */ - private void startFailoverThread() - { - Thread failoverThread = new Thread(_failoverHandler); - failoverThread.setName("Failover"); - // Do not inherit daemon-ness from current thread as this can be a daemon - // thread such as a AnonymousIoService thread. - failoverThread.setDaemon(false); - failoverThread.start(); - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - //write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - HeartbeatDiagnostics.sent(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - //failover: - HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - if (_failoverState == FailoverState.NOT_STARTED) - { - //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) - if (cause instanceof AMQConnectionClosedException) - { - _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); - // this will attemp failover - - sessionClosed(session); - } - } - // we reach this point if failover was attempted and failed therefore we need to let the calling app - // know since we cannot recover the situation - else if (_failoverState == FailoverState.FAILED) - { - _logger.error("Exception caught by protocol handler: " + cause, cause); - // we notify the state manager of the error in case we have any clients waiting on a state - // change. Those "waiters" will be interrupted and can handle the exception - AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); - propagateExceptionToWaiters(amqe); - _connection.exceptionReceived(cause); - } - } - - /** - * There are two cases where we have other threads potentially blocking for events to be handled by this class. - * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type - * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. - * - * @param e the exception to propagate - */ - public void propagateExceptionToWaiters(Exception e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - final Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener ml = (AMQMethodListener) it.next(); - ml.error(e); - } - } - } - - private static int _messageReceivedCount; - - public void messageReceived(IoSession session, Object message) throws Exception - { - final boolean debug = _logger.isDebugEnabled(); - final long msgNumber = ++_messageReceivedCount; - - if (debug && (msgNumber % 1000 == 0)) - { - _logger.debug("Received " + _messageReceivedCount + " protocol messages"); - } - - AMQFrame frame = (AMQFrame) message; - - final AMQBody bodyFrame = frame.getBodyFrame(); - - HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: - - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } - - final AMQMethodEvent evt = new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); - - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); - } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } - exceptionCaught(session, e); - } - break; - - case ContentHeaderBody.TYPE: - - _protocolSession.messageContentHeaderReceived(frame.getChannel(), - (ContentHeaderBody) bodyFrame); - break; - - case ContentBody.TYPE: - - _protocolSession.messageContentBodyReceived(frame.getChannel(), - (ContentBody) bodyFrame); - break; - - case HeartbeatBody.TYPE: - - if (debug) - { - _logger.debug("Received heartbeat"); - } - break; - - default: - - } - _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); - } - - private static int _messagesOut; - - public void messageSent(IoSession session, Object message) throws Exception - { - final long sentMessages = _messagesOut++; - - final boolean debug = _logger.isDebugEnabled(); - - if (debug && (sentMessages % 1000 == 0)) - { - _logger.debug("Sent " + _messagesOut + " protocol messages"); - } - _connection.bytesSent(session.getWrittenBytes()); - if (debug) - { - _logger.debug("Sent frame " + message); - } - } - - /* - public void addFrameListener(AMQMethodListener listener) - { - _frameListeners.add(listener); - } - - public void removeFrameListener(AMQMethodListener listener) - { - _frameListeners.remove(listener); - } - */ - public void attainState(AMQState s) throws AMQException - { - getStateManager().attainState(s); - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - _protocolSession.writeFrame(frame); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - _protocolSession.writeFrame(frame, wait); - } - - /** - * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to - * calling getProtocolSession().write() then waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener) - throws AMQException - { - return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT); - } - - /** - * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to - * calling getProtocolSession().write() then waiting for the response. - * - * @param frame - * @param listener the blocking listener. Note the calling thread will block. - */ - public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, - BlockingMethodFrameListener listener, long timeout) - throws AMQException - { - try - { - _frameListeners.add(listener); - _protocolSession.writeFrame(frame); - - AMQMethodEvent e = listener.blockForFrame(timeout); - return e; - // When control resumes before this line, a reply will have been received - // that matches the criteria defined in the blocking listener - } - catch (AMQException e) - { - throw e; - } - finally - { - // If we don't removeKey the listener then no-one will - _frameListeners.remove(listener); - } - - } - - /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException - { - return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT); - } - - /** More convenient method to write a frame and wait for it's response. */ - public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException - { - return writeCommandFrameAndWaitForReply(frame, - new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); - } - - /** - * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol - * handler will ensure that messages are delivered to the consumer(s) on that session. - * - * @param channelId the channel id of the session - * @param session the session instance. - */ - public void addSessionByChannel(int channelId, AMQSession session) - { - _protocolSession.addSessionByChannel(channelId, session); - } - - /** - * Convenience method to deregister an AMQSession with the protocol handler. - * - * @param channelId then channel id of the session - */ - public void removeSessionByChannel(int channelId) - { - _protocolSession.removeSessionByChannel(channelId); - } - - public void closeSession(AMQSession session) throws AMQException - { - _protocolSession.closeSession(session); - } - - public void closeConnection() throws AMQException - { - closeConnection(-1); - } - - public void closeConnection(long timeout) throws AMQException - { - getStateManager().changeState(AMQState.CONNECTION_CLOSING); - - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0, - _protocolSession.getProtocolMajorVersion(), - _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor) - 0, // classId - 0, // methodId - AMQConstant.REPLY_SUCCESS.getCode(), // replyCode - new AMQShortString("JMS client is closing the connection.")); // replyText - - try - { - syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _protocolSession.closeProtocolSession(); - } - catch (AMQTimeoutException e) - { - _protocolSession.closeProtocolSession(false); - } - - - } - - /** @return the number of bytes read from this protocol session */ - public long getReadBytes() - { - return _protocolSession.getIoSession().getReadBytes(); - } - - /** @return the number of bytes written to this protocol session */ - public long getWrittenBytes() - { - return _protocolSession.getIoSession().getWrittenBytes(); - } - - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - - public void blockUntilNotFailingOver() throws InterruptedException - { - if (_failoverLatch != null) - { - _failoverLatch.await(); - } - } - - public AMQShortString generateQueueName() - { - return _protocolSession.generateQueueName(); - } - - public CountDownLatch getFailoverLatch() - { - return _failoverLatch; - } - - public void setFailoverLatch(CountDownLatch failoverLatch) - { - _failoverLatch = failoverLatch; - } - - public AMQConnection getConnection() - { - return _connection; - } - - public AMQStateManager getStateManager() - { - return _stateManager; - } + void writeFrame(AMQDataBlock frame); - public void setStateManager(AMQStateManager stateManager) - { - _stateManager = stateManager; - _protocolSession.setStateManager(stateManager); - } + void closeSession(AMQSession session) throws AMQException; - public AMQProtocolSession getProtocolSession() - { - return _protocolSession; - } + void closeConnection() throws AMQException; - FailoverState getFailoverState() - { - return _failoverState; - } + AMQConnection getConnection(); - public void setFailoverState(FailoverState failoverState) - { - _failoverState = failoverState; - } + AMQStateManager getStateManager(); - public byte getProtocolMajorVersion() - { - return _protocolSession.getProtocolMajorVersion(); - } + AMQProtocolSession getProtocolSession(); + ProtocolOutputHandler getOutputHandler(); - public byte getProtocolMinorVersion() - { - return _protocolSession.getProtocolMinorVersion(); - } + ProtocolVersion getProtocolVersion(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java new file mode 100644 index 0000000000..738531d5a5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java @@ -0,0 +1,537 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol; + +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.qpid.AMQConnectionClosedException; +import org.apache.qpid.AMQDisconnectedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.SSLConfiguration; +import org.apache.qpid.client.failover.FailoverHandler; +import org.apache.qpid.client.failover.FailoverState; +import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.*; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.ssl.SSLContextFactory; + + +public class AMQProtocolHandlerImpl extends IoHandlerAdapter implements AMQProtocolHandler +{ + private static final Logger _logger = Logger.getLogger(AMQProtocolHandlerImpl.class); + + /** + * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances + * and protocol handler instances. + */ + private AMQConnection _connection; + + /** Our wrapper for a protocol session that provides access to session values in a typesafe manner. */ + private volatile AMQProtocolSession _protocolSession; + + private AMQStateManager _stateManager = new AMQStateManager(); + + + + /** + * We create the failover handler when the session is created since it needs a reference to the IoSession in order + * to be able to send errors during failover back to the client application. The session won't be available in the + * case where we failing over due to a Connection.Redirect message from the broker. + */ + private FailoverHandler _failoverHandler; + + /** + * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly + * attempting failover where it is failing. + */ + private FailoverState _failoverState = FailoverState.NOT_STARTED; + + private CountDownLatch _failoverLatch; + + private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + private static final int CONTROL_CHANNEL = 0; + + public AMQProtocolHandlerImpl(AMQConnection con) + { + _connection = con; + } + + public void sessionCreated(IoSession session) throws Exception + { + _logger.debug("Protocol session created for session " + System.identityHashCode(session)); + _failoverHandler = new FailoverHandler(this, session); + + final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false)); + + if (Boolean.getBoolean("amqj.shared_read_write_pool")) + { + session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); + } + else + { + session.getFilterChain().addLast("protocolFilter", pcf); + } + // we only add the SSL filter where we have an SSL connection + if (_connection.getSSLConfiguration() != null) + { + SSLConfiguration sslConfig = _connection.getSSLConfiguration(); + SSLContextFactory sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext()); + sslFilter.setUseClientMode(true); + session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); + } + + + + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession(session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); + + + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); + + // This starts the AMQP initiation by sending the AMQP Header + _protocolSession.init(); + } + + public void sessionOpened(IoSession session) throws Exception + { + + } + + /** + * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by + * sessionClosed() depending on whether we were trying to send data at the time of failure. + * + * @param session + * + * @throws Exception + */ + public void sessionClosed(IoSession session) throws Exception + { + if (_connection.isClosed()) + { + _logger.info("Session closed called by client"); + } + else + { + _logger.info("Session closed called with failover state currently " + _failoverState); + + //reconnetablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + + if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed()) + { + _logger.info("FAILOVER STARTING"); + if (_failoverState == FailoverState.NOT_STARTED) + { + _failoverState = FailoverState.IN_PROGRESS; + startFailoverThread(); + } + else + { + _logger.info("Not starting failover as state currently " + _failoverState); + } + } + else + { + _logger.info("Failover not allowed by policy."); + + if (_logger.isDebugEnabled()) + { + _logger.debug(_connection.getFailoverPolicy().toString()); + } + + if (_failoverState != FailoverState.IN_PROGRESS) + { + _logger.info("sessionClose() not allowed to failover"); + _connection.exceptionReceived( + new AMQDisconnectedException("Server closed connection and reconnection " + + "not permitted.")); + } + else + { + _logger.info("sessionClose() failover in progress"); + } + } + } + + _logger.info("Protocol Session [" + this + "] closed"); + } + + /** See {@link FailoverHandler} to see rationale for separate thread. */ + private void startFailoverThread() + { + Thread failoverThread = new Thread(_failoverHandler); + failoverThread.setName("Failover"); + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.setDaemon(false); + failoverThread.start(); + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status); + if (IdleStatus.WRITER_IDLE.equals(status)) + { + //write heartbeat frame: + _logger.debug("Sent heartbeat"); + session.write(HeartbeatBody.FRAME); + HeartbeatDiagnostics.sent(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + //failover: + HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + session.close(); + } + } + + public void exceptionCaught(IoSession session, Throwable cause) throws Exception + { + if (_failoverState == FailoverState.NOT_STARTED) + { + //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) + if (cause instanceof AMQConnectionClosedException) + { + _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); + // this will attemp failover + + sessionClosed(session); + } + } + // we reach this point if failover was attempted and failed therefore we need to let the calling app + // know since we cannot recover the situation + else if (_failoverState == FailoverState.FAILED) + { + _logger.error("Exception caught by protocol handler: " + cause, cause); + // we notify the state manager of the error in case we have any clients waiting on a state + // change. Those "waiters" will be interrupted and can handle the exception + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + propagateExceptionToWaiters(amqe); + _connection.exceptionReceived(cause); + } + } + + /** + * There are two cases where we have other threads potentially blocking for events to be handled by this class. + * These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type + * of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately. + * + * @param e the exception to propagate + */ + public void propagateExceptionToWaiters(Exception e) + { + getStateManager().error(e); + getProtocolSession().getOutputHandler().error(e); + + } + + private static int _messageReceivedCount; + + public void messageReceived(IoSession session, Object message) throws Exception + { + final boolean debug = _logger.isDebugEnabled(); + final long msgNumber = ++_messageReceivedCount; + + if (debug && (msgNumber % 1000 == CONTROL_CHANNEL)) + { + _logger.debug("Received " + _messageReceivedCount + " protocol messages"); + } + + AMQFrame frame = (AMQFrame) message; + + final AMQBody bodyFrame = frame.getBodyFrame(); + + HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); + + switch (bodyFrame.getFrameType()) + { + case AMQMethodBodyImpl.TYPE: + + if (debug) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); + } + + final AMQMethodEvent evt = new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) bodyFrame); + + try + { + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + wasAnyoneInterested = getProtocolSession().getOutputHandler().methodReceived(evt) || wasAnyoneInterested; + + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (AMQException e) + { + getStateManager().error(e); + getProtocolSession().getOutputHandler().error(e); + exceptionCaught(session, e); + } + break; + + case ContentHeaderBody.TYPE: + + _protocolSession.messageContentHeaderReceived(frame.getChannel(), + (ContentHeaderBody) bodyFrame); + break; + + case ContentBody.TYPE: + + _protocolSession.messageContentBodyReceived(frame.getChannel(), + (ContentBody) bodyFrame); + break; + + case HeartbeatBody.TYPE: + + if (debug) + { + _logger.debug("Received heartbeat"); + } + break; + + default: + + } + _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + } + + private static int _messagesOut; + + public void messageSent(IoSession session, Object message) throws Exception + { + final long sentMessages = _messagesOut++; + + final boolean debug = _logger.isDebugEnabled(); + + if (debug && (sentMessages % 1000 == CONTROL_CHANNEL)) + { + _logger.debug("Sent " + _messagesOut + " protocol messages"); + } + _connection.bytesSent(session.getWrittenBytes()); + if (debug) + { + _logger.debug("Sent frame " + message); + } + } + + /* + public void addFrameListener(AMQMethodListener listener) + { + _frameListeners.add(listener); + } + + public void removeFrameListener(AMQMethodListener listener) + { + _frameListeners.remove(listener); + } + */ + public void attainState(AMQState s) throws AMQException + { + getStateManager().attainState(s); + } + + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) + { + _protocolSession.writeFrame(frame); + } + + + + /** + * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol + * handler will ensure that messages are delivered to the consumer(s) on that session. + * + * @param channelId the channel id of the session + * @param session the session instance. + */ + public void addSessionByChannel(int channelId, AMQSession session) + { + _protocolSession.addSessionByChannel(channelId, session); + } + + /** + * Convenience method to deregister an AMQSession with the protocol handler. + * + * @param channelId then channel id of the session + */ + public void removeSessionByChannel(int channelId) + { + _protocolSession.removeSessionByChannel(channelId); + } + + public void closeSession(AMQSession session) throws AMQException + { + _protocolSession.closeSession(session); + } + + public void closeConnection() throws AMQException + { + closeConnection(-1); + } + + public void closeConnection(long timeout) throws AMQException + { + getStateManager().changeState(AMQState.CONNECTION_CLOSING); + + + try + { + ConnectionCloseBody closeBody = getAMQMethodFactory().createConnectionClose(); + sendCommandReceiveResponse(CONTROL_CHANNEL,closeBody); + + + _protocolSession.closeProtocolSession(); + } + catch (AMQTimeoutException e) + { + _protocolSession.closeProtocolSession(false); + } + + + } + + private void sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException + { + getOutputHandler().sendCommandReceiveResponse(channelId, command); + } + + private AMQMethodFactory getAMQMethodFactory() + { + return getOutputHandler().getAMQMethodFactory(); + } + + /** @return the number of bytes read from this protocol session */ + public long getReadBytes() + { + return _protocolSession.getIoSession().getReadBytes(); + } + + /** @return the number of bytes written to this protocol session */ + public long getWrittenBytes() + { + return _protocolSession.getIoSession().getWrittenBytes(); + } + + public void failover(String host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + startFailoverThread(); + } + + public void blockUntilNotFailingOver() throws InterruptedException + { + if (_failoverLatch != null) + { + _failoverLatch.await(); + } + } + + public AMQShortString generateQueueName() + { + return _protocolSession.generateQueueName(); + } + + public CountDownLatch getFailoverLatch() + { + return _failoverLatch; + } + + public void setFailoverLatch(CountDownLatch failoverLatch) + { + _failoverLatch = failoverLatch; + } + + public AMQConnection getConnection() + { + return _connection; + } + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public void setStateManager(AMQStateManager stateManager) + { + _stateManager = stateManager; + _protocolSession.setStateManager(stateManager); + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public ProtocolOutputHandler getOutputHandler() + { + return getProtocolSession().getOutputHandler(); + } + + FailoverState getFailoverState() + { + return _failoverState; + } + + public void setFailoverState(FailoverState failoverState) + { + _failoverState = failoverState; + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolSession.getProtocolVersion(); + } + + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 055109d3be..3c158415e0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -43,10 +43,11 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.MainRegistry; import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.MainRegistry; +import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.protocol.AMQConstant; @@ -56,7 +57,7 @@ import org.apache.qpid.protocol.AMQConstant; * The underlying protocol session is still available but clients should not * use it to obtain session attributes. */ -public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession +public class AMQProtocolSession implements AMQVersionAwareProtocolSession { protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2; @@ -81,7 +82,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP * The handler from which this session was created and which is used to handle protocol events. * We send failover events to the handler. */ - protected final AMQProtocolHandler _protocolHandler; + protected final AMQProtocolHandlerImpl _protocolHandler; /** * Maps from the channel id to the AMQSession that it represents. @@ -102,9 +103,10 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP protected int _queueId = 1; protected final Object _queueIdLock = new Object(); - private byte _protocolMinorVersion; - private byte _protocolMajorVersion; - private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]); + private MethodRegistry _registry = MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion()); + private ProtocolVersion _protocolVersion; + private ProtocolOutputHandler _outputHandler = ProtocolOutputHandlerFactory.createOutputHandler(ProtocolVersion.getLatestSupportedVersion(), this); + ; /** @@ -118,7 +120,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP _stateManager = new AMQStateManager(this); } - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) + public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection) { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; @@ -129,7 +131,7 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP _stateManager = new AMQStateManager(this); } - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) + public AMQProtocolSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) { _protocolHandler = protocolHandler; _minaProtocolSession = protocolSession; @@ -147,11 +149,8 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP { // start the process of setting up the connection. This is the first place that // data is written to the server. - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + + _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); } public String getClientID() @@ -474,26 +473,27 @@ public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareP session.confirmConsumerCancelled(consumerTag); } - public void setProtocolVersion(final byte versionMajor, final byte versionMinor) + public void setProtocolVersion(ProtocolVersion pv) { - _protocolMajorVersion = versionMajor; - _protocolMinorVersion = versionMinor; - _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor); + _protocolVersion = pv; + + _registry = MethodRegistry.getMethodRegistry(pv); } - public byte getProtocolMinorVersion() + public ProtocolVersion getProtocolVersion() { - return _protocolMinorVersion; + return _protocolVersion; } - public byte getProtocolMajorVersion() + public MethodRegistry getRegistry() { - return _protocolMajorVersion; + return _registry; } - public VersionSpecificRegistry getRegistry() + public ProtocolOutputHandler getOutputHandler() { - return _registry; + return _outputHandler; } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 85f98eab69..8f1f410a15 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -23,11 +23,12 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -public abstract class BlockingMethodFrameListener implements AMQMethodListener +public abstract class BlockingMethodFrameListener implements AMQMethodListener { private volatile boolean _ready = false; @@ -43,7 +44,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener protected int _channelId; - protected AMQMethodEvent _doneEvt = null; + protected AMQMethodEvent _doneEvt = null; public BlockingMethodFrameListener(int channelId) { @@ -91,7 +92,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener /** * This method is called by the thread that wants to wait for a frame. */ - public AMQMethodEvent blockForFrame(long timeout) throws AMQException + public AMQMethodEvent blockForFrame(long timeout) throws AMQException { synchronized (_lock) { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java new file mode 100644 index 0000000000..3db6232d41 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandler.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.client.protocol; + +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQMethodFactory; +import org.apache.qpid.framing.CommonContentHeaderProperties; +import org.apache.qpid.framing.AMQMethodBodyImpl; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Map; +import java.util.HashMap; + + +public interface ProtocolOutputHandler +{ + + void sendCommand(int channelId, AMQMethodBody command); + + AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException; + AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException; + T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class responseClass, long timeout) throws AMQException; + T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class responseClass) throws AMQException; + + AMQMethodFactory getAMQMethodFactory(); + + void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket); + + boolean methodReceived(AMQMethodEvent evt) throws Exception; + + void error(Exception e); +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java new file mode 100644 index 0000000000..6cc0d8d3b5 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/ProtocolOutputHandlerFactory.java @@ -0,0 +1,50 @@ +package org.apache.qpid.client.protocol; + +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.client.protocol.amqp_8_0.ProtocolOutputHandler_8_0; + +import java.util.Map; +import java.util.HashMap; + +public abstract class ProtocolOutputHandlerFactory +{ + private static final Map _handlers = + new HashMap(); + + public ProtocolOutputHandlerFactory(ProtocolVersion pv) + { + _handlers.put(pv,this); + } + + public abstract ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession); + + public static ProtocolOutputHandler createOutputHandler(ProtocolVersion version, AMQProtocolSession amqProtocolSession) + { + return _handlers.get(version).newInstance(amqProtocolSession); + } + + private static final ProtocolOutputHandlerFactory VERSION_8_0 = + new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)8,(byte)0)) + { + + public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession) + { + return new ProtocolOutputHandler_8_0(amqProtocolSession); + } + }; + + // TODO - HACK + + private static final ProtocolOutputHandlerFactory VERSION_0_9 = + new ProtocolOutputHandlerFactory(new ProtocolVersion((byte)0,(byte)9)) + { + + public ProtocolOutputHandler newInstance(AMQProtocolSession amqProtocolSession) + { + return new ProtocolOutputHandler_8_0(amqProtocolSession); + } + }; + + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java b/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java new file mode 100644 index 0000000000..3654f46c1a --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/amqp_8_0/ProtocolOutputHandler_8_0.java @@ -0,0 +1,278 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.protocol.amqp_8_0; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_8_0.*; +import org.apache.qpid.client.protocol.ProtocolOutputHandler; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; + +import org.apache.mina.common.ByteBuffer; + +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArraySet; + +public class ProtocolOutputHandler_8_0 implements ProtocolOutputHandler +{ + private static final AMQMethodFactory METHOD_FACTORY = new AMQMethodFactory_8_0() ; + + + private static final Map, Class> REQUSET_RESPONSE_METHODBODY_MAP = + new HashMap, Class>(); + + static + { + // Basic Class + REQUSET_RESPONSE_METHODBODY_MAP.put(BasicCancelBody.class, BasicCancelOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(BasicConsumeBody.class, BasicConsumeOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(BasicQosBody.class, BasicQosOkBody.class); + // GET ??? + REQUSET_RESPONSE_METHODBODY_MAP.put(BasicRecoverBody.class, BasicRecoverOkBody.class); + + // Channel Class + REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelCloseBody.class, ChannelCloseOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelFlowBody.class, ChannelFlowOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ChannelOpenBody.class, ChannelOpenOkBody.class); + + // Connection Class + REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionOpenBody.class, ConnectionOpenOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionSecureBody.class, ConnectionSecureOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionStartBody.class, ConnectionStartOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ConnectionTuneBody.class, ConnectionTuneOkBody.class); + + // Exchange Class + REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeBoundBody.class, ExchangeBoundOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeclareBody.class, ExchangeDeclareOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(ExchangeDeleteBody.class, ExchangeDeleteOkBody.class); + + // Queue Class + REQUSET_RESPONSE_METHODBODY_MAP.put(QueueBindBody.class, QueueBindOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeclareBody.class, QueueDeclareOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(QueueDeleteBody.class, QueueDeleteOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(QueuePurgeBody.class, QueuePurgeOkBody.class); + + // Tx Class + REQUSET_RESPONSE_METHODBODY_MAP.put(TxCommitBody.class, TxCommitOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(TxRollbackBody.class, TxRollbackOkBody.class); + REQUSET_RESPONSE_METHODBODY_MAP.put(TxSelectBody.class, TxSelectOkBody.class); + + } + + + + + + private final AMQProtocolSession _session; + private static final long DEFAULT_TIMEOUT = 30000; + private final CopyOnWriteArraySet _frameListeners = + new CopyOnWriteArraySet(); + + public ProtocolOutputHandler_8_0(AMQProtocolSession amqProtocolSession) + { + _session = amqProtocolSession; + } + + + + + private void writeFrame(AMQDataBlock frame) + { + _session.writeFrame(frame); + } + + public void sendCommand(int channelId, AMQMethodBody command) + { + _session.writeFrame(new AMQFrame(channelId,command)); + } + + public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException + { + return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass())); + } + + public AMQMethodBody sendCommandReceiveResponse(int channelId, AMQMethodBody command, long timeout) throws AMQException + { + return sendCommandReceiveResponse(channelId, command, REQUSET_RESPONSE_METHODBODY_MAP.get(command.getClass()), timeout); + } + + public T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class responseClass, long timeout) throws AMQException + { + AMQFrame frame = new AMQFrame(channelId,command); + return writeCommandFrameAndWaitForReply(frame, + new SpecificMethodFrameListener(channelId, responseClass), timeout); + } + + private T writeCommandFrameAndWaitForReply(AMQFrame frame, SpecificMethodFrameListener listener, long timeout) throws AMQException + { + try + { + _frameListeners.add(listener); + _session.writeFrame(frame); + + AMQMethodEvent e = listener.blockForFrame(timeout); + return e.getMethod(); + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener + } + finally + { + // If we don't removeKey the listener then no-one will + _frameListeners.remove(listener); + } + + + } + + public T sendCommandReceiveResponse(int channelId, AMQMethodBody command, Class responseClass) throws AMQException + { + return sendCommandReceiveResponse(channelId, command, responseClass, DEFAULT_TIMEOUT); + } + + public AMQMethodFactory getAMQMethodFactory() + { + return METHOD_FACTORY; + } + + + public void publishMessage(int channelId, AMQShortString exchangeName, AMQShortString routingKey, boolean immediate, boolean mandatory, ByteBuffer payload, CommonContentHeaderProperties contentHeaderProperties, int ticket) + { + final int size = (payload != null) ? payload.limit() : 0; + BasicPublishBodyImpl publishBody = new BasicPublishBodyImpl(ticket, exchangeName, routingKey, mandatory, immediate); + + + final int contentBodyFrameCount = calculateContentBodyFrameCount(payload); + final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount]; + + if (payload != null) + { + createContentBodies(payload, frames, 2, channelId); + } + + + + AMQFrame contentHeaderFrame = + ContentHeaderBody.createAMQFrame(channelId, + publishBody.CLASS_ID, + 0, // weight + contentHeaderProperties, + size); + + frames[0] = new AMQFrame(channelId,publishBody); + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + writeFrame(compositeFrame); + } + + public boolean methodReceived(AMQMethodEvent evt) throws AMQException + { + boolean wasAnyoneInterested = false; + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final SpecificMethodFrameListener listener = it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + + return wasAnyoneInterested; + } + + public void error(Exception e) + { + if (!_frameListeners.isEmpty()) + { + final Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final SpecificMethodFrameListener ml = it.next(); + ml.error(e); + } + } + } + + + /** + * Create content bodies. This will split a large message into numerous bodies depending on the negotiated + * maximum frame size. + * + * @param payload + * @param frames + * @param offset + * @param channelId @return the array of content bodies + */ + private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId) + { + + if (frames.length == (offset + 1)) + { + frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload)); + } + else + { + + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + long remaining = payload.remaining(); + for (int i = offset; i < frames.length; i++) + { + payload.position((int) framePayloadMax * (i - offset)); + int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; + payload.limit(payload.position() + length); + frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice())); + + remaining -= length; + } + } + + } + + private int calculateContentBodyFrameCount(ByteBuffer payload) + { + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + int frameCount; + if ((payload == null) || (payload.remaining() == 0)) + { + frameCount = 0; + } + else + { + int dataLength = payload.remaining(); + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0; + frameCount = (int) (dataLength / framePayloadMax) + lastFrame; + } + + return frameCount; + } + + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 825baf95d1..bba1c2701c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -27,34 +27,20 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.client.handler.BasicCancelOkMethodHandler; -import org.apache.qpid.client.handler.BasicDeliverMethodHandler; -import org.apache.qpid.client.handler.BasicReturnMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; -import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; -import org.apache.qpid.client.handler.ConnectionStartMethodHandler; -import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; -import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler; -import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicCancelOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicDeliverMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ChannelCloseOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ChannelFlowOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionOpenOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ExchangeBoundOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.QueueDeleteOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicReturnMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.*; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -62,7 +48,7 @@ import org.apache.qpid.protocol.AMQMethodListener; * The state manager is responsible for managing the state of the protocol session.

For each AMQProtocolHandler * there is a separate state manager. */ -public class AMQStateManager implements AMQMethodListener +public class AMQStateManager { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); private AMQProtocolSession _protocolSession; @@ -178,7 +164,7 @@ public class AMQStateManager implements AMQMethodListener StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, _protocolSession, evt); + handler.methodReceived(this, evt); return true; } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java index b3932533ce..9ddc50941b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java @@ -31,6 +31,6 @@ import org.apache.qpid.protocol.AMQMethodEvent; */ public interface StateAwareMethodListener { - void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, - AMQMethodEvent evt) throws AMQException; + void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException; + } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java index 1c70ded62a..8a19a77776 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java @@ -22,13 +22,14 @@ package org.apache.qpid.client.state.listener; import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.BlockingMethodFrameListener; +import org.apache.qpid.framing.AMQMethodBodyImpl; import org.apache.qpid.framing.AMQMethodBody; -public class SpecificMethodFrameListener extends BlockingMethodFrameListener +public class SpecificMethodFrameListener extends BlockingMethodFrameListener { private final Class _expectedClass; - public SpecificMethodFrameListener(int channelId, Class expectedClass) + public SpecificMethodFrameListener(int channelId, Class expectedClass) { super(channelId); _expectedClass = expectedClass; diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java index 7a24d6e15a..9877cd3c37 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/ITransportConnection.java @@ -22,11 +22,11 @@ package org.apache.qpid.client.transport; import java.io.IOException; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.jms.BrokerDetails; public interface ITransportConnection { - void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) + void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException; } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 04e7e40564..25d5a2cc1c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -30,7 +30,7 @@ import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -50,7 +50,7 @@ public class SocketTransportConnection implements ITransportConnection _socketConnectorFactory = socketConnectorFactory; } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) + public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException { ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 104c4b43d0..4f8126f070 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -27,7 +27,7 @@ import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.PoolingFilter; import org.apache.qpid.pool.ReferenceCountingExecutorService; @@ -43,7 +43,7 @@ public class VmPipeTransportConnection implements ITransportConnection _port = port; } - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException + public void connect(AMQProtocolHandlerImpl protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new VmPipeConnector(); final IoServiceConfig cfg = ioConnector.getDefaultConfig(); diff --git a/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java index 1db7e200bd..b3b26b660a 100644 --- a/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java +++ b/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.codec; import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_8_0.BasicDeliverBodyImpl; + import org.apache.mina.common.*; import org.apache.mina.common.support.BaseIoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; @@ -238,7 +240,7 @@ public class BasicDeliverTest return new CompositeAMQDataBlock(frames); } - static AMQFrame wrapBody(AMQBody body) + static AMQFrame wrapBody(AMQBodyImpl body) { AMQFrame frame = new AMQFrame(1, body); return frame; @@ -264,13 +266,11 @@ public class BasicDeliverTest return body; } - static BasicDeliverBody createBasicDeliverBody() + static BasicDeliverBodyImpl createBasicDeliverBody() { - BasicDeliverBody body = new BasicDeliverBody((byte) 8, (byte) 0, - BasicDeliverBody.getClazz((byte) 8, (byte) 0), - BasicDeliverBody.getMethod((byte) 8, (byte) 0), - new AMQShortString("myConsumerTag"), 1, - new AMQShortString("myExchange"), false, + BasicDeliverBodyImpl body = new BasicDeliverBodyImpl( + new AMQShortString("myConsumerTag"), 1,false, + new AMQShortString("myExchange"), new AMQShortString("myRoutingKey")); return body; } diff --git a/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java b/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java index 955f82fab5..1358623bf8 100644 --- a/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java +++ b/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java @@ -118,8 +118,8 @@ public class FieldTableTest extends TestCase //decode buffer.flip(); - header = new ContentHeaderBody(); - header.populateFromBuffer(buffer, size); + header = new ContentHeaderBody(buffer, size); + return ((BasicContentHeaderProperties) header.properties).getHeaders(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java index 5e45d1d537..a0c67b4e74 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java @@ -46,13 +46,14 @@ public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListe return _handler; } - public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { _logger.debug("ChannelClose method received"); + final AMQProtocolSession protocolSession = stateManager.getProtocolSession(); ChannelCloseBody method = (ChannelCloseBody) evt.getMethod(); - AMQConstant errorCode = AMQConstant.getConstant(method.replyCode); - AMQShortString reason = method.replyText; + AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode()); + AMQShortString reason = method.getReplyText(); if (_logger.isDebugEnabled()) { _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java index 3431c56783..d6a4818ff7 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java @@ -44,6 +44,9 @@ import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ExchangeDeclareBodyImpl; +import org.apache.qpid.framing.amqp_8_0.ChannelOpenBodyImpl; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.url.URLSyntaxException; @@ -270,9 +273,7 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con private void sendClose(int channel) { - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + AMQFrame frame = new AMQFrame(channel, new ChannelCloseOkBodyImpl()); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } @@ -332,37 +333,29 @@ public class ChannelCloseTest extends TestCase implements ExceptionListener, Con private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + ExchangeDeclareBody exchangeDeclareBody = + ((AMQConnection) _connection).getProtocolOutputHandler().getAMQMethodFactory().createExchangeDeclare(new AMQShortString(_name),new AMQShortString(_type),0); +// new ExchangeDeclareBodyImpl(0,new AMQShortString(_name),new AMQShortString(_type),true,false,false,false,nowait,null); + + //AMQFrame exchangeDeclare = new AMQFrame(channelId, exchangeDeclareBody); if (nowait) { - ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare); + ((AMQConnection) _connection).getProtocolOutputHandler().sendCommand(channelId, exchangeDeclareBody); + } else { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + ((AMQConnection) _connection).getProtocolOutputHandler().sendCommandReceiveResponse(channelId, exchangeDeclareBody, SYNC_TIMEOUT); } } private void createChannel(int channelId) throws AMQException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); + + ChannelOpenBody openBody = + ((AMQConnection) _connection).getProtocolOutputHandler().getAMQMethodFactory().createChannelOpen(); + ((AMQConnection) _connection).getProtocolOutputHandler().sendCommandReceiveResponse(channelId, openBody); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java index d128f30727..29a8ecf5c1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java @@ -22,18 +22,14 @@ package org.apache.qpid.test.unit.client.channelclose; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.AMQState; -import org.apache.qpid.client.handler.ConnectionStartMethodHandler; -import org.apache.qpid.client.handler.ConnectionCloseMethodHandler; -import org.apache.qpid.client.handler.ConnectionTuneMethodHandler; -import org.apache.qpid.client.handler.ConnectionSecureMethodHandler; -import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler; -import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler; -import org.apache.qpid.client.handler.BasicDeliverMethodHandler; -import org.apache.qpid.client.handler.BasicReturnMethodHandler; -import org.apache.qpid.client.handler.BasicCancelOkMethodHandler; -import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler; -import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler; -import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ChannelCloseOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicDeliverMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ChannelFlowOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ExchangeBoundOkMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.BasicReturnMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.ConnectionStartMethodHandler; +import org.apache.qpid.client.handler.amqp_8_0.*; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionCloseBody; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 4374329fb0..bf7ea4f50b 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -24,7 +24,7 @@ import junit.framework.TestCase; import org.apache.mina.common.IoSession; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; @@ -36,7 +36,7 @@ public class AMQProtocolSessionTest extends TestCase { } - public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) + public AMQProtSession(AMQProtocolHandlerImpl protocolHandler, IoSession protocolSession, AMQConnection connection) { super(protocolHandler,protocolSession,connection); } -- cgit v1.2.1