diff options
| author | Gordon Sim <gsim@apache.org> | 2007-01-29 15:32:13 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-01-29 15:32:13 +0000 |
| commit | 2811d5cec86d2556d5136852612e59aacc06959b (patch) | |
| tree | bd9fd8e40dc51d5fdf1169b446ce1f9511b27eee /java | |
| parent | f1aff6512e53443b64d7c4cb877dac6de153afd9 (diff) | |
| download | qpid-python-2811d5cec86d2556d5136852612e59aacc06959b.tar.gz | |
Moved across auto deletion functionailty for exclusive, non-durable queues (aka private queues).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501081 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
5 files changed, 102 insertions, 6 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 85fc8290e0..c4a480530b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -144,11 +144,42 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar return MessageFormat.format("{0,number,0000000000000}", value); } - protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) + protected AMQQueue createQueue(QueueDeclareBody body, final QueueRegistry registry, final AMQProtocolSession session) throws AMQException { String owner = body.exclusive ? session.getContextKey() : null; if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner); - return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry); + final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry); + final String queueName = queue.getName(); + + if(body.exclusive && !body.durable) + { + final AMQProtocolSession.Task deleteQueueTask = + new AMQProtocolSession.Task() + { + + public void doTask(AMQProtocolSession session) throws AMQException + { + if(registry.getQueue(queueName) == queue) + { + queue.delete(); + } + + } + }; + + session.addSessionCloseTask(deleteQueueTask); + + queue.addQueueDeleteTask(new AMQQueue.Task() + { + public void doTask(AMQQueue queue) + { + session.removeSessionCloseTask(deleteQueueTask); + } + }); + + + } + return queue; } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 1d4f67000e..868e0ba463 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -75,6 +75,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; public class AMQMinaProtocolSession implements AMQProtocolSession, @@ -121,6 +122,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private byte _minor; private FieldTable _clientProperties; + private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + // Keeps a tally of connections for logging and debugging private static AtomicInteger _ConnectionId; static { _ConnectionId = new AtomicInteger(0); } @@ -550,6 +553,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for(Task task : _taskList) + { + task.doTask(this); + } } } @@ -720,4 +727,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _ConnectionId.get(); } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1e5df9b8a5..969eb30cb2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -36,6 +36,14 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public interface AMQProtocolSession extends AMQProtocolWriter { + + + + public static interface Task + { + public void doTask(AMQProtocolSession session) throws AMQException; + } + /** * Called when a protocol data block is received * @param message the data block that has been received @@ -150,4 +158,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter boolean versionEquals(byte major, byte minor); void checkMethodBodyVersion(AMQMethodBody methodBody); int getConnectionId(); + + void addSessionCloseTask(Task task); + + void removeSessionCloseTask(Task task); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a3c4fb1820..cb7ec13c3e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -34,6 +34,7 @@ import javax.management.JMException; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,6 +93,12 @@ public class AMQQueue implements Managable, Comparable private final AtomicBoolean _isExclusive = new AtomicBoolean(); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + + private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>(); + /** * Manages message delivery. */ @@ -509,10 +516,19 @@ public class AMQQueue implements Managable, Comparable public void delete() throws AMQException { - _subscribers.queueDeleted(this); - _bindings.deregister(); - _queueRegistry.unregisterQueue(_name); - _managedObject.unregister(); + if(!_deleted.getAndSet(true)) + { + _subscribers.queueDeleted(this); + _bindings.deregister(); + _queueRegistry.unregisterQueue(_name); + _managedObject.unregister(); + for(Task task : _deleteTaskList) + { + task.doTask(this); + } + _deleteTaskList.clear(); + } + } protected void autodelete() throws AMQException @@ -669,4 +685,14 @@ public class AMQQueue implements Managable, Comparable } } + public static interface Task + { + public void doTask(AMQQueue queue) throws AMQException; + } + + public void addQueueDeleteTask(Task task) + { + _deleteTaskList.add(task); + } + } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index 292ce6a834..6ecb54300f 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -232,4 +232,14 @@ public class MockProtocolSession implements AMQProtocolSession { return _ConnectionId.get(); } + + public void addSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } } |
