From a62268645e691b71a645ea19ca41e93df6c7cff9 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 26 Nov 2007 15:57:46 +0000 Subject: QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598324 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/etc/config.xml | 1 + java/broker/pom.xml | 20 ++++---- .../src/main/java/org/apache/qpid/server/Main.java | 2 +- .../server/protocol/AMQPFastProtocolHandler.java | 58 ++++++++++++++++++---- .../server/transport/ConnectorConfiguration.java | 47 ++++++++++++------ 5 files changed, 92 insertions(+), 36 deletions(-) (limited to 'java/broker') diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index 2257a612b3..737c8d22c4 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -33,6 +33,7 @@ keystorepass --> true + true nio 5672 8672 diff --git a/java/broker/pom.xml b/java/broker/pom.xml index 9e38751d62..dbc3b2aea9 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -6,9 +6,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,16 +49,16 @@ log4j - + org.slf4j - slf4j-api - 1.4.0 + slf4j-api + 1.4.0 - - org.slf4j - slf4j-log4j12 - 1.4.0 + + org.slf4j + slf4j-log4j12 + 1.4.0 @@ -208,7 +208,7 @@ - diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index a87c704cf8..ab9f40b31d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -436,7 +436,7 @@ public class Main } } - //fixme qpid.AMQP should be using qpidproperties to get value + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 06f2fbcfd7..fa9d83cd7e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,10 +23,15 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; @@ -45,7 +50,6 @@ import java.net.InetSocketAddress; * * We delegate all frame (message) processing to the AMQProtocolSession which wraps * the state for the connection. - * */ public class AMQPFastProtocolHandler extends IoHandlerAdapter { @@ -109,11 +113,41 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } } + + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false)) + { + try + { +// //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + int buf_size = 32768; + if (protocolSession.getConfig() instanceof SocketSessionConfig) + { + buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize(); + } + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + _logger.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } } - /** - * Separated into its own, protected, method to allow easier reuse - */ + /** Separated into its own, protected, method to allow easier reuse */ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException { new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); @@ -184,8 +218,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter /** * Invoked when a message is received on a particular protocol session. Note that a * protocol session is directly tied to a particular physical connection. + * * @param protocolSession the protocol session that received the message - * @param message the message itself (i.e. a decoded frame) + * @param message the message itself (i.e. a decoded frame) + * * @throws Exception if the message cannot be processed */ public void messageReceived(IoSession protocolSession, Object message) throws Exception @@ -195,7 +231,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter if (message instanceof AMQDataBlock) { amqProtocolSession.dataBlockReceived((AMQDataBlock) message); - + } else if (message instanceof ByteBuffer) { @@ -209,9 +245,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter /** * Called after a message has been sent out on a particular protocol session + * * @param protocolSession the protocol session (i.e. connection) on which this - * message was sent - * @param object the message (frame) that was encoded and sent + * message was sent + * @param object the message (frame) that was encoded and sent + * * @throws Exception if we want to indicate an error */ public void messageSent(IoSession protocolSession, Object object) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index a4ed859fa7..1dcbb02d5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,9 +23,12 @@ package org.apache.qpid.server.transport; import org.apache.mina.common.IoAcceptor; import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.configuration.Configured; +import org.apache.log4j.Logger; public class ConnectorConfiguration { + private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class); + public static final String DEFAULT_PORT = "5672"; public static final String SSL_PORT = "8672"; @@ -41,7 +44,7 @@ public class ConnectorConfiguration @Configured(path = "connector.bind", defaultValue = "wildcard") public String bindAddress; - + @Configured(path = "connector.socketReceiveBuffer", defaultValue = "32767") public int socketReceiveBufferSize; @@ -69,29 +72,43 @@ public class ConnectorConfiguration @Configured(path = "connector.ssl.enabled", defaultValue = "false") public boolean enableSSL; - + @Configured(path = "connector.ssl.sslOnly", - defaultValue = "true") + defaultValue = "true") public boolean sslOnly; - + @Configured(path = "connector.ssl.port", - defaultValue = SSL_PORT) - public int sslPort; - + defaultValue = SSL_PORT) + public int sslPort; + @Configured(path = "connector.ssl.keystorePath", - defaultValue = "none") + defaultValue = "none") public String keystorePath; - + @Configured(path = "connector.ssl.keystorePassword", - defaultValue = "none") + defaultValue = "none") public String keystorePassword; - + @Configured(path = "connector.ssl.certType", - defaultValue = "SunX509") + defaultValue = "SunX509") public String certType; + @Configured(path = "connector.qpidnio", + defaultValue = "true") + public boolean _multiThreadNIO; + + public IoAcceptor createAcceptor() { - return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); + if (_multiThreadNIO) + { + _logger.warn("Using Qpid Multithreaded IO Processing"); + return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor()); + } + else + { + _logger.warn("Using Mina IO Processing"); + return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); + } } } -- cgit v1.2.1