summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java86
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java286
2 files changed, 232 insertions, 140 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ceca43b785..10101976eb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -209,6 +209,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private final boolean _strictAMQP;
+ /** System property to enable strickt AMQP compliance */
+ public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+ /** Strickt AMQP default */
+ public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+ private final boolean _strictAMQPFATAL;
/** System property to enable immediate message prefetching */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
@@ -436,6 +442,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+ _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
_immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
_connection = con;
@@ -924,7 +931,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
getProtocolMajorVersion(),
getProtocolMinorVersion(),
false)); // requeue
- _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
{
@@ -1212,13 +1219,30 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final int prefetchLow,
final boolean noLocal,
final boolean exclusive,
- final String selector,
+ String selector,
final FieldTable rawSelector,
final boolean noConsume,
final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
+ final String messageSelector;
+
+ if (_strictAMQP && !(selector == null || selector.equals("")))
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
+ }
+ else
+ {
+ messageSelector = null;
+ }
+ }
+ else
+ {
+ messageSelector = selector;
+ }
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
@@ -1229,6 +1253,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQDestination amqd = (AMQDestination) destination;
final AMQProtocolHandler protocolHandler = getProtocolHandler();
+ // TODO: Define selectors in AMQP
// TODO: construct the rawSelector from the selector string if rawSelector == null
final FieldTable ft = FieldTableFactory.newFieldTable();
//if (rawSelector != null)
@@ -1237,7 +1262,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
ft.addAll(rawSelector);
}
- BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
+
+ BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode, noConsume, autoClose);
@@ -1630,6 +1656,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
+
+
checkNotClosed();
AMQTopic origTopic = checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
@@ -1657,13 +1685,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
topicName = new AMQShortString(topic.getTopicName());
}
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
- !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+
+ if (_strictAMQP)
{
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
deleteQueue(dest.getAMQQueueName());
}
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+ !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
}
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
@@ -1761,13 +1807,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
+ if (_strictAMQP)
{
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
+ + " Requesting queue deletion regardless.");
+ }
+
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else
{
- throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+
+ if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
+ {
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ }
+ else
+ {
+ throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+ }
}
}
}
@@ -1779,10 +1843,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
- if (isStrictAMQP())
- {
- throw new UnsupportedOperationException();
- }
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
diff --git a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
index e0c4b61333..a219f7d655 100644
--- a/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
@@ -9,119 +9,141 @@ import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.InvalidDestinationException;
-public class QueueSenderAdapter implements QueueSender {
-
- private BasicMessageProducer _delegate;
- private Queue _queue;
- private boolean closed = false;
-
- public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){
- _delegate = msgProducer;
- _queue = queue;
- }
-
- public Queue getQueue() throws JMSException {
- checkPreConditions();
- return _queue;
- }
-
- public void send(Message msg) throws JMSException {
- checkPreConditions();
- _delegate.send(msg);
- }
-
- public void send(Queue queue, Message msg) throws JMSException {
- checkPreConditions(queue);
- _delegate.send(queue, msg);
- }
-
- public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
- checkPreConditions();
- _delegate.send(msg, deliveryMode,priority,timeToLive);
- }
-
- public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
- checkPreConditions(queue);
- _delegate.send(queue,msg, deliveryMode,priority,timeToLive);
- }
-
- public void close() throws JMSException {
- _delegate.close();
- closed = true;
- }
-
- public int getDeliveryMode() throws JMSException {
- checkPreConditions();
- return _delegate.getDeliveryMode();
- }
-
- public Destination getDestination() throws JMSException {
- checkPreConditions();
- return _delegate.getDestination();
- }
-
- public boolean getDisableMessageID() throws JMSException {
- checkPreConditions();
- return _delegate.getDisableMessageID();
- }
-
- public boolean getDisableMessageTimestamp() throws JMSException {
- checkPreConditions();
- return _delegate.getDisableMessageTimestamp();
- }
-
- public int getPriority() throws JMSException {
- checkPreConditions();
- return _delegate.getPriority();
- }
-
- public long getTimeToLive() throws JMSException {
- checkPreConditions();
- return _delegate.getTimeToLive();
- }
-
- public void send(Destination dest, Message msg) throws JMSException {
- checkPreConditions((Queue)dest);
- _delegate.send(dest,msg);
- }
-
- public void send(Message msg, int deliveryMode, int priority, long timeToLive)
- throws JMSException {
- checkPreConditions();
- _delegate.send(msg, deliveryMode,priority,timeToLive);
- }
-
- public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
- checkPreConditions((Queue)dest);
- _delegate.send(dest,msg, deliveryMode,priority,timeToLive);
- }
-
- public void setDeliveryMode(int deliveryMode) throws JMSException {
- checkPreConditions();
- _delegate.setDeliveryMode(deliveryMode);
- }
-
- public void setDisableMessageID(boolean disableMessageID) throws JMSException {
- checkPreConditions();
- _delegate.setDisableMessageID(disableMessageID);
- }
-
- public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
- checkPreConditions();
- _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
- }
-
- public void setPriority(int priority) throws JMSException {
- checkPreConditions();
- _delegate.setPriority(priority);
- }
-
- public void setTimeToLive(long timeToLive) throws JMSException {
- checkPreConditions();
- _delegate.setTimeToLive(timeToLive);
- }
+public class QueueSenderAdapter implements QueueSender
+{
+
+ private BasicMessageProducer _delegate;
+ private Queue _queue;
+ private boolean closed = false;
+
+ public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
+ {
+ _delegate = msgProducer;
+ _queue = queue;
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ checkPreConditions();
+ return _queue;
+ }
+
+ public void send(Message msg) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.send(msg);
+ }
+
+ public void send(Queue queue, Message msg) throws JMSException
+ {
+ checkPreConditions(queue);
+ _delegate.send(queue, msg);
+ }
+
+ public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ checkPreConditions();
+ _delegate.send(msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ checkPreConditions(queue);
+ _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void close() throws JMSException
+ {
+ _delegate.close();
+ closed = true;
+ }
+
+ public int getDeliveryMode() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDeliveryMode();
+ }
+
+ public Destination getDestination() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDestination();
+ }
+
+ public boolean getDisableMessageID() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDisableMessageID();
+ }
+
+ public boolean getDisableMessageTimestamp() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getDisableMessageTimestamp();
+ }
+
+ public int getPriority() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getPriority();
+ }
+
+ public long getTimeToLive() throws JMSException
+ {
+ checkPreConditions();
+ return _delegate.getTimeToLive();
+ }
+
+ public void send(Destination dest, Message msg) throws JMSException
+ {
+ checkPreConditions((Queue) dest);
+ _delegate.send(dest, msg);
+ }
+
+ public void send(Message msg, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
+ {
+ checkPreConditions();
+ _delegate.send(msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ checkPreConditions((Queue) dest);
+ _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
+ }
+
+ public void setDeliveryMode(int deliveryMode) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setDeliveryMode(deliveryMode);
+ }
+
+ public void setDisableMessageID(boolean disableMessageID) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setDisableMessageID(disableMessageID);
+ }
+
+ public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+ }
+
+ public void setPriority(int priority) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setPriority(priority);
+ }
+
+ public void setTimeToLive(long timeToLive) throws JMSException
+ {
+ checkPreConditions();
+ _delegate.setTimeToLive(timeToLive);
+ }
private void checkPreConditions() throws JMSException
{
@@ -130,31 +152,41 @@ public class QueueSenderAdapter implements QueueSender {
private void checkPreConditions(Queue queue) throws JMSException
{
- if (closed){
- throw new javax.jms.IllegalStateException("Publisher is closed");
- }
-
- AMQSession session = ((BasicMessageProducer) _delegate).getSession();
-
- if(session == null || session.isClosed()){
- throw new javax.jms.IllegalStateException("Invalid Session");
- }
-
- if(!(queue instanceof AMQDestination))
+ if (closed)
+ {
+ throw new javax.jms.IllegalStateException("Publisher is closed");
+ }
+
+ AMQSession session = ((BasicMessageProducer) _delegate).getSession();
+
+ if (session == null || session.isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Invalid Session");
+ }
+
+ if (!(queue instanceof AMQDestination))
{
throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
}
AMQDestination destination = (AMQDestination) queue;
- if(!destination.isValidated() && checkQueueBeforePublish())
+ if (!destination.isValidated() && checkQueueBeforePublish())
{
- if (_delegate.isBound(destination))
+ if (_delegate.getSession().isStrictAMQP())
{
+ _delegate._logger.warn("AMQP does not support destination validation before publish, ");
destination.setValidated(true);
}
else
{
- throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server");
+ if (_delegate.isBound(destination))
+ {
+ destination.setValidated(true);
+ }
+ else
+ {
+ throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server");
+ }
}
}
}