summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java111
-rw-r--r--java/common/src/main/java/org/apache/qpid/AMQChannelException.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java8
5 files changed, 53 insertions, 101 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 7bdb5e1ecd..0f2f5ac94e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,14 +23,12 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
-//import org.apache.qpid.framing.BasicPublishBody;
-//import org.apache.qpid.framing.ContentBody;
-//import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -121,7 +119,7 @@ public class AMQChannel
private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
private Set<Long> _browsedAcks = new HashSet<Long>();
- public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolSession)
+ public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
throws AMQException
{
_channelId = channelId;
@@ -129,8 +127,8 @@ public class AMQChannel
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, protocolSession);
- _responseManager = new ResponseManager(channelId, protocolSession);
+ _requestManager = new RequestManager(channelId, protocolWriter);
+ _responseManager = new ResponseManager(channelId, methodListener, protocolWriter);
_txnBuffer = new TxnBuffer(_messageStore);
}
@@ -279,8 +277,15 @@ public class AMQChannel
}
}
- public RequestManager getRequestManager() { return _requestManager; }
- public ResponseManager getResponseManager() { return _responseManager; }
+ public RequestManager getRequestManager()
+ {
+ return _requestManager;
+ }
+
+ public ResponseManager getResponseManager()
+ {
+ return _responseManager;
+ }
public long getNextDeliveryTag()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index e163086c14..36895e065d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -36,7 +36,9 @@ import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.AMQResponseCallback;
import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -119,7 +121,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
-// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -209,11 +210,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
if (frame.bodyFrame instanceof AMQRequestBody)
{
- requestFrameReceived(frame);
+ requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
}
else if (frame.bodyFrame instanceof AMQResponseBody)
{
- responseFrameReceived(frame);
+ responseFrameReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
}
else
{
@@ -222,97 +223,43 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void requestFrameReceived(AMQFrame frame) throws AMQException
+ private void requestFrameReceived(int channel, AMQRequestBody requestBody) throws AMQException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Request frame received: " + frame);
}
- AMQChannel channel = getChannel(frame.channel);
+ AMQChannel channel = getChannel(channel);
+ ResponseManager responseManager = channel.getResponseManager();
+ responseManager.requestReceived(requestBody);
}
- private void responseFrameReceived(AMQFrame frame) throws AMQException
+ private void responseFrameReceived(int channel, AMQResponseBody responseBody) throws AMQException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Response frame received: " + frame);
}
- AMQChannel channel = getChannel(frame.channel);
- }
-
-// private void methodFrameReceived(AMQFrame frame)
-// {
-// if (_logger.isDebugEnabled())
-// {
-// _logger.debug("Method frame received: " + frame);
-// }
-// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
-// (AMQMethodBody) frame.bodyFrame);
-// try
-// {
-// boolean wasAnyoneInterested = false;
-// for (AMQMethodListener listener : _frameListeners)
-// {
-// wasAnyoneInterested = listener.methodReceived(evt) ||
-// wasAnyoneInterested;
-// }
-// if (!wasAnyoneInterested)
-// {
-// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
-// }
-// }
-// catch (AMQChannelException e)
-// {
-// _logger.error("Closing channel due to: " + e.getMessage());
-// writeFrame(e.getCloseFrame(frame.channel));
-// }
-// catch (Exception e)
-// {
-// for (AMQMethodListener listener : _frameListeners)
-// {
-// listener.error(e);
-// }
-// _minaProtocolSession.close();
-// }
-// }
-
-// private void contentFrameReceived(AMQFrame frame) throws AMQException
-// {
-// if (frame.bodyFrame instanceof ContentHeaderBody)
-// {
-// contentHeaderReceived(frame);
-// }
-// else if (frame.bodyFrame instanceof ContentBody)
-// {
-// contentBodyReceived(frame);
-// }
-// else if (frame.bodyFrame instanceof HeartbeatBody)
-// {
-// _logger.debug("Received heartbeat from client");
-// }
-// else
-// {
-// _logger.warn("Unrecognised frame " + frame.getClass().getName());
-// }
-// }
-
-// private void contentHeaderReceived(AMQFrame frame) throws AMQException
-// {
-// if (_logger.isDebugEnabled())
-// {
-// _logger.debug("Content header frame received: " + frame);
-// }
-// getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
-// }
-
-// private void contentBodyReceived(AMQFrame frame) throws AMQException
-// {
-// if (_logger.isDebugEnabled())
-// {
-// _logger.debug("Content body frame received: " + frame);
-// }
-// getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
-// }
+ AMQChannel channel = getChannel(channel);
+ RequestManager requestManager = channel.getRequestManager();
+ requestManager.responseReceived(responseBody);
+ }
+
+ public long writeRequest(int channel, AMQMethodBody methodBody, AMQResponseCallback responseCallback)
+ throws RequestResponseMappingException
+ {
+ AMQChannel channel = getChannel(channel);
+ RequestManager requestManager = channel.getRequestManager();
+ return requestManager.sendRequest(methodBody, responseCallback);
+ }
+
+ public void writeResponse(int channel, long requestId, AMQMethodBody methodBody)
+ throws RequestResponseMappingException
+ {
+ AMQChannel channel = getChannel(channel);
+ ResponseManager responseManager = channel.getResponseManager();
+ responseManager(requestId, methodBody);
+ }
/**
* Convenience method that writes a frame to the protocol session. Equivalent
diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
index 2ead0a03e6..996ac23b09 100644
--- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
+++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
@@ -21,7 +21,7 @@
package org.apache.qpid;
import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
public class AMQChannelException extends AMQException
{
@@ -49,8 +49,8 @@ public class AMQChannelException extends AMQException
this.minor = minor;
}
- public AMQFrame getCloseFrame(int channel)
+ public AMQMethodBody getCloseMethodBody()
{
- return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), getMessage());
+ return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage());
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index da7479af8c..900d068c13 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -27,7 +27,7 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
{
private int channel;
- AMQProtocolWriter protocolSession;
+ AMQProtocolWriter protocolWriter;
/**
* Request and response frames must have a requestID and responseID which
@@ -43,10 +43,10 @@ public class RequestManager
private Hashtable<Long, AMQResponseCallback> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolSession)
+ public RequestManager(int channel, AMQProtocolWriter protocolWriter)
{
this.channel = channel;
- this.protocolSession = protocolSession;
+ this.protocolWriter = protocolWriter;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
requestSentMap = new Hashtable<Long, AMQResponseCallback>();
@@ -60,7 +60,7 @@ public class RequestManager
long requestId = getNextRequestId(); // Get new request ID
AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
lastProcessedResponseId, requestMethodBody);
- protocolSession.writeFrame(requestFrame);
+ protocolWriter.writeFrame(requestFrame);
requestSentMap.put(requestId, responseCallback);
return requestId;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index 9174675ded..bfc0eb84de 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -32,7 +32,7 @@ public class ResponseManager
{
private int channel;
AMQMethodListener methodListener;
- AMQProtocolWriter protocolSession;
+ AMQProtocolWriter protocolWriter;
/**
* Determines the batch behaviour of the manager.
@@ -91,11 +91,11 @@ public class ResponseManager
private Hashtable<Long, ResponseStatus> responseMap;
public ResponseManager(int channel, AMQMethodListener methodListener,
- AMQProtocolWriter protocolSession)
+ AMQProtocolWriter protocolWriter)
{
this.channel = channel;
this.methodListener = methodListener;
- this.protocolSession = protocolSession;
+ this.protocolWriter = protocolWriter;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new Hashtable<Long, ResponseStatus>();
@@ -221,6 +221,6 @@ public class ResponseManager
long responseId = getNextResponseId(); // Get new request ID
AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
firstRequestId, numAdditionalRequests, responseMethodBody);
- protocolSession.writeFrame(responseFrame);
+ protocolWriter.writeFrame(responseFrame);
}
}