diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-17 20:48:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-17 20:48:05 +0000 |
| commit | 500491f93c105e109f5933b5d33a12b45bc4b272 (patch) | |
| tree | 821657484821ee7aa97102d1ed643f8380d98ec1 /qpid/java/amqp-1-0-common | |
| parent | b13c1f50dc34854d0b348aed1fcd24c63e58c938 (diff) | |
| download | qpid-python-500491f93c105e109f5933b5d33a12b45bc4b272.tar.gz | |
QPID-5555 : [Java Broker] Modify implementation of Queues to provide better notions of exclusivity and lifetime
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569109 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-common')
2 files changed, 33 insertions, 5 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 270abef88b..1556876681 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -563,7 +563,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { _receivingSessions[channel] = null; - endpoint.end(end); + endpoint.receiveEnd(end); } else { diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index c37c52c6ea..c9212b1a1e 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -153,19 +153,47 @@ public class SessionEndpoint public void end() { - end(null); + end(new End()); } - public void end(final End end) + public void end(End end) { synchronized(getLock()) { switch(_state) { case BEGIN_SENT: - _connection.sendEnd(getSendingChannel(), new End(), false); + _connection.sendEnd(getSendingChannel(), end, false); _state = SessionState.END_PIPE; break; + case ACTIVE: + detachLinks(); + short sendChannel = getSendingChannel(); + _connection.sendEnd(sendChannel, end, true); + _state = SessionState.END_SENT; + break; + default: + sendChannel = getSendingChannel(); + End reply = new End(); + Error error = new Error(); + error.setCondition(AmqpError.ILLEGAL_STATE); + error.setDescription("END called on Session which has not been opened"); + reply.setError(error); + _connection.sendEnd(sendChannel, reply, true); + break; + + + } + getLock().notifyAll(); + } + } + + public void receiveEnd(final End end) + { + synchronized(getLock()) + { + switch(_state) + { case END_SENT: _state = SessionState.ENDED; break; @@ -174,7 +202,7 @@ public class SessionEndpoint _sessionEventListener.remoteEnd(end); short sendChannel = getSendingChannel(); _connection.sendEnd(sendChannel, new End(), true); - _state = end == null ? SessionState.END_SENT : SessionState.ENDED; + _state = SessionState.ENDED; break; default: sendChannel = getSendingChannel(); |
