diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 19:41:37 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-17 19:41:37 +0000 |
| commit | 9006d5e2237751a43b8d531980e9b1499acb96d6 (patch) | |
| tree | cebe936d36a7a7d41112dc475e70daccb2c5a250 | |
| parent | 7d2471990e0271d4714cf493643be67165d03834 (diff) | |
| download | qpid-python-9006d5e2237751a43b8d531980e9b1499acb96d6.tar.gz | |
QPID-6165 : [Java Broker] Allow the number of open connections to be limited on a per port basis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1632655 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 540 insertions, 41 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java new file mode 100644 index 0000000000..98708270ab --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/PortMessages.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.messages; + +import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.logging.LogMessage; + +import java.text.MessageFormat; +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED. + * + * Generated using GenerateLogMessages and LogMessages.vm + * This file is based on the content of Port_logmessages.properties + * + * To regenerate, edit the templates/properties and run the build with -Dgenerate=true + */ +public class PortMessages +{ + private static ResourceBundle _messages; + private static Locale _currentLocale = BrokerProperties.getLocale(); + + public static final String PORT_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port"; + public static final String OPEN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.open"; + public static final String CREATE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.create"; + public static final String CLOSE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.close"; + public static final String CONNECTION_REJECTED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_rejected"; + public static final String CONNECTION_COUNT_WARN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "port.connection_count_warn"; + + static + { + Logger.getLogger(PORT_LOG_HIERARCHY); + Logger.getLogger(OPEN_LOG_HIERARCHY); + Logger.getLogger(CREATE_LOG_HIERARCHY); + Logger.getLogger(CLOSE_LOG_HIERARCHY); + Logger.getLogger(CONNECTION_REJECTED_LOG_HIERARCHY); + Logger.getLogger(CONNECTION_COUNT_WARN_LOG_HIERARCHY); + + _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Port_logmessages", _currentLocale); + } + + /** + * Log a Port message of the Format: + * <pre>PRT-1002 : Open</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage OPEN() + { + String rawMessage = _messages.getString("OPEN"); + + final String message = rawMessage; + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return OPEN_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Port message of the Format: + * <pre>PRT-1001 : Create</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage CREATE() + { + String rawMessage = _messages.getString("CREATE"); + + final String message = rawMessage; + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return CREATE_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Port message of the Format: + * <pre>PRT-1003 : Close</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage CLOSE() + { + String rawMessage = _messages.getString("CLOSE"); + + final String message = rawMessage; + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return CLOSE_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Port message of the Format: + * <pre>PRT-1005 : Connection from {0} reject as connection limit reached</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage CONNECTION_REJECTED(String param1) + { + String rawMessage = _messages.getString("CONNECTION_REJECTED"); + + final Object[] messageArguments = {param1}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return CONNECTION_REJECTED_LOG_HIERARCHY; + } + }; + } + + /** + * Log a Port message of the Format: + * <pre>PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number}</pre> + * Optional values are contained in [square brackets] and are numbered + * sequentially in the method call. + * + */ + public static LogMessage CONNECTION_COUNT_WARN(Number param1, Number param2, Number param3) + { + String rawMessage = _messages.getString("CONNECTION_COUNT_WARN"); + + final Object[] messageArguments = {param1, param2, param3}; + // Create a new MessageFormat to ensure thread safety. + // Sharing a MessageFormat and using applyPattern is not thread safe + MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale); + + final String message = formatter.format(messageArguments); + + return new LogMessage() + { + public String toString() + { + return message; + } + + public String getLogHierarchy() + { + return CONNECTION_COUNT_WARN_LOG_HIERARCHY; + } + }; + } + + + private PortMessages() + { + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties new file mode 100644 index 0000000000..6d86a29edc --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Port_logmessages.properties @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# Default File used for all non-defined locales. + +CREATE = PRT-1001 : Create +OPEN = PRT-1002 : Open +CLOSE = PRT-1003 : Close +# 0 - flow +CONNECTION_COUNT_WARN = PRT-1004 : Connection count {0,number} within {1, number} % of maximum {2,number} +CONNECTION_REJECTED = PRT-1005 : Connection from {0} rejected diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java index d59a09fce9..304ddb6a3d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/LogSubjectFormat.java @@ -116,4 +116,11 @@ public class LogSubjectFormat */ public static final String QUEUE_FORMAT = "vh(/{0})/qu({1})"; + + /** + * LOG FORMAT for the Port LogSubject, + * 0 - Port number + */ + public static final String PORT_FORMAT = "port({0})"; + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java new file mode 100644 index 0000000000..527838314b --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/PortLogSubject.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.PORT_FORMAT; + +import org.apache.qpid.server.model.Port; + +public class PortLogSubject extends AbstractLogSubject +{ + public PortLogSubject(Port<?> port) + { + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ChannelLogSubject.CHANNEL_FORMAT : con:{0}({1}@{2}/{3})/ch:{4}. + * + * Uses a MessageFormat call to insert the required values according to + * these indices: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + * 4 - Channel ID + */ + setLogStringWithFormat(PORT_FORMAT, port.getPort()); + + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java index ade9055b5a..7acd4aa1fa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.model.port; +import java.net.SocketAddress; import java.util.Map; import java.util.Set; @@ -27,6 +28,7 @@ import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedStatistic; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHostAlias; @@ -41,14 +43,28 @@ public interface AmqpPort<X extends AmqpPort<X>> extends ClientAuthCapablePort<X String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false"; String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false"; + String SEND_BUFFER_SIZE = "sendBufferSize"; String RECEIVE_BUFFER_SIZE = "receiveBufferSize"; + String MAX_OPEN_CONNECTIONS = "maxOpenConnections"; + String DEFAULT_AMQP_PROTOCOLS = "qpid.port.default_amqp_protocols"; @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS) String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString(); + String PORT_MAX_OPEN_CONNECTIONS = "qpid.port.max_open_connections"; + + @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS) + int DEFAULT_MAX_OPEN_CONNECTIONS = -1; + + String OPEN_CONNECTIONS_WARN_PERCENT = "qpid.port.open_connections_warn_percent"; + + @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT) + int DEFAULT_OPEN_CONNECTIONS_WARN_PERCENT = 80; + + @ManagedAttribute(defaultValue = "*") String getBindingAddress(); @@ -79,7 +95,19 @@ public interface AmqpPort<X extends AmqpPort<X>> extends ClientAuthCapablePort<X @ManagedAttribute( defaultValue = "${" + DEFAULT_AMQP_PROTOCOLS + "}", validValues = {"org.apache.qpid.server.model.port.AmqpPortImpl#getAllAvailableProtocolCombinations()"} ) Set<Protocol> getProtocols(); + @ManagedAttribute( defaultValue = "${" + PORT_MAX_OPEN_CONNECTIONS + "}" ) + int getMaxOpenConnections(); + + @ManagedStatistic + int getConnectionCount(); + VirtualHostImpl getVirtualHost(String name); + boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress); + + int incrementConnectionCount(); + + int decrementConnectionCount(); + VirtualHostAlias createVirtualHostAlias(Map<String, Object> attributes); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index fd8c402344..43cb5f0c62 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.model.port; import java.io.IOException; import java.io.StringWriter; +import java.net.SocketAddress; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +31,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.TreeSet; import javax.net.ssl.KeyManager; @@ -42,6 +45,8 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.messages.BrokerMessages; +import org.apache.qpid.server.logging.messages.PortMessages; +import org.apache.qpid.server.logging.subjects.PortLogSubject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.DefaultVirtualHostAlias; import org.apache.qpid.server.model.HostNameAlias; @@ -105,6 +110,12 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< @ManagedAttributeField private String _bindingAddress; + @ManagedAttributeField + private int _maxOpenConnections; + + private final AtomicInteger _connectionCount = new AtomicInteger(); + private final AtomicBoolean _connectionCountWarningGiven = new AtomicBoolean(); + private final Broker<?> _broker; private AcceptingTransport _transport; @@ -141,6 +152,12 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< } @Override + public int getMaxOpenConnections() + { + return _maxOpenConnections; + } + + @Override protected void onCreate() { super.onCreate(); @@ -452,4 +469,54 @@ public class AmqpPortImpl extends AbstractClientAuthCapablePortWithAuthProvider< throw new ServerScopedRuntimeException(e); } } + + @Override + public int incrementConnectionCount() + { + int openConnections = _connectionCount.incrementAndGet(); + int maxOpenConnections = getMaxOpenConnections(); + if(maxOpenConnections > 0 + && openConnections > (maxOpenConnections * getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT)) / 100 + && _connectionCountWarningGiven.compareAndSet(false, true)) + { + _broker.getEventLogger().message(new PortLogSubject(this), + PortMessages.CONNECTION_COUNT_WARN(openConnections, + getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT), + maxOpenConnections)); + } + return openConnections; + } + + @Override + public int decrementConnectionCount() + { + + int openConnections = _connectionCount.decrementAndGet(); + int maxOpenConnections = getMaxOpenConnections(); + + if(maxOpenConnections > 0 + && openConnections < (maxOpenConnections * square(getContextValue(Integer.class, OPEN_CONNECTIONS_WARN_PERCENT))) / 10000) + { + _connectionCountWarningGiven.compareAndSet(true,false); + } + + return openConnections; + } + + private static int square(int val) + { + return val * val; + } + + @Override + public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress) + { + return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections; + } + + @Override + public int getConnectionCount() + { + return _connectionCount.get(); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index ef45971858..dd5e01ebc5 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -33,12 +33,13 @@ import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.auth.Subject; import org.apache.log4j.Logger; + import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -55,24 +56,31 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private final SSLContext _sslContext; private final boolean _wantClientAuth; private final boolean _needClientAuth; - private final Port _port; + private final AmqpPort<?> _port; private final Transport _transport; private final ProtocolEngineCreator[] _creators; + private final Runnable _onCloseTask; private Set<Protocol> _supported; private String _fqdn; - private final Broker _broker; + private final Broker<?> _broker; private NetworkConnection _network; private Sender<ByteBuffer> _sender; private final Protocol _defaultSupportedReply; private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine(); - public MultiVersionProtocolEngine(final Broker broker, - SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth, + public MultiVersionProtocolEngine(final Broker<?> broker, + SSLContext sslContext, + boolean wantClientAuth, + boolean needClientAuth, final Set<Protocol> supported, final Protocol defaultSupportedReply, - Port port, Transport transport, final long id, ProtocolEngineCreator[] creators) + AmqpPort<?> port, + Transport transport, + final long id, + ProtocolEngineCreator[] creators, + final Runnable onCloseTask) { if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply)) { @@ -90,6 +98,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _port = port; _transport = transport; _creators = creators; + _onCloseTask = onCloseTask; } @@ -115,7 +124,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void closed() { - _delegate.closed(); + try + { + _delegate.closed(); + } + finally + { + if(_onCloseTask != null) + { + _onCloseTask.run(); + } + } } public void writerIdle() @@ -477,7 +496,8 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, - _defaultSupportedReply, _port, Transport.SSL, _id, _creators); + _defaultSupportedReply, _port, Transport.SSL, _id, _creators, + null); _engine = _sslContext.createSSLEngine(); _engine.setUseClientMode(false); @@ -485,11 +505,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine if(_needClientAuth) { - _engine.setNeedClientAuth(_needClientAuth); + _engine.setNeedClientAuth(true); } else if(_wantClientAuth) { - _engine.setWantClientAuth(_wantClientAuth); + _engine.setWantClientAuth(true); } SSLStatus sslStatus = new SSLStatus(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index ac8bdc3fa4..5c704c5967 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,10 +31,12 @@ import javax.net.ssl.SSLContext; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.logging.messages.PortMessages; +import org.apache.qpid.server.logging.subjects.PortLogSubject; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.server.plugin.ProtocolEngineCreatorComparator; import org.apache.qpid.server.plugin.QpidServiceLoader; @@ -42,15 +45,17 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { private static final AtomicLong ID_GENERATOR = new AtomicLong(0); - private final Broker _broker; + private final Broker<?> _broker; private final Set<Protocol> _supported; private final Protocol _defaultSupportedReply; private final SSLContext _sslContext; private final boolean _wantClientAuth; private final boolean _needClientAuth; - private final Port _port; + private final AmqpPort<?> _port; private final Transport _transport; private final ProtocolEngineCreator[] _creators; + private final ConnectionCountDecrementingTask + _connectionCountDecrementingTask = new ConnectionCountDecrementingTask(); public MultiVersionProtocolEngineFactory(Broker<?> broker, SSLContext sslContext, @@ -58,7 +63,7 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory boolean needClientAuth, final Set<Protocol> supportedVersions, final Protocol defaultSupportedReply, - Port port, + AmqpPort<?> port, Transport transport) { if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply)) @@ -84,11 +89,31 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory _transport = transport; } - public ServerProtocolEngine newProtocolEngine() + public ServerProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress) { - return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, - _supported, _defaultSupportedReply, _port, _transport, - ID_GENERATOR.getAndIncrement(), - _creators); + if(_port.canAcceptNewConnection(remoteSocketAddress)) + { + _port.incrementConnectionCount(); + return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, + _supported, _defaultSupportedReply, _port, _transport, + ID_GENERATOR.getAndIncrement(), + _creators, _connectionCountDecrementingTask); + } + else + { + _broker.getEventLogger().message(new PortLogSubject(_port), + PortMessages.CONNECTION_REJECTED(remoteSocketAddress.toString())); + + return null; + } + } + + private class ConnectionCountDecrementingTask implements Runnable + { + @Override + public void run() + { + _port.decrementConnectionCount(); + } } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java index 1a993f8967..c153f31872 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java @@ -19,7 +19,11 @@ package org.apache.qpid.server.model.port; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -34,6 +38,8 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -120,4 +126,41 @@ public class AmqpPortImplTest extends QpidTestCase serverSocket.bind(new InetSocketAddress(findFreePort())); return serverSocket; } + + public void testConnectionCounting() + { + Map<String, Object> attributes = new HashMap<>(); + attributes.put(AmqpPort.PORT, 0); + attributes.put(AmqpPort.NAME, getTestName()); + attributes.put(AmqpPort.AUTHENTICATION_PROVIDER, AUTHENTICATION_PROVIDER_NAME); + attributes.put(AmqpPort.MAX_OPEN_CONNECTIONS, 10); + attributes.put(AmqpPort.CONTEXT, Collections.singletonMap(AmqpPort.OPEN_CONNECTIONS_WARN_PERCENT, "80")); + _port = new AmqpPortImpl(attributes, _broker); + _port.create(); + EventLogger mockLogger = mock(EventLogger.class); + + when(_broker.getEventLogger()).thenReturn(mockLogger); + + for(int i = 0; i < 8; i++) + { + assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0))); + _port.incrementConnectionCount(); + assertEquals(i + 1, _port.getConnectionCount()); + verify(mockLogger, never()).message(any(LogSubject.class), any(LogMessage.class)); + } + + assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0))); + _port.incrementConnectionCount(); + assertEquals(9, _port.getConnectionCount()); + verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class)); + + assertTrue(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0))); + _port.incrementConnectionCount(); + assertEquals(10, _port.getConnectionCount()); + verify(mockLogger, times(1)).message(any(LogSubject.class), any(LogMessage.class)); + + assertFalse(_port.canAcceptNewConnection(new InetSocketAddress("example.org", 0))); + + + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java index d8471e5384..c49124f1a6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.transport; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.net.InetAddress; +import java.net.SocketAddress; import java.security.KeyStore; import java.util.Arrays; import java.util.HashSet; @@ -106,6 +108,7 @@ public class TCPandSSLTransportTest extends QpidTestCase when(port.getPort()).thenReturn(0); when(port.getSendBufferSize()).thenReturn(64*1024); when(port.getReceiveBufferSize()).thenReturn(64*1024); + when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true); TCPandSSLTransport transport = new TCPandSSLTransport(new HashSet<>(Arrays.asList(transports)), sslContext, @@ -136,6 +139,8 @@ public class TCPandSSLTransportTest extends QpidTestCase transport.close(); } + + // self signed cert keystore valid until Oct 2024 private static String keystoreString = "/u3+7QAAAAIAAAABAAAAAQAKc2VsZnNpZ25lZAAAAUkYmo+uAAAFATCCBP0wDgYKKwYBBAEqAhEB" + "AQUABIIE6bR+b7FHo2BRT/WG+zDIfO8zOXoGIbuNL2znNMnvEp9xwfMQOkhKxEbVtX8uJ7HSwi1V" diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 8e250ef669..5f025313e8 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -60,7 +60,7 @@ class WebSocketProvider implements AcceptingTransport public static final String AMQP_WEBSOCKET_SUBPROTOCOL = "AMQPWSB10"; private final Transport _transport; private final SSLContext _sslContext; - private final Port<?> _port; + private final AmqpPort<?> _port; private final Set<Protocol> _supported; private final Protocol _defaultSupportedProtocolReply; private final ProtocolEngineFactory _factory; @@ -68,7 +68,7 @@ class WebSocketProvider implements AcceptingTransport WebSocketProvider(final Transport transport, final SSLContext sslContext, - final Port<?> port, + final AmqpPort<?> port, final Set<Protocol> supported, final Protocol defaultSupportedProtocolReply) { @@ -207,7 +207,7 @@ class WebSocketProvider implements AcceptingTransport { _connection = connection; - _engine = _factory.newProtocolEngine(); + _engine = _factory.newProtocolEngine(_remoteAddress); final ConnectionWrapper connectionWrapper = new ConnectionWrapper(connection, _localAddress, _remoteAddress); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index 885a6a975d..50448f1fe1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.protocol; +import java.net.SocketAddress; + public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(); + ProtocolEngine newProtocolEngine(final SocketAddress remoteSocketAddress); -}
\ No newline at end of file +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 068e19fbc4..a2cc74ef78 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -243,28 +243,38 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet try { socket = _serverSocket.accept(); - socket.setTcpNoDelay(_config.getTcpNoDelay()); - socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); + ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); - socket.setSendBufferSize(sendBufferSize); - socket.setReceiveBufferSize(receiveBufferSize); + if(engine != null) + { + socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setSendBufferSize(sendBufferSize); + socket.setReceiveBufferSize(receiveBufferSize); - ProtocolEngine engine = _factory.newProtocolEngine(); - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - NetworkConnection connection = new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, - ticker); + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + NetworkConnection connection = + new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, + ticker); - connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + connection.setMaxReadIdle(HANSHAKE_TIMEOUT); - ticker.setConnection(connection); + ticker.setConnection(connection); - engine.setNetworkConnection(connection, connection.getSender()); + engine.setNetworkConnection(connection, connection.getSender()); - connection.start(); + connection.start(); + } + else + { + socket.close(); + } } catch(RuntimeException e) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index 71e563359f..e8aa2b3f34 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -36,6 +37,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; @@ -153,14 +155,16 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase { Set<Protocol> protocols = getAllAMQPProtocols(); - Port<?> port = mock(Port.class); + AmqpPort<?> port = mock(AmqpPort.class); + when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true); + when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l); MultiVersionProtocolEngineFactory factory = new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port, org.apache.qpid.server.model.Transport.TCP); //create a dummy to retrieve the 'current' ID number - long previousId = factory.newProtocolEngine().getConnectionId(); + long previousId = factory.newProtocolEngine(mock(SocketAddress.class)).getConnectionId(); //create a protocol engine and send the AMQP header for all supported AMQP verisons, //ensuring the ID assigned increases as expected @@ -170,7 +174,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase byte[] header = getAmqpHeader(protocol); assertNotNull("protocol header should not be null", header); - ServerProtocolEngine engine = factory.newProtocolEngine(); + ServerProtocolEngine engine = factory.newProtocolEngine(null); TestNetworkConnection conn = new TestNetworkConnection(); engine.setNetworkConnection(conn, conn.getSender()); assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId()); |
