summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java54
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java38
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java3
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java7
17 files changed, 251 insertions, 88 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 454b731e5f..573fa9d966 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -56,6 +56,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.IncomingMessage;
@@ -86,7 +88,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class AMQChannel implements SessionConfig
+public class AMQChannel implements SessionConfig, AMQSessionModel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -1058,6 +1060,16 @@ public class AMQChannel implements SessionConfig
}
+ public Object getID()
+ {
+ return _channelId;
+ }
+
+ public AMQConnectionModel getConnectionModel()
+ {
+ return _session;
+ }
+
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index d806f9426a..50019090d9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -48,14 +49,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHost vHost = session.getVirtualHost();
+ VirtualHost vHost = protocolConnection.getVirtualHost();
if (channel == null)
{
@@ -96,16 +97,20 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
final AMQShortString consumerTagName;
// Check authz
- if (!vHost.getAccessManager().authoriseConsume(session,
+ if (!vHost.getAccessManager().authoriseConsume(protocolConnection,
body.getExclusive(), body.getNoAck(),
body.getNoLocal(), body.getNowait(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
}
if (body.getConsumerTag() != null)
@@ -126,9 +131,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
body.getArguments(), body.getNoLocal(), body.getExclusive());
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
@@ -136,12 +141,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg, // replytext
body.getClazz(),
body.getMethod());
- session.writeFrame(responseBody.generateFrame(0));
+ protocolConnection.writeFrame(responseBody.generateFrame(0));
}
}
@@ -149,12 +154,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
_logger.debug("Closing connection due to invalid selector");
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
new AMQShortString(ise.getMessage()),
body.getClazz(),
body.getMethod());
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 2c4a9b310a..df5b8098ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -62,12 +63,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- VirtualHost vHost = session.getVirtualHost();
+ VirtualHost vHost = protocolConnection.getVirtualHost();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
throw body.getChannelNotFoundException(channelId);
@@ -93,24 +94,28 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
{
//Perform ACLs
- if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
+ if (!vHost.getAccessManager().authoriseConsume(protocolConnection, body.getNoAck(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
}
- if (!performGet(queue,session, channel, !body.getNoAck()))
+ if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
// TODO - set clusterId
BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index 0de3ded5a1..594edb090c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -59,8 +60,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
@@ -70,7 +71,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
@@ -114,15 +115,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
{
//Perform ACLs
- if (!virtualHost.getAccessManager().authoriseBind(session, exch,
+ if (!virtualHost.getAccessManager().authoriseBind(protocolConnection, exch,
queue, routingKey))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+ }
}
if (!exch.isBound(routingKey, body.getArguments(), queue))
@@ -153,9 +158,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
}
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 5d5bd761c7..961a165877 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -61,8 +62,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
- final AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
@@ -71,7 +72,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getPassive())
{
// Perform ACL if request is not passive
- if (!virtualHost.getAccessManager().authoriseCreateQueue(session, body.getAutoDelete(), body.getDurable(),
+ if (!virtualHost.getAccessManager().authoriseCreateQueue(protocolConnection, body.getAutoDelete(), body.getDurable(),
body.getExclusive(), body.getNowait(), body.getPassive(), body.getQueue()))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
@@ -97,9 +98,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
synchronized (queueRegistry)
{
+ queue = queueRegistry.getQueue(queueName);
+ AMQSessionModel session = null;
- if (((queue = queueRegistry.getQueue(queueName)) == null))
+ if (queue != null)
+ {
+ session = queue.getExclusiveOwningSession();
+ }
+
+ if (queue == null)
{
if (body.getPassive())
@@ -109,8 +117,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
else
{
- queue = createQueue(queueName, body, virtualHost, session);
- queue.setPrincipalHolder(session);
+ queue = createQueue(queueName, body, virtualHost, protocolConnection);
+ queue.setPrincipalHolder(protocolConnection);
if (queue.isDurable() && !queue.isAutoDelete())
{
store.createQueue(queue, body.getArguments());
@@ -122,26 +130,25 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
queueRegistry.registerQueue(queue);
if(body.getExclusive())
{
- if(body.getDurable())
- {
- queue.setExclusiveOwner(session.getPrincipal().getName());
- }
- else
+
+ queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
+ queue.setPrincipalHolder(protocolConnection);
+
+ if(!body.getDurable())
{
final AMQQueue q = queue;
- queue.setExclusiveOwner(session);
final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
public void doTask(AMQProtocolSession session) throws AMQException
{
- q.setExclusiveOwner(null);
+ q.setExclusiveOwningSession(null);
}
};
- session.addSessionCloseTask(sessionCloseTask);
+ protocolConnection.addSessionCloseTask(sessionCloseTask);
queue.addQueueDeleteTask(new AMQQueue.Task() {
public void doTask(AMQQueue queue) throws AMQException
{
- session.removeSessionCloseTask(sessionCloseTask);
+ protocolConnection.removeSessionCloseTask(sessionCloseTask);
}
});
}
@@ -156,7 +163,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
}
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
@@ -168,12 +175,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
"Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
+ queue.isExclusive() + " requested " + body.getExclusive() + ")");
}
- else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session))
+
+ else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? queue.getOwner().equals(protocolConnection.getPrincipal().getName()) : (session == null || session.getConnectionModel() != protocolConnection)))
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
- + " as exclusive queue with same name "
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
+ + "as exclusive queue with same name "
+ "declared on another client ID('"
- + queue.getPrincipalHolder().getPrincipal().getName() + "')");
+ + queue.getOwner() + "')");
}
else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
@@ -190,7 +198,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
}
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
@@ -203,12 +211,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
if (!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
queue.getMessageCount(),
queue.getConsumerCount());
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
_logger.info("Queue " + queueName + " declared successfully");
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
index 93acc94816..4a1940ee01 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
@@ -58,15 +59,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
AMQQueue queue;
if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
@@ -103,12 +104,13 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
else
{
+ AMQSessionModel session = queue.getExclusiveOwningSession();
//Perform ACLs
- if (!virtualHost.getAccessManager().authoriseDelete(session, queue))
+ if (!virtualHost.getAccessManager().authoriseDelete(protocolConnection, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
@@ -121,9 +123,9 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB
store.removeQueue(queue);
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
index b94ebb6538..9ae3a6e28a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
@@ -57,11 +58,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
AMQQueue queue;
@@ -98,13 +99,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
}
else
{
+ AMQSessionModel session = queue.getExclusiveOwningSession();
//Perform ACLs
- if (!virtualHost.getAccessManager().authorisePurge(session, queue))
+ if (!virtualHost.getAccessManager().authorisePurge(protocolConnection, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
- }
- else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ }
+ else if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue is exclusive, but not created on this Connection.");
@@ -116,10 +118,10 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod
if(!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- session.writeFrame(responseBody.generateFrame(channelId));
-
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
new file mode 100644
index 0000000000..448c8508a5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+
+public interface AMQConnectionModel
+{
+
+ /**
+ * Close the given requested Session
+ * @param session
+ * @param cause
+ * @param message
+ * @throws org.apache.qpid.AMQException
+ */
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 3206d4fce7..7d70a3cdfc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -62,7 +62,6 @@ import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
import javax.management.JMException;
-import javax.management.MBeanException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -502,7 +501,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return channel;
}
- public AMQChannel getChannel(int channelId) throws AMQException
+ public AMQChannel getChannel(int channelId)
{
final AMQChannel channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
@@ -1231,4 +1230,20 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
}
}
}
+
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+
+ closeChannel((Integer)session.getID());
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause.getCode(),
+ new AMQShortString(message),
+ 0,0);
+
+ writeFrame(responseBody.generateFrame((Integer)session.getID()));
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 5ed270c80d..f48a214933 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -37,7 +37,7 @@ import java.security.Principal;
import java.util.List;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
{
long getSessionID();
@@ -109,7 +109,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
*
* @return null if no channel exists, the channel otherwise
*/
- AMQChannel getChannel(int channelId) throws AMQException;
+ AMQChannel getChannel(int channelId);
/**
* Associate a channel with this session.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
new file mode 100644
index 0000000000..29749798be
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface AMQSessionModel
+{
+ Object getID();
+
+ AMQConnectionModel getConnectionModel();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 45c84d7603..c557c876f1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfig;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -68,8 +70,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
PrincipalHolder getPrincipalHolder();
void setPrincipalHolder(PrincipalHolder principalHolder);
- void setExclusiveOwner(Object owner);
- Object getExclusiveOwner();
+ void setExclusiveOwningSession(AMQSessionModel owner);
+ AMQSessionModel getExclusiveOwningSession();
VirtualHost getVirtualHost();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index cf2f637697..b7f3f59c4b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -7,6 +7,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -83,7 +85,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private PrincipalHolder _prinicpalHolder;
- private Object _exclusiveOwner;
+ private AMQSessionModel _exclusiveOwner;
private final boolean _durable;
@@ -2045,12 +2047,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return ids;
}
- public Object getExclusiveOwner()
+ public AMQSessionModel getExclusiveOwningSession()
{
return _exclusiveOwner;
}
- public void setExclusiveOwner(Object exclusiveOwner)
+ public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
{
_exclusiveOwner = exclusiveOwner;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 1aff1eec86..58dbc95224 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -22,10 +22,23 @@ package org.apache.qpid.server.transport;
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
-public class ServerConnection extends Connection
+public class ServerConnection extends Connection implements AMQConnectionModel
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
@@ -88,4 +101,15 @@ public class ServerConnection extends Connection
{
_onOpenTask = task;
}
+
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED);
+ ex.setDescription(message);
+ ((ServerSession)session).invoke(ex);
+
+ ((ServerSession)session).close();
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 2195cc4154..52b253c075 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -39,6 +39,8 @@ import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
@@ -63,7 +65,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel
{
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
@@ -310,7 +312,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
}
}
- public void removeDispositionListener(Method method)
+ public void removeDispositionListener(Method method)
{
_messageDispositionListenerMap.remove(method.getId());
}
@@ -552,4 +554,15 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo
{
close();
}
+
+ public Object getID()
+ {
+ return getName();
+ }
+
+ public AMQConnectionModel getConnectionModel()
+ {
+ return (ServerConnection) getConnection();
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 3b0f990377..7dcb268290 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.flow.*;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.*;
@@ -813,7 +814,7 @@ public class ServerSessionDelegate extends SessionDelegate
if(method.getExclusive())
{
queue.setPrincipalHolder((ServerSession)session);
- queue.setExclusiveOwner(session);
+ queue.setExclusiveOwningSession((AMQSessionModel) session);
}
else if(method.getAutoDelete())
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index dbd51af68c..f02b1f435f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
@@ -47,7 +48,7 @@ public class MockAMQQueue implements AMQQueue
private PrincipalHolder _principalHolder;
- private Object _exclusiveOwner;
+ private AMQSessionModel _exclusiveOwner;
public MockAMQQueue(String name)
{
@@ -527,12 +528,12 @@ public class MockAMQQueue implements AMQQueue
_principalHolder = principalHolder;
}
- public Object getExclusiveOwner()
+ public AMQSessionModel getExclusiveOwningSession()
{
return _exclusiveOwner;
}
- public void setExclusiveOwner(Object exclusiveOwner)
+ public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
{
_exclusiveOwner = exclusiveOwner;
}