summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
committerRafael H. Schloming <rhs@apache.org>2009-02-19 15:09:26 +0000
commit0ad1331e60191b01e46194beebd93aac3df211d4 (patch)
tree304ad25858e1131831930e219144dcc70b093eea /java/common
parent2195a9a0986f9a52c3fce0c5765fe5cf454e31b2 (diff)
downloadqpid-python-0ad1331e60191b01e46194beebd93aac3df211d4.tar.gz
QPID-1665: add a timer to ensure message acking is never delayed more than 1000 ms by default, this is configurable by qpid.session.max_ack_delay
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@745892 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/Composite.tpl1
-rw-r--r--java/common/Option.tpl1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Method.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java15
4 files changed, 27 insertions, 1 deletions
diff --git a/java/common/Composite.tpl b/java/common/Composite.tpl
index 17cf846d8c..c46d0a12cc 100644
--- a/java/common/Composite.tpl
+++ b/java/common/Composite.tpl
@@ -145,6 +145,7 @@ if options or base == "Method":
if base == "Method":
out(""" case SYNC: this.setSync(true); break;
case BATCH: this.setBatch(true); break;
+ case UNRELIABLE: this.setUnreliable(true); break;
""")
out(""" case NONE: break;
default: throw new IllegalArgumentException("invalid option: " + _options[i]);
diff --git a/java/common/Option.tpl b/java/common/Option.tpl
index 776b211ad5..c22b35b999 100644
--- a/java/common/Option.tpl
+++ b/java/common/Option.tpl
@@ -37,5 +37,6 @@ for c in composites:
options[option] = None
out(" $option,\n")}
BATCH,
+ UNRELIABLE,
NONE
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Method.java b/java/common/src/main/java/org/apache/qpid/transport/Method.java
index 09cfd119be..611c742fb1 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Method.java
@@ -48,6 +48,7 @@ public abstract class Method extends Struct implements ProtocolEvent
private boolean idSet = false;
private boolean sync = false;
private boolean batch = false;
+ private boolean unreliable = false;
public final int getId()
{
@@ -90,6 +91,16 @@ public abstract class Method extends Struct implements ProtocolEvent
this.batch = value;
}
+ public final boolean isUnreliable()
+ {
+ return unreliable;
+ }
+
+ final void setUnreliable(boolean value)
+ {
+ this.unreliable = value;
+ }
+
public abstract boolean hasPayload();
public Header getHeader()
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index f94edcc655..4079097f96 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -384,7 +384,15 @@ public class Session extends SessionInvoker
{
copy = processed.copy();
}
- sessionCompleted(copy, options);
+
+ synchronized (commands)
+ {
+ if (state == DETACHED)
+ {
+ return;
+ }
+ sessionCompleted(copy, options);
+ }
}
void knownComplete(RangeSet kc)
@@ -484,6 +492,11 @@ public class Session extends SessionInvoker
synchronized (commands)
{
+ if (state == DETACHED && m.isUnreliable())
+ {
+ return;
+ }
+
if (state != OPEN && state != CLOSED)
{
Waiter w = new Waiter(commands, timeout);