summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-06-03 15:25:36 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-06-03 15:25:36 +0000
commit244909ffb0d10ec1ac394c80d1d57d574503ca17 (patch)
tree78b05e83590aaf8ecded23dee5cd6b92b20da492 /java
parent888e7c269275207be3463fcf2ea109db80bf86e5 (diff)
downloadqpid-python-244909ffb0d10ec1ac394c80d1d57d574503ca17.tar.gz
QPID-1112: Update previous commit by re-using messageAcknowledge (added a flag specifying whether to send an messageAccept)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@662827 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java39
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Session.java13
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java2
5 files changed, 22 insertions, 46 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index b9094db5bd..0753ee539a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -160,7 +160,7 @@ public class AMQSession_0_10 extends AMQSession
ranges.add((int) deliveryTag);
_unacknowledgedMessageTags.remove(deliveryTag);
}
- getQpidSession().messageAcknowledge(ranges);
+ getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index c47aee0410..ae597b1703 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -24,10 +24,12 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.*;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.Session;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.filter.JMSSelectorFilter;
@@ -77,12 +79,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
- /**
- * Used for no-ack mode so to send session completion command
- */
- private int _numberReceivedMessages = 0;
- private int _firstMessageToComplete;
-
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -165,25 +161,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
*/
public void onMessage(Message message)
{
- /**
- * For no-ack mode
- */
- if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE )
- {
- _numberReceivedMessages++;
- if(_numberReceivedMessages == 1)
- {
- _firstMessageToComplete = message.getMessageTransferId();
- }
- if(_numberReceivedMessages >= getSession().getAMQConnection().getMaxPrefetch() )
- {
- RangeSet r = new RangeSet();
- r.add(_firstMessageToComplete, message.getMessageTransferId());
- _0_10session.getQpidSession().sessionCompleted(r, Option.TIMELY_REPLY);
- _numberReceivedMessages = 0;
- }
- }
-
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
AMQShortString consumerTag = getConsumerTag();
@@ -383,7 +360,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageAcknowledge(ranges);
+ _0_10session.getQpidSession().messageAcknowledge(ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE );
_0_10session.getCurrentException();
}
}
@@ -499,4 +477,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
return o;
}
+
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
+ {
+ super.postDeliver(msg);
+ if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
index 28218e01d6..833a26da87 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
@@ -65,16 +65,6 @@ public interface Session
public void sessionDetach(byte[] name);
- /**
- * This control is sent by the receiver of commands, and handled by the sender
- * of commands. It informs the sender of all commands completed by the receiver.
- * This excludes commands known by the receiver to be considered complete at the sender.
- *
- * @param commands completed commands.
- * @param options {@link Option#TIMELY_REPLY} If set, the sender is no longer free to delay the known-completed reply.
- */
- public void sessionCompleted(RangeSet commands, Option... options);
-
public void sessionRequestTimeout(long expiry);
public byte[] getName();
@@ -328,8 +318,9 @@ public interface Session
* pre-acquire mode or by explicitly acquiring them.
*
* @param ranges Range of messages to be acknowledged.
+ * @param accept pecify whether to send a message accept to the broker
*/
- public void messageAcknowledge(RangeSet ranges);
+ public void messageAcknowledge(RangeSet ranges, boolean accept);
/**
* Reject a range of acquired messages.
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index 58ffffb12b..0c0341490a 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -59,14 +59,17 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
super(name);
}
- public void messageAcknowledge(RangeSet ranges)
+ public void messageAcknowledge(RangeSet ranges, boolean accept)
{
for (Range range : ranges)
{
super.processed(range);
}
super.flushProcessed();
- messageAccept(ranges);
+ if( accept )
+ {
+ messageAccept(ranges);
+ }
}
public void messageSubscribe(String queue, String destination, short acceptMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
@@ -105,11 +108,6 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
_currentDataSizeNotSynced = 0;
}
- public void sessionCompleted(RangeSet commands, Option ... options)
- {
- super.sessionCompleted(commands, options);
- }
-
/* -------------------------
* Data methods
* ------------------------*/
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
index 0a15189b48..e452091622 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
@@ -108,7 +108,7 @@ public class BasicInteropTest implements ClosedListener
System.out.println("--------/Message Received--------");
RangeSet ack = new RangeSet();
ack.add(message.getMessageTransferId(),message.getMessageTransferId());
- session.messageAcknowledge(ack);
+ session.messageAcknowledge(ack, true);
}
}),