diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2013-01-25 21:32:42 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2013-01-25 21:32:42 +0000 |
| commit | 1a3413543de501ccafde8a39884d460dea764f9e (patch) | |
| tree | e436a92a75bec3ac0d83eae063a5f672835cd385 /qpid/java/common/src | |
| parent | e7ad0e742e5bd214bb750484047fde9bf434e1eb (diff) | |
| download | qpid-python-1a3413543de501ccafde8a39884d460dea764f9e.tar.gz | |
QPID-4541 Added the ability to turn off replay at the time the session
is created. The XASessionImpl will use this feature to turn off replay
on the underlying JMS session. This prevents messages being replayed
outside the boundaries of the XA transaction.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1438725 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java | 25 | ||||
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 15 |
2 files changed, 34 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 5ae2f1ceb2..cdca726148 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -86,15 +86,15 @@ public class Connection extends ConnectionInvoker public static interface SessionFactory { - Session newSession(Connection conn, Binary name, long expiry); + Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay); } private static final class DefaultSessionFactory implements SessionFactory { - public Session newSession(final Connection conn, final Binary name, final long expiry) + public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay) { - return new Session(conn, name, expiry); + return new Session(conn, name, expiry, isNoReplay); } } @@ -296,7 +296,12 @@ public class Connection extends ConnectionInvoker public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), expiry); + return createSession(expiry, false); + } + + public Session createSession(long expiry, boolean isNoReplay) + { + return createSession(UUID.randomUUID().toString(), expiry, isNoReplay); } public Session createSession(String name) @@ -309,6 +314,11 @@ public class Connection extends ConnectionInvoker return createSession(Strings.toUTF8(name), expiry); } + public Session createSession(String name, long expiry,boolean isNoReplay) + { + return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay); + } + public Session createSession(byte[] name, long expiry) { return createSession(new Binary(name), expiry); @@ -316,6 +326,11 @@ public class Connection extends ConnectionInvoker public Session createSession(Binary name, long expiry) { + return createSession(name, expiry, false); + } + + public Session createSession(Binary name, long expiry, boolean isNoReplay) + { synchronized (lock) { Waiter w = new Waiter(lock, timeout); @@ -329,7 +344,7 @@ public class Connection extends ConnectionInvoker throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } - Session ssn = _sessionFactory.newSession(this, name, expiry); + Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay); registerSession(ssn); map(ssn); ssn.attach(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index d66415c659..8b29d6e424 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -131,19 +131,31 @@ public class Session extends SessionInvoker private final Object stateLock = new Object(); private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + private boolean _isNoReplay = false; protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); } + protected Session(Connection connection, Binary name, long expiry, boolean noReplay) + { + this(connection, new SessionDelegate(), name, expiry, noReplay); + } + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { + this(connection, delegate, name, expiry,false); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay) + { this.connection = connection; this.delegate = delegate; this.name = name; this.expiry = expiry; this.closing = false; + this._isNoReplay = noReplay; initReceiver(); } @@ -281,6 +293,7 @@ public class Session extends SessionInvoker void resume() { _failoverRequired.set(false); + synchronized (commandsLock) { attach(); @@ -739,7 +752,7 @@ public class Session extends SessionInvoker sessionCommandPoint(0, 0); } - boolean replayTransfer = !closing && !transacted && + boolean replayTransfer = !_isNoReplay && !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); |
