summaryrefslogtreecommitdiff
path: root/qpid/java/common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/common/src/main')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java15
2 files changed, 34 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 5ae2f1ceb2..cdca726148 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -86,15 +86,15 @@ public class Connection extends ConnectionInvoker
public static interface SessionFactory
{
- Session newSession(Connection conn, Binary name, long expiry);
+ Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay);
}
private static final class DefaultSessionFactory implements SessionFactory
{
- public Session newSession(final Connection conn, final Binary name, final long expiry)
+ public Session newSession(final Connection conn, final Binary name, final long expiry, final boolean isNoReplay)
{
- return new Session(conn, name, expiry);
+ return new Session(conn, name, expiry, isNoReplay);
}
}
@@ -296,7 +296,12 @@ public class Connection extends ConnectionInvoker
public Session createSession(long expiry)
{
- return createSession(UUID.randomUUID().toString(), expiry);
+ return createSession(expiry, false);
+ }
+
+ public Session createSession(long expiry, boolean isNoReplay)
+ {
+ return createSession(UUID.randomUUID().toString(), expiry, isNoReplay);
}
public Session createSession(String name)
@@ -309,6 +314,11 @@ public class Connection extends ConnectionInvoker
return createSession(Strings.toUTF8(name), expiry);
}
+ public Session createSession(String name, long expiry,boolean isNoReplay)
+ {
+ return createSession(new Binary(Strings.toUTF8(name)), expiry, isNoReplay);
+ }
+
public Session createSession(byte[] name, long expiry)
{
return createSession(new Binary(name), expiry);
@@ -316,6 +326,11 @@ public class Connection extends ConnectionInvoker
public Session createSession(Binary name, long expiry)
{
+ return createSession(name, expiry, false);
+ }
+
+ public Session createSession(Binary name, long expiry, boolean isNoReplay)
+ {
synchronized (lock)
{
Waiter w = new Waiter(lock, timeout);
@@ -329,7 +344,7 @@ public class Connection extends ConnectionInvoker
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
- Session ssn = _sessionFactory.newSession(this, name, expiry);
+ Session ssn = _sessionFactory.newSession(this, name, expiry, isNoReplay);
registerSession(ssn);
map(ssn);
ssn.attach();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index d66415c659..8b29d6e424 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -131,19 +131,31 @@ public class Session extends SessionInvoker
private final Object stateLock = new Object();
private final AtomicBoolean _failoverRequired = new AtomicBoolean(false);
+ private boolean _isNoReplay = false;
protected Session(Connection connection, Binary name, long expiry)
{
this(connection, new SessionDelegate(), name, expiry);
}
+ protected Session(Connection connection, Binary name, long expiry, boolean noReplay)
+ {
+ this(connection, new SessionDelegate(), name, expiry, noReplay);
+ }
+
protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
+ this(connection, delegate, name, expiry,false);
+ }
+
+ protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry, boolean noReplay)
+ {
this.connection = connection;
this.delegate = delegate;
this.name = name;
this.expiry = expiry;
this.closing = false;
+ this._isNoReplay = noReplay;
initReceiver();
}
@@ -281,6 +293,7 @@ public class Session extends SessionInvoker
void resume()
{
_failoverRequired.set(false);
+
synchronized (commandsLock)
{
attach();
@@ -739,7 +752,7 @@ public class Session extends SessionInvoker
sessionCommandPoint(0, 0);
}
- boolean replayTransfer = !closing && !transacted &&
+ boolean replayTransfer = !_isNoReplay && !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();