diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-12 14:31:18 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-12 14:31:18 +0000 |
| commit | 482ff88ac8fcaf14207db6e23d3b9981365dfd62 (patch) | |
| tree | 7e4e929ba09510de4a129dd718b37a0830a91c75 /java/broker/src | |
| parent | 03f090715e8744bedc344495fbc03468f260a85a (diff) | |
| download | qpid-python-482ff88ac8fcaf14207db6e23d3b9981365dfd62.tar.gz | |
Refactored to create a common AMQMethodEvent class; Added clinet Method* handlers, removed old Basic* handlers.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495581 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
41 files changed, 241 insertions, 200 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 51be0d867b..7bdb5e1ecd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,10 +23,14 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; +//import org.apache.qpid.framing.BasicPublishBody; +//import org.apache.qpid.framing.ContentBody; +//import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.RequestManager; +import org.apache.qpid.framing.ResponseManager; +import org.apache.qpid.protocol.AMQProtocolWriter; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -64,6 +68,9 @@ public class AMQChannel private long _prefetch_HighWaterMark; private long _prefetch_LowWaterMark; + + private RequestManager _requestManager; + private ResponseManager _responseManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -114,7 +121,7 @@ public class AMQChannel private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); private Set<Long> _browsedAcks = new HashSet<Long>(); - public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges) + public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolSession) throws AMQException { _channelId = channelId; @@ -122,6 +129,8 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; + _requestManager = new RequestManager(channelId, protocolSession); + _responseManager = new ResponseManager(channelId, protocolSession); _txnBuffer = new TxnBuffer(_messageStore); } @@ -170,11 +179,11 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException - { - _currentMessage = new AMQMessage(_messageStore, publishBody); - _currentMessage.setPublisher(publisher); - } +// public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException +// { +// _currentMessage = new AMQMessage(_messageStore, publishBody); +// _currentMessage.setPublisher(publisher); +// } // public void publishContentHeader(ContentHeaderBody contentHeaderBody) // throws AMQException @@ -269,6 +278,9 @@ public class AMQChannel } } } + + public RequestManager getRequestManager() { return _requestManager; } + public ResponseManager getResponseManager() { return _responseManager; } public long getNextDeliveryTag() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 2f1d6ada00..7461f93539 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -25,8 +25,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java index fd6714de3a..32622f8fed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java index 8f91d4ecf4..07ab0537d5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelFlowBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index ee2ff9e5bb..459ccf40a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -24,11 +24,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index 94e54e8f1d..6e22d67b72 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -23,12 +23,12 @@ package org.apache.qpid.server.handler; import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index cc9277593b..69d50c9237 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -22,8 +22,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java index f1487b2844..c3b6560ee4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java @@ -24,8 +24,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 42dda66eab..9aea4a7b26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java index b2ddf6d0db..77fddf1ff5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java @@ -27,8 +27,8 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ConnectionSecureBody; import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.HeartbeatConfig; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java index f0b4e0a515..de38cff3e2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQState; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java index 87e20faa81..67f77c72ef 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -21,9 +21,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeBoundBody; import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index efcfffeaff..cdb3a503ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -25,10 +25,10 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java index 1c72f75dd7..79b4e07c90 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java @@ -24,9 +24,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeleteBody; import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java index 91b5493f32..962bd2004f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageAppendHandler implements StateAwareMethodListener<MessageAppendBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java index a84cbfb88e..ad41910141 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageCancelHandler implements StateAwareMethodListener<MessageCancelBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java index 9dd9a6b18e..2123f9203c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageCheckpointHandler implements StateAwareMethodListener<MessageCheckpointBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java index 5e21c1ee6c..5efe24abfa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageCloseHandler implements StateAwareMethodListener<MessageCloseBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index a2c5662703..c019a3b043 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java index 37d39a517a..3d5b59c238 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageEmptyHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageEmptyHandler implements StateAwareMethodListener<MessageEmptyBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java index 15d769e295..5ae0c677a4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageGetHandler implements StateAwareMethodListener<MessageGetBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java index 7ac075c8d4..de6a886891 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOffsetHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageOffsetHandler implements StateAwareMethodListener<MessageOffsetBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java index 4b2a1543cd..9fb38bf8b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOkHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageOkHandler implements StateAwareMethodListener<MessageOkBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java index 79679713ba..714227bb7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageOpenHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageOpenHandler implements StateAwareMethodListener<MessageOpenBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java index baa01df602..6bea5553d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageQosHandler implements StateAwareMethodListener<MessageQosBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java index e178c60b27..6da0b6ffcc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageRecoverHandler implements StateAwareMethodListener<MessageRecoverBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java index 401b399fa0..e5a27c762a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRejectHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageRejectHandler implements StateAwareMethodListener<MessageRejectBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java index 429514cc5b..49b5ff8e08 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageResumeHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageResumeHandler implements StateAwareMethodListener<MessageResumeBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java index 18027fdc2b..4f6886e885 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java @@ -20,7 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageTransferHandler implements StateAwareMethodListener<MessageTransferBody> diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 5e8c8a31ca..915bfa67a6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -22,13 +22,13 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueBindBody; import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 0d2dc1550e..1a7b82829a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -26,9 +26,9 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index d49baa97a4..b867d80fdb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.handler; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.framing.QueueDeleteBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 74902c33f8..983e6f7e56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -23,9 +23,9 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.TxCommitOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java index 32fb57db70..891dd69d4d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxRollbackBody; import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java index bb0f89f163..0c2a6ca210 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.handler; import org.apache.qpid.AMQException; import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java deleted file mode 100644 index 3d12828900..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodEvent.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.framing.AMQMethodBody; - -/** - * An event that is passed to AMQMethodListeners describing a particular method. - * It supplies the: - * <ul><li>channel id</li> - * <li>protocol method</li> - * to listeners. This means that listeners do not need to be stateful. - * - * In the StateAwareMethodListener, other useful objects such as the protocol session - * are made available. - * - */ -public class AMQMethodEvent<M extends AMQMethodBody> -{ - private final M _method; - - private final int _channelId; - - public AMQMethodEvent(int channelId, M method) - { - _channelId = channelId; - _method = method; - } - - public M getMethod() - { - return _method; - } - - public int getChannelId() - { - return _channelId; - } - - public String toString() - { - StringBuilder buf = new StringBuilder("Method event: "); - buf.append("\nChannel id: ").append(_channelId); - buf.append("\nMethod: ").append(_method); - return buf.toString(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java index d2062d3c17..6596da1f8f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.framing.AMQMethodBody; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 525978e348..558d5f4aa6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -30,14 +30,18 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.Content; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +//import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.ContentHeaderBody; +//import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; @@ -195,124 +199,130 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { AMQFrame frame = (AMQFrame) message; - if (frame.bodyFrame instanceof AMQRequest) + if (frame.bodyFrame instanceof AMQRequestBody) { requestFrameReceived(frame); } - else if (frame.bodyFrame instanceof AMQResponse) + else if (frame.bodyFrame instanceof AMQResponseBody) { responseFrameReceived(frame); } - else if (frame.bodyFrame instanceof AMQMethodBody) - { - methodFrameReceived(frame); - } else { - try - { - contentFrameReceived(frame); - } - catch (RequiredDeliveryException e) - { - //need to return the message: - _logger.info("Returning message to " + this + " channel " + frame.channel - + ": " + e.getMessage()); - writeFrame(e.getReturnMessage(frame.channel)); - } + _logger.error("Received invalid frame: " + frame.toString()); } +// else if (frame.bodyFrame instanceof AMQMethodBody) +// { +// methodFrameReceived(frame); +// } +// else +// { +// try +// { +// contentFrameReceived(frame); +// } +// catch (RequiredDeliveryException e) +// { +// //need to return the message: +// _logger.info("Returning message to " + this + " channel " + frame.channel +// + ": " + e.getMessage()); +// writeFrame(e.getReturnMessage(frame.channel)); +// } +// } } } - private void requestFrameReceived(AMQFrame frame) + private void requestFrameReceived(AMQFrame frame) throws AMQException { if (_logger.isDebugEnabled()) { _logger.debug("Request frame received: " + frame); } + AMQChannel channel = getChannel(frame.channel); } - private void responseFrameReceived(AMQFrame frame) + private void responseFrameReceived(AMQFrame frame) throws AMQException { if (_logger.isDebugEnabled()) { _logger.debug("Response frame received: " + frame); } - } - - private void methodFrameReceived(AMQFrame frame) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Method frame received: " + frame); - } - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, - (AMQMethodBody) frame.bodyFrame); - try - { - boolean wasAnyoneInterested = false; - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || - wasAnyoneInterested; - } - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); - } - } - catch (AMQChannelException e) - { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.channel)); - } - catch (AMQException e) - { - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - _minaProtocolSession.close(); - } - } - - private void contentFrameReceived(AMQFrame frame) throws AMQException - { - if (frame.bodyFrame instanceof ContentHeaderBody) - { - contentHeaderReceived(frame); - } - else if (frame.bodyFrame instanceof ContentBody) - { - contentBodyReceived(frame); - } - else if (frame.bodyFrame instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat from client"); - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } - } - - private void contentHeaderReceived(AMQFrame frame) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content header frame received: " + frame); - } - getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); - } - - private void contentBodyReceived(AMQFrame frame) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Content body frame received: " + frame); - } - getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); - } + AMQChannel channel = getChannel(frame.channel); + } + +// private void methodFrameReceived(AMQFrame frame) +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Method frame received: " + frame); +// } +// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, +// (AMQMethodBody) frame.bodyFrame); +// try +// { +// boolean wasAnyoneInterested = false; +// for (AMQMethodListener listener : _frameListeners) +// { +// wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || +// wasAnyoneInterested; +// } +// if (!wasAnyoneInterested) +// { +// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); +// } +// } +// catch (AMQChannelException e) +// { +// _logger.error("Closing channel due to: " + e.getMessage()); +// writeFrame(e.getCloseFrame(frame.channel)); +// } +// catch (AMQException e) +// { +// for (AMQMethodListener listener : _frameListeners) +// { +// listener.error(e); +// } +// _minaProtocolSession.close(); +// } +// } + +// private void contentFrameReceived(AMQFrame frame) throws AMQException +// { +// if (frame.bodyFrame instanceof ContentHeaderBody) +// { +// contentHeaderReceived(frame); +// } +// else if (frame.bodyFrame instanceof ContentBody) +// { +// contentBodyReceived(frame); +// } +// else if (frame.bodyFrame instanceof HeartbeatBody) +// { +// _logger.debug("Received heartbeat from client"); +// } +// else +// { +// _logger.warn("Unrecognised frame " + frame.getClass().getName()); +// } +// } + +// private void contentHeaderReceived(AMQFrame frame) throws AMQException +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Content header frame received: " + frame); +// } +// getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); +// } + +// private void contentBodyReceived(AMQFrame frame) throws AMQException +// { +// if (_logger.isDebugEnabled()) +// { +// _logger.debug("Content body frame received: " + frame); +// } +// getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); +// } /** * Convenience method that writes a frame to the protocol session. Equivalent diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index b1ed999f72..00d22d839e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; @@ -38,14 +39,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter */ void dataBlockReceived(AMQDataBlock message) throws Exception; -// This is now a part of AMQProtocolWriter (inherited) to provide uniformity across both -// client and server. -// /** -// * Write a datablock, encoding where necessary (e.g. into a sequence of bytes) -// * @param frame the frame to be encoded and written -// */ -// void writeFrame(AMQDataBlock frame); - /** * Get the context key associated with this session. Context key is described * in the AMQ protocol specification (RFC 6). diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 509d7761dd..18b8041e7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; -import org.apache.qpid.server.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java index 7d58f242e5..56323258b7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/StateAwareMethodListener.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.state; import org.apache.qpid.AMQException; -import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; |
