summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-12 14:31:18 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-12 14:31:18 +0000
commit482ff88ac8fcaf14207db6e23d3b9981365dfd62 (patch)
tree7e4e929ba09510de4a129dd718b37a0830a91c75 /java/client/src
parent03f090715e8744bedc344495fbc03468f260a85a (diff)
downloadqpid-python-482ff88ac8fcaf14207db6e23d3b9981365dfd62.tar.gz
Refactored to create a common AMQMethodEvent class; Added clinet Method* handlers, removed old Basic* handlers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java35
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java)32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java (renamed from java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java)36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java62
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java6
29 files changed, 510 insertions, 187 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8cde5e557f..b1f19bc191 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -63,7 +63,6 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -89,6 +88,7 @@ import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.handler.ExchangeBoundHandler;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
deleted file mode 100644
index d855e97204..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.qpid.client.handler;
-
-import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.log4j.Logger;
-
-/**
- * @author Apache Software Foundation
- */
-public class BasicCancelOkMethodHandler implements StateAwareMethodListener
-{
- private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
- private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
-
- public static BasicCancelOkMethodHandler getInstance()
- {
- return _instance;
- }
-
- private BasicCancelOkMethodHandler()
- {
- }
-
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
- {
- _logger.debug("New BasicCancelOk method received");
- BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- evt.getProtocolSession().confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 5ad530a3ea..eb24f1fa74 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -26,13 +26,14 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
public class ChannelCloseMethodHandler implements StateAwareMethodListener
{
@@ -45,7 +46,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
@@ -61,7 +62,7 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)0, (byte)9);
- evt.getProtocolSession().writeFrame(frame);
+ protocolSession.writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
{
_logger.error("Channel close received with errorCode " + errorCode + ", and reason " + reason);
@@ -85,6 +86,6 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener
}
}
- evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
+ protocolSession.channelClosed(evt.getChannelId(), errorCode, reason);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
index 383cebbcab..a99c963eb4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
@@ -21,7 +21,8 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.log4j.Logger;
@@ -37,7 +38,7 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
index 9a4f7af849..7a13972d8f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
@@ -22,7 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ChannelFlowOkBody;
@@ -41,7 +42,7 @@ public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
_logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 5cec420920..36e9c947f3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -24,7 +24,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -47,7 +48,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionClose frame received");
ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
@@ -62,7 +63,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- evt.getProtocolSession().writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9));
+ protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short)0, (byte)0, (byte)9));
if (errorCode != 200)
{
@@ -70,7 +71,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
{
_logger.info("Authentication Error:"+Thread.currentThread().getName());
- evt.getProtocolSession().closeProtocolSession();
+ protocolSession.closeProtocolSession();
//todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
@@ -88,7 +89,7 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener
// this actually closes the connection in the case where it is not an error.
- evt.getProtocolSession().closeProtocolSession();
+ protocolSession.closeProtocolSession();
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
index b5001a6e64..da903e7c1d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
@@ -21,7 +21,8 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -39,7 +40,7 @@ public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
stateManager.changeState(AMQState.CONNECTION_OPEN);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index a658e3e787..699c8955bc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -22,7 +22,8 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionRedirectBody;
@@ -44,7 +45,7 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.info("ConnectionRedirect frame received");
ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
@@ -63,6 +64,6 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
host = method.host.substring(0, portIndex);
port = Integer.parseInt(method.host.substring(portIndex + 1));
}
- evt.getProtocolSession().failover(host, port);
+ protocolSession.failover(host, port);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
index 062cb268d8..87a8bbd529 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
@@ -24,9 +24,10 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
@@ -40,9 +41,9 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
- SaslClient client = evt.getProtocolSession().getSaslClient();
+ SaslClient client = protocolSession.getSaslClient();
if (client == null)
{
throw new AMQException("No SASL client set up - cannot proceed with authentication");
@@ -60,7 +61,7 @@ public class ConnectionSecureMethodHandler implements StateAwareMethodListener
AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
(byte)0, (byte)9, // AMQP version (major, minor)
response); // response
- evt.getProtocolSession().writeFrame(responseFrame);
+ protocolSession.writeFrame(responseFrame);
}
catch (SaslException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index d1b0082d36..a60c298bd2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
@@ -35,6 +34,7 @@ import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.protocol.AMQMethodEvent;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -59,7 +59,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
@@ -81,25 +81,24 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
}
- final AMQProtocolSession ps = evt.getProtocolSession();
byte[] saslResponse;
try
{
SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
null, "AMQP", "localhost",
- null, createCallbackHandler(mechanism, ps));
+ null, createCallbackHandler(mechanism, protocolSession));
if (sc == null)
{
throw new AMQException("Client SASL configuration error: no SaslClient could be created for mechanism " +
mechanism + ". Please ensure all factories are registered. See DynamicSaslRegistrar for " +
" details of how to register non-standard SASL client providers.");
}
- ps.setSaslClient(sc);
+ protocolSession.setSaslClient(sc);
saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
}
catch (SaslException e)
{
- ps.setSaslClient(null);
+ protocolSession.setSaslClient(null);
throw new AMQException("Unable to create SASL client: " + e, e);
}
@@ -122,14 +121,14 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ clientProperties.put(ClientProperties.instance.toString(), protocolSession.getClientID());
clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
+ protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
(byte)0, (byte)9, // AMQP version (major, minor)
clientProperties, // clientProperties
selectedLocale, // locale
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
index d6ff53c416..e4e74be684 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
@@ -23,7 +23,6 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
@@ -32,6 +31,7 @@ import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.protocol.AMQMethodEvent;
public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
@@ -48,13 +48,12 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
_logger.debug("ConnectionTune frame received");
ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
- AMQProtocolSession session = evt.getProtocolSession();
- ConnectionTuneParameters params = session.getConnectionTuneParameters();
+ ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
if (params == null)
{
params = new ConnectionTuneParameters();
@@ -63,11 +62,11 @@ public class ConnectionTuneMethodHandler implements StateAwareMethodListener
params.setFrameMax(frame.frameMax);
params.setChannelMax(frame.channelMax);
params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
- session.setConnectionTuneParameters(params);
+ protocolSession.setConnectionTuneParameters(params);
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
- session.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
+ protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params));
+ protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), session.getAMQConnection().getVirtualHost(), null, true));
}
protected AMQFrame createConnectionOpenFrame(int channel, String path, String capabilities, boolean insist)
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
index 858726745e..41e57113b0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
@@ -19,10 +19,11 @@ package org.apache.qpid.client.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
/**
* @author Apache Software Foundation
@@ -41,7 +42,7 @@ public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
new file mode 100644
index 0000000000..543ea0c7ad
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageAppendMethodHandler implements StateAwareMethodListener
+{
+ private static MessageAppendMethodHandler _instance = new MessageAppendMethodHandler();
+
+ public static MessageAppendMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageAppendMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageAppendBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java
new file mode 100644
index 0000000000..f809dbc4c1
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCheckpointMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageCheckpointMethodHandler implements StateAwareMethodListener
+{
+ private static MessageCheckpointMethodHandler _instance = new MessageCheckpointMethodHandler();
+
+ public static MessageCheckpointMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageCheckpointMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageCheckpointBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
index 6e12899204..cd04e89f6e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
@@ -18,33 +18,33 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.message.UnprocessedMessage;
-public class BasicDeliverMethodHandler implements StateAwareMethodListener
+public class MessageCloseMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(BasicDeliverMethodHandler.class);
+ private static MessageCloseMethodHandler _instance = new MessageCloseMethodHandler();
- private static final BasicDeliverMethodHandler _instance = new BasicDeliverMethodHandler();
-
- public static BasicDeliverMethodHandler getInstance()
+ public static MessageCloseMethodHandler getInstance()
{
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ private MessageCloseMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageCloseBody> evt)
+ throws AMQException
{
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = (BasicDeliverBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
- _logger.debug("New JmsDeliver method received");
- evt.getProtocolSession().unprocessedMessageReceived(msg);
+ // TODO
}
}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java
new file mode 100644
index 0000000000..76534b285f
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageEmptyMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageEmptyMethodHandler implements StateAwareMethodListener
+{
+ private static MessageEmptyMethodHandler _instance = new MessageEmptyMethodHandler();
+
+ public static MessageEmptyMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageEmptyMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageEmptyBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java
new file mode 100644
index 0000000000..80e5412049
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOffsetMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageOffsetMethodHandler implements StateAwareMethodListener
+{
+ private static MessageOffsetMethodHandler _instance = new MessageOffsetMethodHandler();
+
+ public static MessageOffsetMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageOffsetMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageOffsetBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java
new file mode 100644
index 0000000000..6d1d94de67
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOkMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageOkMethodHandler implements StateAwareMethodListener
+{
+ private static MessageOkMethodHandler _instance = new MessageOkMethodHandler();
+
+ public static MessageOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageOkMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageOkBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
index 8785a96396..1535b16563 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java
@@ -18,35 +18,33 @@
* under the License.
*
*/
-package org.apache.qpid.client.handler;
+package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.BasicReturnBody;
-public class BasicReturnMethodHandler implements StateAwareMethodListener
+public class MessageOpenMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(BasicReturnMethodHandler.class);
+ private static MessageOpenMethodHandler _instance = new MessageOpenMethodHandler();
- private static final BasicReturnMethodHandler _instance = new BasicReturnMethodHandler();
-
- public static BasicReturnMethodHandler getInstance()
+ public static MessageOpenMethodHandler getInstance()
{
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ private MessageOpenMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageOpenBody> evt)
+ throws AMQException
{
- _logger.debug("New JmsBounce method received");
- final UnprocessedMessage msg = new UnprocessedMessage();
- msg.deliverBody = null;
- msg.bounceBody = (BasicReturnBody) evt.getMethod();
- msg.channelId = evt.getChannelId();
-
- evt.getProtocolSession().unprocessedMessageReceived(msg);
+ // TODO
}
-} \ No newline at end of file
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java
new file mode 100644
index 0000000000..bbce2ae1ee
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageRejectMethodHandler implements StateAwareMethodListener
+{
+ private static MessageRejectMethodHandler _instance = new MessageRejectMethodHandler();
+
+ public static MessageRejectMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageRejectMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageRejectBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java
new file mode 100644
index 0000000000..810ed25029
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageResumeMethodHandler implements StateAwareMethodListener
+{
+ private static MessageResumeMethodHandler _instance = new MessageResumeMethodHandler();
+
+ public static MessageResumeMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageResumeMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageResumeBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
new file mode 100644
index 0000000000..6a21bed7b5
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+
+public class MessageTransferMethodHandler implements StateAwareMethodListener
+{
+ private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
+
+ public static MessageTransferMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private MessageTransferMethodHandler() {}
+
+
+ public void methodReceived (AMQStateManager stateManager,
+ AMQProtocolSession protocolSession,
+ AMQMethodEvent<MessageTransferBody> evt)
+ throws AMQException
+ {
+ // TODO
+ }
+}
+
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
index 3271a715a2..b8b75accb4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
@@ -17,11 +17,12 @@
*/
package org.apache.qpid.client.handler;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.log4j.Logger;
/**
@@ -41,7 +42,7 @@ public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
{
if (_logger.isDebugEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java
deleted file mode 100644
index 403aa3486e..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodEvent.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-public class AMQMethodEvent
-{
- private AMQMethodBody _method;
-
- private int _channelId;
-
- private AMQProtocolSession _protocolSession;
-
- public AMQMethodEvent(int channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
- {
- _channelId = channelId;
- _method = method;
- _protocolSession = protocolSession;
- }
-
- public AMQMethodBody getMethod()
- {
- return _method;
- }
-
- public int getChannelId()
- {
- return _channelId;
- }
-
- public AMQProtocolSession getProtocolSession()
- {
- return _protocolSession;
- }
-
- public String toString()
- {
- StringBuffer buf = new StringBuffer("Method event: ");
- buf.append("\nChannel id: ").append(_channelId);
- buf.append("\nMethod: ").append(_method);
- return buf.toString();
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
index 68299033a5..2cbd8f0e32 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
@@ -21,6 +21,8 @@
package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public interface AMQMethodListener
{
@@ -34,7 +36,7 @@ public interface AMQMethodListener
* to all registered listeners using the error() method (see below) allowing them to
* perform cleanup if necessary.
*/
- boolean methodReceived(AMQMethodEvent evt) throws AMQException;
+ boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException;
/**
* Callback when an error has occurred. Allows listeners to clean up.
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 68797edc77..a0399e1450 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -46,6 +46,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.ssl.BogusSSLContextFactory;
import java.util.Iterator;
@@ -313,14 +314,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
_logger.debug("Method frame received: " + frame);
}
- final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
}
if (!wasAnyoneInterested)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 21ae3fc71f..4fff4fab00 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -22,6 +22,8 @@ package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
{
@@ -53,7 +55,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
{
AMQMethodBody method = evt.getMethod();
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 50bd1667f9..0d9d70b244 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -21,9 +21,10 @@
package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.handler.*;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -101,9 +102,16 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance());
frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
+ frame2handlerMap.put(MessageAppendBody.class, MessageAppendMethodHandler.getInstance());
+ frame2handlerMap.put(MessageCheckpointBody.class, MessageCheckpointMethodHandler.getInstance());
+ frame2handlerMap.put(MessageCloseBody.class, MessageCloseMethodHandler.getInstance());
+ frame2handlerMap.put(MessageEmptyBody.class, MessageEmptyMethodHandler.getInstance());
+ frame2handlerMap.put(MessageOffsetBody.class, MessageOffsetMethodHandler.getInstance());
+ frame2handlerMap.put(MessageOkBody.class, MessageOkMethodHandler.getInstance());
+ frame2handlerMap.put(MessageOpenBody.class, MessageOpenMethodHandler.getInstance());
+ frame2handlerMap.put(MessageRejectBody.class, MessageRejectMethodHandler.getInstance());
+ frame2handlerMap.put(MessageResumeBody.class, MessageResumeMethodHandler.getInstance());
+ frame2handlerMap.put(MessageTransferBody.class, MessageTransferMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
@@ -146,12 +154,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
{
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, evt);
+ handler.methodReceived(this, protocolSession, evt);
return true;
}
return false;
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
index 37a0f9f376..f2c8916868 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.client.state;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
/**
@@ -30,5 +31,6 @@ import org.apache.qpid.AMQException;
*/
public interface StateAwareMethodListener
{
- void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException;
+ void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
+ AMQMethodEvent evt) throws AMQException;
}