summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2013-04-29 16:45:19 +0000
committerRobert Gemmell <robbie@apache.org>2013-04-29 16:45:19 +0000
commitfe17c41b0b6b3e31ad575dfd2ff9a4b007650dbe (patch)
tree5b8ed6c3f9eed87a577968344e2fbfec35630db8 /java/broker
parentef5a0fae25e87b71c897f43e4a0a7ff3332b6959 (diff)
downloadqpid-python-fe17c41b0b6b3e31ad575dfd2ff9a4b007650dbe.tar.gz
QPID-4785: relax restrictions on editing/deleting active ports outwith management-mode
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1477190 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java96
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java36
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java3
4 files changed, 104 insertions, 42 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
index 71f5397d2b..1fc22c736c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
@@ -193,17 +193,6 @@ public class AmqpPortAdapter extends PortAdapter
return null;
}
- @Override
- protected void changeAttributes(Map<String, Object> attributes)
- {
- if (_transport != null)
- {
- throw new IllegalStateException("Port " + getAttribute(PORT)
- + " is already opened. Start broker in management mode to change a port");
- }
- super.changeAttributes(MapValueConverter.convert(attributes, ATTRIBUTE_TYPES));
- }
-
class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
{
private final InetSocketAddress _bindingSocketAddress;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index ad75aba132..3a94cf22f2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -160,7 +160,8 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
private StatisticsAdapter _statistics;
private final Map<String, VirtualHost> _vhostAdapters = new HashMap<String, VirtualHost>();
- private final Map<Integer, Port> _portAdapters = new HashMap<Integer, Port>();
+ private final Map<UUID, Port> _portAdapters = new HashMap<UUID, Port>();
+ private final Map<Port, Integer> _stillInUsePortNumbers = new HashMap<Port, Integer>();
private final Map<UUID, AuthenticationProvider> _authenticationProviders = new HashMap<UUID, AuthenticationProvider>();
private final Map<String, GroupProvider> _groupProviders = new HashMap<String, GroupProvider>();
private final Map<UUID, ConfiguredObject> _plugins = new HashMap<UUID, ConfiguredObject>();
@@ -445,20 +446,6 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
}
}
- private void addPort(Port port)
- {
- synchronized (_portAdapters)
- {
- int portNumber = port.getPort();
- if(_portAdapters.containsKey(portNumber))
- {
- throw new IllegalArgumentException("Cannot add port " + port + " because port number " + portNumber + " already configured");
- }
- _portAdapters.put(portNumber, port);
- }
- port.addChangeListener(this);
- }
-
/**
* Called when adding a new port via the management interface
*/
@@ -467,15 +454,49 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
Port port = _portFactory.createPort(UUID.randomUUID(), this, attributes);
addPort(port);
- //AMQP ports are disable during ManagementMode, and the management
- //plugins can currently only start ports at broker startup and
- //not when they are newly created via the management interfaces.
- boolean quiesce = isManagementMode() || !(port instanceof AmqpPortAdapter);
+ //1. AMQP ports are disabled during ManagementMode.
+ //2. The management plugins can currently only start ports at broker startup and
+ // not when they are newly created via the management interfaces.
+ //3. When active ports are deleted, or their port numbers updated, the broker must be
+ // restarted for it to take effect so we can't reuse port numbers until it is.
+ boolean quiesce = isManagementMode() || !(port instanceof AmqpPortAdapter) || isPreviouslyUsedPortNumber(port);
+
port.setDesiredState(State.INITIALISING, quiesce ? State.QUIESCED : State.ACTIVE);
return port;
}
+ private void addPort(Port port)
+ {
+ synchronized (_portAdapters)
+ {
+ int portNumber = port.getPort();
+ String portName = port.getName();
+ UUID portId = port.getId();
+
+ for(Port p : _portAdapters.values())
+ {
+ if(portNumber == p.getPort())
+ {
+ throw new IllegalConfigurationException("Can't add port " + portName + " because port number " + portNumber + " is already configured for port " + p.getName());
+ }
+
+ if(portName == p.getName())
+ {
+ throw new IllegalConfigurationException("Can't add Port because one with name " + portName + " already exists");
+ }
+
+ if(portId == p.getId())
+ {
+ throw new IllegalConfigurationException("Can't add Port because one with id " + portId + " already exists");
+ }
+ }
+
+ _portAdapters.put(port.getId(), port);
+ }
+ port.addChangeListener(this);
+ }
+
private AccessControlProvider createAccessControlProvider(Map<String, Object> attributes)
{
AccessControlProvider accessControlProvider = null;
@@ -771,17 +792,24 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
return super.getAttribute(name);
}
- private boolean deletePort(Port portAdapter)
+ private boolean deletePort(State oldState, Port portAdapter)
{
Port removedPort = null;
synchronized (_portAdapters)
{
- removedPort = _portAdapters.remove(portAdapter.getPort());
+ removedPort = _portAdapters.remove(portAdapter.getId());
}
if (removedPort != null)
{
removedPort.removeChangeListener(this);
+
+ if(oldState == State.ACTIVE)
+ {
+ //Record the originally used port numbers of previously-active ports being deleted, to ensure
+ //when creating new ports we don't try to re-bind a port number that we are currently still using
+ recordPreviouslyUsedPortNumberIfNecessary(removedPort, removedPort.getPort());
+ }
}
return removedPort != null;
@@ -907,7 +935,7 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
}
else if(object instanceof Port)
{
- childDeleted = deletePort((Port)object);
+ childDeleted = deletePort(oldState, (Port)object);
}
else if(object instanceof VirtualHost)
{
@@ -948,7 +976,15 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
@Override
public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
{
- // no-op
+ if(object instanceof Port)
+ {
+ //Record all the originally used port numbers of active ports, to ensure that when
+ //creating new ports we don't try to re-bind a port number that we are still using
+ if(attributeName == Port.PORT && object.getActualState() == State.ACTIVE)
+ {
+ recordPreviouslyUsedPortNumberIfNecessary((Port) object, (Integer)oldAttributeValue);
+ }
+ }
}
private void addPlugin(ConfiguredObject plugin)
@@ -1193,4 +1229,18 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
return new ArrayList<AccessControlProvider>(_accessControlProviders.values());
}
}
+
+ private void recordPreviouslyUsedPortNumberIfNecessary(Port port, Integer portNumber)
+ {
+ //If we haven't previously recorded its original port number, record it now
+ if(!_stillInUsePortNumbers.containsKey(port))
+ {
+ _stillInUsePortNumbers.put(port, portNumber);
+ }
+ }
+
+ private boolean isPreviouslyUsedPortNumber(Port port)
+ {
+ return _stillInUsePortNumbers.containsValue(port.getPort());
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
index af6b1421a7..388427678e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
@@ -309,7 +309,7 @@ public class PortAdapter extends AbstractAdapter implements Port
State state = _state.get();
if (desiredState == State.DELETED)
{
- if (state == State.STOPPED || state == State.QUIESCED)
+ if (state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED)
{
return _state.compareAndSet(state, State.DELETED);
}
@@ -322,7 +322,15 @@ public class PortAdapter extends AbstractAdapter implements Port
{
if ((state == State.INITIALISING || state == State.QUIESCED) && _state.compareAndSet(state, State.ACTIVE))
{
- onActivate();
+ try
+ {
+ onActivate();
+ }
+ catch(RuntimeException e)
+ {
+ _state.compareAndSet(State.ACTIVE, state);
+ throw e;
+ }
return true;
}
else
@@ -371,15 +379,27 @@ public class PortAdapter extends AbstractAdapter implements Port
@Override
protected void changeAttributes(Map<String, Object> attributes)
{
- if (getActualState() == State.ACTIVE && !_broker.isManagementMode())
+ Map<String, Object> converted = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+
+ Map<String, Object> merged = generateEffectiveAttributes(converted);
+
+ String newName = (String) merged.get(NAME);
+ if(!getName().equals(newName))
{
- throw new IllegalStateException("Cannot change attributes for an active port outside of Management Mode");
+ throw new IllegalConfigurationException("Changing the port name is not allowed");
}
- Map<String, Object> converted = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
- Map<String, Object> merged = new HashMap<String, Object>(getDefaultAttributes());
- merged.putAll(getActualAttributes());
- merged.putAll(converted);
+ Integer newPort = (Integer) merged.get(PORT);
+ if(getPort() != newPort)
+ {
+ for(Port p : _broker.getPorts())
+ {
+ if(p.getPort() == newPort)
+ {
+ throw new IllegalConfigurationException("Port number " + newPort + " is already in use by port " + p.getName());
+ }
+ }
+ }
@SuppressWarnings("unchecked")
Collection<Transport> transports = (Collection<Transport>)merged.get(TRANSPORTS);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
index 64b432f471..758eb62809 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.TrustStore;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AccessControlProviderFactory;
import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory;
@@ -196,11 +197,13 @@ public class BrokerRecovererTest extends TestCase
//Add a couple ports
ConfigurationEntry portEntry1 = mock(ConfigurationEntry.class);
Port port1 = mock(Port.class);
+ when(port1.getId()).thenReturn(UUIDGenerator.generateRandomUUID());
when(port1.getName()).thenReturn("port1");
when(port1.getPort()).thenReturn(5671);
when(port1.getAttribute(Port.AUTHENTICATION_PROVIDER)).thenReturn("authenticationProvider1");
ConfigurationEntry portEntry2 = mock(ConfigurationEntry.class);
Port port2 = mock(Port.class);
+ when(port2.getId()).thenReturn(UUIDGenerator.generateRandomUUID());
when(port2.getName()).thenReturn("port2");
when(port2.getPort()).thenReturn(5672);
when(port2.getAttribute(Port.AUTHENTICATION_PROVIDER)).thenReturn("authenticationProvider2");