From dff8b876caa9625dc0e03e03dadad22a504826d1 Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 8 Aug 2007 04:36:31 +0000 Subject: implemented Session.sync() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@563738 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/client/Session.java | 10 +++++----- .../java/org/apache/qpidity/impl/ClientSession.java | 8 ++++---- .../org/apache/qpidity/jms/MessageConsumerImpl.java | 21 ++++++++++++--------- .../java/org/apache/qpidity/jms/SessionImpl.java | 16 ++++++++++------ 4 files changed, 31 insertions(+), 24 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 4903991d7d..33b5586409 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -24,7 +24,7 @@ import org.apache.qpidity.api.Message; import org.apache.qpidity.QpidException; import org.apache.qpidity.Header; import org.apache.qpidity.Option; -import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; /** *

A session is associated with a connection. @@ -290,7 +290,7 @@ public interface Session * @param range Range of acknowledged messages. * @throws QpidException If the acknowledgement of the messages fails due to some error. */ - public void messageAcknowledge(Range... range) throws QpidException; + public void messageAcknowledge(RangeSet ranges) throws QpidException; /** * Reject ranges of acquired messages. @@ -300,7 +300,7 @@ public interface Session * @param range Range of rejected messages. * @throws QpidException If those messages cannot be rejected dus to some error */ - public void messageReject(Range... range) throws QpidException; + public void messageReject(RangeSet ranges) throws QpidException; /** * Try to acquire ranges of messages hence releasing them form the queue. @@ -314,7 +314,7 @@ public interface Session * @return Ranges of explicitly acquired messages. * @throws QpidException If this message cannot be acquired dus to some error */ - public Range[] messageAcquire(Range... range) throws QpidException; + public RangeSet messageAcquire(RangeSet range) throws QpidException; /** * Give up responsibility for processing ranges of messages. @@ -323,7 +323,7 @@ public interface Session * @param range Ranges of messages to be released. * @throws QpidException If this message cannot be released dus to some error. */ - public void messageRelease(Range... range) throws QpidException; + public void messageRelease(RangeSet range) throws QpidException; // ----------------------------------------------- // Local transaction methods diff --git a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java index bc02a7c18d..f6d0cfaefd 100644 --- a/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/impl/ClientSession.java @@ -113,25 +113,25 @@ public class ClientSession implements org.apache.qpidity.client.Session } - public void messageAcknowledge(Range... range) throws QpidException + public void messageAcknowledge(RangeSet ranges) throws QpidException { // TODO } - public void messageReject(Range... range) throws QpidException + public void messageReject(RangeSet ranges) throws QpidException { // TODO } - public Range[] messageAcquire(Range... range) throws QpidException + public RangeSet messageAcquire(RangeSet ranges) throws QpidException { // TODO return null; } - public void messageRelease(Range... range) throws QpidException + public void messageRelease(RangeSet ranges) throws QpidException { // TODO diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index d88a177001..7fab5adfea 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -19,7 +19,7 @@ package org.apache.qpidity.jms; import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.impl.MessagePartListenerAdapter; -import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; import org.apache.qpidity.Option; import org.apache.qpidity.filter.MessageFilter; @@ -568,8 +568,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (_preAcquire) { - Range range = new Range(message.getMessageID(), message.getMessageID()); - getSession().getQpidSession().messageRelease(range); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); + getSession().getQpidSession().messageRelease(ranges); } } @@ -585,12 +586,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer boolean result = false; if (!_preAcquire) { - Range range = new Range(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); - Range[] rangeResult = getSession().getQpidSession().messageAcquire(range); - if (rangeResult.length > 0) + RangeSet acquired = getSession().getQpidSession().messageAcquire(ranges); + if (acquired.size() > 0) { - result = rangeResult[0].getLower().compareTo(message.getMessageID()) == 0; + result = acquired.iterator().next().getLower() == message.getMessageID(); } } return result; @@ -606,8 +608,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { if (!_preAcquire) { - Range range = new Range(message.getMessageID(), message.getMessageID()); - getSession().getQpidSession().messageAcknowledge(range); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); + getSession().getQpidSession().messageAcknowledge(ranges); } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java index 5ab8482635..8b224500e9 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.apache.qpidity.jms.message.*; import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; import javax.jms.*; import javax.jms.IllegalStateException; @@ -443,10 +444,11 @@ public class SessionImpl implements Session for (QpidMessage message : _unacknowledgedMessages) { // release this message - Range range = new Range(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageRelease(range); + getQpidSession().messageRelease(ranges); } catch (QpidException e) { @@ -982,10 +984,11 @@ public class SessionImpl implements Session else { // acknowledge this message - Range range = new Range(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageAcknowledge(range); + getQpidSession().messageAcknowledge(ranges); } catch (QpidException e) { @@ -1016,10 +1019,11 @@ public class SessionImpl implements Session for (QpidMessage message : _unacknowledgedMessages) { // acknowledge this message - Range range = new Range(message.getMessageID(), message.getMessageID()); + RangeSet ranges = new RangeSet(); + ranges.add(message.getMessageID()); try { - getQpidSession().messageAcknowledge(range); + getQpidSession().messageAcknowledge(ranges); } catch (QpidException e) { -- cgit v1.2.1