summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/pom.xml68
-rwxr-xr-xqpid/java/broker/python-test.xml32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java22
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java130
6 files changed, 252 insertions, 26 deletions
diff --git a/qpid/java/broker/pom.xml b/qpid/java/broker/pom.xml
index 2cf8a563f0..85ec5da3bf 100644
--- a/qpid/java/broker/pom.xml
+++ b/qpid/java/broker/pom.xml
@@ -15,7 +15,7 @@
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
- -->
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -44,6 +44,11 @@
</dependency>
<dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
@@ -58,7 +63,14 @@
<artifactId>commons-lang</artifactId>
</dependency>
- <!-- Test Dependencies -->
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -120,6 +132,7 @@
</configuration>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
@@ -145,6 +158,57 @@
</testResource>
</testResources>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${antrun.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>ant</groupId>
+ <artifactId>ant-nodeps</artifactId>
+ <version>1.6.5</version>
+ </dependency>
+ </dependencies>
+
+ <executions>
+ <execution>
+ <id>python_test</id>
+ <phase>test</phase>
+ <configuration>
+ <tasks>
+
+ <condition property="broker.dir"
+ value="${user.dir}${file.separator}..${file.separator}broker"
+ else="${user.dir}">
+ <equals arg1="${topDirectoryLocation}" arg2="." />
+ </condition>
+
+ <condition property="skip-python-tests" value="true">
+ <isset property="skip.python.tests"/>
+ </condition>
+
+ <property name="command"
+ value="python run-tests -v -I java_failing.txt"/>
+ <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>-->
+
+ <ant antfile="python-test.xml" inheritRefs="true">
+ <target name="run-tests" />
+ </ant>
+
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+
</build>
</project>
diff --git a/qpid/java/broker/python-test.xml b/qpid/java/broker/python-test.xml
new file mode 100755
index 0000000000..8ade392d1c
--- /dev/null
+++ b/qpid/java/broker/python-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- ====================================================================== -->
+<!-- Ant build file (http://ant.apache.org/) for Ant 1.6.2 or above. -->
+<!-- ====================================================================== -->
+
+<project basedir="." default="default">
+
+ <target name="default" >
+ <echo message="Used via maven to run python tests."/>
+ </target>
+
+ <property name="pythondir" value="../../python"/>
+
+ <target name="run-tests" unless="skip-python-tests">
+
+ <echo message="Starting Broker with command"/>
+
+ <java classname="org.apache.qpid.server.RunBrokerWithCommand"
+ fork="true"
+ dir="${pythondir}"
+ failonerror="true"
+ >
+ <arg value="${command}"/>
+
+ <classpath refid="maven.test.classpath"/>
+ <sysproperty key="QPID_HOME" value="${broker.dir}"/>
+ <sysproperty key="QPID_WORK" value="${broker.dir}${file.separator}target"/>
+ </java>
+
+ </target>
+</project>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 6f970730ec..29ea69caf7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.BindException;
import java.util.Collection;
import java.util.List;
import org.apache.commons.cli.CommandLine;
@@ -171,7 +172,8 @@ public class Main
}
else if (commandLine.hasOption("v"))
{
- String ver = "Qpid 0.9.0.0";
+ String ver = QpidProperties.getVersionString();
+
StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
boolean first = true;
@@ -357,7 +359,7 @@ public class Main
* @todo Partially implements top-level error handler. Better to let these errors fall through to a single
* top-level handler.
*/
- protected void bind(int port, ConnectorConfiguration connectorConfig)
+ protected void bind(int port, ConnectorConfiguration connectorConfig) throws BindException
{
String bindAddr = commandLine.getOptionValue("b");
if (bindAddr == null)
@@ -425,6 +427,8 @@ public class Main
catch (Exception e)
{
_logger.error("Unable to bind service to registry: " + e, e);
+ //fixme this need tidying up
+ throw new BindException(e.getMessage());
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 28b2489142..8462ed9557 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -182,10 +182,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
+
+ return deliverFrame.toByteBuffer();
}
private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
@@ -201,10 +199,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
queueSize,
messageHandle.isRedelivered(),
pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
+
+ return getOkFrame.toByteBuffer();
}
public byte getProtocolMinorVersion()
@@ -225,10 +221,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
message.getMessagePublishInfo().getExchange(),
replyCode, replyText,
message.getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
+
+ return returnFrame.toByteBuffer();
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 0fb5e6d88a..2aa759b35d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -717,7 +717,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException
{
- if (_log.isDebugEnabled())
+
+ final boolean debugEnabled = _log.isDebugEnabled();
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg);
}
@@ -732,7 +734,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (s == null) //no-one can take the message right now.
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus());
}
@@ -744,7 +746,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_lock.unlock();
//Pre Deliver to all subscriptions
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to:" + currentStatus());
@@ -755,7 +757,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
if (_queue.isShared() && msg.getDeliveredToConsumer())
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
") is already delivered.");
@@ -766,7 +768,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
@@ -795,9 +797,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else
{
- if (_log.isInfoEnabled())
+ if (debugEnabled)
{
- _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
+ _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " +
"suspended between nextSubscriber and send for message:" + msg.debugIdentity());
}
}
@@ -805,9 +807,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (!msg.isTaken(_queue))
{
- if (_log.isInfoEnabled())
+ if (debugEnabled)
{
- _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
+ _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" +
" Subscriber:" + System.identityHashCode(s));
}
@@ -815,7 +817,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
else
{
- if (_log.isDebugEnabled())
+ if (debugEnabled)
{
_log.debug(debugIdentity() + " Message(" + msg.toString() +
") has been taken so disregarding deliver request to Subscriber:" +
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
new file mode 100644
index 0000000000..1ebecbacb6
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+public class RunBrokerWithCommand
+{
+ public static void main(String[] args)
+ {
+ //Start broker
+
+ try
+ {
+
+ String[] fudge = new String[1];
+ fudge[0] = "-v";
+ new Main(fudge).startup();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Unable to start broker due to: " + e.getMessage());
+
+ e.printStackTrace();
+ exit(1);
+ }
+
+ Logger.getRootLogger().setLevel(Level.ERROR);
+
+ //run command
+ try
+ {
+ Process task = Runtime.getRuntime().exec(args[0]);
+ System.out.println("Started Proccess: " + args[0]);
+
+ InputStream inputStream = task.getInputStream();
+
+ InputStream errorStream = task.getErrorStream();
+
+ Thread out = new Thread(new Outputter("[OUT]", new BufferedReader(new InputStreamReader(inputStream))));
+ Thread err = new Thread(new Outputter("[ERR]", new BufferedReader(new InputStreamReader(errorStream))));
+
+ out.start();
+ err.start();
+
+ out.join();
+ err.join();
+
+ System.out.println("Waiting for process to exit: " + args[0]);
+ task.waitFor();
+ System.out.println("Done Proccess: " + args[0]);
+
+ }
+ catch (IOException e)
+ {
+ System.out.println("Proccess had problems: " + e.getMessage());
+ exit(1);
+ }
+ catch (InterruptedException e)
+ {
+ System.out.println("Proccess had problems: " + e.getMessage());
+
+ exit(1);
+ }
+
+
+ exit(0);
+ }
+
+ private static void exit(int i)
+ {
+ Logger.getRootLogger().setLevel(Level.INFO);
+ System.exit(i);
+ }
+
+ static class Outputter implements Runnable
+ {
+
+ BufferedReader reader;
+ String prefix;
+
+ Outputter(String s, BufferedReader r)
+ {
+ prefix = s;
+ reader = r;
+ }
+
+ public void run()
+ {
+ String line;
+ try
+ {
+ while ((line = reader.readLine()) != null)
+ {
+ System.out.println(prefix + line);
+ }
+ }
+ catch (IOException e)
+ {
+ System.out.println("Error occured reading; " + e.getMessage());
+ }
+ }
+
+ }
+
+}