summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/pom.xml132
-rw-r--r--java/broker/src/main/java/log4j.properties (renamed from java/broker/src/log4j.properties)0
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Main.java87
3 files changed, 170 insertions, 49 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
new file mode 100644
index 0000000000..c66ddd82e6
--- /dev/null
+++ b/java/broker/pom.xml
@@ -0,0 +1,132 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid Broker</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-filter-ssl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-java5</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>backport-util-concurrent</groupId>
+ <artifactId>backport-util-concurrent</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ant</groupId>
+ <artifactId>ant-junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>ant-test</id>
+ <phase>test</phase>
+ <configuration>
+ <tasks unless="${maven.test.skip}">
+ <taskdef name="junit"
+ classname="org.apache.tools.ant.taskdefs.optional.junit.JUnitTask">
+ <classpath>
+ <path refid="maven.test.classpath"/>
+ </classpath>
+ </taskdef>
+ <mkdir dir="${project.build.directory}/test-classes"/>
+ <junit fork="yes" showoutput="true" haltonfailure="yes">
+ <test name="org.apache.qpid.server.UnitTests"
+ todir="${project.build.directory}"/>
+ <formatter type="plain"/>
+ <classpath>
+ <path refid="maven.test.classpath"/>
+ </classpath>
+ </junit>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/java/broker/src/log4j.properties b/java/broker/src/main/java/log4j.properties
index 87f04f4991..87f04f4991 100644
--- a/java/broker/src/log4j.properties
+++ b/java/broker/src/main/java/log4j.properties
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java
index d2dacb6140..a1dabcd964 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -46,6 +46,7 @@ import org.apache.log4j.xml.DOMConfigurator;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import javax.management.JMException;
@@ -64,6 +65,7 @@ import java.util.List;
/**
* Main entry point for AMQPD.
+ *
*/
public class Main implements ProtocolVersionList
{
@@ -120,10 +122,10 @@ public class Main implements ProtocolVersionList
Option bind = OptionBuilder.withArgName("bind").hasArg().withDescription("bind to the specified address. Overrides any value in the config file").
withLongOpt("bind").create("b");
Option logconfig = OptionBuilder.withArgName("logconfig").hasArg().withDescription("use the specified log4j xml configuration file. By " +
- "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
+ "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME + " in the same directory as the configuration file").
withLongOpt("logconfig").create("l");
Option logwatchconfig = OptionBuilder.withArgName("logwatch").hasArg().withDescription("monitor the log file configuration file for changes. Units are seconds. " +
- "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+ "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
options.addOption(help);
options.addOption(version);
@@ -147,12 +149,10 @@ public class Main implements ProtocolVersionList
{
String ver = "Qpid 0.9.0.0";
String protocol = "AMQP version(s) [major.minor]: ";
- for (int i = 0; i < pv.length; i++)
+ for (int i=0; i<pv.length; i++)
{
if (i > 0)
- {
protocol += ", ";
- }
protocol += pv[i][PROTOCOL_MAJOR] + "." + pv[i][PROTOCOL_MINOR];
}
System.out.println(ver + " (" + protocol + ")");
@@ -223,8 +223,7 @@ public class Main implements ProtocolVersionList
ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance().
getConfiguredObject(ConnectorConfiguration.class);
- // From old Mina
- //ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
+ ByteBuffer.setUseDirectBuffers(connectorConfig.enableDirectBuffers);
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
@@ -259,12 +258,12 @@ public class Main implements ProtocolVersionList
int totalVHosts = ((Collection) virtualHosts).size();
for (int vhost = 0; vhost < totalVHosts; vhost++)
{
- setupVirtualHosts(configFile.getParent(), (String) ((List) virtualHosts).get(vhost));
+ setupVirtualHosts(configFile.getParent() , (String)((List)virtualHosts).get(vhost));
}
}
else
{
- setupVirtualHosts(configFile.getParent(), (String) virtualHosts);
+ setupVirtualHosts(configFile.getParent() , (String)virtualHosts);
}
}
bind(port, connectorConfig);
@@ -281,7 +280,7 @@ public class Main implements ProtocolVersionList
configFilePath = configFileParent + configFilePath.substring(configVar.length());
}
- if (configFilePath.indexOf(".xml") != -1)
+ if (configFilePath.indexOf(".xml") != -1 )
{
VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath);
vHostConfig.performBindings();
@@ -294,11 +293,11 @@ public class Main implements ProtocolVersionList
String[] fileNames = virtualHostDir.list();
- for (int each = 0; each < fileNames.length; each++)
+ for (int each=0; each < fileNames.length; each++)
{
if (fileNames[each].endsWith(".xml"))
{
- VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath + "/" + fileNames[each]);
+ VirtualHostConfiguration vHostConfig = new VirtualHostConfiguration(configFilePath+"/"+fileNames[each]);
vHostConfig.performBindings();
}
}
@@ -317,10 +316,8 @@ public class Main implements ProtocolVersionList
{
//IoAcceptor acceptor = new SocketAcceptor(connectorConfig.processors);
IoAcceptor acceptor = connectorConfig.createAcceptor();
-
- SocketSessionConfig sc;
-
- sc = (SocketSessionConfig) acceptor.getSessionConfig();
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
@@ -330,7 +327,7 @@ public class Main implements ProtocolVersionList
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- acceptor.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(new ReadWriteThreadModel());
}
if (connectorConfig.enableNonSSL)
@@ -345,9 +342,7 @@ public class Main implements ProtocolVersionList
{
bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
}
- acceptor.setLocalAddress(bindAddress);
- acceptor.setHandler(handler);
- acceptor.bind();
+ acceptor.bind(bindAddress, handler, sconfig);
_logger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
}
@@ -357,9 +352,8 @@ public class Main implements ProtocolVersionList
handler.setUseSSL(true);
try
{
- acceptor.setLocalAddress(new InetSocketAddress(connectorConfig.sslPort));
- acceptor.setHandler(handler);
- acceptor.bind();
+ acceptor.bind(new InetSocketAddress(connectorConfig.sslPort),
+ handler, sconfig);
_logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
}
catch (IOException e)
@@ -414,16 +408,15 @@ public class Main implements ProtocolVersionList
catch (NumberFormatException e)
{
System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +
- "a non-negative integer. Using default of zero (no watching configured");
+ "a non-negative integer. Using default of zero (no watching configured");
}
if (logConfigFile.exists() && logConfigFile.canRead())
{
System.out.println("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
-
if (logWatchTime > 0)
{
System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +
- logWatchTime + " seconds");
+ logWatchTime + " seconds");
// log4j expects the watch interval in milliseconds
DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
}
@@ -448,7 +441,7 @@ public class Main implements ProtocolVersionList
}
catch (NotCompliantMBeanException ex)
{
- throw new AMQException("Exception occured in creating AMQBrokerManager MBean.");
+ throw new AMQException("Exception occured in creating AMQBrokerManager MBean.");
}
}
@@ -458,24 +451,24 @@ public class Main implements ProtocolVersionList
*/
@MBeanDescription("This MBean exposes the broker level management features")
private final class AMQBrokerManager extends AMQManagedObject
- implements ManagedBroker
+ implements ManagedBroker
{
- private final QueueRegistry _queueRegistry;
+ private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
- private final ExchangeFactory _exchangeFactory;
- private final MessageStore _messageStore;
+ private final ExchangeFactory _exchangeFactory;
+ private final MessageStore _messageStore;
@MBeanConstructor("Creates the Broker Manager MBean")
- protected AMQBrokerManager() throws NotCompliantMBeanException
+ protected AMQBrokerManager() throws NotCompliantMBeanException
{
super(ManagedBroker.class, ManagedBroker.TYPE);
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _queueRegistry = appRegistry.getQueueRegistry();
+ _queueRegistry = appRegistry.getQueueRegistry();
_exchangeRegistry = appRegistry.getExchangeRegistry();
- _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
- _messageStore = ApplicationRegistry.getInstance().getMessageStore();
- }
+ _exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
+ _messageStore = ApplicationRegistry.getInstance().getMessageStore();
+ }
public String getObjectInstanceName()
{
@@ -484,7 +477,6 @@ public class Main implements ProtocolVersionList
/**
* Creates new exchange and registers it with the registry.
- *
* @param exchangeName
* @param type
* @param durable
@@ -495,7 +487,7 @@ public class Main implements ProtocolVersionList
String type,
boolean durable,
boolean autoDelete)
- throws JMException
+ throws JMException
{
try
{
@@ -506,10 +498,10 @@ public class Main implements ProtocolVersionList
if (exchange == null)
{
exchange = _exchangeFactory.createExchange(exchangeName,
- type, //eg direct
- durable,
- autoDelete,
- 0); //ticket no
+ type, //eg direct
+ durable,
+ autoDelete,
+ 0); //ticket no
_exchangeRegistry.registerExchange(exchange);
}
else
@@ -518,7 +510,7 @@ public class Main implements ProtocolVersionList
}
}
}
- catch (AMQException ex)
+ catch(AMQException ex)
{
_logger.error("Error in creating exchange " + exchangeName, ex);
throw new MBeanException(ex, ex.toString());
@@ -527,12 +519,11 @@ public class Main implements ProtocolVersionList
/**
* Unregisters the exchange from registry.
- *
* @param exchangeName
* @throws JMException
*/
public void unregisterExchange(String exchangeName)
- throws JMException
+ throws JMException
{
boolean inUse = false;
// TODO
@@ -543,7 +534,7 @@ public class Main implements ProtocolVersionList
{
_exchangeRegistry.unregisterExchange(exchangeName, false);
}
- catch (AMQException ex)
+ catch(AMQException ex)
{
_logger.error("Error in unregistering exchange " + exchangeName, ex);
throw new MBeanException(ex, ex.toString());
@@ -553,7 +544,6 @@ public class Main implements ProtocolVersionList
/**
* Creates a new queue and registers it with the registry and puts it
* in persistance storage if durable queue.
- *
* @param queueName
* @param durable
* @param owner
@@ -564,7 +554,7 @@ public class Main implements ProtocolVersionList
boolean durable,
String owner,
boolean autoDelete)
- throws JMException
+ throws JMException
{
AMQQueue queue = _queueRegistry.getQueue(queueName);
if (queue == null)
@@ -592,7 +582,6 @@ public class Main implements ProtocolVersionList
/**
* Deletes the queue from queue registry and persistant storage.
- *
* @param queueName
* @throws JMException
*/