summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-01-29 15:32:13 +0000
committerGordon Sim <gsim@apache.org>2007-01-29 15:32:13 +0000
commit2811d5cec86d2556d5136852612e59aacc06959b (patch)
treebd9fd8e40dc51d5fdf1169b446ce1f9511b27eee /java
parentf1aff6512e53443b64d7c4cb877dac6de153afd9 (diff)
downloadqpid-python-2811d5cec86d2556d5136852612e59aacc06959b.tar.gz
Moved across auto deletion functionailty for exclusive, non-durable queues (aka private queues).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501081 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java34
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java10
5 files changed, 102 insertions, 6 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 85fc8290e0..c4a480530b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -144,11 +144,42 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+ protected AMQQueue createQueue(QueueDeclareBody body, final QueueRegistry registry, final AMQProtocolSession session)
throws AMQException
{
String owner = body.exclusive ? session.getContextKey() : null;
if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner);
- return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+ final String queueName = queue.getName();
+
+ if(body.exclusive && !body.durable)
+ {
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ if(registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
+
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+
+
+ }
+ return queue;
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 1d4f67000e..868e0ba463 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -75,6 +75,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class AMQMinaProtocolSession implements AMQProtocolSession,
@@ -121,6 +122,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private byte _minor;
private FieldTable _clientProperties;
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
// Keeps a tally of connections for logging and debugging
private static AtomicInteger _ConnectionId;
static { _ConnectionId = new AtomicInteger(0); }
@@ -550,6 +553,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
+ for(Task task : _taskList)
+ {
+ task.doTask(this);
+ }
}
}
@@ -720,4 +727,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return _ConnectionId.get();
}
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 1e5df9b8a5..969eb30cb2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -36,6 +36,14 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public interface AMQProtocolSession extends AMQProtocolWriter
{
+
+
+
+ public static interface Task
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException;
+ }
+
/**
* Called when a protocol data block is received
* @param message the data block that has been received
@@ -150,4 +158,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter
boolean versionEquals(byte major, byte minor);
void checkMethodBodyVersion(AMQMethodBody methodBody);
int getConnectionId();
+
+ void addSessionCloseTask(Task task);
+
+ void removeSessionCloseTask(Task task);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a3c4fb1820..cb7ec13c3e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -34,6 +34,7 @@ import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,6 +93,12 @@ public class AMQQueue implements Managable, Comparable
private final AtomicBoolean _isExclusive = new AtomicBoolean();
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+ private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
/**
* Manages message delivery.
*/
@@ -509,10 +516,19 @@ public class AMQQueue implements Managable, Comparable
public void delete() throws AMQException
{
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _queueRegistry.unregisterQueue(_name);
- _managedObject.unregister();
+ if(!_deleted.getAndSet(true))
+ {
+ _subscribers.queueDeleted(this);
+ _bindings.deregister();
+ _queueRegistry.unregisterQueue(_name);
+ _managedObject.unregister();
+ for(Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+ _deleteTaskList.clear();
+ }
+
}
protected void autodelete() throws AMQException
@@ -669,4 +685,14 @@ public class AMQQueue implements Managable, Comparable
}
}
+ public static interface Task
+ {
+ public void doTask(AMQQueue queue) throws AMQException;
+ }
+
+ public void addQueueDeleteTask(Task task)
+ {
+ _deleteTaskList.add(task);
+ }
+
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 292ce6a834..6ecb54300f 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -232,4 +232,14 @@ public class MockProtocolSession implements AMQProtocolSession
{
return _ConnectionId.get();
}
+
+ public void addSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}