summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2013-01-25 21:32:42 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2013-01-25 21:32:42 +0000
commit1a3413543de501ccafde8a39884d460dea764f9e (patch)
treee436a92a75bec3ac0d83eae063a5f672835cd385 /qpid/java/common/src
parente7ad0e742e5bd214bb750484047fde9bf434e1eb (diff)
downloadqpid-python-1a3413543de501ccafde8a39884d460dea764f9e.tar.gz
QPID-4541 Added the ability to turn off replay at the time the session
is created. The XASessionImpl will use this feature to turn off replay on the underlying JMS session. This prevents messages being replayed outside the boundaries of the XA transaction. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1438725 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java15
2 files changed, 34 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 5ae2f1ceb2..cdca726148 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -86,15 +86,15 @@ public class Connection extends ConnectionInvoker
public static interface SessionFactory
{
- Session newSession(Connection conn, Binary name, long expiry);
+ Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay);
}
private static final class DefaultSessionFactory implements SessionFactory
{
- public Session newSession(final Connection conn, final Binary name, final long expiry)
+ public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay)
{
- return new Session(conn, name, expiry);
+ return new Session(conn, name, expiry, isNoReplay);
}
}
@@ -296,7 +296,12 @@ public class Connection extends ConnectionInvoker
public Session createSession(long expiry)
{
- return createSession(UUID.randomUUID().toString(), expiry);
+ return createSession(expiry, false);
+ }
+
+ public Session createSession(long expiry, boolean isNoReplay)
+ {
+ return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
}
public Session createSession(String name)
@@ -309,6 +314,11 @@ public class Connection extends ConnectionInvoker
return createSession(Strings.toUTF8(name), expiry);
}
+ public Session createSession(String name, long expiry,boolean isNoReplay)
+ {
+ return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay);
+ }
+
public Session createSession(byte[] name, long expiry)
{
return createSession(new Binary(name), expiry);
@@ -316,6 +326,11 @@ public class Connection extends ConnectionInvoker
public Session createSession(Binary name, long expiry)
{
+ return createSession(name, expiry, false);
+ }
+
+ public Session createSession(Binary name, long expiry, boolean isNoReplay)
+ {
synchronized (lock)
{
Waiter w = new Waiter(lock, timeout);
@@ -329,7 +344,7 @@ public class Connection extends ConnectionInvoker
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
- Session ssn = _sessionFactory.newSession(this, name, expiry);
+ Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay);
registerSession(ssn);
map(ssn);
ssn.attach();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index d66415c659..8b29d6e424 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -131,19 +131,31 @@ public class Session extends SessionInvoker
private final Object stateLock = new Object();
private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+ private boolean _isNoReplay = false;
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
}
+ protected Session(Connection connection, Binary name, long expiry, boolean noReplay)
+ {
+ this(connection, new SessionDelegate(), name, expiry, noReplay);
+ }
+
protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
+ this(connection, delegate, name, expiry,false);
+ }
+
+ protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay)
+ {
this.connection = connection;
this.delegate = delegate;
this.name = name;
this.expiry = expiry;
this.closing = false;
+ this._isNoReplay = noReplay;
initReceiver();
}
@@ -281,6 +293,7 @@ public class Session extends SessionInvoker
void resume()
{
_failoverRequired.set(false);
+
synchronized (commandsLock)
{
attach();
@@ -739,7 +752,7 @@ public class Session extends SessionInvoker
sessionCommandPoint(0, 0);
}
- boolean replayTransfer = !closing && !transacted &&
+ boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();