diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2010-01-31 00:31:49 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2010-01-31 00:31:49 +0000 |
| commit | 2b8bb96fca71909d1dc185e1f62ee5fdaad02abd (patch) | |
| tree | 919f0119bd3d23d97b497c5fa486121d4b5e286d /java/client | |
| parent | f038a9ea62f563979678c2f1251d1eda82f1f20f (diff) | |
| download | qpid-python-2b8bb96fca71909d1dc185e1f62ee5fdaad02abd.tar.gz | |
QPID-2379 : Initial work on adding QMF and federation to the Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQDestination.java | 15 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 85 |
2 files changed, 46 insertions, 54 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 3f2c1af5c2..311ef1f486 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -47,6 +47,8 @@ public abstract class AMQDestination implements Destination, Referenceable protected final boolean _isAutoDelete; + private final boolean _browseOnly; + private AMQShortString _queueName; private AMQShortString _routingKey; @@ -82,6 +84,7 @@ public abstract class AMQDestination implements Destination, Referenceable _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); + _browseOnly = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_BROWSE)); _queueName = binding.getQueueName() == null ? null : binding.getQueueName(); _routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey(); _bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys(); @@ -122,6 +125,12 @@ public abstract class AMQDestination implements Destination, Referenceable protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) { + this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,bindingKeys, false); + } + + protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly) + { if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) || ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass)) && routingKey == null) @@ -144,6 +153,7 @@ public abstract class AMQDestination implements Destination, Referenceable _queueName = queueName; _isDurable = isDurable; _bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys; + _browseOnly = browseOnly; } public AMQShortString getEncodedName() @@ -502,4 +512,9 @@ public abstract class AMQDestination implements Destination, Referenceable return new AMQAnyDestination(binding); } } + + public boolean isBrowseOnly() + { + return _browseOnly; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index a9142ff4e4..9f934d1055 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,52 +20,14 @@ */ package org.apache.qpid.client; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.jms.TransactionRolledBackException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; @@ -92,8 +54,23 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.thread.Threading; import org.apache.qpid.url.AMQBindingURL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * <p/><table id="crc"><caption>CRC Card</caption> @@ -734,7 +711,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } - //if we don't have an exception then we can perform closing operations + //if we don't have an exception then we can perform closing operations _closing.set(e == null); if (!_closed.getAndSet(true)) @@ -905,7 +882,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null, - false, false); + ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public C createExclusiveConsumer(Destination destination) throws JMSException @@ -913,7 +890,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null, - false, false); + ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException @@ -921,7 +898,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), - messageSelector, null, false, false); + messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -930,7 +907,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkValidDestination(destination); return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic), - messageSelector, null, false, false); + messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) @@ -947,7 +924,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -955,7 +932,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, @@ -963,7 +940,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false); + return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, @@ -971,7 +948,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkValidDestination(destination); - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false, + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false); } @@ -1526,7 +1503,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic sendRecover(); markClean(); - + if (!isSuspended) { suspendChannel(false); @@ -1599,7 +1576,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // should be rolled back(reject/release) _rollbackMark.set(_highestDeliveryTag.get()); - syncDispatchQueue(); + syncDispatchQueue(); _dispatcher.rollback(); |
