summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2010-09-12 12:29:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2010-09-12 12:29:59 +0000
commit8d12e2a70d026176edc6d717fac645b9571d89ec (patch)
treeb6d70de79915a5e43387a8464d18d961f697cb64 /java
parent62c915a43558d6a4a0dc7a78d31f328c2c27fccc (diff)
downloadqpid-python-8d12e2a70d026176edc6d717fac645b9571d89ec.tar.gz
QPID-2854 : Correct implemention of 0-10 Queue Exclusivity in the Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@996300 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java73
1 files changed, 30 insertions, 43 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 7b51b68e61..73eebec7bc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -29,11 +29,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeInUseException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.exchange.*;
import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.flow.WindowCreditManager;
import org.apache.qpid.server.message.MessageMetaData_0_10;
@@ -193,19 +189,40 @@ public class ServerSessionDelegate extends SessionDelegate
QueueRegistry queueRegistry = getQueueRegistry(session);
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ final AMQQueue queue = queueRegistry.getQueue(queueName);
if(queue == null)
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
+ if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session)
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
+ if(queue.isExclusive())
+ {
+ if(queue.getPrincipalHolder() == null)
+ {
+ queue.setPrincipalHolder((ServerSession)session);
+ ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ {
+
+ public void doTask(ServerSession session)
+ {
+ if(queue.getPrincipalHolder() == session)
+ {
+ queue.setPrincipalHolder(null);
+ }
+ }
+ });
+ }
+
+
+ }
+
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
// TODO filters
@@ -349,7 +366,12 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
+ AMQQueue queue = sub.getQueue();
((ServerSession)session).unregister(sub);
+ if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
+ {
+ queue.setPrincipalHolder(null);
+ }
}
}
@@ -850,8 +872,7 @@ public class ServerSessionDelegate extends SessionDelegate
queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
if(method.getExclusive())
{
- queue.setPrincipalHolder((ServerSession)session);
- queue.setExclusiveOwningSession((AMQSessionModel) session);
+ queue.setExclusive(true);
}
else if(method.getAutoDelete())
{
@@ -989,45 +1010,11 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerSession session)
throws AMQException
{
- final QueueRegistry registry = virtualHost.getQueueRegistry();
-
String owner = body.getExclusive() ? session.getClientID() : null;
final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
body.getExclusive(), virtualHost, body.getArguments());
- if (body.getExclusive() && !body.getDurable())
- {
- final ServerSession.Task deleteQueueTask =
- new ServerSession.Task()
- {
- public void doTask(ServerSession session)
- {
- if (registry.getQueue(queueName) == queue)
- {
- try
- {
- queue.delete();
- }
- catch (AMQException e)
- {
- exception(session, body, e, "Cannot delete queue '" + body.getQueue());
- }
- }
- }
- };
-
- session.addSessionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new AMQQueue.Task()
- {
- public void doTask(AMQQueue queue)
- {
- session.removeSessionCloseTask(deleteQueueTask);
- }
- });
- }// if exclusive and not durable
-
return queue;
}