summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2010-01-31 00:31:49 +0000
committerRobert Godfrey <rgodfrey@apache.org>2010-01-31 00:31:49 +0000
commit2b8bb96fca71909d1dc185e1f62ee5fdaad02abd (patch)
tree919f0119bd3d23d97b497c5fa486121d4b5e286d /java/client
parentf038a9ea62f563979678c2f1251d1eda82f1f20f (diff)
downloadqpid-python-2b8bb96fca71909d1dc185e1f62ee5fdaad02abd.tar.gz
QPID-2379 : Initial work on adding QMF and federation to the Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@904934 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQDestination.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java85
2 files changed, 46 insertions, 54 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index 3f2c1af5c2..311ef1f486 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -47,6 +47,8 @@ public abstract class AMQDestination implements Destination, Referenceable
protected final boolean _isAutoDelete;
+ private final boolean _browseOnly;
+
private AMQShortString _queueName;
private AMQShortString _routingKey;
@@ -82,6 +84,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+ _browseOnly = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_BROWSE));
_queueName = binding.getQueueName() == null ? null : binding.getQueueName();
_routingKey = binding.getRoutingKey() == null ? null : binding.getRoutingKey();
_bindingKeys = binding.getBindingKeys() == null || binding.getBindingKeys().length == 0 ? new AMQShortString[0] : binding.getBindingKeys();
@@ -122,6 +125,12 @@ public abstract class AMQDestination implements Destination, Referenceable
protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys)
{
+ this (exchangeName, exchangeClass, routingKey, isExclusive,isAutoDelete,queueName,isDurable,bindingKeys, false);
+ }
+
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys, boolean browseOnly)
+ {
if ( (ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(exchangeClass) ||
ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exchangeClass))
&& routingKey == null)
@@ -144,6 +153,7 @@ public abstract class AMQDestination implements Destination, Referenceable
_queueName = queueName;
_isDurable = isDurable;
_bindingKeys = bindingKeys == null || bindingKeys.length == 0 ? new AMQShortString[0] : bindingKeys;
+ _browseOnly = browseOnly;
}
public AMQShortString getEncodedName()
@@ -502,4 +512,9 @@ public abstract class AMQDestination implements Destination, Referenceable
return new AMQAnyDestination(binding);
}
}
+
+ public boolean isBrowseOnly()
+ {
+ return _browseOnly;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index a9142ff4e4..9f934d1055 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,52 +20,14 @@
*/
package org.apache.qpid.client;
-import java.io.Serializable;
-import java.net.URISyntaxException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -92,8 +54,23 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.url.AMQBindingURL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import java.io.Serializable;
+import java.net.URISyntaxException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -734,7 +711,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
- //if we don't have an exception then we can perform closing operations
+ //if we don't have an exception then we can perform closing operations
_closing.set(e == null);
if (!_closed.getAndSet(true))
@@ -905,7 +882,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic), null, null,
- false, false);
+ ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public C createExclusiveConsumer(Destination destination) throws JMSException
@@ -913,7 +890,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, true, null, null,
- false, false);
+ ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -921,7 +898,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, false, (destination instanceof Topic),
- messageSelector, null, false, false);
+ messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -930,7 +907,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkValidDestination(destination);
return createConsumerImpl(destination, _prefetchHighMark, _prefetchLowMark, noLocal, (destination instanceof Topic),
- messageSelector, null, false, false);
+ messageSelector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -947,7 +924,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -955,7 +932,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
@@ -963,7 +940,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
+ return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()), false);
}
public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -971,7 +948,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, false,
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, rawSelector, ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly()),
false);
}
@@ -1526,7 +1503,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
sendRecover();
markClean();
-
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1599,7 +1576,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// should be rolled back(reject/release)
_rollbackMark.set(_highestDeliveryTag.get());
- syncDispatchQueue();
+ syncDispatchQueue();
_dispatcher.rollback();