diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2007-08-08 04:36:31 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2007-08-08 04:36:31 +0000 |
| commit | dff8b876caa9625dc0e03e03dadad22a504826d1 (patch) | |
| tree | 6afa9f8388c40603fc0423e96e2555a9455b83d6 /java/client | |
| parent | a45694048d1f26e0ed317f661b464bae862fb8fa (diff) | |
| download | qpid-python-dff8b876caa9625dc0e03e03dadad22a504826d1.tar.gz | |
implemented Session.sync()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
4 files changed, 31 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 4903991d7d..33b5586409 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -24,7 +24,7 @@ import org.apache.qpidity.api.Message; import org.apache.qpidity.QpidException;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
-import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
/**
* <p>A session is associated with a connection.
@@ -290,7 +290,7 @@ public interface Session * @param range Range of acknowledged messages.
* @throws QpidException If the acknowledgement of the messages fails due to some error.
*/
- public void messageAcknowledge(Range<Long>... range) throws QpidException;
+ public void messageAcknowledge(RangeSet ranges) throws QpidException;
/**
* Reject ranges of acquired messages.
@@ -300,7 +300,7 @@ public interface Session * @param range Range of rejected messages.
* @throws QpidException If those messages cannot be rejected dus to some error
*/
- public void messageReject(Range<Long>... range) throws QpidException;
+ public void messageReject(RangeSet ranges) throws QpidException;
/**
* Try to acquire ranges of messages hence releasing them form the queue.
@@ -314,7 +314,7 @@ public interface Session * @return Ranges of explicitly acquired messages.
* @throws QpidException If this message cannot be acquired dus to some error
*/
- public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException;
+ public RangeSet messageAcquire(RangeSet range) throws QpidException;
/**
* Give up responsibility for processing ranges of messages.
@@ -323,7 +323,7 @@ public interface Session * @param range Ranges of messages to be released.
* @throws QpidException If this message cannot be released dus to some error.
*/
- public void messageRelease(Range<Long>... range) throws QpidException;
+ public void messageRelease(RangeSet range) throws QpidException;
// -----------------------------------------------
// Local transaction methods
diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index bc02a7c18d..f6d0cfaefd 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -113,25 +113,25 @@ public class ClientSession implements org.apache.qpidity.client.Session } - public void messageAcknowledge(Range<Long>... range) throws QpidException + public void messageAcknowledge(RangeSet ranges) throws QpidException { // TODO } - public void messageReject(Range<Long>... range) throws QpidException + public void messageReject(RangeSet ranges) throws QpidException { // TODO } - public Range<Long>[] messageAcquire(Range<Long>... range) throws QpidException + public RangeSet messageAcquire(RangeSet ranges) throws QpidException { // TODO return null; } - public void messageRelease(Range<Long>... range) throws QpidException + public void messageRelease(RangeSet ranges) throws QpidException { // TODO diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index d88a177001..7fab5adfea 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -19,7 +19,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.impl.MessagePartListenerAdapter; -import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; @@ -568,8 +568,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (_preAcquire) { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); - getSession().getQpidSession().messageRelease(range); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); + getSession().getQpidSession().messageRelease(ranges); } } @@ -585,12 +586,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer boolean result = false; if (!_preAcquire) { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); - Range<Long>[] rangeResult = getSession().getQpidSession().messageAcquire(range); - if (rangeResult.length > 0) + RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges); + if (acquired.size() > 0) { - result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0; + result = acquired.iterator().next().getLower() == message.getMessageID(); } } return result; @@ -606,8 +608,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (!_preAcquire) { - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); - getSession().getQpidSession().messageAcknowledge(range); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); + getSession().getQpidSession().messageAcknowledge(ranges); } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 5ab8482635..8b224500e9 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; @@ -443,10 +444,11 @@ public class SessionImpl implements Session for (QpidMessage message : _unacknowledgedMessages) { // release this message - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageRelease(range); + getQpidSession().messageRelease(ranges); } catch (QpidException e) { @@ -982,10 +984,11 @@ public class SessionImpl implements Session else { // acknowledge this message - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageAcknowledge(range); + getQpidSession().messageAcknowledge(ranges); } catch (QpidException e) { @@ -1016,10 +1019,11 @@ public class SessionImpl implements Session for (QpidMessage message : _unacknowledgedMessages) { // acknowledge this message - Range<Long> range = new Range<Long>(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageAcknowledge(range); + getQpidSession().messageAcknowledge(ranges); } catch (QpidException e) { |
