diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-21 19:38:11 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-21 19:38:11 +0000 |
| commit | 13388a4edc23b812a9a14c7a8d71568acf6f9787 (patch) | |
| tree | 07fbfbd94c17dd7950a3b15cbdd03c8c887092d0 /qpid/java | |
| parent | 244a77488e83190e190fd8733613d1bde8227e7d (diff) | |
| download | qpid-python-13388a4edc23b812a9a14c7a8d71568acf6f9787.tar.gz | |
QPID-3595 : Python Alternate Exchange tests fail against the Java Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1291964 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
5 files changed, 41 insertions, 12 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index a957ef84bf..28d8cb2ec7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -231,23 +231,26 @@ public class ServerConnectionDelegate extends ServerDelegate @Override public void sessionDetach(Connection conn, SessionDetach dtc) { - // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures - // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the stop // completes. - unregisterAllSubscriptions(conn, dtc); + stopAllSubscriptions(conn, dtc); + Session ssn = conn.getSession(dtc.getChannel()); + ((ServerSession)ssn).setClose(true); super.sessionDetach(conn, dtc); } - private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + private void stopAllSubscriptions(Connection conn, SessionDetach dtc) { final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subs) { - ssn.unregister(subscription_0_10); + subscription_0_10.stop(); } } + @Override public void sessionAttach(final Connection conn, final SessionAttach atc) { @@ -305,4 +308,4 @@ public class ServerConnectionDelegate extends ServerDelegate { return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.VERSION_0_10); } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index e4268ed2dc..c32f642eea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -704,7 +704,7 @@ public class ServerSession extends Session { if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) { - if(_blocking.compareAndSet(true,false)) + if(_blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); @@ -759,6 +759,16 @@ public class ServerSession extends Session } } + void stopSubscriptions() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.stop(); + } + } + + public void receivedComplete() { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); @@ -900,6 +910,12 @@ public class ServerSession extends Session } } + + protected void setClose(boolean close) + { + super.setClose(close); + } + @Override public int compareTo(AMQSessionModel session) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index f16c790332..ce4153a8ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -533,7 +533,18 @@ public class ServerSessionDelegate extends SessionDelegate { if(!exchange.getTypeShortString().toString().equals(method.getType())) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +"."); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeShortString() + + " to " + method.getType() +"."); + } + else if(method.hasAlternateExchange() + && !(method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); } } @@ -1302,8 +1313,9 @@ public class ServerSessionDelegate extends SessionDelegate ServerSession serverSession = (ServerSession)session; - serverSession.unregisterSubscriptions(); + serverSession.stopSubscriptions(); serverSession.onClose(); + serverSession.unregisterSubscriptions(); } @Override diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index d450746eaa..0cdffeee80 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -161,7 +161,7 @@ public class Session extends SessionInvoker this.expiry = expiry; } - void setClose(boolean close) + protected void setClose(boolean close) { this.closing = close; } diff --git a/qpid/java/test-profiles/python_tests/Java010PythonExcludes b/qpid/java/test-profiles/python_tests/Java010PythonExcludes index 37ae90395c..11ddc1cda9 100644 --- a/qpid/java/test-profiles/python_tests/Java010PythonExcludes +++ b/qpid/java/test-profiles/python_tests/Java010PythonExcludes @@ -64,8 +64,6 @@ qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward #QPID-3595 Alternate Exchanges support requires work to be spec compliant. -qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_modify_existing_exchange_alternate -qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_autodelete qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_no_match qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_match qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange |
