diff options
Diffstat (limited to 'qpid/java/common/src/main')
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java | 25 | ||||
| -rw-r--r-- | qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java | 15 |
2 files changed, 34 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 5ae2f1ceb2..cdca726148 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -86,15 +86,15 @@ public class Connection extends ConnectionInvoker public static interface SessionFactory { - Session newSession(Connection conn, Binary name, long expiry); + Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay); } private static final class DefaultSessionFactory implements SessionFactory { - public Session newSession(final Connection conn, final Binary name, final long expiry) + public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay) { - return new Session(conn, name, expiry); + return new Session(conn, name, expiry, isNoReplay); } } @@ -296,7 +296,12 @@ public class Connection extends ConnectionInvoker public Session createSession(long expiry) { - return createSession(UUID.randomUUID().toString(), expiry); + return createSession(expiry, false); + } + + public Session createSession(long expiry, boolean isNoReplay) + { + return createSession(UUID.randomUUID().toString(), expiry, isNoReplay); } public Session createSession(String name) @@ -309,6 +314,11 @@ public class Connection extends ConnectionInvoker return createSession(Strings.toUTF8(name), expiry); } + public Session createSession(String name, long expiry,boolean isNoReplay) + { + return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay); + } + public Session createSession(byte[] name, long expiry) { return createSession(new Binary(name), expiry); @@ -316,6 +326,11 @@ public class Connection extends ConnectionInvoker public Session createSession(Binary name, long expiry) { + return createSession(name, expiry, false); + } + + public Session createSession(Binary name, long expiry, boolean isNoReplay) + { synchronized (lock) { Waiter w = new Waiter(lock, timeout); @@ -329,7 +344,7 @@ public class Connection extends ConnectionInvoker throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); } - Session ssn = _sessionFactory.newSession(this, name, expiry); + Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay); registerSession(ssn); map(ssn); ssn.attach(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index d66415c659..8b29d6e424 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -131,19 +131,31 @@ public class Session extends SessionInvoker private final Object stateLock = new Object(); private final AtomicBoolean _failoverRequired = new AtomicBoolean(false); + private boolean _isNoReplay = false; protected Session(Connection connection, Binary name, long expiry) { this(connection, new SessionDelegate(), name, expiry); } + protected Session(Connection connection, Binary name, long expiry, boolean noReplay) + { + this(connection, new SessionDelegate(), name, expiry, noReplay); + } + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { + this(connection, delegate, name, expiry,false); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay) + { this.connection = connection; this.delegate = delegate; this.name = name; this.expiry = expiry; this.closing = false; + this._isNoReplay = noReplay; initReceiver(); } @@ -281,6 +293,7 @@ public class Session extends SessionInvoker void resume() { _failoverRequired.set(false); + synchronized (commandsLock) { attach(); @@ -739,7 +752,7 @@ public class Session extends SessionInvoker sessionCommandPoint(0, 0); } - boolean replayTransfer = !closing && !transacted && + boolean replayTransfer = !_isNoReplay && !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); |
