diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 10:32:50 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-08-06 10:32:50 +0000 |
| commit | d5865c18ddfa24f32ad47a6628f16e1cb5028f8f (patch) | |
| tree | 556d8c2f728c4993a79c69730454ff4863d025c8 /java/client | |
| parent | 8e159b2051510728f64f31c6b06e322cb2d7974d (diff) | |
| download | qpid-python-d5865c18ddfa24f32ad47a6628f16e1cb5028f8f.tar.gz | |
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563097 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
12 files changed, 600 insertions, 162 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Session.java b/java/client/src/main/java/org/apache/qpid/nclient/Session.java index f94d26b854..46cc2f340c 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Session.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Session.java @@ -77,23 +77,24 @@ public interface Session //------------------------------------------------------ /** * Transfer the given message to a specified exchange. - * <p> Following are the valid options for messageTransfer - * <ul> - * <li> CONFIRM - * <li> PRE_ACCQUIRE - * </ul> - * <p> In the absence of a particular option, the defaul value is: - * <ul> - * <li> CONFIRM = false - * <li> NO-ACCQUIRE - * </ul> * - * @param exchange The exchange the message is being sent. - * @param msg The Message to be sent - * @param options A list of valid options + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param exchange The exchange the message is being sent. + * @param msg The Message to be sent * @throws QpidException If the session fails to send the message due to some error */ - public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException; + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException; /** * Declare the beginning of a message transfer operation. This operation must @@ -103,31 +104,31 @@ public interface Session * <p> In the interval [messageTransfer endData] any attempt to call a method other than * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close} * will result in an exception being thrown. - * <p> Following are the valid options for messageTransfer - * <ul> - * <li> CONFIRM - * <li> PRE_ACCQUIRE - * </ul> - * <p> In the absence of a particular option, the defaul value is: - * <ul> - * <li> CONFIRM = false - * <li> NO-ACCQUIRE - * </ul> * - * @param exchange The exchange the message is being sent. - * @param options Set of options. + * @param confirmMode <ul> </li>off (0): confirmation is not required, once a message has been transferred in pre-acquire + * mode (or once acquire has been sent in no-acquire mode) the message is considered + * transferred + * <p/> + * <li> on (1): an acquired message (whether acquisition was implicit as in pre-acquire mode or + * explicit as in no-acquire mode) is not considered transferred until the original + * transfer is complete (signaled via execution.complete) + * </ul> + * @param acquireMode <ul> <li> no-acquire (0): the message must be explicitly acquired + * <p/> + * <li> pre-acquire (1): the message is acquired when the transfer starts + * </ul> + * @param exchange The exchange the message is being sent. * @throws QpidException If the session fails to send the message due to some error. */ - public void messageTransfer(String exchange, Option... options) throws QpidException; + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException; /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} - * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent. + * or to the message being sent. * * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code> * @throws QpidException If the session fails to execute the method due to some error * @see org.apache.qpidity.DeliveryProperties - * @see org.apache.qpidity.ApplicationProperties */ public void addMessageHeaders(Header... headers) throws QpidException; @@ -371,7 +372,12 @@ public interface Session * <p>In the absence of a particular option, the defaul value is false for each option * * @param queueName The name of the delcared queue. - * @param alternateExchange Alternate excahnge. + * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message + * may be rejected by a queue for the following reasons: + * <oL> <li> The queue is deleted when it is not empty; + * <<li> Immediate delivery of a message is requested, but there are no consumers connected to + * the queue. </ol> + * @param arguments Used for backward compatibility * @param options Set of Options. * @throws QpidException If the session fails to declare the queue due to some error. * @see Option @@ -385,6 +391,7 @@ public interface Session * @param queueName The queue to be bound. * @param exchangeName The exchange name. * @param routingKey The routing key. + * @param arguments Used for backward compatibility * @throws QpidException If the session fails to bind the queue due to some error. */ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) @@ -396,6 +403,7 @@ public interface Session * @param queueName The queue to be unbound. * @param exchangeName The exchange name. * @param routingKey The routing key. + * @param arguments Used for backward compatibility * @throws QpidException If the session fails to unbind the queue due to some error. */ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) @@ -448,9 +456,12 @@ public interface Session * <p/> * <p>In the absence of a particular option, the defaul value is false for each option</p> * * - * @param exchangeName The exchange name. - * @param exchangeClass The fully qualified name of the exchange class. - * @param options Set of options. + * @param exchangeName The exchange name. + * @param exchangeClass The fully qualified name of the exchange class. + * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which + * the message will be sent. + * @param options Set of options. + * @param arguments Used for backward compatibility * @throws QpidException If the session fails to declare the exchange due to some error. * @see Option */ diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index bbcc17aca5..e3883f462e 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -13,162 +13,192 @@ import org.apache.qpidity.*; public class ClientSession implements org.apache.qpid.nclient.Session { - Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); - + Map<String,MessagePartListener> messagListeners = new HashMap<String,MessagePartListener>(); + + //------------------------------------------------------ // Session housekeeping methods //------------------------------------------------------ public void close() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void suspend() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void resume() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + }//------------------------------------------------------ // Messaging methods // Producer //------------------------------------------------------ - public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException + public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageTransfer(String exchange, Option... options) throws QpidException + public void messageTransfer(String exchange, short confirmMode, short acquireMode) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void addMessageHeaders(Header... headers) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void addData(byte[] data, int off, int len) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void endData() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageSubscribe(String queue, String destination, MessagePartListener listener, Map<String, ?> filter, - Option... options) throws QpidException + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, + MessagePartListener listener, Map<String, ?> filter, Option... options) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void messageCancel(String destination) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageAcknowledge(Range... range) throws QpidException + public void setMessageListener(String destination, MessagePartListener listener) { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageReject(Range... range) throws QpidException + public void messageFlowMode(String destination, short mode) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public Range[] messageAcquire(Range... range) throws QpidException + public void messageFlow(String destination, short unit, long value) throws QpidException { - return new Range[0]; //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageRelease(Range... range) throws QpidException + public boolean messageFlush(String destination) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + return false; } + public void messageStop(String destination) throws QpidException + { + // TODO - public void messageFlowMode(String destination, short mode) + } + + public void messageAcknowledge(Range<Long>... range) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void messageFlow(String destination, short unit, long value) + public void messageReject(Range<Long>... range) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public boolean messageFlush(String destination) + public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException { - return false; //To change body of implemented methods use File | Settings | File Templates. + // TODO + return null; } - public void messageStop(String destination) + public void messageRelease(Range<Long>... range) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + }// ----------------------------------------------- // Local transaction methods // ---------------------------------------------- public void txSelect() throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void txCommit() throws QpidException, IllegalStateException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void txRollback() throws QpidException, IllegalStateException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, - Option... options) throws QpidException + public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws - QpidException + public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } - public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws - QpidException + public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) + throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void queuePurge(String queueName) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void queueDelete(String queueName, Option... options) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange, Map<String, ?> arguments, Option... options) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. + // TODO + } public void exchangeDelete(String exchangeName, Option... options) throws QpidException { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void setMessageListener(String destination,MessagePartListener listener) - { - messagListeners.put(destination, listener); + // TODO + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java index be3c9de194..f880d71ea3 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/DestinationImpl.java @@ -17,8 +17,11 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.url.BindingURL; + import javax.jms.Destination; -import javax.jms.JMSException; /** * Implementation of the JMS Destination interface @@ -35,24 +38,60 @@ public class DestinationImpl implements Destination */ protected SessionImpl _session; + /** + * The excahnge name + */ + protected String _exchangeName; + + /** + * The excahnge class + */ + protected String _exchangeClass; + + /** + * The queu name + */ + protected String _queueName; + //--- Constructor /** * Create a new DestinationImpl with a given name. * - * @param name The name of this destination. + * @param name The name of this destination. * @param session The session used to create this destination. - * @throws JMSException If the destiantion name is not valid + * @throws QpidException If the destiantion name is not valid */ - protected DestinationImpl(SessionImpl session, String name) throws JMSException + protected DestinationImpl(SessionImpl session, String name) throws QpidException { - // TODO validate that this destination name exists - //_session.getQpidSession() _session = session; _name = name; } + /** + * Create a destiantion from a binding URL + * + * @param session The session used to create this queue. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected DestinationImpl(SessionImpl session, BindingURL binding) throws QpidException + { + _session = session; + _exchangeName = binding.getExchangeName(); + _exchangeClass = binding.getExchangeClass(); + _name = binding.getDestinationName(); + // _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); + boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); + boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); + _queueName = binding.getQueueName(); + // create this exchange + _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null, + isDurable ? Option.DURABLE : Option.NO_OPTION, + isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION); + } + //---- Getters and Setters - + /** * Gets the name of this destination. * @@ -84,5 +123,20 @@ public class DestinationImpl implements Destination return _name; } + // getter methods + public String getQpidQueueName() + { + return _queueName; + } + + public String getExchangeName() + { + return _exchangeName; + } + + public String getExchangeClass() + { + return _exchangeClass; + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java index 5641869e3e..973fb23332 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java @@ -17,8 +17,6 @@ */ package org.apache.qpid.nclient.jms; -//import org.apache.qpid.nclient.api.MessageReceiver; - import org.apache.qpid.nclient.jms.message.QpidMessage; import org.apache.qpid.nclient.jms.filter.JMSSelectorFilter; import org.apache.qpid.nclient.jms.filter.MessageFilter; @@ -27,6 +25,7 @@ import org.apache.qpid.nclient.MessagePartListener; import org.apache.qpidity.Range; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; +import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.*; @@ -120,15 +119,16 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer _noLocal = noLocal; _subscriptionName = subscriptionName; _isStopped = getSession().isStopped(); + // let's create a message part assembler + /** + * A Qpid message listener that pushes messages to this consumer session when this consumer is + * asynchronous or directly to this consumer when it is synchronously accessed. + */ + MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); + if (destination instanceof Queue) { // this is a queue we expect that this queue exists - // let's create a message part assembler - /** - * A Qpid message listener that pushes messages to this consumer session when this consumer is - * asynchronous or directly to this consumer when it is synchronously accessed. - */ - MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this)); getSession().getQpidSession() .messageSubscribe(destination.getName(), getMessageActorID(), org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED, @@ -144,25 +144,44 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // this is a topic we need to create a temporary queue for this consumer // unless this is a durable subscriber + String queueName; if (subscriptionName != null) { // this ia a durable subscriber // create a persistent queue for this subscriber - // getSession().getQpidSession().queueDeclare(destination.getName()); + queueName = "topic-" + subscriptionName; + getSession().getQpidSession() + .queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.DURABLE); } else { // this is a non durable subscriber // create a temporary queue - + queueName = "topic-" + getMessageActorID(); + getSession().getQpidSession() + .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE); } + // bind this queue with the topic exchange + getSession().getQpidSession() + .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null); + // subscribe to this topic + getSession().getQpidSession() + .messageSubscribe(queueName, getMessageActorID(), + org.apache.qpid.nclient.Session.CONFIRM_MODE_NOT_REQUIRED, + // We always acquire the messages + org.apache.qpid.nclient.Session.ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, + _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, + // Request exclusive subscription access, meaning only this subscription + // can access the queue. + Option.EXCLUSIVE); + } // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpid.nclient.Session.MESSAGE_FLOW_MODE_CREDIT); } - //----- Message consumer API + //----- Message consumer API /** * Gets this MessageConsumer's message selector. * @@ -426,7 +445,14 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { messageOk = _filter.matches(message.getJMSMessage()); } - // right now we need to acquire this message if needed + if (!messageOk && _preAcquire) + { + // this is the case for topics + // We need to ack this message + acknowledgeMessage(message); + } + // now we need to acquire this message if needed + // this is the case of queue with a message selector set if (!_preAcquire && messageOk) { messageOk = acquireMessage(message); @@ -569,4 +595,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } return result; } + + /** + * Acknowledge a message + * + * @param message The message to be acknowledged + * @throws QpidException If the message cannot be acquired due to some internal error. + */ + private void acknowledgeMessage(QpidMessage message) throws QpidException + { + if (!_preAcquire) + { + Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + getSession().getQpidSession().messageAcknowledge(range); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java index 454762b3e3..9eeadcac68 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java @@ -17,96 +17,300 @@ */ package org.apache.qpid.nclient.jms; -import javax.jms.MessageProducer; -import javax.jms.JMSException; -import javax.jms.Destination; -import javax.jms.Message; +import javax.jms.*; /** - * Implements MessageProducer + * Implements MessageProducer */ public class MessageProducerImpl extends MessageActor implements MessageProducer { + /** + * If true, messages will not get a timestamp. + */ + private boolean _disableTimestamps = false; + /** + * Priority of messages created by this producer. + */ + private int _messagePriority = Message.DEFAULT_PRIORITY; + + /** + * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + */ + private long _timeToLive; + + /** + * Delivery mode used for this producer. + */ + private int _deliveryMode = DeliveryMode.PERSISTENT; + + /** + * Speicify whether the messageID is disable + */ + private boolean _disableMessageId = false; + + //-- constructors public MessageProducerImpl(SessionImpl session, DestinationImpl destination) { super(session, destination); } - // Interface javax.jms.MessageProducer - - public void setDisableMessageID(boolean b) throws JMSException + //--- Interface javax.jms.MessageProducer + /** + * Sets whether message IDs are disabled. + * + * @param value Specify whether the MessageID must be disabled + * @throws JMSException If disabling messageID fails due to some internal error. + */ + public void setDisableMessageID(boolean value) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + _disableMessageId = value; } + /** + * Gets an indication of whether message IDs are disabled. + * + * @return true is messageID is disabled, false otherwise + * @throws JMSException If getting whether messagID is disabled fails due to some internal error. + */ public boolean getDisableMessageID() throws JMSException { - return false; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _disableMessageId; } - public void setDisableMessageTimestamp(boolean b) throws JMSException + /** + * Sets whether message timestamps are disabled. + * <P> JMS spec says: + * <p> Since timestamps take some effort to create and increase a + * message's size, some JMS providers may be able to optimize message + * overhead if they are given a hint that the timestamp is not used by an + * application.... + * these messages must have the timestamp set to zero; if the provider + * ignores the hint, the timestamp must be set to its normal value. + * <p>Message timestamps are enabled by default. + * + * @param value Indicates if message timestamps are disabled + * @throws JMSException if disabling the timestamps fails due to some internal error. + */ + public void setDisableMessageTimestamp(boolean value) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + _disableTimestamps = value; } + /** + * Gets an indication of whether message timestamps are disabled. + * + * @return an indication of whether message timestamps are disabled + * @throws JMSException if getting whether timestamps are disabled fails due to some internal error. + */ public boolean getDisableMessageTimestamp() throws JMSException { - return false; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _disableTimestamps; } - public void setDeliveryMode(int i) throws JMSException + /** + * Sets the producer's default delivery mode. + * <p> JMS specification says: + * <p>Delivery mode is set to {@link DeliveryMode#PERSISTENT} by default. + * + * @param deliveryMode The message delivery mode for this message producer; legal + * values are {@link DeliveryMode#NON_PERSISTENT} + * and {@link DeliveryMode#PERSISTENT}. + * @throws JMSException if setting the delivery mode fails due to some internal error. + */ + public void setDeliveryMode(int deliveryMode) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + if ((deliveryMode != DeliveryMode.NON_PERSISTENT) && (deliveryMode != DeliveryMode.PERSISTENT)) + { + throw new JMSException( + "DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + deliveryMode + " is illegal"); + } + _deliveryMode = deliveryMode; } + /** + * Gets the producer's delivery mode. + * + * @return The message delivery mode for this message producer + * @throws JMSException If getting the delivery mode fails due to some internal error. + */ public int getDeliveryMode() throws JMSException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _deliveryMode; } - public void setPriority(int i) throws JMSException + /** + * Sets the producer's message priority. + * <p> The jms spec says: + * <p> The JMS API defines ten levels of priority value, with 0 as the + * lowest priority and 9 as the highest. Clients should consider priorities + * 0-4 as gradations of normal priority and priorities 5-9 as gradations + * of expedited priority. + * <p> Priority is set to 4 by default. + * + * @param priority The message priority for this message producer; must be a value between 0 and 9 + * @throws JMSException if setting this producer priority fails due to some internal error. + */ + public void setPriority(int priority) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + if ((priority < 0) || (priority > 9)) + { + throw new IllegalArgumentException( + "Priority of " + priority + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = priority; } + /** + * Gets the producer's message priority. + * + * @return The message priority for this message producer. + * @throws JMSException If getting this producer message priority fails due to some internal error. + */ public int getPriority() throws JMSException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _messagePriority; } - public void setTimeToLive(long l) throws JMSException + /** + * Sets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * <p> The JMS spec says that time to live must be set to zero by default. + * + * @param timeToLive The message time to live in milliseconds; zero is unlimited + * @throws JMSException If setting the default time to live fails due to some internal error. + */ + public void setTimeToLive(long timeToLive) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + if (timeToLive < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); + } + _timeToLive = timeToLive; } + /** + * Gets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * + * @return The default message time to live in milliseconds; zero is unlimited + * @throws JMSException if getting the default time to live fails due to some internal error. + * @see javax.jms.MessageProducer#setTimeToLive + */ public long getTimeToLive() throws JMSException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _timeToLive; } + /** + * Gets the destination associated with this producer. + * + * @return This producer's destination. + * @throws JMSException If getting the destination for this producer fails + * due to some internal error. + */ public Destination getDestination() throws JMSException { - return null; //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + return _destination; } + /** + * Sends a message using the producer's default delivery mode, priority, destination + * and time to live. + * + * @param message the message to be sent + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If this producer destination is invalid. + * @throws java.lang.UnsupportedOperationException + * If a client uses this method with a producer that did + * not specify a destination at creation time. + */ public void send(Message message) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + send(message, _deliveryMode, _messagePriority, _timeToLive); } - public void send(Message message, int i, int i1, long l) throws JMSException + /** + * Sends a message to this producer default destination, specifying delivery mode, + * priority, and time to live. + * + * @param message The message to send + * @param deliveryMode The delivery mode to use + * @param priority The priority for this message + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If this producer's destination is invalid. + * @throws java.lang.UnsupportedOperationException + * If a client uses this method with a producer that did + * not specify a destination at creation time. + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + send(_destination, message, deliveryMode, priority, timeToLive); } + /** + * Sends a message to a specified destination using this producer's default + * delivery mode, priority and time to live. + * <p/> + * <P>Typically, a message producer is assigned a destination at creation + * time; however, the JMS API also supports unidentified message producers, + * which require that the destination be supplied every time a message is + * sent. + * + * @param destination The destination to send this message to + * @param message The message to send + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If an invalid destination is specified. + */ public void send(Destination destination, Message message) throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + send(destination, message, _deliveryMode, _messagePriority, _timeToLive); } - public void send(Destination destination, Message message, int i, int i1, long l) throws JMSException + /** + * Sends a message to a destination specifying delivery mode, priority and time to live. + * + * @param destination The destination to send this message to. + * @param message The message to be sent. + * @param deliveryMode The delivery mode to use. + * @param priority The priority for this message. + * @param timeToLive The message's lifetime (in milliseconds) + * @throws JMSException If sending the message fails due to some internal error. + * @throws MessageFormatException If an invalid message is specified. + * @throws InvalidDestinationException If an invalid destination is specified. + */ + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException { - //To change body of implemented methods use File | Settings | File Templates. + checkNotClosed(); + getSession().checkDestination(destination); + // Do not allow negative timeToLive values + if (timeToLive < 0) + { + throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + timeToLive); + } + // check that the message is not a foreign one + + // set the properties + + // + + // dispatch it + // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java index 9120173fd9..6dcdde1728 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueImpl.java @@ -17,6 +17,11 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.url.BindingURL; +import org.apache.qpidity.exchange.ExchangeDefaults; + import javax.jms.Queue; import javax.jms.JMSException; @@ -32,15 +37,32 @@ public class QueueImpl extends DestinationImpl implements Queue * * @param name The name of this queue. * @param session The session used to create this queue. - * @throws JMSException If the queue name is not valid + * @throws QpidException If the queue name is not valid */ - protected QueueImpl(SessionImpl session, String name) throws JMSException + protected QueueImpl(SessionImpl session, String name) throws QpidException { super(session, name); + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _queueName = name; + // check that this queue exist on the server + // As pasive is set the server will not create the queue. + session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE); } - //---- Interface javax.jms.Queue + /** + * Create a destiantion from a binding URL + * + * @param session The session used to create this queue. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException + { + super(session, binding); + } + //---- Interface javax.jms.Queue /** * Gets the name of this queue. * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index 22136814a1..dc9ad144a7 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -569,7 +569,7 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(destination); - MessageConsumerImpl consumer = null; + MessageConsumerImpl consumer; try { consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); @@ -602,7 +602,16 @@ public class SessionImpl implements Session public Queue createQueue(String queueName) throws JMSException { checkNotClosed(); - return new QueueImpl(this, queueName); + Queue result; + try + { + result = new QueueImpl(this, queueName); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** @@ -624,7 +633,16 @@ public class SessionImpl implements Session public Topic createTopic(String topicName) throws JMSException { checkNotClosed(); - return new TopicImpl(this, topicName); + Topic result; + try + { + result = new TopicImpl(this, topicName); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** @@ -713,25 +731,43 @@ public class SessionImpl implements Session } /** - * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier. + * Create a TemporaryQueue. Its lifetime will be the Connection unless it is deleted earlier. * * @return A temporary queue. * @throws JMSException If creating the temporary queue fails due to some internal error. */ public TemporaryQueue createTemporaryQueue() throws JMSException { - return new TemporaryQueueImpl(this); + TemporaryQueue result; + try + { + result = new TemporaryQueueImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** - * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier. + * Create a TemporaryTopic. Its lifetime will be the Connection unless it is deleted earlier. * * @return A temporary topic. * @throws JMSException If creating the temporary topic fails due to some internal error. */ public TemporaryTopic createTemporaryTopic() throws JMSException { - return new TemporaryTopicImpl(this); + TemporaryTopic result; + try + { + result = new TemporaryTopicImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + return result; } /** diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java index b33ab0d990..50130cee55 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryQueueImpl.java @@ -17,13 +17,18 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; +import org.apache.qpidity.exchange.ExchangeDefaults; + import javax.jms.TemporaryQueue; import javax.jms.JMSException; +import java.util.UUID; /** * Implements TemporaryQueue */ -public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination +public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination { /** * Indicates whether this temporary queue is deleted. @@ -32,16 +37,23 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem //--- constructor - /** + /** * Create a new TemporaryQueueImpl with a given name. * * @param session The session used to create this TemporaryQueueImpl. - * @throws JMSException If creating the TemporaryQueueImpl fails due to some error. + * @throws QpidException If creating the TemporaryQueueImpl fails due to some error. */ - public TemporaryQueueImpl(SessionImpl session) throws JMSException + protected TemporaryQueueImpl(SessionImpl session) throws QpidException { - // temporary destinations do not have names and are not registered in the JNDI namespace. + // temporary destinations do not have names super(session, "NAME_NOT_SET"); + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _queueName = "TempQueue-" + UUID.randomUUID(); + // check that this queue exist on the server + // As pasive is set the server will not create the queue. + session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE); + session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null); } //-- TemporaryDestination Interface @@ -59,11 +71,22 @@ public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, Tem /** * Delete this temporary destinaiton * - * @throws JMSException If deleting this temporary queue fails due to some error. + * @throws JMSException If deleting this temporary queue fails due to some error. */ public void delete() throws JMSException { // todo delete this temporary queue _isDeleted = true; } + + //---- Interface javax.jms.Queue + /** + * Gets the name of this queue. + * + * @return This queue's name. + */ + public String getQueueName() throws JMSException + { + return super.getName(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java index 11deba8361..1dfb082557 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TemporaryTopicImpl.java @@ -17,6 +17,8 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; + import javax.jms.TemporaryTopic; import javax.jms.JMSException; @@ -36,9 +38,9 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem * Create a new TemporaryTopicImpl with a given name. * * @param session The session used to create this TemporaryTopicImpl. - * @throws JMSException If creating the TemporaryTopicImpl fails due to some error. + * @throws QpidException If creating the TemporaryTopicImpl fails due to some error. */ - public TemporaryTopicImpl(SessionImpl session) throws JMSException + protected TemporaryTopicImpl(SessionImpl session) throws QpidException { // temporary destinations do not have names. super(session, "NAME_NOT_SET"); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java index 25c2afa4e7..52875ab0d5 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicImpl.java @@ -17,8 +17,11 @@ */ package org.apache.qpid.nclient.jms; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.exchange.ExchangeDefaults; +import org.apache.qpidity.url.BindingURL; + import javax.jms.Topic; -import javax.jms.JMSException; /** * Implementation of the javax.jms.Topic interface. @@ -29,16 +32,30 @@ public class TopicImpl extends DestinationImpl implements Topic /** * Create a new TopicImpl with a given name. * - * @param name The name of this topic + * @param name The name of this topic * @param session The session used to create this queue. - * @throws JMSException If the topic name is not valid + * @throws QpidException If the topic name is not valid */ - public TopicImpl(SessionImpl session, String name) throws JMSException + public TopicImpl(SessionImpl session, String name) throws QpidException { super(session, name); + _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; + _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + } + + /** + * Create a TopicImpl from a binding URL + * + * @param session The session used to create this Topic. + * @param binding The URL + * @throws QpidException If the URL is not valid + */ + protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException + { + super(session, binding); } - //--- javax.jsm.Topic Interface + //--- javax.jsm.Topic Interface /** * Gets the name of this topic. * diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java index 5ea830b2cc..03caf16520 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/AbstractJMSMessage.java @@ -33,17 +33,14 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; +import javax.jms.*; import java.util.Collections; import java.util.Enumeration; import java.util.Map; import java.util.UUID; -public abstract class AbstractJMSMessage extends QpidMessage implements org.apache.qpid.jms.Message +public abstract class AbstractJMSMessage extends QpidMessage implements Message { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java index 02a43b0414..9b8222cf7d 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java @@ -145,6 +145,7 @@ public class QpidMessage //todo return new Long(1); } + } |
