summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
commit0ad1331e60191b01e46194beebd93aac3df211d4 (patch)
tree304ad25858e1131831930e219144dcc70b093eea /java/common/src
parent2195a9a0986f9a52c3fce0c5765fe5cf454e31b2 (diff)
downloadqpid-python-0ad1331e60191b01e46194beebd93aac3df211d4.tar.gz
QPID-1665: add a timer to ensure message acking is never delayed more than 1000 ms by default, this is configurable by qpid.session.max_ack_delay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@745892 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Method.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java15
2 files changed, 25 insertions, 1 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 09cfd119be..611c742fb1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -48,6 +48,7 @@ public abstract class Method extends Struct implements ProtocolEvent
private boolean idSet = false;
private boolean sync = false;
private boolean batch = false;
+ private boolean unreliable = false;
public final int getId()
{
@@ -90,6 +91,16 @@ public abstract class Method extends Struct implements ProtocolEvent
this.batch = value;
}
+ public final boolean isUnreliable()
+ {
+ return unreliable;
+ }
+
+ final void setUnreliable(boolean value)
+ {
+ this.unreliable = value;
+ }
+
public abstract boolean hasPayload();
public Header getHeader()
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index f94edcc655..4079097f96 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -384,7 +384,15 @@ public class Session extends SessionInvoker
{
copy = processed.copy();
}
- sessionCompleted(copy, options);
+
+ synchronized (commands)
+ {
+ if (state == DETACHED)
+ {
+ return;
+ }
+ sessionCompleted(copy, options);
+ }
}
void knownComplete(RangeSet kc)
@@ -484,6 +492,11 @@ public class Session extends SessionInvoker
synchronized (commands)
{
+ if (state == DETACHED && m.isUnreliable())
+ {
+ return;
+ }
+
if (state != OPEN && state != CLOSED)
{
Waiter w = new Waiter(commands, timeout);