diff options
| author | Gordon Sim <gsim@apache.org> | 2007-01-29 18:53:27 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-01-29 18:53:27 +0000 |
| commit | 06fc36ba2e32bc5c17ba18c1338f602cb84d54bb (patch) | |
| tree | f5f1cbfb1cdfc8f1d5dfa84784442566a52f0583 /java | |
| parent | 55fd03d058a965422e2369817c6c3f323e51ce55 (diff) | |
| download | qpid-python-06fc36ba2e32bc5c17ba18c1338f602cb84d54bb.tar.gz | |
Moved across fixes from trunk for handling exclusive consumers and no_local consumption.
Fixed close process in AMQChannel (remove channel from map only after consumer cancellations have been processed).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 65 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index 83121e7977..1d37456b8e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -105,6 +105,21 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(), "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod()); } + catch (AMQQueue.ExistingExclusiveSubscription e) + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + "Cannot subscribe to queue " + + queue.getName() + + " as it already has an existing exclusive consumer"); + } + catch (AMQQueue.ExistingSubscriptionPreventsExclusive e) + { + throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(), + "Cannot subscribe to queue " + + queue.getName() + + " exclusively as it already has a consumer"); + } + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java index 33a3b0735f..3914d6d793 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -61,6 +62,8 @@ public class MessageRecoverHandler implements StateAwareMethodListener<MessageRe } else { channel.resend(protocolSession); } + MessageOkBody response = MessageOkBody.createMethodBody(protocolSession.getMajor(), protocolSession.getMinor()); + protocolSession.writeResponse(evt, response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 868e0ba463..c30e8eaab4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -316,9 +316,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.error("Closing channel due to: " + e.getMessage()); writeRequest(channelNum, e.getCloseMethodBody()); - AMQChannel channel = _channelMap.remove(channelNum); + AMQChannel channel = _channelMap.get(channelNum);//can't remove it yet as close requires it if (channel != null) { channel.close(this); + _channelMap.remove(channelNum); } } catch (AMQConnectionException e) @@ -728,6 +729,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _ConnectionId.get(); } + public Object getClientIdentifier() + { + return _minaProtocolSession.getRemoteAddress(); + } + public void addSessionCloseTask(Task task) { _taskList.add(task); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 969eb30cb2..6c99af1c70 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -159,6 +159,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter void checkMethodBodyVersion(AMQMethodBody methodBody); int getConnectionId(); + Object getClientIdentifier(); + void addSessionCloseTask(Task task); void removeSessionCloseTask(Task task); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 0b61f95efe..0a4397cefc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -72,6 +72,7 @@ public class SubscriptionImpl implements Subscription private final boolean _isBrowser; private final Boolean _autoClose; private boolean _closed = false; + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); public static class Factory implements SubscriptionFactory { @@ -331,35 +332,49 @@ public class SubscriptionImpl implements Subscription { if (_noLocal) { + boolean isLocal; // We don't want local messages so check to see if message is one we sent - if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals( - msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString()))) + Object localInstance; + Object msgInstance; + + if((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (_logger.isTraceEnabled()) + if((msg.getPublisher().getClientProperties() != null) && + (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + - System.identityHashCode(msg) + ")"); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; + } } - return false; } - else // if not then filter the message. + else { - if (_logger.isTraceEnabled()) + localInstance = protocolSession.getClientIdentifier(); + msgInstance = msg.getPublisher().getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) { - _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) + - ") but not ours so filtering"); + if (_logger.isTraceEnabled()) + { + _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" + + System.identityHashCode(msg) + ")"); + } + return false; } - return checkFilters(msg); + } } - else + if (_logger.isTraceEnabled()) { - if (_logger.isTraceEnabled()) - { - _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); - } - return checkFilters(msg); + _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg)); } + return checkFilters(msg); } private boolean checkFilters(AMQMessage msg) diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java index 6ecb54300f..400b21c5b1 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -143,6 +143,12 @@ public class MockProtocolSession implements AMQProtocolSession { } + + public Object getClientIdentifier() + { + return null; + } + public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException { // TODO Auto-generated method stub |
