diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-05-17 15:32:18 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-05-17 15:32:18 +0000 |
| commit | ee94c939a9d77f0de6ae0cb33c782a9015bb8452 (patch) | |
| tree | 05558bbb1d3bd6386d54da85895bdc239475f1de /java | |
| parent | c135260a07f847965dae618315b379f02b8a0b52 (diff) | |
| download | qpid-python-ee94c939a9d77f0de6ae0cb33c782a9015bb8452.tar.gz | |
Merged revisions 538084-538097,538099-538108,538110-538906,538908-538912 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r538084 | ritchiem | 2007-05-15 09:02:42 +0100 (Tue, 15 May 2007) | 1 line
QPID-466 Removed Unsupported exception from setIntProperty with STRICT_AMQP set
........
r538240 | ritchiem | 2007-05-15 17:19:01 +0100 (Tue, 15 May 2007) | 6 lines
QPID-3 Topic Matching with tests
A simple naive approach. Similar to C++ to be included for M2.
More elaborate pre-evaluated version will have to wait.
Once benchmarks have been performed we can evaluate performance advantages if any of that approach.
........
r538882 | ritchiem | 2007-05-17 13:12:34 +0100 (Thu, 17 May 2007) | 3 lines
Fix for broken CSDM message purging routine that was causing python test_get to fail.
Replaced long while control with a method call that is easier to understand and has more comments.
........
r538912 | ritchiem | 2007-05-17 14:26:25 +0100 (Thu, 17 May 2007) | 2 lines
Fixed failing python tests. The rather annoying way we unsubscribe subscribers by creating new ones was causing a problem as the closing channel had been closed before the unsubscribe call.
Java now passes all python tests
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@538968 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 994 insertions, 221 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1cd098a64f..1de4d16ad4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -42,12 +43,12 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.exchange.NoRouteException; +import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.Subscription; -import org.apache.qpid.server.messageStore.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.*; @@ -59,7 +60,7 @@ public class AMQChannel private final int _channelId; - //private boolean _transactional; + // private boolean _transactional; private long _prefetch_HighWaterMark; @@ -119,14 +120,12 @@ public class AMQChannel private Set<Long> _browsedAcks = new HashSet<Long>(); - //Why do we need this reference ? - ritchiem + // Why do we need this reference ? - ritchiem private final AMQProtocolSession _session; private boolean _closing; - - public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges) - throws - AMQException + public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, + MessageStore messageStore, MessageRouter exchanges) throws AMQException { _session = session; _channelId = channelId; @@ -145,7 +144,8 @@ public class AMQChannel */ public void setLocalTransactional() { - _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages); + _txnContext = + new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages); } public boolean isTransactional() @@ -176,7 +176,6 @@ public class AMQChannel return _prefetchSize; } - public void setPrefetchSize(long prefetchSize) { _prefetchSize = prefetchSize; @@ -202,31 +201,26 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - - public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) - throws - AMQException + public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException { - - _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, - _txnContext); + _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); _currentMessage.setPublisher(publisher); } - public void publishContentHeader(ContentHeaderBody contentHeaderBody) - throws - AMQException + public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException { if (_currentMessage == null) { throw new AMQException("Received content header without previously receiving a BasicPublish frame"); - } else + } + else { if (_log.isTraceEnabled()) { _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } + _currentMessage.setContentHeaderBody(contentHeaderBody); _currentMessage.setExpiration(); @@ -241,9 +235,7 @@ public class AMQChannel } } - public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) - throws - AMQException + public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession) throws AMQException { if (_currentMessage == null) { @@ -254,12 +246,15 @@ public class AMQChannel { _log.trace(debugIdentity() + "Content body received on channel " + _channelId); } + try { // returns true iff the message was delivered (i.e. if all data was // received - if (_currentMessage.addContentBodyFrame(_storeContext, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody))) + if (_currentMessage.addContentBodyFrame(_storeContext, + protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToContentChunk( + contentBody))) { // callback to allow the context to do any post message processing // primary use is to allow message return processing in the non-tx case @@ -276,9 +271,7 @@ public class AMQChannel } } - protected void routeCurrentMessage() - throws - AMQException + protected void routeCurrentMessage() throws AMQException { try { @@ -316,15 +309,13 @@ public class AMQChannel * @throws AMQException if something goes wrong */ public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks, - FieldTable filters, boolean noLocal, boolean exclusive) - throws - AMQException, - ConsumerTagNotUniqueException + FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException { if (tag == null) { tag = new AMQShortString("sgen_" + getNextConsumerTag()); } + if (_consumerTag2QueueMap.containsKey(tag)) { throw new ConsumerTagNotUniqueException(); @@ -332,13 +323,11 @@ public class AMQChannel queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); _consumerTag2QueueMap.put(tag, queue); + return tag; } - - public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag) - throws - AMQException + public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag) throws AMQException { final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag); if (q != null) @@ -353,38 +342,44 @@ public class AMQChannel * @param session The session to close * @throws AMQException if there is an error during closure */ - public void close(AMQProtocolSession session) - throws - AMQException + public void close(AMQProtocolSession session) throws AMQException { - _closing = true; _txnContext.rollback(); unsubscribeAllConsumers(session); requeue(); + + setClosing(true); + } + + private void setClosing(boolean closing) + { + _closing = closing; } - private void unsubscribeAllConsumers(AMQProtocolSession session) - throws - AMQException + private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException { if (_log.isInfoEnabled()) { if (!_consumerTag2QueueMap.isEmpty()) { _log.info("Unsubscribing all consumers on channel " + toString()); - } else + } + else { _log.info("No consumers to unsubscribe on channel " + toString()); } } + for (Map.Entry<AMQShortString, AMQQueue> me : _consumerTag2QueueMap.entrySet()) { if (_log.isInfoEnabled()) { _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } + me.getValue().unregisterProtocolSession(session, _channelId, me.getKey()); } + _consumerTag2QueueMap.clear(); } @@ -404,12 +399,13 @@ public class AMQChannel if (queue == null) { _log.debug("Adding unacked message with a null queue:" + message.debugIdentity()); - } else + } + else { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + - ") with a queue(" + queue + ") for " + consumerTag); + _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag + + ") with a queue(" + queue + ") for " + consumerTag); } } } @@ -434,9 +430,7 @@ public class AMQChannel * * @throws org.apache.qpid.AMQException if the requeue fails */ - public void requeue() - throws - AMQException + public void requeue() throws AMQException { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); @@ -454,20 +448,20 @@ public class AMQChannel if (!(_txnContext instanceof NonTransactionalContext)) { -// if (_nonTransactedContext == null) + // if (_nonTransactedContext == null) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _nonTransactedContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; - } else + } + else { deliveryContext = _txnContext; } } - for (UnacknowledgedMessage unacked : messagesToBeDelivered) { if (unacked.queue != null) @@ -483,7 +477,7 @@ public class AMQChannel // Should we allow access To the DM to directy deliver the message? // As we don't need to check for Consumers or worry about incrementing the message count? -// unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); + // unacked.queue.getDeliveryManager().deliver(_storeContext, unacked.queue.getName(), unacked.message, false); } } @@ -495,9 +489,7 @@ public class AMQChannel * @param deliveryTag The message to requeue * @throws AMQException If something goes wrong. */ - public void requeue(long deliveryTag) - throws - AMQException + public void requeue(long deliveryTag) throws AMQException { UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag); @@ -518,74 +510,71 @@ public class AMQChannel TransactionalContext deliveryContext; if (!(_txnContext instanceof NonTransactionalContext)) { -// if (_nonTransactedContext == null) + // if (_nonTransactedContext == null) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _nonTransactedContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; - } else + } + else { deliveryContext = _txnContext; } if (unacked.queue != null) { - //Redeliver the messages to the front of the queue + // Redeliver the messages to the front of the queue deliveryContext.deliver(unacked.message, unacked.queue, true); - //Deliver increments the message count but we have already deliverted this once so don't increment it again + // Deliver increments the message count but we have already deliverted this once so don't increment it again // this was because deliver did an increment changed this. - } else - { - _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag + - " but no queue defined and no DeadLetter queue so DROPPING message."); -// _log.error("Requested requeue of message:" + deliveryTag + -// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); -// -// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); -// } - } else + else + { + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); + // _log.error("Requested requeue of message:" + deliveryTag + + // " but no queue defined using DeadLetter queue:" + getDeadLetterQueue()); + // + // deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false); + // + } + } + else { - _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size()); + _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + + _unacknowledgedMessageMap.size()); if (_log.isDebugEnabled()) { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - int count = 0; - - public boolean callback(UnacknowledgedMessage message) - throws - AMQException { - _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" + - "[" + message.deliveryTag + "]"); - return false; // Continue - } + int count = 0; - public void visitComplete() - { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + _log.debug( + (count++) + ": (" + message.message.debugIdentity() + ")" + "[" + message.deliveryTag + "]"); - } - }); + return false; // Continue + } + + public void visitComplete() + { } + }); } } - } - /** * Called to resend all outstanding unacknowledged messages to this same channel. * * @param requeue Are the messages to be requeued or dropped. * @throws AMQException When something goes wrong. */ - public void resend(final boolean requeue) - throws - AMQException + public void resend(final boolean requeue) throws AMQException { final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>(); @@ -599,52 +588,53 @@ public class AMQChannel // Marking messages who still have a consumer for to be resent // and those that don't to be requeued. _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - public boolean callback(UnacknowledgedMessage message) - throws - AMQException { - AMQShortString consumerTag = message.consumerTag; - AMQMessage msg = message.message; - msg.setRedelivered(true); - if (consumerTag != null) + public boolean callback(UnacknowledgedMessage message) throws AMQException { - // Consumer exists - if (_consumerTag2QueueMap.containsKey(consumerTag)) - { - msgToResend.add(message); - } else // consumer has gone + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if (consumerTag != null) { - msgToRequeue.add(message); + // Consumer exists + if (_consumerTag2QueueMap.containsKey(consumerTag)) + { + msgToResend.add(message); + } + else // consumer has gone + { + msgToRequeue.add(message); + } } - } else - { - // Message has no consumer tag, so was "delivered" to a GET - // or consumer no longer registered - // cannot resend, so re-queue. - if (message.queue != null) + else { - if (requeue) + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null) { - msgToRequeue.add(message); - } else + if (requeue) + { + msgToRequeue.add(message); + } + else + { + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + } + } + else { - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); } - } else - { - _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message); } - } - // false means continue processing - return false; - } + // false means continue processing + return false; + } - public void visitComplete() - { - } - }); + public void visitComplete() + { } + }); // Process Messages to Resend if (_log.isInfoEnabled()) @@ -652,11 +642,13 @@ public class AMQChannel if (!msgToResend.isEmpty()) { _log.info("Preparing (" + msgToResend.size() + ") message to resend."); - } else + } + else { _log.info("No message to resend."); } } + for (UnacknowledgedMessage message : msgToResend) { AMQMessage msg = message.message; @@ -665,22 +657,21 @@ public class AMQChannel // If the client has requested the messages be resent then it is // their responsibility to ensure that thay are capable of receiving them // i.e. The channel hasn't been server side suspended. -// if (isSuspended()) -// { -// _log.info("Channel is suspended so requeuing"); -// //move this message to requeue -// msgToRequeue.add(message); -// } -// else -// { - //release to allow it to be delivered + // if (isSuspended()) + // { + // _log.info("Channel is suspended so requeuing"); + // //move this message to requeue + // msgToRequeue.add(message); + // } + // else + // { + // release to allow it to be delivered msg.release(message.queue); // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(message.queue); if (sub != null) @@ -697,32 +688,38 @@ public class AMQChannel { if (_log.isDebugEnabled()) { - _log.debug("Subscription(" + System.identityHashCode(sub) + ") closed during resend so requeuing message"); + _log.debug("Subscription(" + System.identityHashCode(sub) + + ") closed during resend so requeuing message"); } - //move this message to requeue + // move this message to requeue msgToRequeue.add(message); - } else + } + else { if (_log.isDebugEnabled()) { - _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + System.identityHashCode(sub)); + _log.debug("Requeuing " + msg.debugIdentity() + " for resend via sub:" + + System.identityHashCode(sub)); } + sub.addToResendQueue(msg); _unacknowledgedMessageMap.remove(message.deliveryTag); } } // sync(sub.getSendLock) - } else + } + else { if (_log.isInfoEnabled()) { - _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + ")to prevent loss"); + _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + + ")to prevent loss"); } - //move this message to requeue + // move this message to requeue msgToRequeue.add(message); } } // for all messages -// } else !isSuspend + // } else !isSuspend if (_log.isInfoEnabled()) { @@ -739,12 +736,13 @@ public class AMQChannel { if (_nonTransactedContext == null) { - _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this, - _returnMessages, _browsedAcks); + _nonTransactedContext = + new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } deliveryContext = _nonTransactedContext; - } else + } + else { deliveryContext = _txnContext; } @@ -769,36 +767,32 @@ public class AMQChannel * @param queue the queue that has been deleted * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages */ - public void queueDeleted(final AMQQueue queue) - throws - AMQException + public void queueDeleted(final AMQQueue queue) throws AMQException { _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() - { - public boolean callback(UnacknowledgedMessage message) - throws - AMQException { - if (message.queue == queue) + public boolean callback(UnacknowledgedMessage message) throws AMQException { - try - { - message.discard(_storeContext); - message.queue = null; - } - catch (AMQException e) + if (message.queue == queue) { - _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " + - e, e); + try + { + message.discard(_storeContext); + message.queue = null; + } + catch (AMQException e) + { + _log.error( + "Error decrementing ref count on message " + message.message.getMessageId() + ": " + e, e); + } } + + return false; } - return false; - } - public void visitComplete() - { - } - }); + public void visitComplete() + { } + }); } /** @@ -809,9 +803,7 @@ public class AMQChannel * acknowledges the single message specified by the delivery tag * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) - throws - AMQException + public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException { synchronized (_unacknowledgedMessageMap.getLock()) { @@ -828,6 +820,7 @@ public class AMQChannel } } + checkSuspension(); } @@ -845,8 +838,9 @@ public class AMQChannel { boolean suspend; - suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark) - || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes()); + suspend = + ((_prefetch_HighWaterMark != 0) && (_unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)) + || ((_prefetchSize != 0) && (_prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes())); setSuspended(suspend); } @@ -867,12 +861,13 @@ public class AMQChannel if (wasSuspended) { _log.debug("Unsuspending channel " + this); - //may need to deliver queued messages + // may need to deliver queued messages for (AMQQueue q : _consumerTag2QueueMap.values()) { q.deliverAsync(); } - } else + } + else { _log.debug("Suspending channel " + this); } @@ -884,20 +879,17 @@ public class AMQChannel return _suspended.get(); } - public void commit() - throws - AMQException + public void commit() throws AMQException { if (!isTransactional()) { throw new AMQException("Fatal error: commit called on non-transactional channel"); } - _txnContext.commit(); + + _txnContext.commit(); } - public void rollback() - throws - AMQException + public void rollback() throws AMQException { _txnContext.rollback(); } @@ -908,6 +900,7 @@ public class AMQChannel sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(isTransactional()); sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark); sb.append("/").append(_prefetch_HighWaterMark); + return sb.toString(); } @@ -926,41 +919,40 @@ public class AMQChannel return _storeContext; } - public void processReturns(AMQProtocolSession session) - throws - AMQException + public void processReturns(AMQProtocolSession session) throws AMQException { for (RequiredDeliveryException bouncedMessage : _returnMessages) { AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, - bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); } + _returnMessages.clear(); } - public boolean wouldSuspend(AMQMessage msg) { if (isSuspended()) { return true; - } else + } + else { - boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark); + boolean willSuspend = + ((_prefetch_HighWaterMark != 0) && ((_unacknowledgedMessageMap.size() + 1) > _prefetch_HighWaterMark)); if (!willSuspend) { final long unackedSize = _unacknowledgedMessageMap.getUnacknowledgeBytes(); - willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < msg.getSize() + unackedSize); + willSuspend = (_prefetchSize != 0) && (unackedSize != 0) && (_prefetchSize < (msg.getSize() + unackedSize)); } - if (willSuspend) { setSuspended(true); } + return willSuspend; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 386cfd2349..e9c5b0024c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.exchange; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.StringTokenizer; +import java.util.LinkedList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -56,6 +58,10 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); + private static final String TOPIC_SEPARATOR = "."; + private static final String AMQP_STAR = "*"; + private static final String AMQP_HASH = "#"; /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") @@ -78,7 +84,7 @@ public class DestWildExchange extends AbstractExchange AMQShortString key = entry.getKey(); List<String> queueList = new ArrayList<String>(); - List<AMQQueue> queues = entry.getValue(); + List<AMQQueue> queues = getMatchedQueues(key); for (AMQQueue q : queues) { queueList.add(q.getName().toString()); @@ -118,10 +124,13 @@ public class DestWildExchange extends AbstractExchange return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; } - public synchronized void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; - assert routingKey != null; + assert rKey != null; + + AMQShortString routingKey = normalize(rKey); + _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); @@ -142,15 +151,67 @@ public class DestWildExchange extends AbstractExchange } + private AMQShortString normalize(AMQShortString routingKey) + { + StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); + List<String> _subscription = new ArrayList<String>(); + + while (routingTokens.hasMoreTokens()) + { + _subscription.add(routingTokens.nextToken()); + } + + int size = _subscription.size(); + + for (int index = 0; index < size; index++) + { + //if there are more levels + if (index + 1 < size) + { + if (_subscription.get(index).equals(AMQP_HASH)) + { + if (_subscription.get(index + 1).equals(AMQP_HASH)) + { + // we don't need #.# delete this one + _subscription.remove(index); + size--; + //redo this normalisation + index--; + } + + if (_subscription.get(index + 1).equals(AMQP_STAR)) + { + // we don't want #.* swap to *.# + // remove it and put it in at index + 1 + _subscription.add(index + 1, _subscription.remove(index)); + } + } + }//if we have more levels + } + + StringBuilder sb = new StringBuilder(); + + for (String s : _subscription) + { + sb.append(s); + sb.append(TOPIC_SEPARATOR); + } + + sb.deleteCharAt(sb.length() - 1); + + return new AMQShortString(sb.toString()); + } + public void route(AMQMessage payload) throws AMQException { MessagePublishInfo info = payload.getMessagePublishInfo(); - final AMQShortString routingKey = info.getRoutingKey(); - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + final AMQShortString routingKey = normalize(info.getRoutingKey()); + + List<AMQQueue> queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag - if (queues == null) + if (queues == null || queues.size() == 0) { if (info.isMandatory()) { @@ -177,14 +238,14 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); return queues != null && queues.contains(queue); } public boolean isBound(AMQShortString routingKey) throws AMQException { - List<AMQQueue> queues = _routingKey2queues.get(routingKey); + List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); return queues != null && !queues.isEmpty(); } @@ -205,10 +266,12 @@ public class DestWildExchange extends AbstractExchange return !_routingKey2queues.isEmpty(); } - public synchronized void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; - assert routingKey != null; + assert rKey != null; + + AMQShortString routingKey = normalize(rKey); List<AMQQueue> queues = _routingKey2queues.get(routingKey); if (queues == null) @@ -241,4 +304,110 @@ public class DestWildExchange extends AbstractExchange throw new AMQException("Exception occured in creating the topic exchenge mbean", ex); } } + + + private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) + { + List<AMQQueue> list = new LinkedList<AMQQueue>(); + StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); + + ArrayList<String> routingkeyList = new ArrayList<String>(); + + while (routingTokens.hasMoreTokens()) + { + String next = routingTokens.nextToken(); + if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH)) + { + continue; + } + + routingkeyList.add(next); + } + + for (AMQShortString queue : _routingKey2queues.keySet()) + { + StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR); + + ArrayList<String> queueList = new ArrayList<String>(); + + while (queTok.hasMoreTokens()) + { + queueList.add(queTok.nextToken()); + } + + + int depth = 0; + boolean matching = true; + boolean done = false; + int routingskip = 0; + int queueskip = 0; + + while (matching && !done) + { + if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip) + { + done = true; + + // if it was the routing key that ran out of digits + if (routingkeyList.size() == depth + routingskip) + { + if (queueList.size() > (depth + queueskip)) + { // a hash and it is the last entry + matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1; + } + } + else if (routingkeyList.size() > depth + routingskip) + { + // There is still more routing key to check + matching = false; + } + + + continue; + } + + // if the values on the two topics don't match + if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip))) + { + if (queueList.get(depth + queueskip).equals(AMQP_STAR)) + { + depth++; + + continue; + } + else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) + { + // Is this a # at the end + if (queueList.size() == depth + queueskip + 1) + { + done = true; + continue; + } + + // otherwise # in the middle + while (routingkeyList.size() > depth + routingskip) + { + if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) + { + queueskip++; + depth++; + break; + } + routingskip++; + } + continue; + } + matching = false; + } + depth++; + } + + if (matching) + { + list.addAll(_routingKey2queues.get(queue)); + } + } + + return list; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index bdc2189676..0fb5e6d88a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -451,13 +451,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = messages.peek(); //while (we have a message) && ((The subscriber is not a browser or message is taken ) or we are clearing) && (Check message is taken.) - while (message != null - && ( - ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) - || sub == null) - && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired - || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired - ) + while (purgeMessage(message, sub)) { //remove the already taken message or expired AMQMessage removed = messages.poll(); @@ -478,6 +472,54 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return message; } + /** + * + * @param message + * @param sub + * @return + * @throws AMQException + */ + private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException + { + //Original.. complicated while loop control +// (message != null +// && ( +// ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) +// || sub == null) +// && message.taken(_queue, sub)); + + boolean purge = false; + + // if the message is null then don't purge as we have no messagse. + if (message != null) + { + // if we have a subscriber perform message checks + if (sub != null) + { + // Check that the message hasn't expired. + if (message.expired(sub.getChannel().getStoreContext(), _queue)) + { + return true; + } + + // if we have a queue browser(we don't purge) so check mark the message as taken + purge = ((!sub.isBrowser() || message.isTaken(_queue))); + } + else + { + // if there is no subscription we are doing + // a get or purging so mark message as taken. + message.isTaken(_queue); + // and then ensure that it gets purged + purge = true; + } + } + + // if we are purging then ensure we mark this message taken for the current subscriber + // the current subscriber may be null in the case of a get or a purge but this is ok. + return purge && message.taken(_queue, sub); + } + public void sendNextMessage(Subscription sub, AMQQueue queue)//Queue<AMQMessage> messageQueue) { diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java new file mode 100644 index 0000000000..8391481aeb --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.exchange; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.messageStore.MemoryMessageStore; +import org.apache.qpid.server.messageStore.MessageStore; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MessageHandleFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +// import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.txn.NonTransactionalContext; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class DestWildExchangeTest extends TestCase +{ + + DestWildExchange _exchange; + + VirtualHost _vhost; + MessageStore _store; + StoreContext _context; + + public void setUp() throws AMQException + { + _exchange = new DestWildExchange(); + _vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); + // _store = new MemoryMessageStore(); + _store = new MemoryMessageStore(); + _context = new StoreContext(); + } + + public void testNoRoute() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a*#b"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); + + MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b")); + + AMQMessage message = new AMQMessage(0L, info, null); + + try + { + _exchange.route(message); + fail("Message has no route and shouldn't be routed"); + } + catch (NoRouteException nre) + { + // normal + } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + public void testDirectMatch() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("ab"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + + AMQMessage message = createMessage("a.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has no route and should fail to be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + public void testStarMatch() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a*"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*"), queue, null); + + AMQMessage message = createMessage("a.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has no route and should fail to be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + public void testHashMatch() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.#"), queue, null); + + AMQMessage message = createMessage("a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has no route and should fail to be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + } + + public void testMidHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null); + + AMQMessage message = createMessage("a.c.d.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.c.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testMatchafterHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b.c"), queue, null); + + AMQMessage message = createMessage("a.c.b.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.b.c.b"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.b.c.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testHashAfterHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.*.#.b.c.#.d"), queue, null); + + AMQMessage message = createMessage("a.c.b.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.a.b.c.d"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testHashHash() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a#"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.#.*.#.d"), queue, null); + + AMQMessage message = createMessage("a.c.b.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + message = createMessage("a.a.b.c.d"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + } + catch (AMQException nre) + { + fail("Message has no route and should be routed"); + } + + Assert.assertEquals(1, queue.getMessageCount()); + + Assert.assertEquals("Wrong message recevied", message, queue.getMessagesOnTheQueue().get(0)); + + queue.deleteMessageFromTop(_context); + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testSubMatchFails() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b.c.d"), queue, null); + + AMQMessage message = createMessage("a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testMoreRouting() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + + AMQMessage message = createMessage("a.b.c"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + } + + public void testMoreQueue() throws AMQException + { + AMQQueue queue = new AMQQueue(new AMQShortString("a"), false, null, false, _vhost); + _exchange.registerQueue(new AMQShortString("a.b"), queue, null); + + AMQMessage message = createMessage("a"); + + try + { + _exchange.route(message); + message.routingComplete(_store, _context, new MessageHandleFactory()); + fail("Message has route and should not be routed"); + } + catch (AMQException nre) + { } + + Assert.assertEquals(0, queue.getMessageCount()); + + } + + private AMQMessage createMessage(String s) throws AMQException + { + MessagePublishInfo info = new PublishInfo(new AMQShortString(s)); + + TransactionalContext trancontext = + new NonTransactionalContext(_store, _context, null, new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + + AMQMessage message = new AMQMessage(0L, info, trancontext); + message.setContentHeaderBody(new ContentHeaderBody()); + + return message; + } + + class PublishInfo implements MessagePublishInfo + { + AMQShortString _routingkey; + + PublishInfo(AMQShortString routingkey) + { + _routingkey = routingkey; + } + + public AMQShortString getExchange() + { + return null; + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return true; + } + + public AMQShortString getRoutingKey() + { + return _routingkey; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 11102e0925..13f544516a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -467,11 +467,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setIntProperty(String propertyName, int i) throws JMSException { - if (_strictAMQP) - { - throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); - } - checkWritableProperties(); JMSHeaderAdapter.checkPropertyName(propertyName); super.setIntProperty(new AMQShortString(propertyName), new Integer(i)); diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java index 27ac3474b9..4762b451c4 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java @@ -250,7 +250,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex ;
}
- assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ assertEquals("Wrong number of messages bounced: ", 1, _bouncedMessageList.size());
Message m = _bouncedMessageList.get(0);
assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
|
