From 85ad542c1d5724cb5a1294f12e826a83972ef433 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 15 Oct 2009 01:06:23 +0000 Subject: Merge java-network-refactor branch git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@825362 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 1 - .../src/main/java/org/apache/qpid/server/Main.java | 199 ++-- .../server/configuration/ServerConfiguration.java | 56 +- .../qpid/server/connection/ConnectionRegistry.java | 9 +- .../server/connection/IConnectionRegistry.java | 1 - .../server/protocol/AMQMinaProtocolSession.java | 923 ----------------- .../server/protocol/AMQPFastProtocolHandler.java | 290 ------ .../qpid/server/protocol/AMQPProtocolProvider.java | 52 - .../qpid/server/protocol/AMQProtocolEngine.java | 1035 ++++++++++++++++++++ .../server/protocol/AMQProtocolEngineFactory.java | 29 + .../qpid/server/protocol/AMQProtocolSession.java | 17 + .../server/protocol/AMQProtocolSessionMBean.java | 42 +- .../qpid/server/registry/ApplicationRegistry.java | 2 +- .../access/plugins/network/FirewallPlugin.java | 23 +- .../apache/qpid/server/transport/QpidAcceptor.java | 12 +- .../qpid/server/virtualhost/VirtualHost.java | 8 + 16 files changed, 1243 insertions(+), 1456 deletions(-) delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java (limited to 'java/broker/src/main') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index ea48bd7cc3..644a33db01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -41,7 +41,6 @@ import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index f8deb95628..c45e794145 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,13 @@ */ package org.apache.qpid.server; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Properties; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; @@ -30,16 +37,9 @@ import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.xml.QpidLog4JConfigurator; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.FixedSizeByteBufferAllocator; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.AMQException; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; import org.apache.qpid.server.information.management.ServerInformationMBean; @@ -48,19 +48,13 @@ import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.management.LoggingManagementMBean; import org.apache.qpid.server.logging.messages.BrokerMessages; -import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; -import org.apache.qpid.server.protocol.AMQPProtocolProvider; +import org.apache.qpid.server.protocol.AMQProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Properties; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; /** * Main entry point for AMQPD. @@ -300,20 +294,6 @@ public class Main _brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); - ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers()); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (!serverConfig.getEnablePooledAllocator()) - { - ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); - } - - if (serverConfig.getUseBiasedWrites()) - { - System.setProperty("org.apache.qpid.use_write_biased_pool", "true"); - } - int port = serverConfig.getPort(); String portStr = commandLine.getOptionValue("p"); @@ -329,7 +309,54 @@ public class Main } } - bind(port, serverConfig); + String bindAddr = commandLine.getOptionValue("b"); + if (bindAddr == null) + { + bindAddr = serverConfig.getBind(); + } + InetAddress bindAddress = null; + if (bindAddr.equals("wildcard")) + { + bindAddress = new InetSocketAddress(port).getAddress(); + } + else + { + bindAddress = InetAddress.getByAddress(parseIP(bindAddr)); + } + + String keystorePath = serverConfig.getKeystorePath(); + String keystorePassword = serverConfig.getKeystorePassword(); + String certType = serverConfig.getCertType(); + SSLContextFactory sslFactory = null; + boolean isSsl = false; + + if (!serverConfig.getSSLOnly()) + { + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(), + serverConfig.getNetworkConfiguration(), null); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port)); + } + + if (serverConfig.getEnableSSL()) + { + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + NetworkDriver driver = new MINANetworkDriver(); + driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress}, + new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), + new QpidAcceptor(driver,"TCP")); + CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort())); + } + + //fixme qpid.AMQP should be using qpidproperties to get value + _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + + " build: " + QpidProperties.getBuildVersion()); + + CurrentActor.get().message(BrokerMessages.BRK_1004()); + } finally { @@ -358,114 +385,6 @@ public class Main } } - protected void bind(int port, ServerConfiguration config) throws BindException - { - String bindAddr = commandLine.getOptionValue("b"); - if (bindAddr == null) - { - bindAddr = config.getBind(); - } - - try - { - IoAcceptor acceptor; - - if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO()) - { - _logger.warn("Using Qpid Multithreaded IO Processing"); - acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor()); - } - else - { - _logger.warn("Using Mina IO Processing"); - acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor()); - } - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - sc.setSendBufferSize(config.getWriteBufferSize()); - sc.setTcpNoDelay(config.getTcpNoDelay()); - - // if we do not use the executor pool threading model we get the default leader follower - // implementation provided by MINA - if (config.getEnableExecutorPool()) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - if (!config.getEnableSSL() || !config.getSSLOnly()) - { - AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); - InetSocketAddress bindAddress; - if (bindAddr.equals("wildcard")) - { - bindAddress = new InetSocketAddress(port); - } - else - { - bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); - } - - bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig); - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); - } - - if (config.getEnableSSL()) - { - AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler(); - try - { - - bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig); - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort()); - - } - catch (IOException e) - { - _brokerLogger.error("Unable to listen on SSL port: " + e, e); - } - } - - //fixme qpid.AMQP should be using qpidproperties to get value - _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() - + " build: " + QpidProperties.getBuildVersion()); - - CurrentActor.get().message(BrokerMessages.BRK_1004()); - - } - catch (Exception e) - { - _logger.error("Unable to bind service to registry: " + e, e); - //fixme this need tidying up - throw new BindException(e.getMessage()); - } - } - - /** - * Ensure that any bound Acceptors are recorded in the registry so they can be closed later. - * - * @param acceptor - * @param bindAddress - * @param handler - * @param sconfig - * - * @throws IOException from the acceptor.bind command - */ - private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException - { - acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig); - - CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort())); - - ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor); - } - public static void main(String[] args) { //if the -Dlog4j.configuration property has not been set, enable the init override diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 01befbbfe8..641b44bb18 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.management.ConfigurationManagementMB import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriverConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -570,7 +571,7 @@ public class ServerConfiguration implements SignalHandler public boolean getSSLOnly() { - return getConfig().getBoolean("connector.ssl.sslOnly", true); + return getConfig().getBoolean("connector.ssl.sslOnly", false); } public int getSSLPort() @@ -619,4 +620,57 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public NetworkDriverConfiguration getNetworkConfiguration() + { + return new NetworkDriverConfiguration() + { + + public Integer getTrafficClass() + { + return null; + } + + public Boolean getTcpNoDelay() + { + // Can't call parent getTcpNoDelay since it just calls this one + return getConfig().getBoolean("connector.tcpNoDelay", true); + } + + public Integer getSoTimeout() + { + return null; + } + + public Integer getSoLinger() + { + return null; + } + + public Integer getSendBufferSize() + { + return getBufferWriteLimit(); + } + + public Boolean getReuseAddress() + { + return null; + } + + public Integer getReceiveBufferSize() + { + return getBufferReadLimit(); + } + + public Boolean getOOBInline() + { + return null; + } + + public Boolean getKeepAlive() + { + return null; + } + }; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index d287595e2d..7b50a2e3ad 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.connection; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; @@ -45,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry { } + + public void expireClosedChannels() + { + for (AMQProtocolSession connection : _registry) + { + connection.closeIfLingeringClosedChannels(); + } + } /** Close all of the currently open connections. */ public void close() throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index d64fde1c20..002269bbaa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.connection; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java deleted file mode 100644 index 7bc4365152..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ /dev/null @@ -1,923 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.log4j.Logger; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSession; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.qpid.AMQChannelException; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; -import org.apache.qpid.server.logging.actors.AMQPConnectionActor; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.messages.ConnectionMessages; -import org.apache.qpid.server.management.Managable; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQState; -import org.apache.qpid.server.state.AMQStateManager; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.Sender; - -import javax.management.JMException; -import javax.security.sasl.SaslServer; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLong; - -public class AMQMinaProtocolSession implements AMQProtocolSession, Managable -{ - private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); - - private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); - - private static final AtomicLong idGenerator = new AtomicLong(0); - - // to save boxing the channelId and looking up in a map... cache in an array the low numbered - // channels. This value must be of the form 2^x - 1. - private static final int CHANNEL_CACHE_SIZE = 0xff; - - private final IoSession _minaProtocolSession; - - private AMQShortString _contextKey; - - private AMQShortString _clientVersion = null; - - private VirtualHost _virtualHost; - - private final Map _channelMap = new HashMap(); - - private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; - - private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); - - private final AMQStateManager _stateManager; - - private AMQCodecFactory _codecFactory; - - private AMQProtocolSessionMBean _managedObject; - - private SaslServer _saslServer; - - private Object _lastReceived; - - private Object _lastSent; - - protected boolean _closed; - // maximum number of channels this session should have - private long _maxNoOfChannels = 1000; - - /* AMQP Version for this session */ - private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); - - private FieldTable _clientProperties; - private final List _taskList = new CopyOnWriteArrayList(); - - private List _closingChannelsList = new CopyOnWriteArrayList(); - private ProtocolOutputConverter _protocolOutputConverter; - private Principal _authorizedID; - private MethodDispatcher _dispatcher; - private ProtocolSessionIdentifier _sessionIdentifier; - - private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; - private org.apache.mina.common.WriteFuture _lastWriteFuture; - - // Create a simple ID that increments for ever new Session - private final long _sessionID = idGenerator.getAndIncrement(); - - private AMQPConnectionActor _actor; - private LogSubject _logSubject; - - public ManagedObject getManagedObject() - { - return _managedObject; - } - - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) - throws AMQException - { - _stateManager = new AMQStateManager(virtualHostRegistry, this); - _minaProtocolSession = session; - session.setAttachment(this); - - _codecFactory = codecFactory; - - _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); - - _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); - - try - { - IoServiceConfig config = session.getServiceConfig(); - ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel(); - threadModel.getAsynchronousReadFilter().createNewJobForSession(session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); - } - catch (RuntimeException e) - { - e.printStackTrace(); - throw e; - - } - } - - private AMQProtocolSessionMBean createMBean() throws AMQException - { - try - { - return new AMQProtocolSessionMBean(this); - } - catch (JMException ex) - { - _logger.error("AMQProtocolSession MBean creation has failed ", ex); - throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); - } - } - - public IoSession getIOSession() - { - return _minaProtocolSession; - } - - public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession) - { - return (AMQProtocolSession) minaProtocolSession.getAttachment(); - } - - public long getSessionID() - { - return _sessionID; - } - - public LogActor getLogActor() - { - return _actor; - } - - public void dataBlockReceived(AMQDataBlock message) throws Exception - { - _lastReceived = message; - if (message instanceof ProtocolInitiation) - { - protocolInitiationReceived((ProtocolInitiation) message); - - } - else if (message instanceof AMQFrame) - { - AMQFrame frame = (AMQFrame) message; - frameReceived(frame); - - } - else - { - throw new UnknnownMessageTypeException(message); - } - } - - private void frameReceived(AMQFrame frame) throws AMQException - { - int channelId = frame.getChannel(); - AMQBody body = frame.getBodyFrame(); - - //Look up the Channel's Actor and set that as the current actor - // If that is not available then we can use the ConnectionActor - // that is associated with this AMQMPSession. - LogActor channelActor = null; - if (_channelMap.get(channelId) != null) - { - channelActor = _channelMap.get(channelId).getLogActor(); - } - CurrentActor.set(channelActor == null ? _actor : channelActor); - - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Frame Received: " + frame); - } - - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame); - } - - closeProtocolSession(); - return; - } - } - - try - { - body.handle(channelId, this); - } - catch (AMQException e) - { - closeChannel(channelId); - throw e; - } - } - finally - { - CurrentActor.remove(); - } - } - - private void protocolInitiationReceived(ProtocolInitiation pi) - { - // this ensures the codec never checks for a PI message again - ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); - try - { - // Log incomming protocol negotiation request - _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true)); - - ProtocolVersion pv = pi.checkVersion(); // Fails if not correct - - // This sets the protocol version (and hence framing classes) for this session. - setProtocolVersion(pv); - - String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); - - String locales = "en_US"; - - AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), - (short) getProtocolMinorVersion(), - null, - mechanisms.getBytes(), - locales.getBytes()); - _minaProtocolSession.write(responseBody.generateFrame(0)); - - } - catch (AMQException e) - { - _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - - _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - - // TODO: Close connection (but how to wait until message is sent?) - // ritchiem 2006-12-04 will this not do? - // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); - // future.join(); - // close connection - - } - } - - public void methodFrameReceived(int channelId, AMQMethodBody methodBody) - { - - final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); - - try - { - try - { - - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); - - if (!_frameListeners.isEmpty()) - { - for (AMQMethodListener listener : _frameListeners) - { - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - - if (!wasAnyoneInterested) - { - throw new AMQNoMethodHandlerException(evt); - } - } - catch (AMQChannelException e) - { - if (getChannel(channelId) != null) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing channel due to: " + e.getMessage()); - } - - writeFrame(e.getCloseFrame(channelId)); - closeChannel(channelId); - } - else - { - if (_logger.isDebugEnabled()) - { - _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); - - closeConnection(channelId, ce, false); - } - } - catch (AMQConnectionException e) - { - closeConnection(channelId, e, false); - } - } - catch (Exception e) - { - - for (AMQMethodListener listener : _frameListeners) - { - listener.error(e); - } - - _logger.error("Unexpected exception while processing frame. Closing connection.", e); - - closeProtocolSession(); - } - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException - { - - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentHeader(body); - - } - - public void contentBodyReceived(int channelId, ContentBody body) throws AMQException - { - AMQChannel channel = getAndAssertChannel(channelId); - - channel.publishContentBody(body); - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - // NO - OP - } - - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ - public void writeFrame(AMQDataBlock frame) - { - _lastSent = frame; - - _lastWriteFuture = _minaProtocolSession.write(frame); - } - - public AMQShortString getContextKey() - { - return _contextKey; - } - - public void setContextKey(AMQShortString contextKey) - { - _contextKey = contextKey; - } - - public List getChannels() - { - return new ArrayList(_channelMap.values()); - } - - public AMQChannel getAndAssertChannel(int channelId) throws AMQException - { - AMQChannel channel = getChannel(channelId); - if (channel == null) - { - throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); - } - - return channel; - } - - public AMQChannel getChannel(int channelId) throws AMQException - { - final AMQChannel channel = - ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); - if ((channel == null) || channel.isClosing()) - { - return null; - } - else - { - return channel; - } - } - - public boolean channelAwaitingClosure(int channelId) - { - return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId); - } - - public void addChannel(AMQChannel channel) throws AMQException - { - if (_closed) - { - throw new AMQException("Session is closed"); - } - - final int channelId = channel.getChannelId(); - - if (_closingChannelsList.contains(channelId)) - { - throw new AMQException("Session is marked awaiting channel close"); - } - - if (_channelMap.size() == _maxNoOfChannels) - { - String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; - _logger.error(errorMessage); - throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); - } - else - { - _channelMap.put(channel.getChannelId(), channel); - } - - if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) - { - _cachedChannels[channelId] = channel; - } - - checkForNotification(); - } - - private void checkForNotification() - { - int channelsCount = _channelMap.size(); - if (channelsCount >= _maxNoOfChannels) - { - _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); - } - } - - public Long getMaximumNumberOfChannels() - { - return _maxNoOfChannels; - } - - public void setMaximumNumberOfChannels(Long value) - { - _maxNoOfChannels = value; - } - - public void commitTransactions(AMQChannel channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.commit(); - } - } - - public void rollbackTransactions(AMQChannel channel) throws AMQException - { - if ((channel != null) && channel.isTransactional()) - { - channel.rollback(); - } - } - - /** - * Close a specific channel. This will remove any resources used by the channel, including:
  • any queue - * subscriptions (this may in turn remove queues if they are auto delete
- * - * @param channelId id of the channel to close - * - * @throws AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ - public void closeChannel(int channelId) throws AMQException - { - final AMQChannel channel = getChannel(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Unknown channel id"); - } - else - { - try - { - channel.close(); - markChannelAwaitingCloseOk(channelId); - } - finally - { - removeChannel(channelId); - } - } - } - - public void closeChannelOk(int channelId) - { - // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. - // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. - // We do it from the Close Handler as we are sending the OK back to the client. - // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException - // will send a close-ok.. Where we should call removeChannel. - // However, due to the poor exception handling on the client. The client-user will be notified of the - // InvalidArgument and if they then decide to close the session/connection then the there will be time - // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. - //removeChannel(channelId); - _closingChannelsList.remove(new Integer(channelId)); - } - - private void markChannelAwaitingCloseOk(int channelId) - { - _closingChannelsList.add(channelId); - } - - /** - * In our current implementation this is used by the clustering code. - * - * @param channelId The channel to remove - */ - public void removeChannel(int channelId) - { - _channelMap.remove(channelId); - if ((channelId & CHANNEL_CACHE_SIZE) == channelId) - { - _cachedChannels[channelId] = null; - } - } - - /** - * Initialise heartbeats on the session. - * - * @param delay delay in seconds (not ms) - */ - public void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); - } - } - - /** - * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - * - * @throws AMQException if an error occurs while closing any channel - */ - private void closeAllChannels() throws AMQException - { - for (AMQChannel channel : _channelMap.values()) - { - channel.close(); - } - - _channelMap.clear(); - for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) - { - _cachedChannels[i] = null; - } - } - - /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - public void closeSession() throws AMQException - { - if (!_closed) - { - if (_virtualHost != null) - { - _virtualHost.getConnectionRegistry().deregisterConnection(this); - } - - closeAllChannels(); - if (_managedObject != null) - { - _managedObject.unregister(); - } - - for (Task task : _taskList) - { - task.doTask(this); - } - - _closed = true; - - CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); - } - } - - public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException - { - if (_logger.isInfoEnabled()) - { - _logger.info("Closing connection due to: " + e.getMessage()); - } - - markChannelAwaitingCloseOk(channelId); - closeSession(); - _stateManager.changeState(AMQState.CONNECTION_CLOSING); - writeFrame(e.getCloseFrame(channelId)); - - if (closeProtocolSession) - { - closeProtocolSession(); - } - } - - public void closeProtocolSession() - { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - if (waitLast && (_lastWriteFuture != null)) - { - _logger.debug("Waiting for last write to join."); - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession); - final CloseFuture future = _minaProtocolSession.close(); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - - try - { - _stateManager.changeState(AMQState.CONNECTION_CLOSED); - } - catch (AMQException e) - { - _logger.info(e.getMessage()); - } - } - - public String toString() - { - return _minaProtocolSession.getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); - } - - public String dump() - { - return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; - } - - /** @return an object that can be used to identity */ - public Object getKey() - { - return _minaProtocolSession.getRemoteAddress(); - } - - /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may - * be bound to multiple addresses this could vary depending on the acceptor this session was created from. - * - * @return a String FQDN - */ - public String getLocalFQDN() - { - SocketAddress address = _minaProtocolSession.getLocalAddress(); - // we use the vmpipe address in some tests hence the need for this rather ugly test. The host - // information is used by SASL primary. - if (address instanceof InetSocketAddress) - { - return ((InetSocketAddress) address).getHostName(); - } - else if (address instanceof VmPipeAddress) - { - return "vmpipe:" + ((VmPipeAddress) address).getPort(); - } - else - { - throw new IllegalArgumentException("Unsupported socket address class: " + address); - } - } - - public SaslServer getSaslServer() - { - return _saslServer; - } - - public void setSaslServer(SaslServer saslServer) - { - _saslServer = saslServer; - } - - public FieldTable getClientProperties() - { - return _clientProperties; - } - - public void setClientProperties(FieldTable clientProperties) - { - _clientProperties = clientProperties; - if (_clientProperties != null) - { - if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) - { - String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE); - setContextKey(new AMQShortString(clientID)); - - // Log the Opening of the connection for this client - _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true)); - } - - if (_clientProperties.getString(ClientProperties.version.toString()) != null) - { - _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); - } - } - _sessionIdentifier = new ProtocolSessionIdentifier(this); - } - - private void setProtocolVersion(ProtocolVersion pv) - { - _protocolVersion = pv; - - _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); - _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); - } - - public byte getProtocolMajorVersion() - { - return _protocolVersion.getMajorVersion(); - } - - public ProtocolVersion getProtocolVersion() - { - return _protocolVersion; - } - - public byte getProtocolMinorVersion() - { - return _protocolVersion.getMinorVersion(); - } - - public boolean isProtocolVersion(byte major, byte minor) - { - return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); - } - - public MethodRegistry getRegistry() - { - return getMethodRegistry(); - } - - public Object getClientIdentifier() - { - return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(VirtualHost virtualHost) throws AMQException - { - _virtualHost = virtualHost; - - _actor.virtualHostSelected(this); - _logSubject = new ConnectionLogSubject(this); - - _virtualHost.getConnectionRegistry().registerConnection(this); - - _managedObject = createMBean(); - _managedObject.register(); - } - - public void addSessionCloseTask(Task task) - { - _taskList.add(task); - } - - public void removeSessionCloseTask(Task task) - { - _taskList.remove(task); - } - - public ProtocolOutputConverter getProtocolOutputConverter() - { - return _protocolOutputConverter; - } - - public void setAuthorizedID(Principal authorizedID) - { - _authorizedID = authorizedID; - - // Let the actor know that this connection is now Authorized - _actor.connectionAuthorized(this); - } - - public Principal getAuthorizedID() - { - return _authorizedID; - } - - public SocketAddress getRemoteAddress() - { - return _minaProtocolSession.getRemoteAddress(); - } - - public MethodRegistry getMethodRegistry() - { - return MethodRegistry.getMethodRegistry(getProtocolVersion()); - } - - public MethodDispatcher getMethodDispatcher() - { - return _dispatcher; - } - - public ProtocolSessionIdentifier getSessionIdentifier() - { - return _sessionIdentifier; - } - - public String getClientVersion() - { - return (_clientVersion == null) ? null : _clientVersion.toString(); - } - - public void setSender(Sender sender) - { - // No-op, interface munging between this and AMQProtocolSession - } - - public void init() - { - // No-op, interface munging between this and AMQProtocolSession - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java deleted file mode 100644 index 16b85e67b3..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.codec.QpidProtocolCodecFilter; -import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.util.SessionUtil; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.ssl.SSLContextFactory; - -/** - * The protocol handler handles "protocol events" for all connections. The state - * associated with an individual connection is accessed through the protocol session. - * - * We delegate all frame (message) processing to the AMQProtocolSession which wraps - * the state for the connection. - */ -public class AMQPFastProtocolHandler extends IoHandlerAdapter -{ - private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class); - - private final IApplicationRegistry _applicationRegistry; - - private final int BUFFER_READ_LIMIT_SIZE; - private final int BUFFER_WRITE_LIMIT_SIZE; - - public AMQPFastProtocolHandler(Integer applicationRegistryInstance) - { - this(ApplicationRegistry.getInstance(applicationRegistryInstance)); - } - - public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) - { - _applicationRegistry = applicationRegistry; - - // Read the configuration from the application registry - BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit(); - BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit(); - - _logger.debug("AMQPFastProtocolHandler created"); - } - - protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler) - { - this(handler._applicationRegistry); - } - - public void sessionCreated(IoSession protocolSession) throws Exception - { - SessionUtil.initialize(protocolSession); - final AMQCodecFactory codecFactory = new AMQCodecFactory(true); - - createSession(protocolSession, _applicationRegistry, codecFactory); - _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); - - final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); - final ServerConfiguration config = _applicationRegistry.getConfiguration(); - - String keystorePath = config.getKeystorePath(); - String keystorePassword = config.getKeystorePassword(); - String certType = config.getCertType(); - SSLContextFactory sslContextFactory = null; - boolean isSsl = false; - if (config.getEnableSSL() && isSSLClient(config, protocolSession)) - { - sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); - isSsl = true; - } - if (config.getEnableExecutorPool()) - { - if (isSsl) - { - protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", - new SSLFilter(sslContextFactory.buildServerContext())); - } - protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } - else - { - protocolSession.getFilterChain().addLast("protocolFilter", pcf); - if (isSsl) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(sslContextFactory.buildServerContext())); - } - } - - if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled()) - { - try - { -// //Add IO Protection Filters - IoFilterChain chain = protocolSession.getFilterChain(); - - - protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE); - writefilter.attach(chain); - - protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - _logger.info("Using IO Read/Write Filter Protection"); - } - catch (Exception e) - { - _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); - } - } - } - - /** Separated into its own, protected, method to allow easier reuse */ - protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException - { - new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); - } - - public void sessionOpened(IoSession protocolSession) throws Exception - { - _logger.info("Session opened for:" + protocolSession.getRemoteAddress()); - } - - public void sessionClosed(IoSession protocolSession) throws Exception - { - _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress()); - final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); - //fixme -- this can be null - if (amqProtocolSession != null) - { - try - { - CurrentActor.set(amqProtocolSession.getLogActor()); - amqProtocolSession.closeSession(); - } - catch (AMQException e) - { - _logger.error("Caught AMQException whilst closingSession:" + e); - } - finally - { - CurrentActor.remove(); - } - } - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress()); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - //write heartbeat frame: - session.write(HeartbeatBody.FRAME); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - //failover: - throw new IOException("Timed out while waiting for heartbeat from peer."); - } - - } - - public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception - { - AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); - if (throwable instanceof AMQProtocolHeaderException) - { - - protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - - protocolSession.close(); - - _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable); - } - else if (throwable instanceof IOException) - { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable); - } - else - { - _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); - - - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion()); - ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); - - protocolSession.write(closeBody.generateFrame(0)); - - protocolSession.close(); - } - } - - /** - * Invoked when a message is received on a particular protocol session. Note that a - * protocol session is directly tied to a particular physical connection. - * - * @param protocolSession the protocol session that received the message - * @param message the message itself (i.e. a decoded frame) - * - * @throws Exception if the message cannot be processed - */ - public void messageReceived(IoSession protocolSession, Object message) throws Exception - { - final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession); - - if (message instanceof AMQDataBlock) - { - amqProtocolSession.dataBlockReceived((AMQDataBlock) message); - - } - else if (message instanceof ByteBuffer) - { - throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message); - } - else - { - throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); - } - } - - /** - * Called after a message has been sent out on a particular protocol session - * - * @param protocolSession the protocol session (i.e. connection) on which this - * message was sent - * @param object the message (frame) that was encoded and sent - * - * @throws Exception if we want to indicate an error - */ - public void messageSent(IoSession protocolSession, Object object) throws Exception - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Message sent: " + object); - } - } - - protected boolean isSSLClient(ServerConfiguration connectionConfig, - IoSession protocolSession) - { - InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress(); - return addr.getPort() == connectionConfig.getSSLPort(); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java deleted file mode 100644 index 07c153bfe8..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; - -/** - * The protocol provide's role is to encapsulate the initialisation of the protocol handler. - * - * The protocol handler (see AMQPFastProtocolHandler class) handles protocol events - * such as connection closing or a frame being received. It can either do this directly - * or pass off to the protocol session in the cases where state information is required to - * deal with the event. - * - */ -public class AMQPProtocolProvider -{ - /** - * Handler for protocol events - */ - private AMQPFastProtocolHandler _handler; - - public AMQPProtocolProvider() - { - IApplicationRegistry registry = ApplicationRegistry.getInstance(); - _handler = new AMQPFastProtocolHandler(registry); - } - - public AMQPFastProtocolHandler getHandler() - { - return _handler; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java new file mode 100644 index 0000000000..3bcd102858 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -0,0 +1,1035 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; + +import javax.management.JMException; +import javax.security.sasl.SaslServer; + +import org.apache.log4j.Logger; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.pool.Job; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.ConnectionMessages; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.management.Managable; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Sender; + +public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession +{ + private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); + + private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + + private static final AtomicLong idGenerator = new AtomicLong(0); + + // to save boxing the channelId and looking up in a map... cache in an array the low numbered + // channels. This value must be of the form 2^x - 1. + private static final int CHANNEL_CACHE_SIZE = 0xff; + + private AMQShortString _contextKey; + + private AMQShortString _clientVersion = null; + + private VirtualHost _virtualHost; + + private final Map _channelMap = new HashMap(); + + private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; + + private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + + private final AMQStateManager _stateManager; + + private AMQCodecFactory _codecFactory; + + private AMQProtocolSessionMBean _managedObject; + + private SaslServer _saslServer; + + private Object _lastReceived; + + private Object _lastSent; + + protected boolean _closed; + // maximum number of channels this session should have + private long _maxNoOfChannels = 1000; + + /* AMQP Version for this session */ + private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); + + private FieldTable _clientProperties; + private final List _taskList = new CopyOnWriteArrayList(); + + private Map _closingChannelsList = new ConcurrentHashMap(); + private ProtocolOutputConverter _protocolOutputConverter; + private Principal _authorizedID; + private MethodDispatcher _dispatcher; + private ProtocolSessionIdentifier _sessionIdentifier; + + // Create a simple ID that increments for ever new Session + private final long _sessionID = idGenerator.getAndIncrement(); + + private AMQPConnectionActor _actor; + private LogSubject _logSubject; + + private NetworkDriver _networkDriver; + + private long _lastIoTime; + + private long _writtenBytes; + private long _readBytes; + + private Job _readJob; + private Job _writeJob; + + private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); + + public ManagedObject getManagedObject() + { + return _managedObject; + } + + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) + { + _stateManager = new AMQStateManager(virtualHostRegistry, this); + _networkDriver = driver; + + _codecFactory = new AMQCodecFactory(true, this); + _poolReference.acquireExecutorService(); + _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); + _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); + + } + + private AMQProtocolSessionMBean createMBean() throws AMQException + { + try + { + return new AMQProtocolSessionMBean(this); + } + catch (JMException ex) + { + _logger.error("AMQProtocolSession MBean creation has failed ", ex); + throw new AMQException("AMQProtocolSession MBean creation has failed ", ex); + } + } + + public long getSessionID() + { + return _sessionID; + } + + public LogActor getLogActor() + { + return _actor; + } + + @Override + public void received(final ByteBuffer msg) + { + _lastIoTime = System.currentTimeMillis(); + try + { + final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable() + { + @Override + public void run() + { + // Decode buffer + + for (AMQDataBlock dataBlock : dataBlocks) + { + try + { + dataBlockReceived(dataBlock); + } + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); + } + } + } + }); + } + catch (Exception e) + { + _logger.error("Unexpected exception when processing datablock", e); + closeProtocolSession(); + } + } + + public void dataBlockReceived(AMQDataBlock message) throws Exception + { + _lastReceived = message; + if (message instanceof ProtocolInitiation) + { + protocolInitiationReceived((ProtocolInitiation) message); + + } + else if (message instanceof AMQFrame) + { + AMQFrame frame = (AMQFrame) message; + frameReceived(frame); + + } + else + { + throw new UnknnownMessageTypeException(message); + } + } + + private void frameReceived(AMQFrame frame) throws AMQException + { + int channelId = frame.getChannel(); + AMQBody body = frame.getBodyFrame(); + + //Look up the Channel's Actor and set that as the current actor + // If that is not available then we can use the ConnectionActor + // that is associated with this AMQMPSession. + LogActor channelActor = null; + if (_channelMap.get(channelId) != null) + { + channelActor = _channelMap.get(channelId).getLogActor(); + } + CurrentActor.set(channelActor == null ? _actor : channelActor); + + try + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Frame Received: " + frame); + } + + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) + { + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } + } + else + { + // The channel has been told to close, we don't process any more frames until + // it's closed. + return; + } + } + + try + { + body.handle(channelId, this); + } + catch (AMQException e) + { + closeChannel(channelId); + throw e; + } + } + finally + { + CurrentActor.remove(); + } + } + + private void protocolInitiationReceived(ProtocolInitiation pi) + { + // this ensures the codec never checks for a PI message again + ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false); + try + { + // Log incomming protocol negotiation request + _actor.message(ConnectionMessages.CON_1001(null, pi._protocolMajor + "-" + pi._protocolMinor, false, true)); + + ProtocolVersion pv = pi.checkVersion(); // Fails if not correct + + // This sets the protocol version (and hence framing classes) for this session. + setProtocolVersion(pv); + + String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms(); + + String locales = "en_US"; + + AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), + (short) getProtocolMinorVersion(), + null, + mechanisms.getBytes(), + locales.getBytes()); + _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); + + } + catch (AMQException e) + { + _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); + + _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + } + } + + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) + { + + final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); + + try + { + try + { + + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); + + if (!_frameListeners.isEmpty()) + { + for (AMQMethodListener listener : _frameListeners) + { + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + + if (!wasAnyoneInterested) + { + throw new AMQNoMethodHandlerException(evt); + } + } + catch (AMQChannelException e) + { + if (getChannel(channelId) != null) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing channel due to: " + e.getMessage()); + } + + writeFrame(e.getCloseFrame(channelId)); + closeChannel(channelId); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); + } + + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + + AMQConnectionException ce = + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); + + closeConnection(channelId, ce, false); + } + } + catch (AMQConnectionException e) + { + closeConnection(channelId, e, false); + } + } + catch (Exception e) + { + + for (AMQMethodListener listener : _frameListeners) + { + listener.error(e); + } + + _logger.error("Unexpected exception while processing frame. Closing connection.", e); + + closeProtocolSession(); + } + } + + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + { + + AMQChannel channel = getAndAssertChannel(channelId); + + channel.publishContentHeader(body); + + } + + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException + { + AMQChannel channel = getAndAssertChannel(channelId); + + channel.publishContentBody(body); + } + + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + // NO - OP + } + + /** + * Convenience method that writes a frame to the protocol session. Equivalent to calling + * getProtocolSession().write(). + * + * @param frame the frame to write + */ + public void writeFrame(AMQDataBlock frame) + { + _lastSent = frame; + final ByteBuffer buf = frame.toNioByteBuffer(); + _lastIoTime = System.currentTimeMillis(); + _writtenBytes += buf.remaining(); + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable() + { + @Override + public void run() + { + _networkDriver.send(buf); + } + }); + } + + public AMQShortString getContextKey() + { + return _contextKey; + } + + public void setContextKey(AMQShortString contextKey) + { + _contextKey = contextKey; + } + + public List getChannels() + { + return new ArrayList(_channelMap.values()); + } + + public AMQChannel getAndAssertChannel(int channelId) throws AMQException + { + AMQChannel channel = getChannel(channelId); + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId); + } + + return channel; + } + + public AMQChannel getChannel(int channelId) throws AMQException + { + final AMQChannel channel = + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + if ((channel == null) || channel.isClosing()) + { + return null; + } + else + { + return channel; + } + } + + public boolean channelAwaitingClosure(int channelId) + { + return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId); + } + + public void addChannel(AMQChannel channel) throws AMQException + { + if (_closed) + { + throw new AMQException("Session is closed"); + } + + final int channelId = channel.getChannelId(); + + if (_closingChannelsList.containsKey(channelId)) + { + throw new AMQException("Session is marked awaiting channel close"); + } + + if (_channelMap.size() == _maxNoOfChannels) + { + String errorMessage = + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; + _logger.error(errorMessage); + throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); + } + else + { + _channelMap.put(channel.getChannelId(), channel); + } + + if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) + { + _cachedChannels[channelId] = channel; + } + + checkForNotification(); + } + + private void checkForNotification() + { + int channelsCount = _channelMap.size(); + if (channelsCount >= _maxNoOfChannels) + { + _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value"); + } + } + + public Long getMaximumNumberOfChannels() + { + return _maxNoOfChannels; + } + + public void setMaximumNumberOfChannels(Long value) + { + _maxNoOfChannels = value; + } + + public void commitTransactions(AMQChannel channel) throws AMQException + { + if ((channel != null) && channel.isTransactional()) + { + channel.commit(); + } + } + + public void rollbackTransactions(AMQChannel channel) throws AMQException + { + if ((channel != null) && channel.isTransactional()) + { + channel.rollback(); + } + } + + /** + * Close a specific channel. This will remove any resources used by the channel, including:
  • any queue + * subscriptions (this may in turn remove queues if they are auto delete
+ * + * @param channelId id of the channel to close + * + * @throws AMQException if an error occurs closing the channel + * @throws IllegalArgumentException if the channel id is not valid + */ + public void closeChannel(int channelId) throws AMQException + { + final AMQChannel channel = getChannel(channelId); + if (channel == null) + { + throw new IllegalArgumentException("Unknown channel id"); + } + else + { + try + { + channel.close(); + markChannelAwaitingCloseOk(channelId); + } + finally + { + removeChannel(channelId); + } + } + } + + public void closeChannelOk(int channelId) + { + // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. + // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. + // We do it from the Close Handler as we are sending the OK back to the client. + // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException + // will send a close-ok.. Where we should call removeChannel. + // However, due to the poor exception handling on the client. The client-user will be notified of the + // InvalidArgument and if they then decide to close the session/connection then the there will be time + // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. + //removeChannel(channelId); + _closingChannelsList.remove(new Integer(channelId)); + } + + private void markChannelAwaitingCloseOk(int channelId) + { + _closingChannelsList.put(channelId, System.currentTimeMillis()); + } + + /** + * In our current implementation this is used by the clustering code. + * + * @param channelId The channel to remove + */ + public void removeChannel(int channelId) + { + _channelMap.remove(channelId); + if ((channelId & CHANNEL_CACHE_SIZE) == channelId) + { + _cachedChannels[channelId] = null; + } + } + + /** + * Initialise heartbeats on the session. + * + * @param delay delay in seconds (not ms) + */ + public void initHeartbeats(int delay) + { + if (delay > 0) + { + _networkDriver.setMaxWriteIdle(delay); + _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + } + } + + /** + * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. + * + * @throws AMQException if an error occurs while closing any channel + */ + private void closeAllChannels() throws AMQException + { + for (AMQChannel channel : _channelMap.values()) + { + channel.close(); + } + + _channelMap.clear(); + for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) + { + _cachedChannels[i] = null; + } + } + + /** This must be called when the session is _closed in order to free up any resources managed by the session. */ + public void closeSession() throws AMQException + { + if (CurrentActor.get() == null) + { + CurrentActor.set(_actor); + } + if (!_closed) + { + if (_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + + closeAllChannels(); + if (_managedObject != null) + { + _managedObject.unregister(); + } + + for (Task task : _taskList) + { + task.doTask(this); + } + + _closed = true; + _poolReference.releaseExecutorService(); + CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002()); + } + } + + public void closeConnection(int channelId, AMQConnectionException e, boolean closeProtocolSession) throws AMQException + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + + markChannelAwaitingCloseOk(channelId); + closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(e.getCloseFrame(channelId)); + + if (closeProtocolSession) + { + closeProtocolSession(); + } + } + + public void closeProtocolSession() + { + _networkDriver.close(); + try + { + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + } + catch (AMQException e) + { + _logger.info(e.getMessage()); + } + } + + public String toString() + { + return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")"); + } + + public String dump() + { + return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived; + } + + /** @return an object that can be used to identity */ + public Object getKey() + { + return getRemoteAddress(); + } + + /** + * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may + * be bound to multiple addresses this could vary depending on the acceptor this session was created from. + * + * @return a String FQDN + */ + public String getLocalFQDN() + { + SocketAddress address = _networkDriver.getLocalAddress(); + // we use the vmpipe address in some tests hence the need for this rather ugly test. The host + // information is used by SASL primary. + if (address instanceof InetSocketAddress) + { + return ((InetSocketAddress) address).getHostName(); + } + else if (address instanceof VmPipeAddress) + { + return "vmpipe:" + ((VmPipeAddress) address).getPort(); + } + else + { + throw new IllegalArgumentException("Unsupported socket address class: " + address); + } + } + + public SaslServer getSaslServer() + { + return _saslServer; + } + + public void setSaslServer(SaslServer saslServer) + { + _saslServer = saslServer; + } + + public FieldTable getClientProperties() + { + return _clientProperties; + } + + public void setClientProperties(FieldTable clientProperties) + { + _clientProperties = clientProperties; + if (_clientProperties != null) + { + if (_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE) != null) + { + String clientID = _clientProperties.getString(CLIENT_PROPERTIES_INSTANCE); + setContextKey(new AMQShortString(clientID)); + + // Log the Opening of the connection for this client + _actor.message(ConnectionMessages.CON_1001(clientID, _protocolVersion.toString(), true, true)); + } + + if (_clientProperties.getString(ClientProperties.version.toString()) != null) + { + _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); + } + } + _sessionIdentifier = new ProtocolSessionIdentifier(this); + } + + private void setProtocolVersion(ProtocolVersion pv) + { + _protocolVersion = pv; + + _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this); + _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion); + } + + public byte getProtocolMajorVersion() + { + return _protocolVersion.getMajorVersion(); + } + + public ProtocolVersion getProtocolVersion() + { + return _protocolVersion; + } + + public byte getProtocolMinorVersion() + { + return _protocolVersion.getMinorVersion(); + } + + public boolean isProtocolVersion(byte major, byte minor) + { + return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); + } + + public MethodRegistry getRegistry() + { + return getMethodRegistry(); + } + + public Object getClientIdentifier() + { + return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) throws AMQException + { + _virtualHost = virtualHost; + + _actor.virtualHostSelected(this); + _logSubject = new ConnectionLogSubject(this); + + _virtualHost.getConnectionRegistry().registerConnection(this); + + _managedObject = createMBean(); + _managedObject.register(); + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } + + public ProtocolOutputConverter getProtocolOutputConverter() + { + return _protocolOutputConverter; + } + + public void setAuthorizedID(Principal authorizedID) + { + _authorizedID = authorizedID; + + // Let the actor know that this connection is now Authorized + _actor.connectionAuthorized(this); + } + + public Principal getAuthorizedID() + { + return _authorizedID; + } + + public SocketAddress getRemoteAddress() + { + return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); + } + + public MethodRegistry getMethodRegistry() + { + return MethodRegistry.getMethodRegistry(getProtocolVersion()); + } + + public MethodDispatcher getMethodDispatcher() + { + return _dispatcher; + } + + @Override + public void closed() + { + try + { + closeSession(); + } + catch (AMQException e) + { + _logger.error("Could not close protocol engine", e); + } + } + + @Override + public void readerIdle() + { + // Nothing + } + + @Override + public void setNetworkDriver(NetworkDriver driver) + { + _networkDriver = driver; + } + + @Override + public void writerIdle() + { + _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); + } + + @Override + public void exception(Throwable throwable) + { + if (throwable instanceof AMQProtocolHeaderException) + { + + writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); + _networkDriver.close(); + + _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); + } + else if (throwable instanceof IOException) + { + _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable); + } + else + { + _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable); + + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); + ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); + + writeFrame(closeBody.generateFrame(0)); + + _networkDriver.close(); + } + } + + @Override + public void init() + { + // Do nothing + } + + @Override + public void setSender(Sender sender) + { + // Do nothing + } + + @Override + public long getReadBytes() + { + return _readBytes; + } + + public long getWrittenBytes() + { + return _writtenBytes; + } + + public long getLastIoTime() + { + return _lastIoTime; + } + + public ProtocolSessionIdentifier getSessionIdentifier() + { + return _sessionIdentifier; + } + + public String getClientVersion() + { + return (_clientVersion == null) ? null : _clientVersion.toString(); + } + + @Override + public void closeIfLingeringClosedChannels() + { + for (Entryid : _closingChannelsList.entrySet()) + { + if (id.getValue() + 30000 > System.currentTimeMillis()) + { + // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection + _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed"); + closeProtocolSession(); + } + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java new file mode 100644 index 0000000000..ff0c007a60 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java @@ -0,0 +1,29 @@ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.NetworkDriver; + +public class AMQProtocolEngineFactory implements ProtocolEngineFactory +{ + private VirtualHostRegistry _vhosts; + + public AMQProtocolEngineFactory() + { + this(1); + } + + public AMQProtocolEngineFactory(Integer port) + { + _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry(); + } + + + public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + { + return new AMQProtocolEngine(_vhosts, networkDriver); + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index fff406bb3d..b16ed01c79 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.Principal; +import java.util.List; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession @@ -210,5 +211,21 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession public MethodDispatcher getMethodDispatcher(); public ProtocolSessionIdentifier getSessionIdentifier(); + + String getClientVersion(); + + long getLastIoTime(); + + long getWrittenBytes(); + + Long getMaximumNumberOfChannels(); + + void setMaximumNumberOfChannels(Long value); + + void commitTransactions(AMQChannel channel) throws AMQException; + + List getChannels(); + + void closeIfLingeringClosedChannels(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java index 81dbeeded2..8497c95e26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java @@ -79,7 +79,7 @@ import org.apache.qpid.server.management.ManagedObject; @MBeanDescription("Management Bean for an AMQ Broker Connection") public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection { - private AMQMinaProtocolSession _session = null; + private AMQProtocolSession _protocolSession = null; private String _name = null; // openmbean data types for representing the channel attributes @@ -92,10 +92,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed new AMQShortString("Broker Management Console has closed the connection."); @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection") - public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException + public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException { super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION); - _session = session; + _protocolSession = amqProtocolSession; String remote = getRemoteAddress(); remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote; _name = jmxEncode(new StringBuffer(remote), 0).toString(); @@ -128,52 +128,52 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public String getClientId() { - return (_session.getContextKey() == null) ? null : _session.getContextKey().toString(); + return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString(); } public String getAuthorizedId() { - return (_session.getAuthorizedID() != null ) ? _session.getAuthorizedID().getName() : null; + return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null; } public String getVersion() { - return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString(); + return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString(); } public Date getLastIoTime() { - return new Date(_session.getIOSession().getLastIoTime()); + return new Date(_protocolSession.getLastIoTime()); } public String getRemoteAddress() { - return _session.getIOSession().getRemoteAddress().toString(); + return _protocolSession.getRemoteAddress().toString(); } public ManagedObject getParentObject() { - return _session.getVirtualHost().getManagedObject(); + return _protocolSession.getVirtualHost().getManagedObject(); } public Long getWrittenBytes() { - return _session.getIOSession().getWrittenBytes(); + return _protocolSession.getWrittenBytes(); } public Long getReadBytes() { - return _session.getIOSession().getReadBytes(); + return _protocolSession.getWrittenBytes(); } public Long getMaximumNumberOfChannels() { - return _session.getMaximumNumberOfChannels(); + return _protocolSession.getMaximumNumberOfChannels(); } public void setMaximumNumberOfChannels(Long value) { - _session.setMaximumNumberOfChannels(value); + _protocolSession.setMaximumNumberOfChannels(value); } public String getObjectInstanceName() @@ -192,13 +192,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { - AMQChannel channel = _session.getChannel(channelId); + AMQChannel channel = _protocolSession.getChannel(channelId); if (channel == null) { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - _session.commitTransactions(channel); + _protocolSession.commitTransactions(channel); } catch (AMQException ex) { @@ -221,13 +221,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { - AMQChannel channel = _session.getChannel(channelId); + AMQChannel channel = _protocolSession.getChannel(channelId); if (channel == null) { throw new JMException("The channel (channel Id = " + channelId + ") does not exist"); } - _session.rollbackTransactions(channel); + _protocolSession.commitTransactions(channel); } catch (AMQException ex) { @@ -248,7 +248,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public TabularData channels() throws OpenDataException { TabularDataSupport channelsList = new TabularDataSupport(_channelsType); - List list = _session.getChannels(); + List list = _protocolSession.getChannels(); for (AMQChannel channel : list) { @@ -274,7 +274,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed public void closeConnection() throws JMException { - MethodRegistry methodRegistry = _session.getMethodRegistry(); + MethodRegistry methodRegistry = _protocolSession.getMethodRegistry(); ConnectionCloseBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode @@ -301,12 +301,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed try { - _session.writeFrame(responseBody.generateFrame(0)); + _protocolSession.writeFrame(responseBody.generateFrame(0)); try { - _session.closeSession(); + _protocolSession.closeSession(); } catch (AMQException ex) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 7511b5c818..1affdd6590 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -258,7 +258,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry for (InetSocketAddress bindAddress : _acceptors.keySet()) { QpidAcceptor acceptor = _acceptors.get(bindAddress); - acceptor.getIoAcceptor().unbind(bindAddress); + acceptor.getNetworkDriver().close(); CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort())); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java index 5b3c3659c4..7450322130 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.security.access.plugins.network; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -32,7 +31,6 @@ import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.security.access.ACLPluginFactory; @@ -180,13 +178,13 @@ public class FirewallPlugin extends AbstractACLPlugin @Override public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost) { - if (!(session instanceof AMQMinaProtocolSession)) + SocketAddress sockAddr = session.getRemoteAddress(); + if (!(sockAddr instanceof InetSocketAddress)) { - return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which - // mean MINA right now + return AuthzResult.ABSTAIN; // We only deal with tcp sessions } - InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session); + InetAddress addr = ((InetSocketAddress) sockAddr).getAddress(); if (addr == null) { @@ -213,19 +211,6 @@ public class FirewallPlugin extends AbstractACLPlugin } - private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session) - { - SocketAddress remote = session.getIOSession().getRemoteAddress(); - if (remote instanceof InetSocketAddress) - { - return ((InetSocketAddress) remote).getAddress(); - } - else - { - return null; - } - } - public void setConfiguration(Configuration config) throws ConfigurationException { // Get default action diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java index 61cc7cdeb6..3ca22b60c8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.transport; -import org.apache.mina.common.IoAcceptor; +import org.apache.qpid.transport.NetworkDriver; public class QpidAcceptor { - IoAcceptor _acceptor; + NetworkDriver _driver; String _protocol; - public QpidAcceptor(IoAcceptor acceptor, String protocol) + public QpidAcceptor(NetworkDriver driver, String protocol) { - _acceptor = acceptor; + _driver = driver; _protocol = protocol; } - public IoAcceptor getIoAcceptor() + public NetworkDriver getNetworkDriver() { - return _acceptor; + return _driver; } public String toString() diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 9ab3592628..3b776a62b4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -279,6 +279,14 @@ public class VirtualHost implements Accessable _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), period / 2, period); + + class ForceChannelClosuresTask extends TimerTask + { + public void run() + { + _connectionRegistry.expireClosedChannels(); + } + } } } -- cgit v1.2.1