diff options
Diffstat (limited to 'qpid/java/broker')
6 files changed, 252 insertions, 26 deletions
diff --git a/qpid/java/broker/pom.xml b/qpid/java/broker/pom.xml index 2cf8a563f0..85ec5da3bf 100644 --- a/qpid/java/broker/pom.xml +++ b/qpid/java/broker/pom.xml @@ -15,7 +15,7 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - --> +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -44,6 +44,11 @@ </dependency> <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> @@ -58,7 +63,14 @@ <artifactId>commons-lang</artifactId> </dependency> - <!-- Test Dependencies --> + <!-- Test Dependencies --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.4.0</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -120,6 +132,7 @@ </configuration> </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> @@ -145,6 +158,57 @@ </testResource> </testResources> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>${antrun.version}</version> + <dependencies> + <dependency> + <groupId>ant</groupId> + <artifactId>ant-nodeps</artifactId> + <version>1.6.5</version> + </dependency> + </dependencies> + + <executions> + <execution> + <id>python_test</id> + <phase>test</phase> + <configuration> + <tasks> + + <condition property="broker.dir" + value="${user.dir}${file.separator}..${file.separator}broker" + else="${user.dir}"> + <equals arg1="${topDirectoryLocation}" arg2="." /> + </condition> + + <condition property="skip-python-tests" value="true"> + <isset property="skip.python.tests"/> + </condition> + + <property name="command" + value="python run-tests -v -I java_failing.txt"/> + <!--value="bash -c 'python run-tests -v -I java_failing.txt'"/>--> + + <ant antfile="python-test.xml" inheritRefs="true"> + <target name="run-tests" /> + </ant> + + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> + + </build> </project> diff --git a/qpid/java/broker/python-test.xml b/qpid/java/broker/python-test.xml new file mode 100755 index 0000000000..8ade392d1c --- /dev/null +++ b/qpid/java/broker/python-test.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- ====================================================================== --> +<!-- Ant build file (http://ant.apache.org/) for Ant 1.6.2 or above. --> +<!-- ====================================================================== --> + +<project basedir="." default="default"> + + <target name="default" > + <echo message="Used via maven to run python tests."/> + </target> + + <property name="pythondir" value="../../python"/> + + <target name="run-tests" unless="skip-python-tests"> + + <echo message="Starting Broker with command"/> + + <java classname="org.apache.qpid.server.RunBrokerWithCommand" + fork="true" + dir="${pythondir}" + failonerror="true" + > + <arg value="${command}"/> + + <classpath refid="maven.test.classpath"/> + <sysproperty key="QPID_HOME" value="${broker.dir}"/> + <sysproperty key="QPID_WORK" value="${broker.dir}${file.separator}target"/> + </java> + + </target> +</project> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 6f970730ec..29ea69caf7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.BindException; import java.util.Collection; import java.util.List; import org.apache.commons.cli.CommandLine; @@ -171,7 +172,8 @@ public class Main } else if (commandLine.hasOption("v")) { - String ver = "Qpid 0.9.0.0"; + String ver = QpidProperties.getVersionString(); + StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: "); boolean first = true; @@ -357,7 +359,7 @@ public class Main * @todo Partially implements top-level error handler. Better to let these errors fall through to a single * top-level handler. */ - protected void bind(int port, ConnectorConfiguration connectorConfig) + protected void bind(int port, ConnectorConfiguration connectorConfig) throws BindException { String bindAddr = commandLine.getOptionValue("b"); if (bindAddr == null) @@ -425,6 +427,8 @@ public class Main catch (Exception e) { _logger.error("Unable to bind service to registry: " + e, e); + //fixme this need tidying up + throw new BindException(e.getMessage()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 28b2489142..8462ed9557 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -182,10 +182,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
+
+ return deliverFrame.toByteBuffer();
}
private ByteBuffer createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
@@ -201,10 +199,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter queueSize,
messageHandle.isRedelivered(),
pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
+
+ return getOkFrame.toByteBuffer();
}
public byte getProtocolMinorVersion()
@@ -225,10 +221,8 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter message.getMessagePublishInfo().getExchange(),
replyCode, replyText,
message.getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
+
+ return returnFrame.toByteBuffer();
}
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 0fb5e6d88a..2aa759b35d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -717,7 +717,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void deliver(StoreContext context, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws AMQException { - if (_log.isDebugEnabled()) + + final boolean debugEnabled = _log.isDebugEnabled(); + if (debugEnabled) { _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") :" + msg); } @@ -732,7 +734,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (s == null) //no-one can take the message right now. { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "Testing Message(" + msg + ") for Queued Delivery:" + currentStatus()); } @@ -744,7 +746,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.unlock(); //Pre Deliver to all subscriptions - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to:" + currentStatus()); @@ -755,7 +757,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // stop if the message gets delivered whilst PreDelivering if we have a shared queue. if (_queue.isShared() && msg.getDeliveredToConsumer()) { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered."); @@ -766,7 +768,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Only give the message to those that want them. if (sub.hasInterest(msg)) { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")"); @@ -795,9 +797,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { - if (_log.isInfoEnabled()) + if (debugEnabled) { - _log.info(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + + _log.debug(debugIdentity() + " Subscription(" + System.identityHashCode(s) + ") became " + "suspended between nextSubscriber and send for message:" + msg.debugIdentity()); } } @@ -805,9 +807,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (!msg.isTaken(_queue)) { - if (_log.isInfoEnabled()) + if (debugEnabled) { - _log.info(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + + _log.debug(debugIdentity() + " Message(" + msg.debugIdentity() + ") has not been taken so recursing!:" + " Subscriber:" + System.identityHashCode(s)); } @@ -815,7 +817,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } else { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + " Message(" + msg.toString() + ") has been taken so disregarding deliver request to Subscriber:" + diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java new file mode 100644 index 0000000000..1ebecbacb6 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/RunBrokerWithCommand.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server; + +import org.apache.log4j.Logger; +import org.apache.log4j.Level; + +import java.io.InputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.IOException; + +public class RunBrokerWithCommand +{ + public static void main(String[] args) + { + //Start broker + + try + { + + String[] fudge = new String[1]; + fudge[0] = "-v"; + new Main(fudge).startup(); + } + catch (Exception e) + { + System.out.println("Unable to start broker due to: " + e.getMessage()); + + e.printStackTrace(); + exit(1); + } + + Logger.getRootLogger().setLevel(Level.ERROR); + + //run command + try + { + Process task = Runtime.getRuntime().exec(args[0]); + System.out.println("Started Proccess: " + args[0]); + + InputStream inputStream = task.getInputStream(); + + InputStream errorStream = task.getErrorStream(); + + Thread out = new Thread(new Outputter("[OUT]", new BufferedReader(new InputStreamReader(inputStream)))); + Thread err = new Thread(new Outputter("[ERR]", new BufferedReader(new InputStreamReader(errorStream)))); + + out.start(); + err.start(); + + out.join(); + err.join(); + + System.out.println("Waiting for process to exit: " + args[0]); + task.waitFor(); + System.out.println("Done Proccess: " + args[0]); + + } + catch (IOException e) + { + System.out.println("Proccess had problems: " + e.getMessage()); + exit(1); + } + catch (InterruptedException e) + { + System.out.println("Proccess had problems: " + e.getMessage()); + + exit(1); + } + + + exit(0); + } + + private static void exit(int i) + { + Logger.getRootLogger().setLevel(Level.INFO); + System.exit(i); + } + + static class Outputter implements Runnable + { + + BufferedReader reader; + String prefix; + + Outputter(String s, BufferedReader r) + { + prefix = s; + reader = r; + } + + public void run() + { + String line; + try + { + while ((line = reader.readLine()) != null) + { + System.out.println(prefix + line); + } + } + catch (IOException e) + { + System.out.println("Error occured reading; " + e.getMessage()); + } + } + + } + +} |
