diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2010-09-12 12:29:59 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2010-09-12 12:29:59 +0000 |
| commit | 8d12e2a70d026176edc6d717fac645b9571d89ec (patch) | |
| tree | b6d70de79915a5e43387a8464d18d961f697cb64 /java | |
| parent | 62c915a43558d6a4a0dc7a78d31f328c2c27fccc (diff) | |
| download | qpid-python-8d12e2a70d026176edc6d717fac645b9571d89ec.tar.gz | |
QPID-2854 : Correct implemention of 0-10 Queue Exclusivity in the Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@996300 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java | 73 |
1 files changed, 30 insertions, 43 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 7b51b68e61..73eebec7bc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -29,11 +29,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeInUseException; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.exchange.*; import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.flow.WindowCreditManager; import org.apache.qpid.server.message.MessageMetaData_0_10; @@ -193,19 +189,40 @@ public class ServerSessionDelegate extends SessionDelegate QueueRegistry queueRegistry = getQueueRegistry(session); - AMQQueue queue = queueRegistry.getQueue(queueName); + final AMQQueue queue = queueRegistry.getQueue(queueName); if(queue == null) { exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found"); } - else if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) + if(queue.getPrincipalHolder() != null && queue.getPrincipalHolder() != session) { exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session"); } else { + if(queue.isExclusive()) + { + if(queue.getPrincipalHolder() == null) + { + queue.setPrincipalHolder((ServerSession)session); + ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + { + + public void doTask(ServerSession session) + { + if(queue.getPrincipalHolder() == session) + { + queue.setPrincipalHolder(null); + } + } + }); + } + + + } + FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); // TODO filters @@ -349,7 +366,12 @@ public class ServerSessionDelegate extends SessionDelegate } else { + AMQQueue queue = sub.getQueue(); ((ServerSession)session).unregister(sub); + if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) + { + queue.setPrincipalHolder(null); + } } } @@ -850,8 +872,7 @@ public class ServerSessionDelegate extends SessionDelegate queue = createQueue(queueName, method, virtualHost, (ServerSession)session); if(method.getExclusive()) { - queue.setPrincipalHolder((ServerSession)session); - queue.setExclusiveOwningSession((AMQSessionModel) session); + queue.setExclusive(true); } else if(method.getAutoDelete()) { @@ -989,45 +1010,11 @@ public class ServerSessionDelegate extends SessionDelegate final ServerSession session) throws AMQException { - final QueueRegistry registry = virtualHost.getQueueRegistry(); - String owner = body.getExclusive() ? session.getClientID() : null; final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); - if (body.getExclusive() && !body.getDurable()) - { - final ServerSession.Task deleteQueueTask = - new ServerSession.Task() - { - public void doTask(ServerSession session) - { - if (registry.getQueue(queueName) == queue) - { - try - { - queue.delete(); - } - catch (AMQException e) - { - exception(session, body, e, "Cannot delete queue '" + body.getQueue()); - } - } - } - }; - - session.addSessionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new AMQQueue.Task() - { - public void doTask(AMQQueue queue) - { - session.removeSessionCloseTask(deleteQueueTask); - } - }); - }// if exclusive and not durable - return queue; } |
