summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-20 12:50:19 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-20 12:50:19 +0000
commit4761feb30a16c6f28bb3b26fa9ad98bed6d4869e (patch)
tree87ae2d4b3fc2a2abe383f512d84997754692b1b4 /java/client
parentfb7e64d5985ee88e60316bb3d76e408dde0764a1 (diff)
downloadqpid-python-4761feb30a16c6f28bb3b26fa9ad98bed6d4869e.tar.gz
added sync
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@567674 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java15
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java9
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java62
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java45
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java11
5 files changed, 113 insertions, 29 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index fa3bdf0934..438f8f0605 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -326,8 +326,7 @@ public interface Session
/**
* Forces the broker to exhaust its credit supply.
* <p> The broker's credit will always be zero when
- * this method completes. This method does not complete until all the message transfers occur.
- * <p> This method returns the number of flushed messages.
+ * this method completes.
*
* @param destination The destination to call flush on.
*/
@@ -424,6 +423,16 @@ public interface Session
*/
public void messageRelease(RangeSet ranges);
+
+ /**
+ * Returns the number of message received for this session since
+ * {@link Session#messageFlow} has bee invoked.
+ *
+ * @return The number of message received for this session since
+ * {@link Session#messageFlow} has bee invoked.
+ */
+ public int messagesReceived();
+
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
@@ -568,7 +577,7 @@ public interface Session
* @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
* exchange types define the functionality of the exchange - i.e. how messages are routed
* through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
+ * exchange. Default exchange types are: direct, topic, headers and fanout.
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
index 8059633cab..9247925073 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
@@ -25,7 +25,14 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa
private ExceptionListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
+
+
+ public int messagesReceived()
+ {
+ // TODO
+ return 1;
+ }
+
@Override public void sessionClose()
{
super.sessionClose();
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 4cb649c71a..57b7b7e7ac 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -130,7 +130,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
// this is a queue we expect that this queue exists
getSession().getQpidSession()
- .messageSubscribe(destination.getQpidQueueName(), getMessageActorID(),
+ .messageSubscribe(destination.getQpidQueueName(), // queue
+ getMessageActorID(), // destination
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// When the message selctor is set we do not acquire the messages
_messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
@@ -156,8 +157,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
else
{
// this is a non durable subscriber
- // create a temporary queue
- queueName = "topic-" + getMessageActorID();
+ queueName = destination.getQpidQueueName();
getSession().getQpidSession()
.queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
}
@@ -169,8 +169,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
.messageSubscribe(queueName, getMessageActorID(),
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// We always acquire the messages
- org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
- _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
// Request exclusive subscription access, meaning only this subscription
// can access the queue.
Option.EXCLUSIVE);
@@ -179,6 +179,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+ getSession().getQpidSession().sync();
+ // check for an exception
+ if (getSession().getCurrentException() != null)
+ {
+ throw getSession().getCurrentException();
+ }
}
//----- Message consumer API
@@ -353,11 +359,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
// if this consumer is stopped then this will be call when starting
getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
- // received = getSession().getQpidSession().
+ getSession().getQpidSession().sync();
+ received = getSession().getQpidSession().messagesReceived();
}
- if ( received == 0 && timeout < 0)
+ if (received == 0 && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
result = null;
@@ -425,7 +433,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// there is a synch call waiting for a message to be delivered
// so tell the broker to deliver a message
getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
}
}
@@ -490,8 +499,10 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().getQpidSession()
.messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- int received = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
- if ( received == 0 && _isNoWaitIsReceiving)
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ getSession().getQpidSession().sync();
+ int received = getSession().getQpidSession().messagesReceived();
+ if (received == 0 && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a message
// but no one can be delivered it then need to return
@@ -570,9 +581,10 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
if (_preAcquire)
{
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- //ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getSession().getQpidSession().messageRelease(ranges);
+ getSession().getQpidSession().sync();
+ testQpidException();
}
}
@@ -589,15 +601,17 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
- getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ getSession().getQpidSession()
+ .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ getSession().getQpidSession().sync();
RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
if (acquired.size() > 0)
{
- result = true; // todo acquired.iterator().next().getLower() == message.getMessageID();
+ result = true;
}
+ testQpidException();
}
return result;
}
@@ -613,9 +627,19 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getSession().getQpidSession().messageAcknowledge(ranges);
+ getSession().getQpidSession().sync();
+ testQpidException();
+ }
+ }
+
+ private void testQpidException() throws QpidException
+ {
+ QpidException qe = getSession().getCurrentException();
+ if (qe != null)
+ {
+ throw qe;
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 90912f9c10..5994650f34 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -25,6 +25,8 @@ import org.apache.qpidity.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
+import javax.jms.MessageListener;
+import javax.jms.Session;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
@@ -108,6 +110,11 @@ public class SessionImpl implements Session
private org.apache.qpidity.client.Session _qpidSession;
/**
+ * The latest qpid Exception that has been reaised.
+ */
+ private QpidException _currentException;
+
+ /**
* Indicates whether this session is recovering
*/
private boolean _inRecovery = false;
@@ -142,6 +149,8 @@ public class SessionImpl implements Session
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = _connection.getQpidConnection().createSession(0);
+ // set the exception listnere for this session
+ _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
// set transacted if required
if (_transacted && !isXA)
{
@@ -431,8 +440,7 @@ public class SessionImpl implements Session
{
// release this message
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getQpidSession().messageRelease(ranges);
}
}
@@ -817,6 +825,17 @@ public class SessionImpl implements Session
checkNotClosed();
}
+ /**
+ * Get the latest thrown exception.
+ *
+ * @return The latest thrown exception.
+ */
+ public synchronized QpidException getCurrentException()
+ {
+ QpidException result = _currentException;
+ _currentException = null;
+ return result;
+ }
//----- Protected methods
/**
@@ -1006,8 +1025,7 @@ public class SessionImpl implements Session
{
// acknowledge this message
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getQpidSession().messageAcknowledge(ranges);
}
//tobedone: Implement DUPS OK heuristic
@@ -1035,8 +1053,7 @@ public class SessionImpl implements Session
{
// acknowledge this message
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getQpidSession().messageAcknowledge(ranges);
}
//empty the list of unack messages
@@ -1094,6 +1111,22 @@ public class SessionImpl implements Session
//------ Inner classes
/**
+ * Lstener for qpid protocol exceptions
+ */
+ private class QpidSessionExceptionListener implements org.apache.qpidity.client.ExceptionListener
+ {
+ public void onException(QpidException exception)
+ {
+ synchronized (this)
+ {
+ //todo check the error code for finding out if we need to notify the
+ // JMS connection exception listener
+ _currentException = exception;
+ }
+ }
+ }
+
+ /**
* Convenient class for storing incoming messages associated with a consumer ID.
* <p> Those messages are enqueued in _incomingAsynchronousMessages
*/
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index 2b7316e847..67121f416a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -369,6 +369,7 @@ public class QpidMessage
/**
* Get this message excahgne name
+ *
* @return this message excahgne name
*/
public String getExchangeName()
@@ -407,6 +408,16 @@ public class QpidMessage
{
return _qpidityMessage;
}
+
+ /**
+ * Get this message transfer ID.
+ *
+ * @return This message transfer ID.
+ */
+ public long getMessageTransferId()
+ {
+ return _qpidityMessage.getMessageTransferId();
+ }
}