summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-05-07 15:09:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-05-07 15:09:42 +0000
commitc4308e861f101a99e4a87048caf9cb7455a68ab3 (patch)
tree092b5f93c47fc7a6f0db231f7df41283eb004e93 /java
parent8a5d74d7c730e3d5ca372b0361024e4f4ba8fe11 (diff)
downloadqpid-python-c4308e861f101a99e4a87048caf9cb7455a68ab3.tar.gz
QPID-2575 : Create Connection and Session models to correctly expose the Owning Session. Addressed issue where getPrincipal was used in error to identify queue owner. Session model now allows access to this in a protocol independent way.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@942101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java54
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java38
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java7
17 files changed, 251 insertions, 88 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 454b731e5f..573fa9d966 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -56,6 +56,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.IncomingMessage;
@@ -86,7 +88,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class AMQChannel implements SessionConfig
+public class AMQChannel implements SessionConfig, AMQSessionModel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -1058,6 +1060,16 @@ public class AMQChannel implements SessionConfig
}
+ public Object getID()
+ {
+ return _channelId;
+ }
+
+ public AMQConnectionModel getConnectionModel()
+ {
+ return _session;
+ }
+
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index d806f9426a..50019090d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -48,14 +49,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHost vHost = session.getVirtualHost();
+ VirtualHost vHost = protocolConnection.getVirtualHost();
if (channel == null)
{
@@ -96,16 +97,20 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
final AMQShortString consumerTagName;
// Check authz
- if (!vHost.getAccessManager().authoriseConsume(session,
+ if (!vHost.getAccessManager().authoriseConsume(protocolConnection,
body.getExclusive(), body.getNoAck(),
body.getNoLocal(), body.getNowait(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
}
if (body.getConsumerTag() != null)
@@ -126,9 +131,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.getArguments(), body.getNoLocal(), body.getExclusive());
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
@@ -136,12 +141,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg, // replytext
body.getClazz(),
body.getMethod());
- session.writeFrame(responseBody.generateFrame(0));
+ protocolConnection.writeFrame(responseBody.generateFrame(0));
}
}
@@ -149,12 +154,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
_logger.debug("Closing connection due to invalid selector");
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
new AMQShortString(ise.getMessage()),
body.getClazz(),
body.getMethod());
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 2c4a9b310a..df5b8098ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -62,12 +63,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- VirtualHost vHost = session.getVirtualHost();
+ VirtualHost vHost = protocolConnection.getVirtualHost();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
throw body.getChannelNotFoundException(channelId);
@@ -93,24 +94,28 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
//Perform ACLs
- if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
+ if (!vHost.getAccessManager().authoriseConsume(protocolConnection, body.getNoAck(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
}
- if (!performGet(queue,session, channel, !body.getNoAck()))
+ if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
// TODO - set clusterId
BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 0de3ded5a1..594edb090c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -59,8 +60,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -70,7 +71,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
@@ -114,15 +115,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
//Perform ACLs
- if (!virtualHost.getAccessManager().authoriseBind(session, exch,
+ if (!virtualHost.getAccessManager().authoriseBind(protocolConnection, exch,
queue, routingKey))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
}
if (!exch.isBound(routingKey, body.getArguments(), queue))
@@ -153,9 +158,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 5d5bd761c7..961a165877 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -61,8 +62,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
@@ -71,7 +72,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getPassive())
{
// Perform ACL if request is not passive
- if (!virtualHost.getAccessManager().authoriseCreateQueue(session, body.getAutoDelete(), body.getDurable(),
+ if (!virtualHost.getAccessManager().authoriseCreateQueue(protocolConnection, body.getAutoDelete(), body.getDurable(),
body.getExclusive(), body.getNowait(), body.getPassive(), body.getQueue()))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
@@ -97,9 +98,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
synchronized (queueRegistry)
{
+ queue = queueRegistry.getQueue(queueName);
+ AMQSessionModel session = null;
- if (((queue = queueRegistry.getQueue(queueName)) == null))
+ if (queue != null)
+ {
+ session = queue.getExclusiveOwningSession();
+ }
+
+ if (queue == null)
{
if (body.getPassive())
@@ -109,8 +117,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- queue = createQueue(queueName, body, virtualHost, session);
- queue.setPrincipalHolder(session);
+ queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setPrincipalHolder(protocolConnection);
if (queue.isDurable() && !queue.isAutoDelete())
{
store.createQueue(queue, body.getArguments());
@@ -122,26 +130,25 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queueRegistry.registerQueue(queue);
if(body.getExclusive())
{
- if(body.getDurable())
- {
- queue.setExclusiveOwner(session.getPrincipal().getName());
- }
- else
+
+ queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
+ queue.setPrincipalHolder(protocolConnection);
+
+ if(!body.getDurable())
{
final AMQQueue q = queue;
- queue.setExclusiveOwner(session);
final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
public void doTask(AMQProtocolSession session) throws AMQException
{
- q.setExclusiveOwner(null);
+ q.setExclusiveOwningSession(null);
}
};
- session.addSessionCloseTask(sessionCloseTask);
+ protocolConnection.addSessionCloseTask(sessionCloseTask);
queue.addQueueDeleteTask(new AMQQueue.Task() {
public void doTask(AMQQueue queue) throws AMQException
{
- session.removeSessionCloseTask(sessionCloseTask);
+ protocolConnection.removeSessionCloseTask(sessionCloseTask);
}
});
}
@@ -156,7 +163,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
@@ -168,12 +175,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
"Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
+ queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session))
+
+ else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? queue.getOwner().equals(protocolConnection.getPrincipal().getName()) : (session == null || session.getConnectionModel() != protocolConnection)))
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
- + " as exclusive queue with same name "
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
+ + "as exclusive queue with same name "
+ "declared on another client ID('"
- + queue.getPrincipalHolder().getPrincipal().getName() + "')");
+ + queue.getOwner() + "')");
}
else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
@@ -190,7 +198,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
@@ -203,12 +211,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
queue.getMessageCount(),
queue.getConsumerCount());
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
_logger.info("Queue " + queueName + " declared successfully");
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 93acc94816..4a1940ee01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
@@ -58,15 +59,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
AMQQueue queue;
if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
@@ -103,12 +104,13 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
else
{
+ AMQSessionModel session = queue.getExclusiveOwningSession();
//Perform ACLs
- if (!virtualHost.getAccessManager().authoriseDelete(session, queue))
+ if (!virtualHost.getAccessManager().authoriseDelete(protocolConnection, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
@@ -121,9 +123,9 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
store.removeQueue(queue);
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index b94ebb6538..9ae3a6e28a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
@@ -57,11 +58,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
AMQQueue queue;
@@ -98,13 +99,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
+ AMQSessionModel session = queue.getExclusiveOwningSession();
//Perform ACLs
- if (!virtualHost.getAccessManager().authorisePurge(session, queue))
+ if (!virtualHost.getAccessManager().authorisePurge(protocolConnection, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
- }
- else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ }
+ else if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue is exclusive, but not created on this Connection.");
@@ -116,10 +118,10 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if(!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- session.writeFrame(responseBody.generateFrame(channelId));
-
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
new file mode 100644
index 0000000000..448c8508a5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+
+public interface AMQConnectionModel
+{
+
+ /**
+ * Close the given requested Session
+ * @param session
+ * @param cause
+ * @param message
+ * @throws org.apache.qpid.AMQException
+ */
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 3206d4fce7..7d70a3cdfc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -62,7 +62,6 @@ import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
import javax.management.JMException;
-import javax.management.MBeanException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -502,7 +501,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return channel;
}
- public AMQChannel getChannel(int channelId) throws AMQException
+ public AMQChannel getChannel(int channelId)
{
final AMQChannel channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
@@ -1231,4 +1230,20 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
}
}
+
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+
+ closeChannel((Integer)session.getID());
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ new AMQShortString(message),
+ 0,0);
+
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 5ed270c80d..f48a214933 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -37,7 +37,7 @@ import java.security.Principal;
import java.util.List;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
{
long getSessionID();
@@ -109,7 +109,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
*
* @return null if no channel exists, the channel otherwise
*/
- AMQChannel getChannel(int channelId) throws AMQException;
+ AMQChannel getChannel(int channelId);
/**
* Associate a channel with this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
new file mode 100644
index 0000000000..29749798be
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface AMQSessionModel
+{
+ Object getID();
+
+ AMQConnectionModel getConnectionModel();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 45c84d7603..c557c876f1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfig;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -68,8 +70,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
PrincipalHolder getPrincipalHolder();
void setPrincipalHolder(PrincipalHolder principalHolder);
- void setExclusiveOwner(Object owner);
- Object getExclusiveOwner();
+ void setExclusiveOwningSession(AMQSessionModel owner);
+ AMQSessionModel getExclusiveOwningSession();
VirtualHost getVirtualHost();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index cf2f637697..b7f3f59c4b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -7,6 +7,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -83,7 +85,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private PrincipalHolder _prinicpalHolder;
- private Object _exclusiveOwner;
+ private AMQSessionModel _exclusiveOwner;
private final boolean _durable;
@@ -2045,12 +2047,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return ids;
}
- public Object getExclusiveOwner()
+ public AMQSessionModel getExclusiveOwningSession()
{
return _exclusiveOwner;
}
- public void setExclusiveOwner(Object exclusiveOwner)
+ public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
{
_exclusiveOwner = exclusiveOwner;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 1aff1eec86..58dbc95224 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -22,10 +22,23 @@ package org.apache.qpid.server.transport;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
-public class ServerConnection extends Connection
+public class ServerConnection extends Connection implements AMQConnectionModel
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
@@ -88,4 +101,15 @@ public class ServerConnection extends Connection
{
_onOpenTask = task;
}
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED);
+ ex.setDescription(message);
+ ((ServerSession)session).invoke(ex);
+
+ ((ServerSession)session).close();
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 2195cc4154..52b253c075 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -39,6 +39,8 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
@@ -63,7 +65,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel
{
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
@@ -310,7 +312,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
}
}
- public void removeDispositionListener(Method method)
+ public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
@@ -552,4 +554,15 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
{
close();
}
+
+ public Object getID()
+ {
+ return getName();
+ }
+
+ public AMQConnectionModel getConnectionModel()
+ {
+ return (ServerConnection) getConnection();
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 3b0f990377..7dcb268290 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.flow.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
@@ -813,7 +814,7 @@ public class ServerSessionDelegate extends SessionDelegate
if(method.getExclusive())
{
queue.setPrincipalHolder((ServerSession)session);
- queue.setExclusiveOwner(session);
+ queue.setExclusiveOwningSession((AMQSessionModel) session);
}
else if(method.getAutoDelete())
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index dbd51af68c..f02b1f435f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
@@ -47,7 +48,7 @@ public class MockAMQQueue implements AMQQueue
private PrincipalHolder _principalHolder;
- private Object _exclusiveOwner;
+ private AMQSessionModel _exclusiveOwner;
public MockAMQQueue(String name)
{
@@ -527,12 +528,12 @@ public class MockAMQQueue implements AMQQueue
_principalHolder = principalHolder;
}
- public Object getExclusiveOwner()
+ public AMQSessionModel getExclusiveOwningSession()
{
return _exclusiveOwner;
}
- public void setExclusiveOwner(Object exclusiveOwner)
+ public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
{
_exclusiveOwner = exclusiveOwner;
}