diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-02-27 05:15:20 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-02-27 05:15:20 +0000 |
| commit | e4218487f49a2685bc58da7994f1d8477ccae393 (patch) | |
| tree | d802452d6845e07e86469c510a66302fe40c5a88 | |
| parent | 4ea88f4335ca9d99952cb87baca80ca7b1d7b402 (diff) | |
| download | qpid-python-e4218487f49a2685bc58da7994f1d8477ccae393.tar.gz | |
This contains the ground work for QPID-803.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@631486 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 240 insertions, 83 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 0e36a09bbd..0cf1cb32a1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client; +import java.net.URISyntaxException; + import javax.jms.Destination; import javax.naming.NamingException; import javax.naming.Reference; @@ -31,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; public abstract class AMQDestination implements Destination, Referenceable @@ -50,6 +51,8 @@ public abstract class AMQDestination implements Destination, Referenceable private AMQShortString _routingKey; + private AMQShortString[] _bindingKeys; + private String _url; private AMQShortString _urlAsShortString; @@ -64,7 +67,7 @@ public abstract class AMQDestination implements Destination, Referenceable public static final Integer TOPIC_TYPE = Integer.valueOf(2); public static final Integer UNKNOWN_TYPE = Integer.valueOf(3); - protected AMQDestination(String url) throws URLSyntaxException + protected AMQDestination(String url) throws URISyntaxException { this(new AMQBindingURL(url)); } @@ -79,26 +82,43 @@ public abstract class AMQDestination implements Destination, Referenceable _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName() == null ? null : new AMQShortString(binding.getQueueName()); _routingKey = binding.getRoutingKey() == null ? null : new AMQShortString(binding.getRoutingKey()); + _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName) { - this(exchangeName, exchangeClass, routingKey, false, false, queueName); + this(exchangeName, exchangeClass, routingKey, false, false, queueName, null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, AMQShortString queueName, AMQShortString[] bindingKeys) + { + this(exchangeName, exchangeClass, routingKey, false, false, queueName,bindingKeys); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString destinationName) { - this(exchangeName, exchangeClass, destinationName, false, false, null); + this(exchangeName, exchangeClass, destinationName, false, false, null,null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName) + { + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,null); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName) + boolean isAutoDelete, AMQShortString queueName,AMQShortString[] bindingKeys) { - this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false); + this(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, false,bindingKeys); } protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + boolean isAutoDelete, AMQShortString queueName, boolean isDurable){ + this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,null); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) { // If used with a fannout exchange, the routing key can be null if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) && routingKey == null) @@ -120,6 +140,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isAutoDelete = isAutoDelete; _queueName = queueName; _isDurable = isDurable; + _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; } public AMQShortString getEncodedName() @@ -181,6 +202,20 @@ public abstract class AMQDestination implements Destination, Referenceable return _routingKey; } + public AMQShortString[] getBindingKeys() + { + if (_bindingKeys != null && _bindingKeys.length > 0) + { + return _bindingKeys; + } + else + { + // catering to the common use case where the + //routingKey is the same as the bindingKey. + return new AMQShortString[]{_routingKey}; + } + } + public boolean isExclusive() { return _isExclusive; @@ -239,6 +274,21 @@ public abstract class AMQDestination implements Destination, Referenceable sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); } + if (_bindingKeys != null && _bindingKeys.length>0) + { + sb.append(BindingURL.OPTION_BINDING_KEY); + sb.append("='"); + for (AMQShortString bindingKey:_bindingKeys) + { + + sb.append(bindingKey).append(","); + + } + sb.deleteCharAt(sb.length() - 1); + sb.append("'"); + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + if (_isDurable) { sb.append(BindingURL.OPTION_DURABLE); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java index 924b20e3ba..78b01add14 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java @@ -61,8 +61,14 @@ public class AMQQueue extends AMQDestination implements Queue public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) { super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, - false, queueName, false); } + false, queueName, false); + } + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + { + super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, false, + false, queueName, false,bindingKeys); + } /** * Create a reference to a non temporary queue. Note this does not actually imply the queue exists. @@ -126,11 +132,15 @@ public class AMQQueue extends AMQDestination implements Queue this(exchangeName, routingKey, queueName, exclusive, autoDelete, false); } - public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable) { + this(exchangeName,routingKey,queueName,exclusive,autoDelete,durable,null); + } + + public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean durable,AMQShortString[] bindingKeys) + { super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS, routingKey, exclusive, - autoDelete, queueName, durable); + autoDelete, queueName, durable, bindingKeys); } public AMQShortString getRoutingKey() diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b16c2682de..5a6d145295 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import java.io.Serializable; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Iterator; @@ -29,6 +30,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -80,12 +82,9 @@ import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicLong; - /** * * <p/><table id="crc"><caption>CRC Card</caption> @@ -407,14 +406,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * @todo Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException + final AMQShortString exchangeName,final AMQDestination destination) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName,routingKey,arguments,exchangeName); + sendQueueBind(queueName,routingKey,arguments,exchangeName,destination); return null; } }, _connection).execute(); @@ -425,12 +424,12 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { if( consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName()); + bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(),amqd); } } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException, FailoverException; + final AMQShortString exchangeName,AMQDestination destination) throws AMQException, FailoverException; /** @@ -820,7 +819,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { return new AMQQueue(new AMQBindingURL(queueName)); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1031,7 +1030,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { return new AMQTopic(new AMQBindingURL(topicName)); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { JMSException jmse = new JMSException(urlse.getReason()); jmse.setLinkedException(urlse); @@ -1165,7 +1164,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); AMQShortString queueName = declareQueue(amqd, protocolHandler); - bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName()); + bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(),amqd); } /** @@ -1556,6 +1555,9 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException; + + public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; + /** * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover * when the client has veoted resubscription. <p/> The caller of this method must already hold the failover mutex. @@ -2111,7 +2113,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess consumer.setQueuename(queueName); // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName()); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(),amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 407f1f3786..8039e3a163 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import javax.jms.*; import javax.jms.IllegalStateException; + import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; @@ -159,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); - + _subscriptions.put(name, subscriber); _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); @@ -213,7 +214,7 @@ public class AMQSession_0_10 extends AMQSession * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName) + final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -222,7 +223,12 @@ public class AMQSession_0_10 extends AMQSession { args.put("x-match", "any"); } - getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), routingKey.toString(), args); + + for (AMQShortString rk: destination.getBindingKeys()) + { + _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); + getQpidSession().queueBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + } // We need to sync so that we get notify of an error. getQpidSession().sync(); getCurrentException(); @@ -238,6 +244,7 @@ public class AMQSession_0_10 extends AMQSession */ public void sendClose(long timeout) throws AMQException, FailoverException { + getQpidSession().sync(); getQpidSession().sessionClose(); getCurrentException(); } @@ -350,19 +357,37 @@ public class AMQSession_0_10 extends AMQSession /** * Bind a queue with an exchange. */ - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, - final AMQShortString routingKey) throws JMSException + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException + { + return isQueueBound(exchangeName,queueName,routingKey,null); + } + + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); + } + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) + throws JMSException { String rk = ""; boolean res; - if (routingKey != null) + if (bindingKeys != null && bindingKeys.length>0) + { + rk = bindingKeys[0].toString(); + } + else if (routingKey != null) { rk = routingKey.toString(); } + Future<BindingQueryResult> result = - getQpidSession().bindingQuery(exchangeName.toString(), queueName.toString(), rk, null); + getQpidSession().bindingQuery(exchangeName.toString(),queueName.toString(), rk, null); BindingQueryResult bindingQueryResult = result.get(); - if (routingKey == null) + + if (rk == null) { res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); } @@ -577,7 +602,7 @@ public class AMQSession_0_10 extends AMQSession { // this is done so that we can produce to a temporary queue beofre we create a consumer sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); - sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName()); + sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result); result.setQueueName(result.getRoutingKey()); } catch (Exception e) @@ -701,7 +726,7 @@ public class AMQSession_0_10 extends AMQSession AMQShortString topicName; if (topic instanceof AMQTopic) { - topicName=((AMQTopic) topic).getRoutingKey(); + topicName=((AMQTopic) topic).getBindingKeys()[0]; } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 0e5786da1e..a087f9e02a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -94,7 +94,7 @@ public class AMQSession_0_8 extends AMQSession } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName) throws AMQException, FailoverException + final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException { AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments exchangeName, // exchange @@ -178,6 +178,11 @@ public class AMQSession_0_8 extends AMQSession } } + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName()); + } + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index 9e7721310e..fcc36cd3fd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -49,6 +49,11 @@ public class AMQTopic extends AMQDestination implements Topic super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); } + public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + { + super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false,bindingKeys); + } + public AMQTopic(AMQConnection conn, String routingKey) { this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); @@ -77,6 +82,11 @@ public class AMQTopic extends AMQDestination implements Topic super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable ); } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) + { + super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); + } public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index be7e2363f4..97a631aa18 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -121,8 +121,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } catch (AMQException e) { + _logger.error("Receivecd an Exception when receiving message",e); try { + getSession().getAMQConnection().getExceptionListener() .onException(new JMSAMQException("Error when receiving message", e)); } @@ -134,6 +136,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } if (messageOk) { + _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage, channelId); } } @@ -331,6 +334,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _logger.debug("filterMessage - trying to acquire message"); } messageOk = acquireMessage(message); + _logger.debug("filterMessage - *************************************"); + _logger.debug("filterMessage - message acquire status : " + messageOk); + _logger.debug("filterMessage - *************************************"); } return messageOk; } @@ -392,13 +398,29 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By _0_10session.getQpidSession() .messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); + + _logger.debug("acquireMessage, sent acquire message to broker"); + _0_10session.getQpidSession().sync(); + + _logger.debug("acquireMessage, returned from sync"); + RangeSet acquired = _0_10session.getQpidSession().getAccquiredMessages(); + + _logger.debug("acquireMessage, acquired range set " + acquired); + if (acquired != null && acquired.size() > 0) { result = true; } + + _logger.debug("acquireMessage, Trying to get current exception "); + _0_10session.getCurrentException(); + + _logger.debug("acquireMessage, returned from getting current exception "); + + _logger.debug("acquireMessage, acquired range set " + acquired + " now returning " ); } return result; } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 42c1d687cb..c6baf0b0fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -17,22 +17,22 @@ */ package org.apache.qpid.client; -import org.apache.qpid.client.protocol.AMQProtocolHandler; +import java.io.IOException; +import java.net.URISyntaxException; + +import javax.jms.JMSException; +import javax.jms.Message; + import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.FiledTableSupport; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpidity.njms.ExceptionHelper; import org.apache.qpidity.nclient.util.ByteBufferMessage; -import org.apache.qpidity.transport.ReplyTo; +import org.apache.qpidity.njms.ExceptionHelper; import org.apache.qpidity.transport.DeliveryProperties; - -import javax.jms.Message; -import javax.jms.JMSException; -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.qpidity.transport.ReplyTo; /** * This is a 0_10 message producer. @@ -154,12 +154,20 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer String replyToURL = contentHeaderProperties.getReplyToAsString(); if (replyToURL != null) { + if(_logger.isDebugEnabled()) + { + StringBuffer b = new StringBuffer(); + b.append("\n=========================="); + b.append("\nReplyTo : " + replyToURL); + b.append("\n=========================="); + _logger.debug(b.toString()); + } AMQBindingURL dest; try { dest = new AMQBindingURL(replyToURL); } - catch (URLSyntaxException e) + catch (URISyntaxException e) { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } @@ -198,8 +206,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), - destination.getRoutingKey()); + return _session.isQueueBound(destination); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index e3ca6b5de1..ddedb0e913 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -20,29 +20,34 @@ */ package org.apache.qpid.client.message; -import org.apache.commons.collections.map.ReferenceMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Map; +import java.util.UUID; -import org.apache.mina.common.ByteBuffer; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; +import org.apache.commons.collections.map.ReferenceMap; +import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; -import org.apache.qpid.client.*; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.CustomJMSXProperty; +import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; - -import java.util.Collections; -import java.util.Enumeration; -import java.util.Map; -import java.util.UUID; -import java.io.IOException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { @@ -222,7 +227,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach BindingURL binding = new AMQBindingURL(replyToEncoding); dest = AMQDestination.createDestination(binding); } - catch (URLSyntaxException e) + catch (URISyntaxException e) { throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e); } diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index 9bf9fa5e64..42a168bad8 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -20,6 +20,24 @@ */ package org.apache.qpid.jndi; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; + import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQHeadersExchange; @@ -30,27 +48,9 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - public class PropertiesFileInitialContextFactory implements InitialContextFactory { protected final Logger _logger = LoggerFactory.getLogger(PropertiesFileInitialContextFactory.class); @@ -99,7 +99,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL)); } - + createConnectionFactories(data, environment); @@ -185,6 +185,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor Topic t = createTopic(entry.getValue().toString()); if (t != null) { + if (_logger.isDebugEnabled()) + { + StringBuffer b = new StringBuffer(); + b.append("Creating the topic: " + jndiName + " with the following binding keys "); + for (AMQShortString binding:((AMQTopic)t).getBindingKeys()) + { + b.append(binding.toString()).append(","); + } + + _logger.debug(b.toString()); + } data.put(jndiName, t); } } @@ -218,9 +229,9 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { binding = new AMQBindingURL(bindingURL); } - catch (URLSyntaxException urlse) + catch (URISyntaxException urlse) { - _logger.warn("Unable to destination:" + urlse); + _logger.warn("Unable to create destination:" + urlse, urlse); return null; } @@ -269,7 +280,17 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor } else if (value instanceof String) { - return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, new AMQShortString((String) value)); + String[] keys = ((String)value).split(","); + AMQShortString[] bindings = new AMQShortString[keys.length]; + int i = 0; + for (String key:keys) + { + bindings[i] = new AMQShortString(key); + i++; + } + // The Destination has a dual nature. If this was used for a producer the key is used + // for the routing key. If it was used for the consumer it becomes the bindingKey + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME,bindings[0],null,bindings); } else if (value instanceof BindingURL) { |
