summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
commit0ad1331e60191b01e46194beebd93aac3df211d4 (patch)
tree304ad25858e1131831930e219144dcc70b093eea /java/client/src
parent2195a9a0986f9a52c3fce0c5765fe5cf454e31b2 (diff)
downloadqpid-python-0ad1331e60191b01e46194beebd93aac3df211d4.tar.gz
QPID-1665: add a timer to ensure message acking is never delayed more than 1000 ms by default, this is configurable by qpid.session.max_ack_delay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@745892 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java60
1 files changed, 45 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 45b74f317e..8ab8110dd4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -52,9 +52,12 @@ import static org.apache.qpid.transport.Option.*;
import javax.jms.*;
import javax.jms.IllegalStateException;
+import java.util.Date;
import java.util.HashMap;
import java.util.UUID;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
/**
* This is a 0.10 Session
@@ -68,6 +71,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
+ private static Timer timer = new Timer("ack-flusher", true);
+
/**
* The underlying QpidSession
@@ -83,6 +88,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// a ref on the qpid connection
protected org.apache.qpid.transport.Connection _qpidConnection;
+ private TimerTask flushTask = new TimerTask()
+ {
+ public void run()
+ {
+ try
+ {
+ flushAcknowledgments();
+ }
+ catch (Throwable t)
+ {
+ _logger.error("error flushing acks", t);
+ }
+ }
+ };
private RangeSet unacked = new RangeSet();
private int unackedCount = 0;
@@ -119,6 +138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
_qpidSession.txSelect();
}
+ timer.schedule(flushTask, new Date(), Long.getLong("qpid.session.max_ack_delay", 1000));
}
/**
@@ -142,14 +162,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
private void addUnacked(int id)
{
- unacked.add(id);
- unackedCount++;
+ synchronized (unacked)
+ {
+ unacked.add(id);
+ unackedCount++;
+ }
}
private void clearUnacked()
{
- unacked.clear();
- unackedCount = 0;
+ synchronized (unacked)
+ {
+ unacked.clear();
+ unackedCount = 0;
+ }
}
//------- overwritten methods of class AMQSession
@@ -196,19 +222,22 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
long prefetch = getAMQConnection().getMaxPrefetch();
- if (unackedCount >= prefetch/2 || _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+ if (unackedCount >= prefetch/2)
{
flushAcknowledgments();
- }
+ }
}
void flushAcknowledgments()
{
- if (unackedCount > 0)
+ synchronized (unacked)
{
- messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- clearUnacked();
+ if (unackedCount > 0)
+ {
+ messageAcknowledge
+ (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ clearUnacked();
+ }
}
}
@@ -222,7 +251,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
ssn.flushProcessed(accept ? BATCH : NONE);
if (accept)
{
- ssn.messageAccept(ranges);
+ ssn.messageAccept(ranges, UNRELIABLE);
}
}
@@ -267,6 +296,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ flushTask.cancel();
flushAcknowledgments();
getQpidSession().sync();
getQpidSession().close();
@@ -692,7 +722,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
String binddingKey = "";
for(AMQShortString key : amqd.getBindingKeys())
{
- binddingKey = binddingKey + "_" + key.toString();
+ binddingKey = binddingKey + "_" + key.toString();
}
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
@@ -722,7 +752,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
-
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
@@ -800,14 +830,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Store non committed messages for this session
* With 0.10 messages are consumed with window mode, we must send a completion
- * before the window size is reached so credits don't dry up.
+ * before the window size is reached so credits don't dry up.
* @param id
*/
@Override protected void addDeliveredMessage(long id)
{
_txRangeSet.add((int) id);
_txSize++;
- // this is a heuristic, we may want to have that configurable
+ // this is a heuristic, we may want to have that configurable
if (_connection.getMaxPrefetch() == 1 ||
_connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
{