diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-01 19:24:36 +0000 |
| commit | 3e4d1f2f56ef296ea5132511faaa8689867c499c (patch) | |
| tree | 6c990d7d04cdaf07bd6dace7c157b882f8370cbf /qpid/java/client | |
| parent | f6b68fa2e1ca27d46e6080a3568ef5d785eed548 (diff) | |
| download | qpid-python-3e4d1f2f56ef296ea5132511faaa8689867c499c.tar.gz | |
QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
5 files changed, 206 insertions, 139 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index b9e9a33cd6..922cc1e2a7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -31,7 +31,7 @@ public class AMQHeadersExchange extends AMQDestination { public AMQHeadersExchange(BindingURL binding) { - this(binding.getExchangeName()); + super(binding); } public AMQHeadersExchange(String name) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e784e903fa..018a1ec851 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -440,7 +440,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // If the session has been closed don't waste time creating a thread to do // flow control if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing())) - { + { // Only execute change if previous state // was False if (!_suspendState.getAndSet(true)) @@ -535,7 +535,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract AMQException getLastException(); - + public void checkNotClosed() throws JMSException { try @@ -553,7 +553,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic ssnClosed.setLinkedException(ex); ssnClosed.initCause(ex); throw ssnClosed; - } + } else { throw ise; @@ -987,13 +987,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Delegate the work to the {@link #createDurableSubscriber(Topic, String, String, boolean)} method return createDurableSubscriber(topic, name, null, false); } - + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { checkNotClosed(); Topic origTopic = checkValidTopic(topic, true); - + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); if (dest.getDestSyntax() == DestSyntax.ADDR && !dest.isAddressResolved()) @@ -1015,20 +1015,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw toJMSException("Error when verifying destination", e); } } - + String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector; - + _subscriberDetails.lock(); try { TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name); - + // Not subscribed to this name in the current session if (subscriber == null) { // After the address is resolved routing key will not be null. AMQShortString topicName = dest.getRoutingKey(); - + if (_strictAMQP) { if (_strictAMQPFATAL) @@ -1046,8 +1046,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic else { Map<String,Object> args = new HashMap<String,Object>(); - - // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all @@ -1060,16 +1060,28 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); - boolean isQueueBoundForTopicAndSelector = + boolean isQueueBoundForTopicAndSelector = isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); if (isQueueBound && !isQueueBoundForTopicAndSelector) { deleteQueue(dest.getAMQQueueName()); } + else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match + { + try + { + bindQueue(dest.getAMQQueueName(), dest.getRoutingKey(), + FieldTable.convertToFieldTable(args), dest.getExchangeName(), dest, true); + } + catch(AMQException e) + { + throw toJMSException("Error when checking binding",e); + } + } } } - else + else { // Subscribed with the same topic and no current / previous or same selector if (subscriber.getTopic().equals(topic) @@ -1100,7 +1112,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _subscriberAccess.unlock(); } - + return subscriber; } catch (TransportException e) @@ -1193,19 +1205,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (syntax == AMQDestination.DestSyntax.BURL) { // For testing we may want to use the prefix - return new AMQQueue(getDefaultQueueExchangeName(), + return new AMQQueue(getDefaultQueueExchangeName(), new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName))); } else { AMQQueue queue = new AMQQueue(queueName); return queue; - + } } else { - return new AMQQueue(queueName); + return new AMQQueue(queueName); } } catch (URISyntaxException urlse) @@ -1341,7 +1353,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return new QueueReceiverAdaptor(dest, consumer); } - + private Queue validateQueue(Destination dest) throws InvalidDestinationException { if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue) @@ -1497,9 +1509,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - return new AMQTopic(topicName); + return new AMQTopic(topicName); } - + } catch (URISyntaxException urlse) { @@ -1646,16 +1658,24 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.debug("Message[" + message.toString() + "] received in session"); } _highestDeliveryTag.set(message.getDeliveryTag()); - _queue.add(message); + _queue.add(message); } public void declareAndBind(AMQDestination amqd) throws AMQException { + declareAndBind(amqd, new FieldTable()); + } + + + public void declareAndBind(AMQDestination amqd, FieldTable arguments) + throws + AMQException + { declareExchange(amqd, false); AMQShortString queueName = declareQueue(amqd, false); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd); } /** @@ -1681,7 +1701,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * Not that this does not necessarily mean that the recovery has failed, but simply that it is * not possible to tell if it has or not. * @todo Be aware of possible changes to parameter order as versions change. - * + * * Strategy for handling recover. * Flush any acks not yet sent. * Stop the message flow. @@ -1730,7 +1750,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } sendRecover(); - + markClean(); if (!isSuspended) @@ -1755,7 +1775,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected abstract void sendRecover() throws AMQException, FailoverException; protected abstract void flushAcknowledgments(); - + public void rejectMessage(UnprocessedMessage message, boolean requeue) { @@ -1851,7 +1871,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setMessageListener(MessageListener listener) throws JMSException { } - + /** * @see #unsubscribe(String, boolean) */ @@ -1866,20 +1886,20 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw toJMSException("Exception while unsubscribing:" + e.getMessage(), e); } } - + /** * Unsubscribe from a subscription. - * + * * @param name the name of the subscription to unsubscribe * @param safe allows safe unsubscribe operation that will not throw an {@link InvalidDestinationException} if the * queue is not bound, possibly due to the subscription being closed. - * @throws JMSException on + * @throws JMSException on * @throws InvalidDestinationException */ private void unsubscribe(String name, boolean safe) throws JMSException { TopicSubscriberAdaptor<C> subscriber; - + _subscriberDetails.lock(); try { @@ -1896,11 +1916,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _subscriberDetails.unlock(); } - + if (subscriber != null) { subscriber.close(); - + // send a queue.delete for the subscription deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } @@ -1917,7 +1937,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe." + " Requesting queue deletion regardless."); } - + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else // Queue Browser @@ -1936,8 +1956,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } protected C createConsumerImpl(final Destination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, - final boolean noConsume, final boolean autoClose) throws JMSException + final int prefetchLow, final boolean noLocal, + final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -2111,7 +2132,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throws JMSException; public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; - + public abstract boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String,Object> args) throws JMSException; /** @@ -2844,14 +2865,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { declareExchange(amqd, nowait); } - + if (_delareQueues || amqd.isNameRequired()) { declareQueue(amqd, consumer.isNoLocal(), nowait); } - bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); + if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey())) + { + bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), + amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait); + } + } - + AMQShortString queueName = amqd.getAMQQueueName(); // store the consumer queue name @@ -2895,10 +2921,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) + throws AMQException; + public abstract void resolveAddress(AMQDestination dest, boolean isConsumer, boolean noLocal) throws AMQException; - + private void registerProducer(long producerId, MessageProducer producer) { _producers.put(new Long(producerId), producer); @@ -3189,7 +3218,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } - + public void run() { if (_dispatcherLogger.isDebugEnabled()) @@ -3304,7 +3333,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (updateRollbackMark(current, deliveryTag)) { _rollbackMark.compareAndSet(current, deliveryTag); - } + } } private void notifyConsumer(UnprocessedMessage message) @@ -3424,7 +3453,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { return super.isClosing() || _connection.isClosing(); } - + public boolean isDeclareExchanges() { return _declareExchanges; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index e2cfe0e27f..1baaff738b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -143,7 +143,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); private TimerTask flushTask = null; private RangeSet unacked = RangeSetFactory.createRangeSet(); - private int unackedCount = 0; + private int unackedCount = 0; /** * Used to store the range of in tx messages @@ -292,7 +292,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { flushAcknowledgments(false); } - + void flushAcknowledgments(boolean setSyncBit) { synchronized (unacked) @@ -310,7 +310,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { messageAcknowledge(ranges,accept,false); } - + void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit) { final Session ssn = getQpidSession(); @@ -354,15 +354,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (destination.getDestSyntax() == DestSyntax.BURL) { Map args = FieldTableSupport.convertToMap(arguments); - + for (AMQShortString rk: destination.getBindingKeys()) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + + _logger.debug("Binding queue : " + queueName.toString() + + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), + getQpidSession().exchangeBind(queueName.toString(), + exchangeName.toString(), + rk.toString(), args); } } @@ -371,10 +371,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. List<Binding> bindings = new ArrayList<Binding>(); bindings.addAll(destination.getNode().getBindings()); - + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; - + for (Binding binding: bindings) { // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. @@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } String queue = binding.getQueue() == null? queueName.asString(): binding.getQueue(); - - String exchange = binding.getExchange() == null ? + + String exchange = binding.getExchange() == null ? defaultExchange : binding.getExchange(); - - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, + getQpidSession().exchangeBind(queue, exchange, binding.getBindingKey(), - binding.getArgs()); + binding.getArgs()); } } - + if (!nowait) { // We need to sync so that we get notify of an error. @@ -561,20 +561,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException { return isQueueBound(exchangeName,queueName,routingKey,null); } - public boolean isQueueBound(final AMQDestination destination) throws JMSException + public boolean isQueueBound(final AMQDestination destination) { return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); } public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) - throws JMSException { - String rk = null; + String rk = null; if (bindingKeys != null && bindingKeys.length>0) { rk = bindingKeys[0].toString(); @@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { rk = routingKey.toString(); } - + return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); } - + public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) { boolean res; @@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } else - { + { if (args == null) { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult .getQueueNotMatched()); } else { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult .getQueueNotMatched() || bindingQueryResult.getArgsNotMatched()); } } return res; } + @Override + protected boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) + { + return isQueueBound(exchangeName, amqQueueName, routingKey); + } + /** * This method is invoked when a consumer is created * Registers the consumer with the broker @@ -730,7 +734,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } /** - * deletes an exchange + * deletes an exchange */ public void sendExchangeDelete(final String name, final boolean nowait) throws AMQException, FailoverException @@ -763,12 +767,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } if (amqd.getDestSyntax() == DestSyntax.BURL) - { + { Map<String,Object> arguments = new HashMap<String,Object>(); if (noLocal) - { + { arguments.put(AddressHelper.NO_LOCAL, true); - } + } getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, @@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic arguments, node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, - node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); } // passive --> false @@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic try { long capacity = consumer.getCapacity(); - + if (capacity == 0) { if (consumer.getMessageListener() != null) @@ -1090,20 +1094,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { return AMQMessageDelegateFactory.FACTORY_0_10; } - + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); - match = !result.getNotFound(); + match = !result.getNotFound(); Node node = dest.getNode(); - + if (match) { if (assertNode) { - match = (result.getDurable() == node.isDurable()) && - (node.getExchangeType() != null && + match = (result.getDurable() == node.isDurable()) && + (node.getExchangeType() != null && node.getExchangeType().equals(result.getType())) && (matchProps(result.getArguments(),node.getDeclareArgs())); } @@ -1125,7 +1129,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { boolean match = true; @@ -1137,7 +1141,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (match && assertNode) { - match = (result.getDurable() == node.isDurable()) && + match = (result.getDurable() == node.isDurable()) && (result.getAutoDelete() == node.isAutoDelete()) && (result.getExclusive() == node.isExclusive()) && (matchProps(result.getArguments(),node.getDeclareArgs())); @@ -1165,17 +1169,17 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } return match; } - + private boolean matchProps(Map<String,Object> target,Map<String,Object> source) { boolean match = true; for (String key: source.keySet()) { - match = target.containsKey(key) && + match = target.containsKey(key) && target.get(key).equals(source.get(key)); - - if (!match) - { + + if (!match) + { StringBuffer buf = new StringBuffer(); buf.append("Property given in address did not match with the args sent by the broker."); buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); @@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } } - + return match; } /** * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, + * 2. if type == queue, * 2.1 verify queue exists or create if create == true * 2.2 If not throw exception - * + * * 3. if type == exchange, * 3.1 verify exchange exists or create if create == true * 3.2 if not throw exception * 3.3 if exchange exists (or created) create subscription queue. */ - + @SuppressWarnings("deprecation") public void resolveAddress(AMQDestination dest, boolean isConsumer, @@ -1211,21 +1215,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } else { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || + boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || (!isConsumer && dest.getAssert() == AddressOption.SENDER); - + boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - + + + int type = resolveAddressType(dest); - + switch (type) { - case AMQDestination.QUEUE_TYPE: + case AMQDestination.QUEUE_TYPE: { if(createNode) { @@ -1239,24 +1243,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic break; } } - - case AMQDestination.TOPIC_TYPE: + + case AMQDestination.TOPIC_TYPE: { if(createNode) - { + { setLegacyFiledsForTopicType(dest); verifySubject(dest); handleExchangeNodeCreation(dest); break; } else if (isExchangeExist(dest,assertNode)) - { + { setLegacyFiledsForTopicType(dest); verifySubject(dest); break; } } - + default: throw new AMQException( "The name '" + dest.getAddressName() + @@ -1265,7 +1269,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setAddressResolved(System.currentTimeMillis()); } } - + public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); @@ -1292,14 +1296,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } dest.setAddressType(type); return type; - } + } } - + private void verifySubject(AMQDestination dest) throws AMQException { if (dest.getSubject() == null || dest.getSubject().trim().equals("")) { - + if ("topic".equals(dest.getExchangeClass().toString())) { dest.setRoutingKey(new AMQShortString("#")); @@ -1364,12 +1368,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic // legacy support dest.setExchangeName(new AMQShortString(dest.getAddressName())); Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? + dest.setExchangeClass(node.getExchangeType() == null? ExchangeDefaults.TOPIC_EXCHANGE_CLASS: - new AMQShortString(node.getExchangeType())); + new AMQShortString(node.getExchangeType())); dest.setRoutingKey(new AMQShortString(dest.getSubject())); } - + protected void acknowledgeImpl() { RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); @@ -1412,7 +1416,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); } - + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); RangeSet all = RangeSetFactory.createRangeSet(delivered.size() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3097b33da3..9a9da62f2a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // thread. // We can't close the session if we are already in the process of // closing/closed the connection. - + if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED) || getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING))) { @@ -381,10 +381,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { public AMQMethodEvent execute() throws AMQException, FailoverException { - AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody - (exchangeName, routingKey, queueName).generateFrame(getChannelId()); - - return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + return sendExchangeBound(exchangeName, routingKey, queueName); } }, getAMQConnection()).execute(); @@ -398,7 +395,38 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e); } - } + } + + @Override + protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws AMQException + { + + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, routingKey, queueName); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.getReplyCode() == 0); + } + + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, + AMQShortString routingKey, + AMQShortString queueName) throws AMQException, FailoverException + { + AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody + (exchangeName, routingKey, queueName).generateFrame(getChannelId()); + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, @@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe JMSException ex = new JMSException("Error creating producer"); ex.initCause(e); ex.setLinkedException(e); - + throw ex; } } @@ -609,7 +637,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); - + return null; } }, getAMQConnection()).execute(); @@ -671,7 +699,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe false, null).generateFrame(getChannelId()); QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); return okHandler.getMessageCount(); } @@ -689,9 +717,9 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe { return AMQMessageDelegateFactory.FACTORY_0_8; } - + public void sync() throws AMQException - { + { declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } @@ -702,10 +730,10 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe throw new UnsupportedOperationException("The new addressing based syntax is " + "not supported for AMQP 0-8/0-9 versions"); } - + protected void flushAcknowledgments() { - + } @Override @@ -744,7 +772,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for AMQStateManager manager = getProtocolHandler().getStateManager(); - + Exception e = manager.getLastException(); if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && e != null) @@ -752,15 +780,15 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe if (e instanceof AMQException) { return (AMQException) e; - } + } else { AMQException amqe = new AMQException(AMQConstant - .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), + .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), e.getMessage(), e.getCause()); return amqe; } - } + } else { return null; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 96cd209447..d78e725a5d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -42,7 +42,7 @@ public class AMQTopic extends AMQDestination implements Topic { super(address); } - + protected AMQTopic() { super(); @@ -89,6 +89,12 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } + + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + { + super(exchangeName, exchangeClass, name, true, isAutoDelete, queueName, isDurable); + } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) { @@ -114,10 +120,10 @@ public class AMQTopic extends AMQDestination implements Topic AMQTopic t = new AMQTopic(qpidTopic.getAddress()); AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); + t.getLink().setName(queueName.asString()); t.getLink().getSubscriptionQueue().setAutoDelete(false); t.getLink().setDurable(true); - + // The legacy fields are also populated just in case. t.setQueueName(queueName); t.setAutoDelete(false); @@ -134,7 +140,7 @@ public class AMQTopic extends AMQDestination implements Topic } else { - return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false, + return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getExchangeClass(), qpidTopic.getRoutingKey(), false, getDurableTopicQueueName(subscriptionName, connection), true); } @@ -165,7 +171,7 @@ public class AMQTopic extends AMQDestination implements Topic return null; } } - + @Override public AMQShortString getExchangeName() { @@ -181,9 +187,9 @@ public class AMQTopic extends AMQDestination implements Topic public AMQShortString getRoutingKey() { - if (super.getRoutingKey() != null) + if (super.getRoutingKey() != null) { - return super.getRoutingKey(); + return super.getRoutingKey(); } else if (getSubject() != null) { |
