diff options
Diffstat (limited to 'java')
5 files changed, 113 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index fa3bdf0934..438f8f0605 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -326,8 +326,7 @@ public interface Session /**
* Forces the broker to exhaust its credit supply.
* <p> The broker's credit will always be zero when
- * this method completes. This method does not complete until all the message transfers occur.
- * <p> This method returns the number of flushed messages.
+ * this method completes.
*
* @param destination The destination to call flush on.
*/
@@ -424,6 +423,16 @@ public interface Session */
public void messageRelease(RangeSet ranges);
+
+ /**
+ * Returns the number of message received for this session since
+ * {@link Session#messageFlow} has bee invoked.
+ *
+ * @return The number of message received for this session since
+ * {@link Session#messageFlow} has bee invoked.
+ */
+ public int messagesReceived();
+
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
@@ -568,7 +577,7 @@ public interface Session * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
* exchange types define the functionality of the exchange - i.e. how messages are routed
* through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
+ * exchange. Default exchange types are: direct, topic, headers and fanout.
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index 8059633cab..9247925073 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -25,7 +25,14 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa private ExceptionListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; - + + + public int messagesReceived() + { + // TODO + return 1; + } + @Override public void sessionClose() { super.sessionClose(); diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 4cb649c71a..57b7b7e7ac 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -130,7 +130,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // this is a queue we expect that this queue exists getSession().getQpidSession() - .messageSubscribe(destination.getQpidQueueName(), getMessageActorID(), + .messageSubscribe(destination.getQpidQueueName(), // queue + getMessageActorID(), // destination org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, @@ -156,8 +157,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer else { // this is a non durable subscriber - // create a temporary queue - queueName = "topic-" + getMessageActorID(); + queueName = destination.getQpidQueueName(); getSession().getQpidSession() .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE); } @@ -169,8 +169,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer .messageSubscribe(queueName, getMessageActorID(), org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // We always acquire the messages - org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null, - _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION, // Request exclusive subscription access, meaning only this subscription // can access the queue. Option.EXCLUSIVE); @@ -179,6 +179,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // set the flow mode getSession().getQpidSession() .messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT); + getSession().getQpidSession().sync(); + // check for an exception + if (getSession().getCurrentException() != null) + { + throw getSession().getCurrentException(); + } } //----- Message consumer API @@ -353,11 +359,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // if this consumer is stopped then this will be call when starting getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, + 1); getSession().getQpidSession().messageFlush(getMessageActorID()); - // received = getSession().getQpidSession(). + getSession().getQpidSession().sync(); + received = getSession().getQpidSession().messagesReceived(); } - if ( received == 0 && timeout < 0) + if (received == 0 && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return result = null; @@ -425,7 +433,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // there is a synch call waiting for a message to be delivered // so tell the broker to deliver a message getSession().getQpidSession() - .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, + 1); getSession().getQpidSession().messageFlush(getMessageActorID()); } } @@ -490,8 +499,10 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().getQpidSession() .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - int received = 0; //getSession().getQpidSession().messageFlush(getMessageActorID()); - if ( received == 0 && _isNoWaitIsReceiving) + getSession().getQpidSession().messageFlush(getMessageActorID()); + getSession().getQpidSession().sync(); + int received = getSession().getQpidSession().messagesReceived(); + if (received == 0 && _isNoWaitIsReceiving) { // Right a message nowait is waiting for a message // but no one can be delivered it then need to return @@ -570,9 +581,10 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (_preAcquire) { RangeSet ranges = new RangeSet(); - // TODO: messageID is a string but range need a long??? - //ranges.add(message.getMessageID()); + ranges.add(message.getMessageTransferId()); getSession().getQpidSession().messageRelease(ranges); + getSession().getQpidSession().sync(); + testQpidException(); } } @@ -589,15 +601,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (!_preAcquire) { RangeSet ranges = new RangeSet(); - // TODO: messageID is a string but range need a long??? - // ranges.add(message.getMessageID()); + ranges.add(message.getMessageTransferId()); - getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); + getSession().getQpidSession() + .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); + getSession().getQpidSession().sync(); RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { - result = true; // todo acquired.iterator().next().getLower() == message.getMessageID(); + result = true; } + testQpidException(); } return result; } @@ -613,9 +627,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer if (!_preAcquire) { RangeSet ranges = new RangeSet(); - // TODO: messageID is a string but range need a long??? - // ranges.add(message.getMessageID()); + ranges.add(message.getMessageTransferId()); getSession().getQpidSession().messageAcknowledge(ranges); + getSession().getQpidSession().sync(); + testQpidException(); + } + } + + private void testQpidException() throws QpidException + { + QpidException qe = getSession().getCurrentException(); + if (qe != null) + { + throw qe; } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 90912f9c10..5994650f34 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -25,6 +25,8 @@ import org.apache.qpidity.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; +import javax.jms.MessageListener; +import javax.jms.Session; import java.io.Serializable; import java.util.LinkedList; import java.util.HashMap; @@ -108,6 +110,11 @@ public class SessionImpl implements Session private org.apache.qpidity.client.Session _qpidSession; /** + * The latest qpid Exception that has been reaised. + */ + private QpidException _currentException; + + /** * Indicates whether this session is recovering */ private boolean _inRecovery = false; @@ -142,6 +149,8 @@ public class SessionImpl implements Session // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = _connection.getQpidConnection().createSession(0); + // set the exception listnere for this session + _qpidSession.setExceptionListener(new QpidSessionExceptionListener()); // set transacted if required if (_transacted && !isXA) { @@ -431,8 +440,7 @@ public class SessionImpl implements Session { // release this message RangeSet ranges = new RangeSet(); - // TODO: messageID is a string but range need a long??? - // ranges.add(message.getMessageID()); + ranges.add(message.getMessageTransferId()); getQpidSession().messageRelease(ranges); } } @@ -817,6 +825,17 @@ public class SessionImpl implements Session checkNotClosed(); } + /** + * Get the latest thrown exception. + * + * @return The latest thrown exception. + */ + public synchronized QpidException getCurrentException() + { + QpidException result = _currentException; + _currentException = null; + return result; + } //----- Protected methods /** @@ -1006,8 +1025,7 @@ public class SessionImpl implements Session { // acknowledge this message RangeSet ranges = new RangeSet(); - // TODO: messageID is a string but range need a long??? - // ranges.add(message.getMessageID()); + ranges.add(message.getMessageTransferId()); getQpidSession().messageAcknowledge(ranges); } //tobedone: Implement DUPS OK heuristic @@ -1035,8 +1053,7 @@ public class SessionImpl implements Session { // acknowledge this message RangeSet ranges = new RangeSet(); - // TODO: messageID is a string but range need a long??? - // ranges.add(message.getMessageID()); + ranges.add(message.getMessageTransferId()); getQpidSession().messageAcknowledge(ranges); } //empty the list of unack messages @@ -1094,6 +1111,22 @@ public class SessionImpl implements Session //------ Inner classes /** + * Lstener for qpid protocol exceptions + */ + private class QpidSessionExceptionListener implements org.apache.qpidity.client.ExceptionListener + { + public void onException(QpidException exception) + { + synchronized (this) + { + //todo check the error code for finding out if we need to notify the + // JMS connection exception listener + _currentException = exception; + } + } + } + + /** * Convenient class for storing incoming messages associated with a consumer ID. * <p> Those messages are enqueued in _incomingAsynchronousMessages */ diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 2b7316e847..67121f416a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -369,6 +369,7 @@ public class QpidMessage /** * Get this message excahgne name + * * @return this message excahgne name */ public String getExchangeName() @@ -407,6 +408,16 @@ public class QpidMessage { return _qpidityMessage; } + + /** + * Get this message transfer ID. + * + * @return This message transfer ID. + */ + public long getMessageTransferId() + { + return _qpidityMessage.getMessageTransferId(); + } } |
