diff options
Diffstat (limited to 'java')
5 files changed, 53 insertions, 101 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 7bdb5e1ecd..0f2f5ac94e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,14 +23,12 @@ package org.apache.qpid.server; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; -//import org.apache.qpid.framing.BasicPublishBody; -//import org.apache.qpid.framing.ContentBody; -//import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.Content; import org.apache.qpid.framing.RequestManager; import org.apache.qpid.framing.ResponseManager; import org.apache.qpid.protocol.AMQProtocolWriter; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; @@ -121,7 +119,7 @@ public class AMQChannel private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>(); private Set<Long> _browsedAcks = new HashSet<Long>(); - public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolSession) + public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolWriter, AMQMethodListener methodListener) throws AMQException { _channelId = channelId; @@ -129,8 +127,8 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; - _requestManager = new RequestManager(channelId, protocolSession); - _responseManager = new ResponseManager(channelId, protocolSession); + _requestManager = new RequestManager(channelId, protocolWriter); + _responseManager = new ResponseManager(channelId, methodListener, protocolWriter); _txnBuffer = new TxnBuffer(_messageStore); } @@ -279,8 +277,15 @@ public class AMQChannel } } - public RequestManager getRequestManager() { return _requestManager; } - public ResponseManager getResponseManager() { return _responseManager; } + public RequestManager getRequestManager() + { + return _requestManager; + } + + public ResponseManager getResponseManager() + { + return _responseManager; + } public long getNextDeliveryTag() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index e163086c14..36895e065d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -36,7 +36,9 @@ import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQRequestBody; import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.AMQResponseCallback; import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.RequestResponseMappingException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.protocol.AMQMethodEvent; @@ -119,7 +121,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); -// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, @@ -209,11 +210,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, if (frame.bodyFrame instanceof AMQRequestBody) { - requestFrameReceived(frame); + requestFrameReceived(frame.channel, (AMQRequestBody)frame.bodyFrame); } else if (frame.bodyFrame instanceof AMQResponseBody) { - responseFrameReceived(frame); + responseFrameReceived(frame.channel, (AMQResponseBody)frame.bodyFrame); } else { @@ -222,97 +223,43 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void requestFrameReceived(AMQFrame frame) throws AMQException + private void requestFrameReceived(int channel, AMQRequestBody requestBody) throws AMQException { if (_logger.isDebugEnabled()) { _logger.debug("Request frame received: " + frame); } - AMQChannel channel = getChannel(frame.channel); + AMQChannel channel = getChannel(channel); + ResponseManager responseManager = channel.getResponseManager(); + responseManager.requestReceived(requestBody); } - private void responseFrameReceived(AMQFrame frame) throws AMQException + private void responseFrameReceived(int channel, AMQResponseBody responseBody) throws AMQException { if (_logger.isDebugEnabled()) { _logger.debug("Response frame received: " + frame); } - AMQChannel channel = getChannel(frame.channel); - } - -// private void methodFrameReceived(AMQFrame frame) -// { -// if (_logger.isDebugEnabled()) -// { -// _logger.debug("Method frame received: " + frame); -// } -// final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, -// (AMQMethodBody) frame.bodyFrame); -// try -// { -// boolean wasAnyoneInterested = false; -// for (AMQMethodListener listener : _frameListeners) -// { -// wasAnyoneInterested = listener.methodReceived(evt) || -// wasAnyoneInterested; -// } -// if (!wasAnyoneInterested) -// { -// throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); -// } -// } -// catch (AMQChannelException e) -// { -// _logger.error("Closing channel due to: " + e.getMessage()); -// writeFrame(e.getCloseFrame(frame.channel)); -// } -// catch (Exception e) -// { -// for (AMQMethodListener listener : _frameListeners) -// { -// listener.error(e); -// } -// _minaProtocolSession.close(); -// } -// } - -// private void contentFrameReceived(AMQFrame frame) throws AMQException -// { -// if (frame.bodyFrame instanceof ContentHeaderBody) -// { -// contentHeaderReceived(frame); -// } -// else if (frame.bodyFrame instanceof ContentBody) -// { -// contentBodyReceived(frame); -// } -// else if (frame.bodyFrame instanceof HeartbeatBody) -// { -// _logger.debug("Received heartbeat from client"); -// } -// else -// { -// _logger.warn("Unrecognised frame " + frame.getClass().getName()); -// } -// } - -// private void contentHeaderReceived(AMQFrame frame) throws AMQException -// { -// if (_logger.isDebugEnabled()) -// { -// _logger.debug("Content header frame received: " + frame); -// } -// getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame); -// } - -// private void contentBodyReceived(AMQFrame frame) throws AMQException -// { -// if (_logger.isDebugEnabled()) -// { -// _logger.debug("Content body frame received: " + frame); -// } -// getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame); -// } + AMQChannel channel = getChannel(channel); + RequestManager requestManager = channel.getRequestManager(); + requestManager.responseReceived(responseBody); + } + + public long writeRequest(int channel, AMQMethodBody methodBody, AMQResponseCallback responseCallback) + throws RequestResponseMappingException + { + AMQChannel channel = getChannel(channel); + RequestManager requestManager = channel.getRequestManager(); + return requestManager.sendRequest(methodBody, responseCallback); + } + + public void writeResponse(int channel, long requestId, AMQMethodBody methodBody) + throws RequestResponseMappingException + { + AMQChannel channel = getChannel(channel); + ResponseManager responseManager = channel.getResponseManager(); + responseManager(requestId, methodBody); + } /** * Convenience method that writes a frame to the protocol session. Equivalent diff --git a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java index 2ead0a03e6..996ac23b09 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQChannelException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQChannelException.java @@ -21,7 +21,7 @@ package org.apache.qpid; import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; public class AMQChannelException extends AMQException { @@ -49,8 +49,8 @@ public class AMQChannelException extends AMQException this.minor = minor; } - public AMQFrame getCloseFrame(int channel) + public AMQMethodBody getCloseMethodBody() { - return ChannelCloseBody.createAMQFrame(channel, major, minor, _classId, _methodId, getErrorCode(), getMessage()); + return ChannelCloseBody.createMethodBody(major, minor, _classId, _methodId, getErrorCode(), getMessage()); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index da7479af8c..900d068c13 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -27,7 +27,7 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager { private int channel; - AMQProtocolWriter protocolSession; + AMQProtocolWriter protocolWriter; /** * Request and response frames must have a requestID and responseID which @@ -43,10 +43,10 @@ public class RequestManager private Hashtable<Long, AMQResponseCallback> requestSentMap; - public RequestManager(int channel, AMQProtocolWriter protocolSession) + public RequestManager(int channel, AMQProtocolWriter protocolWriter) { this.channel = channel; - this.protocolSession = protocolSession; + this.protocolWriter = protocolWriter; requestIdCount = 1L; lastProcessedResponseId = 0L; requestSentMap = new Hashtable<Long, AMQResponseCallback>(); @@ -60,7 +60,7 @@ public class RequestManager long requestId = getNextRequestId(); // Get new request ID AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, lastProcessedResponseId, requestMethodBody); - protocolSession.writeFrame(requestFrame); + protocolWriter.writeFrame(requestFrame); requestSentMap.put(requestId, responseCallback); return requestId; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 9174675ded..bfc0eb84de 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -32,7 +32,7 @@ public class ResponseManager { private int channel; AMQMethodListener methodListener; - AMQProtocolWriter protocolSession; + AMQProtocolWriter protocolWriter; /** * Determines the batch behaviour of the manager. @@ -91,11 +91,11 @@ public class ResponseManager private Hashtable<Long, ResponseStatus> responseMap; public ResponseManager(int channel, AMQMethodListener methodListener, - AMQProtocolWriter protocolSession) + AMQProtocolWriter protocolWriter) { this.channel = channel; this.methodListener = methodListener; - this.protocolSession = protocolSession; + this.protocolWriter = protocolWriter; responseIdCount = 1L; lastReceivedRequestId = 0L; responseMap = new Hashtable<Long, ResponseStatus>(); @@ -221,6 +221,6 @@ public class ResponseManager long responseId = getNextResponseId(); // Get new request ID AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, firstRequestId, numAdditionalRequests, responseMethodBody); - protocolSession.writeFrame(responseFrame); + protocolWriter.writeFrame(responseFrame); } } |
