summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-01-29 18:53:27 +0000
committerGordon Sim <gsim@apache.org>2007-01-29 18:53:27 +0000
commit06fc36ba2e32bc5c17ba18c1338f602cb84d54bb (patch)
treef5f1cbfb1cdfc8f1d5dfa84784442566a52f0583 /java
parent55fd03d058a965422e2369817c6c3f323e51ce55 (diff)
downloadqpid-python-06fc36ba2e32bc5c17ba18c1338f602cb84d54bb.tar.gz
Moved across fixes from trunk for handling exclusive consumers and no_local consumption.
Fixed close process in AMQChannel (remove channel from map only after consumer cancellations have been processed). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@501144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java49
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java6
6 files changed, 65 insertions, 18 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
index 83121e7977..1d37456b8e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
@@ -105,6 +105,21 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
"Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod());
}
+ catch (AMQQueue.ExistingExclusiveSubscription e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ {
+ throw body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer");
+ }
+
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
index 33a3b0735f..3914d6d793 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -61,6 +62,8 @@ public class MessageRecoverHandler implements StateAwareMethodListener<MessageRe
} else {
channel.resend(protocolSession);
}
+ MessageOkBody response = MessageOkBody.createMethodBody(protocolSession.getMajor(), protocolSession.getMinor());
+ protocolSession.writeResponse(evt, response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 868e0ba463..c30e8eaab4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -316,9 +316,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.error("Closing channel due to: " + e.getMessage());
writeRequest(channelNum, e.getCloseMethodBody());
- AMQChannel channel = _channelMap.remove(channelNum);
+ AMQChannel channel = _channelMap.get(channelNum);//can't remove it yet as close requires it
if (channel != null) {
channel.close(this);
+ _channelMap.remove(channelNum);
}
}
catch (AMQConnectionException e)
@@ -728,6 +729,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _ConnectionId.get();
}
+ public Object getClientIdentifier()
+ {
+ return _minaProtocolSession.getRemoteAddress();
+ }
+
public void addSessionCloseTask(Task task)
{
_taskList.add(task);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 969eb30cb2..6c99af1c70 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -159,6 +159,8 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void checkMethodBodyVersion(AMQMethodBody methodBody);
int getConnectionId();
+ Object getClientIdentifier();
+
void addSessionCloseTask(Task task);
void removeSessionCloseTask(Task task);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 0b61f95efe..0a4397cefc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -72,6 +72,7 @@ public class SubscriptionImpl implements Subscription
private final boolean _isBrowser;
private final Boolean _autoClose;
private boolean _closed = false;
+ private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
public static class Factory implements SubscriptionFactory
{
@@ -331,35 +332,49 @@ public class SubscriptionImpl implements Subscription
{
if (_noLocal)
{
+ boolean isLocal;
// We don't want local messages so check to see if message is one we sent
- if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
- msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ Object localInstance;
+ Object msgInstance;
+
+ if((protocolSession.getClientProperties() != null) &&
+ (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if (_logger.isTraceEnabled())
+ if((msg.getPublisher().getClientProperties() != null) &&
+ (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
}
- return false;
}
- else // if not then filter the message.
+ else
{
- if (_logger.isTraceEnabled())
+ localInstance = protocolSession.getClientIdentifier();
+ msgInstance = msg.getPublisher().getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
- _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
- ") but not ours so filtering");
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
}
- return checkFilters(msg);
+
}
}
- else
+ if (_logger.isTraceEnabled())
{
- if (_logger.isTraceEnabled())
- {
- _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
- }
- return checkFilters(msg);
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
}
+ return checkFilters(msg);
}
private boolean checkFilters(AMQMessage msg)
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 6ecb54300f..400b21c5b1 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -143,6 +143,12 @@ public class MockProtocolSession implements AMQProtocolSession
{
}
+
+ public Object getClientIdentifier()
+ {
+ return null;
+ }
+
public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException {
// TODO Auto-generated method stub