diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-25 10:20:31 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-25 10:20:31 +0000 |
| commit | 59b8d464a2a3b36f0985c10c057e14b284e3bc7c (patch) | |
| tree | 113e07b9b6cb40181f74ae3e3fd032ea2815471a /qpid/java | |
| parent | e280e8fe6d8b5650f3e66e308047d8036ad941f7 (diff) | |
| download | qpid-python-59b8d464a2a3b36f0985c10c057e14b284e3bc7c.tar.gz | |
QPID-4946 : [Java Broker] closing the broker may result in same message being delivered to multipl competing consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496401 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
10 files changed, 137 insertions, 4 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index a539743081..b933d3f961 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -52,6 +52,13 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable public void close(final String replyText) { + synchronized(this) + { + for(AMQConnectionModel conn : _registry) + { + conn.stop(); + } + } if (_logger.isDebugEnabled()) { _logger.debug("Closing connection registry :" + _registry.size() + " connections."); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index b52da3039d..e757898b69 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -89,4 +89,7 @@ public interface AMQConnectionModel extends StatisticsGatherer Transport getTransport(); + void stop(); + + boolean isStopped(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 92d6683415..e9b0fd9f10 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -190,6 +190,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Transport _transport; private volatile boolean _closeWhenNoRoute; + private volatile boolean _stopped; public AMQProtocolEngine(Broker broker, NetworkConnection network, @@ -1304,6 +1305,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _transport; } + @Override + public void stop() + { + _stopped = true; + } + + @Override + public boolean isStopped() + { + return _stopped; + } + public long getLastReceivedTime() { return _lastReceivedTime; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 85b5177760..bf5f34e17a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -149,6 +149,8 @@ public class Connection_1_0 implements ConnectionEventListener } }; + private volatile boolean _stopped; + @Override public void close(AMQConstant cause, String message) throws AMQException { @@ -252,6 +254,18 @@ public class Connection_1_0 implements ConnectionEventListener } @Override + public void stop() + { + _stopped = true; + } + + @Override + public boolean isStopped() + { + return _stopped; + } + + @Override public void initialiseStatistics() { _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 0559f2ed94..44ee945548 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -163,7 +163,7 @@ class Subscription_1_0 implements Subscription public boolean isSuspended() { - return !isActive();// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index f5b2dbbfec..29a8f4e27e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -463,7 +463,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public boolean isSuspended() { - return !isActive() || _channel.isSuspended() || _deleted.get(); + return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); } /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index b48082aade..2f237f0f3a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -198,7 +198,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public boolean isSuspended() { - return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension + return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension } public boolean hasInterest(QueueEntry entry) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index b49bd89266..cc28aba981 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -74,6 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, private Principal _peerPrincipal; private NetworkConnection _networkConnection; private Transport _transport; + private volatile boolean _stopped; public ServerConnection(final long connectionId) { @@ -169,6 +170,18 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return _transport; } + @Override + public void stop() + { + _stopped = true; + } + + @Override + public boolean isStopped() + { + return _stopped; + } + public void setTransport(Transport transport) { _transport = transport; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 0bc20836f6..7f797afeda 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -553,5 +553,16 @@ public class MockSubscription implements Subscription { return null; } + + @Override + public void stop() + { + } + + @Override + public boolean isStopped() + { + return false; + } } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java index 2cd7520ae4..4a92728d82 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java @@ -23,6 +23,11 @@ package org.apache.qpid.test.unit.client.connection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.naming.NamingException; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.client.AMQConnection; @@ -45,6 +50,7 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase private Connection _connection; private boolean _isExternalBroker; private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener(); + private Session _session; @Override protected void setUp() throws Exception @@ -52,7 +58,7 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase super.setUp(); _connection = getConnection(); - _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); _connection.setExceptionListener(_recordingExceptionListener); _isExternalBroker = isExternalBroker(); @@ -140,4 +146,70 @@ public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase return _exception; } } + + + private class Listener implements MessageListener + { + int _messageCount; + + @Override + public synchronized void onMessage(Message message) + { + _messageCount++; + } + + public synchronized int getCount() + { + return _messageCount; + } + } + + public void testNoDeliveryAfterBrokerClose() throws JMSException, NamingException, InterruptedException + { + + Listener listener = new Listener(); + + Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer1 = session.createConsumer(getTestQueue()); + consumer1.setMessageListener(listener); + + MessageProducer producer = _session.createProducer(getTestQueue()); + producer.send(_session.createTextMessage("test message")); + + _connection.start(); + + + synchronized (listener) + { + long currentTime = System.currentTimeMillis(); + long until = currentTime + 2000l; + while(listener.getCount() == 0 && currentTime < until) + { + listener.wait(until - currentTime); + currentTime = System.currentTimeMillis(); + } + } + assertEquals(1, listener.getCount()); + + Connection connection2 = getConnection(); + Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(getTestQueue()); + consumer2.setMessageListener(listener); + connection2.start(); + + + Connection connection3 = getConnection(); + Session session3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer3 = session3.createConsumer(getTestQueue()); + consumer3.setMessageListener(listener); + connection3.start(); + + assertEquals(1, listener.getCount()); + + stopBroker(); + + assertEquals(1, listener.getCount()); + + + } } |
