From 2811d5cec86d2556d5136852612e59aacc06959b Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 29 Jan 2007 15:32:13 +0000 Subject: Moved across auto deletion functionailty for exclusive, non-durable queues (aka private queues). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501081 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/handler/QueueDeclareHandler.java | 35 ++++++++++++++++++++-- .../server/protocol/AMQMinaProtocolSession.java | 17 +++++++++++ .../qpid/server/protocol/AMQProtocolSession.java | 12 ++++++++ .../org/apache/qpid/server/queue/AMQQueue.java | 34 ++++++++++++++++++--- .../qpid/server/queue/MockProtocolSession.java | 10 +++++++ 5 files changed, 102 insertions(+), 6 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 85fc8290e0..c4a480530b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -144,11 +144,42 @@ public class QueueDeclareHandler implements StateAwareMethodListener _taskList = new CopyOnWriteArrayList(); + // Keeps a tally of connections for logging and debugging private static AtomicInteger _ConnectionId; static { _ConnectionId = new AtomicInteger(0); } @@ -550,6 +553,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for(Task task : _taskList) + { + task.doTask(this); + } } } @@ -720,4 +727,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { return _ConnectionId.get(); } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1e5df9b8a5..969eb30cb2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -36,6 +36,14 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public interface AMQProtocolSession extends AMQProtocolWriter { + + + + public static interface Task + { + public void doTask(AMQProtocolSession session) throws AMQException; + } + /** * Called when a protocol data block is received * @param message the data block that has been received @@ -150,4 +158,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter boolean versionEquals(byte major, byte minor); void checkMethodBodyVersion(AMQMethodBody methodBody); int getConnectionId(); + + void addSessionCloseTask(Task task); + + void removeSessionCloseTask(Task task); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a3c4fb1820..cb7ec13c3e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -34,6 +34,7 @@ import javax.management.JMException; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; @@ -92,6 +93,12 @@ public class AMQQueue implements Managable, Comparable private final AtomicBoolean _isExclusive = new AtomicBoolean(); + private final AtomicBoolean _deleted = new AtomicBoolean(false); + + + + private List _deleteTaskList = new CopyOnWriteArrayList(); + /** * Manages message delivery. */ @@ -509,10 +516,19 @@ public class AMQQueue implements Managable, Comparable public void delete() throws AMQException { - _subscribers.queueDeleted(this); - _bindings.deregister(); - _queueRegistry.unregisterQueue(_name); - _managedObject.unregister(); + if(!_deleted.getAndSet(true)) + { + _subscribers.queueDeleted(this); + _bindings.deregister(); + _queueRegistry.unregisterQueue(_name); + _managedObject.unregister(); + for(Task task : _deleteTaskList) + { + task.doTask(this); + } + _deleteTaskList.clear(); + } + } protected void autodelete() throws AMQException @@ -669,4 +685,14 @@ public class AMQQueue implements Managable, Comparable } } + public static interface Task + { + public void doTask(AMQQueue queue) throws AMQException; + } + + public void addQueueDeleteTask(Task task) + { + _deleteTaskList.add(task); + } + } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index 292ce6a834..6ecb54300f 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -232,4 +232,14 @@ public class MockProtocolSession implements AMQProtocolSession { return _ConnectionId.get(); } + + public void addSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void removeSessionCloseTask(Task task) + { + //To change body of implemented methods use File | Settings | File Templates. + } } -- cgit v1.2.1