diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-11-26 15:57:46 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-11-26 15:57:46 +0000 |
| commit | a62268645e691b71a645ea19ca41e93df6c7cff9 (patch) | |
| tree | 88fd0d7c71d2a4e941085d96a9c3fde0a8d67fa3 /java/broker/src | |
| parent | c7ae06e49f2376853c0e77afefa0a59a7c9612ea (diff) | |
| download | qpid-python-a62268645e691b71a645ea19ca41e93df6c7cff9.tar.gz | |
QPID-92, QPID-564 : Upgraded Mina to 1.0.1 still not good enough but all future versions currently have a bug with the CumulativeProtocolDecoder. It compact()s the buffer which breaks slices. Added MultiThread Support which is some of the feature set of QPID-564
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@598324 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
3 files changed, 81 insertions, 26 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index a87c704cf8..ab9f40b31d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -436,7 +436,7 @@ public class Main } } - //fixme qpid.AMQP should be using qpidproperties to get value + //fixme qpid.AMQP should be using qpidproperties to get value _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion() + " build: " + QpidProperties.getBuildVersion()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 06f2fbcfd7..fa9d83cd7e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,10 +23,15 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; @@ -45,7 +50,6 @@ import java.net.InetSocketAddress; * * We delegate all frame (message) processing to the AMQProtocolSession which wraps * the state for the connection. - * */ public class AMQPFastProtocolHandler extends IoHandlerAdapter { @@ -109,11 +113,41 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } } + + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false)) + { + try + { +// //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + int buf_size = 32768; + if (protocolSession.getConfig() instanceof SocketSessionConfig) + { + buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize(); + } + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + _logger.info("Using IO Read/Write Filter Protection"); + } + catch (Exception e) + { + _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage()); + } + } } - /** - * Separated into its own, protected, method to allow easier reuse - */ + /** Separated into its own, protected, method to allow easier reuse */ protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException { new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec); @@ -184,8 +218,10 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter /** * Invoked when a message is received on a particular protocol session. Note that a * protocol session is directly tied to a particular physical connection. + * * @param protocolSession the protocol session that received the message - * @param message the message itself (i.e. a decoded frame) + * @param message the message itself (i.e. a decoded frame) + * * @throws Exception if the message cannot be processed */ public void messageReceived(IoSession protocolSession, Object message) throws Exception @@ -195,7 +231,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter if (message instanceof AMQDataBlock) { amqProtocolSession.dataBlockReceived((AMQDataBlock) message); - + } else if (message instanceof ByteBuffer) { @@ -209,9 +245,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter /** * Called after a message has been sent out on a particular protocol session + * * @param protocolSession the protocol session (i.e. connection) on which this - * message was sent - * @param object the message (frame) that was encoded and sent + * message was sent + * @param object the message (frame) that was encoded and sent + * * @throws Exception if we want to indicate an error */ public void messageSent(IoSession protocolSession, Object object) throws Exception diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index a4ed859fa7..1dcbb02d5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,9 +23,12 @@ package org.apache.qpid.server.transport; import org.apache.mina.common.IoAcceptor; import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.configuration.Configured; +import org.apache.log4j.Logger; public class ConnectorConfiguration { + private static final Logger _logger = Logger.getLogger(ConnectorConfiguration.class); + public static final String DEFAULT_PORT = "5672"; public static final String SSL_PORT = "8672"; @@ -41,7 +44,7 @@ public class ConnectorConfiguration @Configured(path = "connector.bind", defaultValue = "wildcard") public String bindAddress; - + @Configured(path = "connector.socketReceiveBuffer", defaultValue = "32767") public int socketReceiveBufferSize; @@ -69,29 +72,43 @@ public class ConnectorConfiguration @Configured(path = "connector.ssl.enabled", defaultValue = "false") public boolean enableSSL; - + @Configured(path = "connector.ssl.sslOnly", - defaultValue = "true") + defaultValue = "true") public boolean sslOnly; - + @Configured(path = "connector.ssl.port", - defaultValue = SSL_PORT) - public int sslPort; - + defaultValue = SSL_PORT) + public int sslPort; + @Configured(path = "connector.ssl.keystorePath", - defaultValue = "none") + defaultValue = "none") public String keystorePath; - + @Configured(path = "connector.ssl.keystorePassword", - defaultValue = "none") + defaultValue = "none") public String keystorePassword; - + @Configured(path = "connector.ssl.certType", - defaultValue = "SunX509") + defaultValue = "SunX509") public String certType; + @Configured(path = "connector.qpidnio", + defaultValue = "true") + public boolean _multiThreadNIO; + + public IoAcceptor createAcceptor() { - return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); + if (_multiThreadNIO) + { + _logger.warn("Using Qpid Multithreaded IO Processing"); + return new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(processors, new NewThreadExecutor()); + } + else + { + _logger.warn("Using Mina IO Processing"); + return new org.apache.mina.transport.socket.nio.SocketAcceptor(processors, new NewThreadExecutor()); + } } } |
