summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-06-25 10:20:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-06-25 10:20:31 +0000
commit59b8d464a2a3b36f0985c10c057e14b284e3bc7c (patch)
tree113e07b9b6cb40181f74ae3e3fd032ea2815471a /qpid/java/broker/src
parente280e8fe6d8b5650f3e66e308047d8036ad941f7 (diff)
downloadqpid-python-59b8d464a2a3b36f0985c10c057e14b284e3bc7c.tar.gz
QPID-4946 : [Java Broker] closing the broker may result in same message being delivered to multipl competing consumers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496401 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java11
9 files changed, 64 insertions, 3 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index a539743081..b933d3f961 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -52,6 +52,13 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
public void close(final String replyText)
{
+ synchronized(this)
+ {
+ for(AMQConnectionModel conn : _registry)
+ {
+ conn.stop();
+ }
+ }
if (_logger.isDebugEnabled())
{
_logger.debug("Closing connection registry :" + _registry.size() + " connections.");
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index b52da3039d..e757898b69 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -89,4 +89,7 @@ public interface AMQConnectionModel extends StatisticsGatherer
Transport getTransport();
+ void stop();
+
+ boolean isStopped();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 92d6683415..e9b0fd9f10 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -190,6 +190,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Transport _transport;
private volatile boolean _closeWhenNoRoute;
+ private volatile boolean _stopped;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -1304,6 +1305,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _transport;
}
+ @Override
+ public void stop()
+ {
+ _stopped = true;
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
public long getLastReceivedTime()
{
return _lastReceivedTime;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 85b5177760..bf5f34e17a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -149,6 +149,8 @@ public class Connection_1_0 implements ConnectionEventListener
}
};
+ private volatile boolean _stopped;
+
@Override
public void close(AMQConstant cause, String message) throws AMQException
{
@@ -252,6 +254,18 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
+ public void stop()
+ {
+ _stopped = true;
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
+ @Override
public void initialiseStatistics()
{
_messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 0559f2ed94..44ee945548 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -163,7 +163,7 @@ class Subscription_1_0 implements Subscription
public boolean isSuspended()
{
- return !isActive();// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index f5b2dbbfec..29a8f4e27e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -463,7 +463,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
public boolean isSuspended()
{
- return !isActive() || _channel.isSuspended() || _deleted.get();
+ return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
}
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index b48082aade..2f237f0f3a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -198,7 +198,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public boolean isSuspended()
{
- return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
+ return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
}
public boolean hasInterest(QueueEntry entry)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index b49bd89266..cc28aba981 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -74,6 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private Principal _peerPrincipal;
private NetworkConnection _networkConnection;
private Transport _transport;
+ private volatile boolean _stopped;
public ServerConnection(final long connectionId)
{
@@ -169,6 +170,18 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return _transport;
}
+ @Override
+ public void stop()
+ {
+ _stopped = true;
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
public void setTransport(Transport transport)
{
_transport = transport;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 0bc20836f6..7f797afeda 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -553,5 +553,16 @@ public class MockSubscription implements Subscription
{
return null;
}
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return false;
+ }
}
}