summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-19 16:07:12 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-19 16:07:12 +0000
commit3085b02021e10aefd23de281f1ae71189a3c2ac8 (patch)
tree69f9c2cd527d3dadb907ff9374d448d1d53269e9 /java/broker
parentee05152f188034e8b3cdc9e67c9e615febbf1d42 (diff)
downloadqpid-python-3085b02021e10aefd23de281f1ae71189a3c2ac8.tar.gz
QPID-216
BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body and passed down channel to subscription. SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal parameter through to the Subscription ConnectionStartOkMethodHandler.java - Saved the client properties so the client identifier can be used in comparison with the publisher id to implement no_local AMQMinaProtocolSession.java - added _clientProperties to store the sent client properties. AMQProtocolSession.java - interface changes to get/set ClientProperties ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this will take care of the hasFilters optimisation check. SubscriptionImpl.java - Added code to do comparison of client ids to determin insterest in a given message. SubscriptionSet.java - tidied up code to use hasInterest as this is where the nolocal is implemented. ConnectionStartMethodHandler.java - Moved literal values to a ClientProperties.java enumeration and a QpidProperties.java values. QpidConnectionMetaData.java - updated to get values from QpidProperties.java MockProtocolSession.java - null implementation of new get/set methods git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488712 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java39
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java22
10 files changed, 112 insertions, 68 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index d8485ef0f2..117231b36e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -286,12 +286,14 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
+ * @param noLocal
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks, FieldTable filters) throws AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -302,7 +304,7 @@ public class AMQChannel
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters);
+ queue.registerProtocolSession(session, _channelId, tag, acks, filters,noLocal);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -499,7 +501,7 @@ public class AMQChannel
if (_log.isDebugEnabled())
{
_log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
- " and multiple " + multiple);
+ " and multiple " + multiple);
}
if (multiple)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
index bf282020ee..1e57c714ff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
@@ -77,7 +77,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
try
{
- String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck, body.arguments);
+ String consumerTag = channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+ body.arguments, body.noLocal);
if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
@@ -90,8 +91,8 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
{
_log.info("Closing connection due to invalid selector");
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage(), BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ ise.getMessage(), BasicConsumeBody.CLASS_ID,
+ BasicConsumeBody.METHOD_ID));
}
catch (ConsumerTagNotUniqueException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
index 00ae547683..79b2e11bca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
@@ -78,12 +78,19 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ //save clientProperties
+ if (protocolSession.getClientProperties() == null)
+ {
+ protocolSession.setClientProperties(body.clientProperties);
+ }
+
switch (authResult.status)
{
case ERROR:
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
+
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
static int getConfiguredFrameSize()
{
final Configuration config = ApplicationRegistry.getInstance().getConfiguration();
- final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
+ final int framesize = config.getInt("advanced.framesize", DEFAULT_FRAME_SIZE);
_logger.info("Framesize set to " + framesize);
return framesize;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index e623d23a79..6ba78ba722 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -26,17 +26,19 @@ import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
+
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -89,10 +91,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
-
+
/* AMQP Version for this session */
private byte _major;
private byte _minor;
+ private FieldTable _clientProperties;
public ManagedObject getManagedObject()
{
@@ -128,7 +131,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
return new AMQProtocolSessionMBean(this);
}
- catch(JMException ex)
+ catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
@@ -153,18 +156,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
ProtocolInitiation pi = (ProtocolInitiation) message;
// this ensures the codec never checks for a PI message again
- ((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
- try {
+ ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ try
+ {
pi.checkVersion(this); // Fails if not correct
// This sets the protocol version (and hence framing classes) for this session.
_major = pi.protocolMajor;
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor, pi.protocolMinor, null,
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
mechanisms.getBytes(), locales.getBytes());
_minaProtocolSession.write(response);
- } catch (AMQException e) {
+ }
+ catch (AMQException e)
+ {
_logger.error("Received incorrect protocol initiation", e);
/* Find last protocol version in protocol version list. Make sure last protocol version
listed in the build file (build-module.xml) is the latest version which will be used
@@ -211,7 +217,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_logger.debug("Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
- (AMQMethodBody)frame.bodyFrame);
+ (AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
@@ -266,7 +272,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
- getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -275,7 +281,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentBody((ContentBody) frame.bodyFrame);
}
/**
@@ -355,6 +361,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* Close a specific channel. This will remove any resources used by the channel, including:
* <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
* </ul>
+ *
* @param channelId id of the channel to close
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
@@ -381,6 +388,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* In our current implementation this is used by the clustering code.
+ *
* @param channelId
*/
public void removeChannel(int channelId)
@@ -390,11 +398,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Initialise heartbeats on the session.
+ *
* @param delay delay in seconds (not ms)
*/
public void initHeartbeats(int delay)
{
- if(delay > 0)
+ if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
_minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.getInstance().getTimeout(delay));
@@ -404,6 +413,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
/**
* Closes all channels that were opened by this protocol session. This frees up all resources
* used by the channel.
+ *
* @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels() throws AMQException
@@ -421,7 +431,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
*/
public void closeSession() throws AMQException
{
- if(!_closed)
+ if (!_closed)
{
_closed = true;
closeAllChannels();
@@ -463,11 +473,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
- return ((InetSocketAddress)address).getHostName();
+ return ((InetSocketAddress) address).getHostName();
}
else if (address instanceof VmPipeAddress)
{
- return "vmpipe:" + ((VmPipeAddress)address).getPort();
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
}
else
{
@@ -484,22 +494,32 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_saslServer = saslServer;
}
-
+
+ public FieldTable getClientProperties()
+ {
+ return _clientProperties;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ _clientProperties = clientProperties;
+ }
+
/**
* Convenience methods for managing AMQP version.
* NOTE: Both major and minor will be set to 0 prior to protocol initiation.
*/
-
+
public byte getAmqpMajor()
{
return _major;
}
-
+
public byte getAmqpMinor()
{
return _minor;
}
-
+
public boolean amqpVersionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index acaf6b0d9b..03d0c50dac 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
@@ -122,4 +123,9 @@ public interface AMQProtocolSession
* @param saslServer
*/
void setSaslServer(SaslServer saslServer);
+
+
+ FieldTable getClientProperties();
+
+ void setClientProperties(FieldTable clientProperties);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index e64daef690..561b719b2e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -96,7 +96,7 @@ public class AMQQueue implements Managable, Comparable
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -362,12 +362,17 @@ public class AMQQueue implements Managable, Comparable
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ registerProtocolSession(ps, channel, consumerTag, acks, filters, false);
+ }
+
+ public void registerProtocolSession(AMQProtocolSession ps, int channel, String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters);
+ Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
_subscribers.addSubscriber(subscription);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index d8bb6e1948..8bdadcb493 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -281,7 +281,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
// Only give the message to those that want them.
- if (sub.hasFilters() && sub.hasInterest(msg))
+ if (sub.hasInterest(msg))
{
sub.enqueueForPreDelivery(msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
index f464384562..2bb77dc649 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
@@ -33,12 +33,10 @@ import org.apache.qpid.framing.FieldTable;
*/
public interface SubscriptionFactory
{
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 79b0593f69..fc00754cda 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -56,6 +57,7 @@ public class SubscriptionImpl implements Subscription
private Queue<AMQMessage> _messages;
+ private final boolean _noLocal;
/**
* True if messages need to be acknowledged
@@ -65,21 +67,15 @@ public class SubscriptionImpl implements Subscription
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters);
- }
-
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
- {
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
}
}
@@ -87,11 +83,11 @@ public class SubscriptionImpl implements Subscription
String consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null);
+ this(channelId, protocolSession, consumerTag, acks, null, false);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable filters)
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -105,6 +101,8 @@ public class SubscriptionImpl implements Subscription
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _noLocal = noLocal;
+
_filters = FilterManagerFactory.createManager(filters);
if (_filters != null)
@@ -218,7 +216,22 @@ public class SubscriptionImpl implements Subscription
public boolean hasInterest(AMQMessage msg)
{
- return _filters.allAllow(msg);
+ if (_noLocal)
+ {
+ return !(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())));
+ }
+ else
+ {
+ if (_filters != null)
+ {
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ return true;
+ }
+ }
}
public Queue<AMQMessage> getPreDeliveryQueue()
@@ -235,8 +248,6 @@ public class SubscriptionImpl implements Subscription
}
-
-
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index a4afe18e4d..91e720ea54 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -139,22 +139,15 @@ class SubscriptionSet implements WeightedSubscriptionManager
if (!subscription.isSuspended())
{
- if (!subscription.hasFilters())
+ if (subscription.hasInterest(msg))
{
- return subscription;
- }
- else
- {
- if (subscription.hasInterest(msg))
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
{
- // if the queue is not empty then this client is ready to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a message
- // OR have a clean up thread that runs the PDQs expunging the messages.
- if (subscription.getPreDeliveryQueue().isEmpty())
- {
- return subscription;
- }
+ return subscription;
}
}
}
@@ -208,6 +201,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue)