summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2007-08-08 04:36:31 +0000
committerRafael H. Schloming <rhs@apache.org>2007-08-08 04:36:31 +0000
commitdff8b876caa9625dc0e03e03dadad22a504826d1 (patch)
tree6afa9f8388c40603fc0423e96e2555a9455b83d6 /java/client
parenta45694048d1f26e0ed317f661b464bae862fb8fa (diff)
downloadqpid-python-dff8b876caa9625dc0e03e03dadad22a504826d1.tar.gz
implemented Session.sync()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java21
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java16
4 files changed, 31 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 4903991d7d..33b5586409 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -24,7 +24,7 @@ import org.apache.qpidity.api.Message;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
/**
* <p>A session is associated with a connection.
@@ -290,7 +290,7 @@ public interface Session
* @param range Range of acknowledged messages.
* @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(Range<Long>... range) throws QpidException;
+ public void messageAcknowledge(RangeSet ranges) throws QpidException;
/**
* Reject ranges of acquired messages.
@@ -300,7 +300,7 @@ public interface Session
* @param range Range of rejected messages.
* @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(Range<Long>... range) throws QpidException;
+ public void messageReject(RangeSet ranges) throws QpidException;
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -314,7 +314,7 @@ public interface Session
* @return Ranges of explicitly acquired messages.
* @throws QpidException If this message cannot be acquired dus to some error
*/
- public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException;
+ public RangeSet messageAcquire(RangeSet range) throws QpidException;
/**
* Give up responsibility for processing ranges of messages.
@@ -323,7 +323,7 @@ public interface Session
* @param range Ranges of messages to be released.
* @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(Range<Long>... range) throws QpidException;
+ public void messageRelease(RangeSet range) throws QpidException;
// -----------------------------------------------
// Local transaction methods
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
index bc02a7c18d..f6d0cfaefd 100644
--- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java
@@ -113,25 +113,25 @@ public class ClientSession implements org.apache.qpidity.client.Session
}
- public void messageAcknowledge(Range<Long>... range) throws QpidException
+ public void messageAcknowledge(RangeSet ranges) throws QpidException
{
// TODO
}
- public void messageReject(Range<Long>... range) throws QpidException
+ public void messageReject(RangeSet ranges) throws QpidException
{
// TODO
}
- public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException
+ public RangeSet messageAcquire(RangeSet ranges) throws QpidException
{
// TODO
return null;
}
- public void messageRelease(Range<Long>... range) throws QpidException
+ public void messageRelease(RangeSet ranges) throws QpidException
{
// TODO
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index d88a177001..7fab5adfea 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -19,7 +19,7 @@ package org.apache.qpidity.jms;
import org.apache.qpidity.jms.message.QpidMessage;
import org.apache.qpidity.impl.MessagePartListenerAdapter;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.filter.MessageFilter;
@@ -568,8 +568,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
if (_preAcquire)
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
- getSession().getQpidSession().messageRelease(range);
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
+ getSession().getQpidSession().messageRelease(ranges);
}
}
@@ -585,12 +586,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
boolean result = false;
if (!_preAcquire)
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
- Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range);
- if (rangeResult.length > 0)
+ RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges);
+ if (acquired.size() > 0)
{
- result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0;
+ result = acquired.iterator().next().getLower() == message.getMessageID();
}
}
return result;
@@ -606,8 +608,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
if (!_preAcquire)
{
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
- getSession().getQpidSession().messageAcknowledge(range);
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
+ getSession().getQpidSession().messageAcknowledge(ranges);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index 5ab8482635..8b224500e9 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpidity.jms.message.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -443,10 +444,11 @@ public class SessionImpl implements Session
for (QpidMessage message : _unacknowledgedMessages)
{
// release this message
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
try
{
- getQpidSession().messageRelease(range);
+ getQpidSession().messageRelease(ranges);
}
catch (QpidException e)
{
@@ -982,10 +984,11 @@ public class SessionImpl implements Session
else
{
// acknowledge this message
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
try
{
- getQpidSession().messageAcknowledge(range);
+ getQpidSession().messageAcknowledge(ranges);
}
catch (QpidException e)
{
@@ -1016,10 +1019,11 @@ public class SessionImpl implements Session
for (QpidMessage message : _unacknowledgedMessages)
{
// acknowledge this message
- Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID());
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageID());
try
{
- getQpidSession().messageAcknowledge(range);
+ getQpidSession().messageAcknowledge(ranges);
}
catch (QpidException e)
{