diff options
Diffstat (limited to 'java')
17 files changed, 251 insertions, 88 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 454b731e5f..573fa9d966 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -56,6 +56,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.IncomingMessage; @@ -86,7 +88,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class AMQChannel implements SessionConfig +public class AMQChannel implements SessionConfig, AMQSessionModel { public static final int DEFAULT_PREFETCH = 5000; @@ -1058,6 +1060,16 @@ public class AMQChannel implements SessionConfig } + public Object getID() + { + return _channelId; + } + + public AMQConnectionModel getConnectionModel() + { + return _session; + } + private class MessageDeliveryAction implements ServerTransaction.Action { private IncomingMessage _incommingMessage; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index d806f9426a..50019090d9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -48,14 +49,14 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); + AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = protocolConnection.getChannel(channelId); - VirtualHost vHost = session.getVirtualHost(); + VirtualHost vHost = protocolConnection.getVirtualHost(); if (channel == null) { @@ -96,16 +97,20 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic final AMQShortString consumerTagName; // Check authz - if (!vHost.getAccessManager().authoriseConsume(session, + if (!vHost.getAccessManager().authoriseConsume(protocolConnection, body.getExclusive(), body.getNoAck(), body.getNoLocal(), body.getNowait(), queue)) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } - else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + else if (queue.isExclusive() && !queue.isDurable()) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + AMQSessionModel session = queue.getExclusiveOwningSession(); + if (session == null || session.getConnectionModel() != protocolConnection) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + } } if (body.getConsumerTag() != null) @@ -126,9 +131,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic body.getArguments(), body.getNoLocal(), body.getExclusive()); if (!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - session.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); } } @@ -136,12 +141,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode msg, // replytext body.getClazz(), body.getMethod()); - session.writeFrame(responseBody.generateFrame(0)); + protocolConnection.writeFrame(responseBody.generateFrame(0)); } } @@ -149,12 +154,12 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic { _logger.debug("Closing connection due to invalid selector"); - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(), new AMQShortString(ise.getMessage()), body.getClazz(), body.getMethod()); - session.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 2c4a9b310a..df5b8098ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.state.AMQStateManager;
@@ -62,12 +63,12 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
- VirtualHost vHost = session.getVirtualHost();
+ VirtualHost vHost = protocolConnection.getVirtualHost();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
if (channel == null)
{
throw body.getChannelNotFoundException(channelId);
@@ -93,24 +94,28 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB {
//Perform ACLs
- if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
+ if (!vHost.getAccessManager().authoriseConsume(protocolConnection, body.getNoAck(), queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ else if (queue.isExclusive())
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.");
+ AMQSessionModel session = queue.getExclusiveOwningSession();
+ if (session == null || session.getConnectionModel() != protocolConnection)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.");
+ }
}
- if (!performGet(queue,session, channel, !body.getNoAck()))
+ if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
// TODO - set clusterId
BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- session.writeFrame(responseBody.generateFrame(channelId));
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index 0de3ded5a1..594edb090c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; @@ -59,8 +60,8 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); + AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); + VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); @@ -70,7 +71,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> if (body.getQueue() == null) { - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) { @@ -114,15 +115,19 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> { //Perform ACLs - if (!virtualHost.getAccessManager().authoriseBind(session, exch, + if (!virtualHost.getAccessManager().authoriseBind(protocolConnection, exch, queue, routingKey)) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } - else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + else if (queue.isExclusive() && !queue.isDurable()) { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + AMQSessionModel session = queue.getExclusiveOwningSession(); + if (session == null || session.getConnectionModel() != protocolConnection) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); + } } if (!exch.isBound(routingKey, body.getArguments(), queue)) @@ -153,9 +158,9 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> } if (!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 5d5bd761c7..961a165877 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; @@ -61,8 +62,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException { - final AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); + final AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); + VirtualHost virtualHost = protocolConnection.getVirtualHost(); ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); @@ -71,7 +72,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (!body.getPassive()) { // Perform ACL if request is not passive - if (!virtualHost.getAccessManager().authoriseCreateQueue(session, body.getAutoDelete(), body.getDurable(), + if (!virtualHost.getAccessManager().authoriseCreateQueue(protocolConnection, body.getAutoDelete(), body.getDurable(), body.getExclusive(), body.getNowait(), body.getPassive(), body.getQueue())) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); @@ -97,9 +98,16 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar synchronized (queueRegistry) { + queue = queueRegistry.getQueue(queueName); + AMQSessionModel session = null; - if (((queue = queueRegistry.getQueue(queueName)) == null)) + if (queue != null) + { + session = queue.getExclusiveOwningSession(); + } + + if (queue == null) { if (body.getPassive()) @@ -109,8 +117,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } else { - queue = createQueue(queueName, body, virtualHost, session); - queue.setPrincipalHolder(session); + queue = createQueue(queueName, body, virtualHost, protocolConnection); + queue.setPrincipalHolder(protocolConnection); if (queue.isDurable() && !queue.isAutoDelete()) { store.createQueue(queue, body.getArguments()); @@ -122,26 +130,25 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar queueRegistry.registerQueue(queue); if(body.getExclusive()) { - if(body.getDurable()) - { - queue.setExclusiveOwner(session.getPrincipal().getName()); - } - else + + queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId)); + queue.setPrincipalHolder(protocolConnection); + + if(!body.getDurable()) { final AMQQueue q = queue; - queue.setExclusiveOwner(session); final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() { public void doTask(AMQProtocolSession session) throws AMQException { - q.setExclusiveOwner(null); + q.setExclusiveOwningSession(null); } }; - session.addSessionCloseTask(sessionCloseTask); + protocolConnection.addSessionCloseTask(sessionCloseTask); queue.addQueueDeleteTask(new AMQQueue.Task() { public void doTask(AMQQueue queue) throws AMQException { - session.removeSessionCloseTask(sessionCloseTask); + protocolConnection.removeSessionCloseTask(sessionCloseTask); } }); } @@ -156,7 +163,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } } } - else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); @@ -168,12 +175,13 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: " + queue.isExclusive() + " requested " + body.getExclusive() + ")"); } - else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session)) + + else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? queue.getOwner().equals(protocolConnection.getPrincipal().getName()) : (session == null || session.getConnectionModel() != protocolConnection))) { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "')," - + " as exclusive queue with same name " + throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), " + + "as exclusive queue with same name " + "declared on another client ID('" - + queue.getPrincipalHolder().getPrincipal().getName() + "')"); + + queue.getOwner() + "')"); } else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete()) @@ -190,7 +198,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar } - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) { @@ -203,12 +211,12 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar if (!body.getNowait()) { - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queueName, queue.getMessageCount(), queue.getConsumerCount()); - session.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); _logger.info("Queue " + queueName + " declared successfully"); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java index 93acc94816..4a1940ee01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.QueueDeleteOkBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.state.AMQStateManager; @@ -58,15 +59,15 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); + AMQProtocolSession protocolConnection = stateManager.getProtocolSession(); + VirtualHost virtualHost = protocolConnection.getVirtualHost(); QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); DurableConfigurationStore store = virtualHost.getDurableConfigurationStore(); AMQQueue queue; if (body.getQueue() == null) { - AMQChannel channel = session.getChannel(channelId); + AMQChannel channel = protocolConnection.getChannel(channelId); if (channel == null) { @@ -103,12 +104,13 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB else { + AMQSessionModel session = queue.getExclusiveOwningSession(); //Perform ACLs - if (!virtualHost.getAccessManager().authoriseDelete(session, queue)) + if (!virtualHost.getAccessManager().authoriseDelete(protocolConnection, queue)) { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied"); } - else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session) + else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection)) { throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection."); @@ -121,9 +123,9 @@ public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteB store.removeQueue(queue); } - MethodRegistry methodRegistry = session.getMethodRegistry(); + MethodRegistry methodRegistry = protocolConnection.getMethodRegistry(); QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); - session.writeFrame(responseBody.generateFrame(channelId)); + protocolConnection.writeFrame(responseBody.generateFrame(channelId)); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java index b94ebb6538..9ae3a6e28a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java @@ -27,6 +27,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.state.AMQStateManager;
@@ -57,11 +58,11 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHost virtualHost = session.getVirtualHost();
+ AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHost virtualHost = protocolConnection.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = protocolConnection.getChannel(channelId);
AMQQueue queue;
@@ -98,13 +99,14 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod }
else
{
+ AMQSessionModel session = queue.getExclusiveOwningSession();
//Perform ACLs
- if (!virtualHost.getAccessManager().authorisePurge(session, queue))
+ if (!virtualHost.getAccessManager().authorisePurge(protocolConnection, queue))
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
- }
- else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+ }
+ else if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Queue is exclusive, but not created on this Connection.");
@@ -116,10 +118,10 @@ public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBod if(!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
- session.writeFrame(responseBody.generateFrame(channelId));
-
+ protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java new file mode 100644 index 0000000000..448c8508a5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.AMQException; + +public interface AMQConnectionModel +{ + + /** + * Close the given requested Session + * @param session + * @param cause + * @param message + * @throws org.apache.qpid.AMQException + */ + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 3206d4fce7..7d70a3cdfc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -62,7 +62,6 @@ import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; import javax.management.JMException; -import javax.management.MBeanException; import javax.security.sasl.SaslServer; import java.io.IOException; import java.net.InetSocketAddress; @@ -502,7 +501,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return channel; } - public AMQChannel getChannel(int channelId) throws AMQException + public AMQChannel getChannel(int channelId) { final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); @@ -1231,4 +1230,20 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol } } } + + + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + + closeChannel((Integer)session.getID()); + + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + new AMQShortString(message), + 0,0); + + writeFrame(responseBody.generateFrame((Integer)session.getID())); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 5ed270c80d..f48a214933 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -37,7 +37,7 @@ import java.security.Principal; import java.util.List; -public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder +public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel { long getSessionID(); @@ -109,7 +109,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin * * @return null if no channel exists, the channel otherwise */ - AMQChannel getChannel(int channelId) throws AMQException; + AMQChannel getChannel(int channelId); /** * Associate a channel with this session. diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java new file mode 100644 index 0000000000..29749798be --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface AMQSessionModel +{ + Object getID(); + + AMQConnectionModel getConnectionModel(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 45c84d7603..c557c876f1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.QueueConfig; import org.apache.qpid.server.configuration.QueueConfiguration; @@ -68,8 +70,8 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer PrincipalHolder getPrincipalHolder(); void setPrincipalHolder(PrincipalHolder principalHolder); - void setExclusiveOwner(Object owner); - Object getExclusiveOwner(); + void setExclusiveOwningSession(AMQSessionModel owner); + AMQSessionModel getExclusiveOwningSession(); VirtualHost getVirtualHost(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index cf2f637697..b7f3f59c4b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -7,6 +7,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -83,7 +85,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private PrincipalHolder _prinicpalHolder; - private Object _exclusiveOwner; + private AMQSessionModel _exclusiveOwner; private final boolean _durable; @@ -2045,12 +2047,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return ids; } - public Object getExclusiveOwner() + public AMQSessionModel getExclusiveOwningSession() { return _exclusiveOwner; } - public void setExclusiveOwner(Object exclusiveOwner) + public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) { _exclusiveOwner = exclusiveOwner; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 1aff1eec86..58dbc95224 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -22,10 +22,23 @@ package org.apache.qpid.server.transport; import org.apache.qpid.server.configuration.ConnectionConfig; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.ConnectionCloseCode; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionDetachCode; +import org.apache.qpid.transport.SessionDetach; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.SessionDetached; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.AMQException; -public class ServerConnection extends Connection +public class ServerConnection extends Connection implements AMQConnectionModel { private ConnectionConfig _config; private Runnable _onOpenTask; @@ -88,4 +101,15 @@ public class ServerConnection extends Connection { _onOpenTask = task; } + + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + { + ExecutionException ex = new ExecutionException(); + ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED); + ex.setDescription(message); + ((ServerSession)session).invoke(ex); + + ((ServerSession)session).close(); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 2195cc4154..52b253c075 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -39,6 +39,8 @@ import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -63,7 +65,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; -public class ServerSession extends Session implements PrincipalHolder, SessionConfig +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel { private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); @@ -310,7 +312,7 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo } } - public void removeDispositionListener(Method method) + public void removeDispositionListener(Method method) { _messageDispositionListenerMap.remove(method.getId()); } @@ -552,4 +554,15 @@ public class ServerSession extends Session implements PrincipalHolder, SessionCo { close(); } + + public Object getID() + { + return getName(); + } + + public AMQConnectionModel getConnectionModel() + { + return (ServerConnection) getConnection(); + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 3b0f990377..7dcb268290 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.flow.*; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUnknownExchangeType; import org.apache.qpid.framing.*; @@ -813,7 +814,7 @@ public class ServerSessionDelegate extends SessionDelegate if(method.getExclusive()) { queue.setPrincipalHolder((ServerSession)session); - queue.setExclusiveOwner(session); + queue.setExclusiveOwningSession((AMQSessionModel) session); } else if(method.getAutoDelete()) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index dbd51af68c..f02b1f435f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.AMQException; @@ -47,7 +48,7 @@ public class MockAMQQueue implements AMQQueue private PrincipalHolder _principalHolder; - private Object _exclusiveOwner; + private AMQSessionModel _exclusiveOwner; public MockAMQQueue(String name) { @@ -527,12 +528,12 @@ public class MockAMQQueue implements AMQQueue _principalHolder = principalHolder; } - public Object getExclusiveOwner() + public AMQSessionModel getExclusiveOwningSession() { return _exclusiveOwner; } - public void setExclusiveOwner(Object exclusiveOwner) + public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner) { _exclusiveOwner = exclusiveOwner; } |
