summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-common
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-17 20:48:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-17 20:48:05 +0000
commit500491f93c105e109f5933b5d33a12b45bc4b272 (patch)
tree821657484821ee7aa97102d1ed643f8380d98ec1 /qpid/java/amqp-1-0-common
parentb13c1f50dc34854d0b348aed1fcd24c63e58c938 (diff)
downloadqpid-python-500491f93c105e109f5933b5d33a12b45bc4b272.tar.gz
QPID-5555 : [Java Broker] Modify implementation of Queues to provide better notions of exclusivity and lifetime
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569109 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-common')
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java2
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java36
2 files changed, 33 insertions, 5 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 270abef88b..1556876681 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -563,7 +563,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
{
_receivingSessions[channel] = null;
- endpoint.end(end);
+ endpoint.receiveEnd(end);
}
else
{
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index c37c52c6ea..c9212b1a1e 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -153,19 +153,47 @@ public class SessionEndpoint
public void end()
{
- end(null);
+ end(new End());
}
- public void end(final End end)
+ public void end(End end)
{
synchronized(getLock())
{
switch(_state)
{
case BEGIN_SENT:
- _connection.sendEnd(getSendingChannel(), new End(), false);
+ _connection.sendEnd(getSendingChannel(), end, false);
_state = SessionState.END_PIPE;
break;
+ case ACTIVE:
+ detachLinks();
+ short sendChannel = getSendingChannel();
+ _connection.sendEnd(sendChannel, end, true);
+ _state = SessionState.END_SENT;
+ break;
+ default:
+ sendChannel = getSendingChannel();
+ End reply = new End();
+ Error error = new Error();
+ error.setCondition(AmqpError.ILLEGAL_STATE);
+ error.setDescription("END called on Session which has not been opened");
+ reply.setError(error);
+ _connection.sendEnd(sendChannel, reply, true);
+ break;
+
+
+ }
+ getLock().notifyAll();
+ }
+ }
+
+ public void receiveEnd(final End end)
+ {
+ synchronized(getLock())
+ {
+ switch(_state)
+ {
case END_SENT:
_state = SessionState.ENDED;
break;
@@ -174,7 +202,7 @@ public class SessionEndpoint
_sessionEventListener.remoteEnd(end);
short sendChannel = getSendingChannel();
_connection.sendEnd(sendChannel, new End(), true);
- _state = end == null ? SessionState.END_SENT : SessionState.ENDED;
+ _state = SessionState.ENDED;
break;
default:
sendChannel = getSendingChannel();