diff options
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/pom.xml | 132 | ||||
| -rw-r--r-- | java/broker/src/main/java/log4j.properties (renamed from java/broker/src/log4j.properties) | 0 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/Main.java | 87 |
3 files changed, 170 insertions, 49 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml new file mode 100644 index 0000000000..c66ddd82e6 --- /dev/null +++ b/java/broker/pom.xml @@ -0,0 +1,132 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid Broker</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + </properties> + + <dependencies> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-filter-ssl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-java5</artifactId> + </dependency> + <dependency> + <groupId>backport-util-concurrent</groupId> + <artifactId>backport-util-concurrent</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-common</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ant</groupId> + <artifactId>ant-junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymockclassextension</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>ant-test</id> + <phase>test</phase> + <configuration> + <tasks unless="${maven.test.skip}"> + <taskdef name="junit" + classname="org.apache.tools.ant.taskdefs.optional.junit.JUnitTask"> + <classpath> + <path refid="maven.test.classpath"/> + </classpath> + </taskdef> + <mkdir dir="${project.build.directory}/test-classes"/> + <junit fork="yes" showoutput="true" haltonfailure="yes"> + <test name="org.apache.qpid.server.UnitTests" + todir="${project.build.directory}"/> + <formatter type="plain"/> + <classpath> + <path refid="maven.test.classpath"/> + </classpath> + </junit> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/java/broker/src/log4j.properties b/java/broker/src/main/java/log4j.properties index 87f04f4991..87f04f4991 100644 --- a/java/broker/src/log4j.properties +++ b/java/broker/src/main/java/log4j.properties diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index d2dacb6140..a1dabcd964 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -46,6 +46,7 @@ import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import javax.management.JMException; @@ -64,6 +65,7 @@ import java.util.List; /** * Main entry point for AMQPD. + * */ public class Main implements ProtocolVersionList { @@ -120,10 +122,10 @@ public class Main implements ProtocolVersionList Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file"). withLongOpt("bind").create("b"); Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " + - "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file"). + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file"). withLongOpt("logconfig").create("l"); Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " + - "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); + "Zero means do not check for changes.").withLongOpt("logwatch").create("w"); options.addOption(help); options.addOption(version); @@ -147,12 +149,10 @@ public class Main implements ProtocolVersionList { String ver = "Qpid 0.9.0.0"; String protocol = "AMQP version(s) [major.minor]: "; - for (int i = 0; i < pv.length; i++) + for (int i=0; i<pv.length; i++) { if (i > 0) - { protocol += ", "; - } protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR]; } System.out.println(ver + " (" + protocol + ")"); @@ -223,8 +223,7 @@ public class Main implements ProtocolVersionList ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). getConfiguredObject(ConnectorConfiguration.class); - // From old Mina - //ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); + ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done @@ -259,12 +258,12 @@ public class Main implements ProtocolVersionList int totalVHosts = ((Collection) virtualHosts).size(); for (int vhost = 0; vhost < totalVHosts; vhost++) { - setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost)); + setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost)); } } else { - setupVirtualHosts(configFile.getParent(), (String) virtualHosts); + setupVirtualHosts(configFile.getParent() , (String)virtualHosts); } } bind(port, connectorConfig); @@ -281,7 +280,7 @@ public class Main implements ProtocolVersionList configFilePath = configFileParent + configFilePath.substring(configVar.length()); } - if (configFilePath.indexOf(".xml") != -1) + if (configFilePath.indexOf(".xml") != -1 ) { VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath); vHostConfig.performBindings(); @@ -294,11 +293,11 @@ public class Main implements ProtocolVersionList String[] fileNames = virtualHostDir.list(); - for (int each = 0; each < fileNames.length; each++) + for (int each=0; each < fileNames.length; each++) { if (fileNames[each].endsWith(".xml")) { - VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]); + VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]); vHostConfig.performBindings(); } } @@ -317,10 +316,8 @@ public class Main implements ProtocolVersionList { //IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors); IoAcceptor acceptor = connectorConfig.createAcceptor(); - - SocketSessionConfig sc; - - sc = (SocketSessionConfig) acceptor.getSessionConfig(); + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); @@ -330,7 +327,7 @@ public class Main implements ProtocolVersionList // implementation provided by MINA if (connectorConfig.enableExecutorPool) { - acceptor.setThreadModel(new ReadWriteThreadModel()); + sconfig.setThreadModel(new ReadWriteThreadModel()); } if (connectorConfig.enableNonSSL) @@ -345,9 +342,7 @@ public class Main implements ProtocolVersionList { bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port); } - acceptor.setLocalAddress(bindAddress); - acceptor.setHandler(handler); - acceptor.bind(); + acceptor.bind(bindAddress, handler, sconfig); _logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress); } @@ -357,9 +352,8 @@ public class Main implements ProtocolVersionList handler.setUseSSL(true); try { - acceptor.setLocalAddress(new InetSocketAddress(connectorConfig.sslPort)); - acceptor.setHandler(handler); - acceptor.bind(); + acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), + handler, sconfig); _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); } catch (IOException e) @@ -414,16 +408,15 @@ public class Main implements ProtocolVersionList catch (NumberFormatException e) { System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " + - "a non-negative integer. Using default of zero (no watching configured"); + "a non-negative integer. Using default of zero (no watching configured"); } if (logConfigFile.exists() && logConfigFile.canRead()) { System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath()); - if (logWatchTime > 0) { System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " + - logWatchTime + " seconds"); + logWatchTime + " seconds"); // log4j expects the watch interval in milliseconds DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000); } @@ -448,7 +441,7 @@ public class Main implements ProtocolVersionList } catch (NotCompliantMBeanException ex) { - throw new AMQException("Exception occured in creating AMQBrokerManager MBean."); + throw new AMQException("Exception occured in creating AMQBrokerManager MBean."); } } @@ -458,24 +451,24 @@ public class Main implements ProtocolVersionList */ @MBeanDescription("This MBean exposes the broker level management features") private final class AMQBrokerManager extends AMQManagedObject - implements ManagedBroker + implements ManagedBroker { - private final QueueRegistry _queueRegistry; + private final QueueRegistry _queueRegistry; private final ExchangeRegistry _exchangeRegistry; - private final ExchangeFactory _exchangeFactory; - private final MessageStore _messageStore; + private final ExchangeFactory _exchangeFactory; + private final MessageStore _messageStore; @MBeanConstructor("Creates the Broker Manager MBean") - protected AMQBrokerManager() throws NotCompliantMBeanException + protected AMQBrokerManager() throws NotCompliantMBeanException { super(ManagedBroker.class, ManagedBroker.TYPE); IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getQueueRegistry(); + _queueRegistry = appRegistry.getQueueRegistry(); _exchangeRegistry = appRegistry.getExchangeRegistry(); - _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); - _messageStore = ApplicationRegistry.getInstance().getMessageStore(); - } + _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory(); + _messageStore = ApplicationRegistry.getInstance().getMessageStore(); + } public String getObjectInstanceName() { @@ -484,7 +477,6 @@ public class Main implements ProtocolVersionList /** * Creates new exchange and registers it with the registry. - * * @param exchangeName * @param type * @param durable @@ -495,7 +487,7 @@ public class Main implements ProtocolVersionList String type, boolean durable, boolean autoDelete) - throws JMException + throws JMException { try { @@ -506,10 +498,10 @@ public class Main implements ProtocolVersionList if (exchange == null) { exchange = _exchangeFactory.createExchange(exchangeName, - type, //eg direct - durable, - autoDelete, - 0); //ticket no + type, //eg direct + durable, + autoDelete, + 0); //ticket no _exchangeRegistry.registerExchange(exchange); } else @@ -518,7 +510,7 @@ public class Main implements ProtocolVersionList } } } - catch (AMQException ex) + catch(AMQException ex) { _logger.error("Error in creating exchange " + exchangeName, ex); throw new MBeanException(ex, ex.toString()); @@ -527,12 +519,11 @@ public class Main implements ProtocolVersionList /** * Unregisters the exchange from registry. - * * @param exchangeName * @throws JMException */ public void unregisterExchange(String exchangeName) - throws JMException + throws JMException { boolean inUse = false; // TODO @@ -543,7 +534,7 @@ public class Main implements ProtocolVersionList { _exchangeRegistry.unregisterExchange(exchangeName, false); } - catch (AMQException ex) + catch(AMQException ex) { _logger.error("Error in unregistering exchange " + exchangeName, ex); throw new MBeanException(ex, ex.toString()); @@ -553,7 +544,6 @@ public class Main implements ProtocolVersionList /** * Creates a new queue and registers it with the registry and puts it * in persistance storage if durable queue. - * * @param queueName * @param durable * @param owner @@ -564,7 +554,7 @@ public class Main implements ProtocolVersionList boolean durable, String owner, boolean autoDelete) - throws JMException + throws JMException { AMQQueue queue = _queueRegistry.getQueue(queueName); if (queue == null) @@ -592,7 +582,6 @@ public class Main implements ProtocolVersionList /** * Deletes the queue from queue registry and persistant storage. - * * @param queueName * @throws JMException */ |
