summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-21 15:47:17 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-21 15:47:17 +0000
commit5679739e02af18e04fb66ce531356c051373646b (patch)
tree584df7d32b2918954d3d5c315e8534db7d51292c /java/broker
parent033902ad87784d29855cb49f03fbe279fd84100b (diff)
downloadqpid-python-5679739e02af18e04fb66ce531356c051373646b.tar.gz
QPID-348 Problems related to prefetching of messages
Client caches are now cleared. Partially commented out code in AMQSession and BasicMessageConsumer pending broker fixes to ensure channel suspension is respected. Tests fail otherwise. Tests pass just now as they are not correct, JIRA raised for fix (QPID-386). Spec Changes Added recover-ok method to recover. But to maintain compatibility added a nowait bit to request the response. Java Changes AMQConnection added wrapping of AMQExceptions that can be thrown by the waiting suspend calls. AMQSession Added clean up code for rollback/recover to clean up Session._queue and BMC._syncQueue BasicMessageConsumer - added rollback method to clean up _syncQueue ChannelCloseMethodHandler - reduced logging level from error to debug for received methods. FlowControllingBlockingQueue - added code to return iterator so messages can be purged cleanly. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@510060 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java13
1 files changed, 11 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
index 5f5b7ccad1..5a9b9b54af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -43,16 +44,24 @@ public class BasicRecoverMethodHandler implements StateAwareMethodListener<Basic
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
+
_logger.debug("Recover received on protocol session " + session + " and channel " + evt.getChannelId());
AMQChannel channel = session.getChannel(evt.getChannelId());
BasicRecoverBody body = evt.getMethod();
-
+
if (channel == null)
{
throw body.getChannelNotFoundException(evt.getChannelId());
}
channel.resend(session, body.requeue);
+
+ if (!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+ }
}
}