summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-07-04 16:05:55 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-07-04 16:05:55 +0000
commit0b209ab4c684d63c86250e2a894c6a03efcc2ecf (patch)
treea25d65645cdfca4627a304318f6a2691110a02f3
parent5aedab1b101894dd21a45eacc1fb9b4c8becc7de (diff)
downloadqpid-python-0b209ab4c684d63c86250e2a894c6a03efcc2ecf.tar.gz
Addition of the MessageStore Tool.
Small changes to the Exchanges to allow the extraction of currently listed items. Extracted initial broker configuration mechanism to a reusable class. Have modified broker to use it. Move the Passwd.java to new tools package structure on the broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@553248 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xjava/broker/bin/msTool.sh34
-rw-r--r--java/broker/bin/qpid-passwd2
-rw-r--r--java/broker/bin/qpid-server6
-rw-r--r--java/broker/etc/messagestoretool-log4j.xml53
-rw-r--r--java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java189
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java587
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java66
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java85
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java299
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java97
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java331
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java94
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java54
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java248
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java542
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java (renamed from java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java)2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java90
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java116
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java363
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java75
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java352
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java116
-rw-r--r--java/pom.xml15
37 files changed, 4045 insertions, 74 deletions
diff --git a/java/broker/bin/msTool.sh b/java/broker/bin/msTool.sh
new file mode 100755
index 0000000000..2f05e511c5
--- /dev/null
+++ b/java/broker/bin/msTool.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_OPTS=-Dlog4j.configuration=$QPID_TOOLS\etc\messagestoretool-log4j.xml \
+ QPID_CLASSPATH=$QPID_LIBS
+
+if [ -z "$QPID_TOOLS" ]; then
+ die "QPID_HOME must be set"
+fi
+
+. qpid-run org.apache.qpid.tools.messagestore.MessageStoreTool "$@"
diff --git a/java/broker/bin/qpid-passwd b/java/broker/bin/qpid-passwd
index 6e64af6e70..f046252522 100644
--- a/java/broker/bin/qpid-passwd
+++ b/java/broker/bin/qpid-passwd
@@ -27,4 +27,4 @@ export JAVA=java \
JAVA_MEM=-Xmx1024m \
QPID_CLASSPATH=$QPID_LIBS
-. qpid-run org.apache.qpid.server.security.Passwd "$@"
+. qpid-run org.apache.qpid.tools.security.Passwd "$@"
diff --git a/java/broker/bin/qpid-server b/java/broker/bin/qpid-server
index 76d0ad786d..fabf7e941c 100644
--- a/java/broker/bin/qpid-server
+++ b/java/broker/bin/qpid-server
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,7 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
export JAVA=java \
JAVA_VM=-server \
JAVA_MEM=-Xmx1024m \
- JAVA_GC=-XX:-UseConcMarkSweepGC
+ JAVA_GC=-XX:-UseConcMarkSweepGC \
QPID_CLASSPATH=$QPID_LIBS
. qpid-run org.apache.qpid.server.Main "$@"
diff --git a/java/broker/etc/messagestoretool-log4j.xml b/java/broker/etc/messagestoretool-log4j.xml
new file mode 100644
index 0000000000..0313de4f95
--- /dev/null
+++ b/java/broker/etc/messagestoretool-log4j.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.qpid.tools">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid.server.security">
+ <priority value="info"/>
+ </category>
+
+ <category name="org.apache.qpid.server.management">
+ <priority value="error"/>
+ </category>
+
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+</log4j:configuration>
diff --git a/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java b/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
index 931c15a664..7e0c4defe1 100644
--- a/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
+++ b/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
import org.apache.log4j.helpers.CountingQuietWriter;
@@ -39,8 +38,6 @@ import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.helpers.OptionConverter;
import org.apache.log4j.spi.LoggingEvent;
-import org.apache.qpid.framing.FieldTable;
-
/**
* <p>CompositeRollingAppender combines RollingFileAppender and DailyRollingFileAppender<br> It can function as either
* or do both at the same time (making size based rolling files like RollingFileAppender until a data/time boundary is
diff --git a/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java
new file mode 100644
index 0000000000..c17c27a497
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.configuration;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class Configuration
+{
+ public static final String QPID_HOME = "QPID_HOME";
+
+ final String QPIDHOME = System.getProperty(QPID_HOME);
+
+ private static Logger _devlog = LoggerFactory.getLogger(Configuration.class);
+
+ public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+ public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+
+ protected final Options _options = new Options();
+ protected CommandLine _commandLine;
+ protected File _configFile;
+
+
+ public Configuration()
+ {
+
+ }
+
+ public void processCommandline(String[] args) throws InitException
+ {
+ try
+ {
+ _commandLine = new PosixParser().parse(_options, args);
+ }
+ catch (ParseException e)
+ {
+ throw new InitException("Unable to parse commmandline", e);
+ }
+
+ final File defaultConfigFile = new File(QPIDHOME, DEFAULT_CONFIG_FILE);
+ setConfig(new File(_commandLine.getOptionValue("c", defaultConfigFile.getPath())));
+ }
+
+ public void setConfig(File file)
+ {
+ _configFile = file;
+ }
+
+ /**
+ * @param option The option to set.
+ */
+ public void setOption(Option option)
+ {
+ _options.addOption(option);
+ }
+
+ /**
+ * getOptionValue from the configuration
+ * @param option variable argument, first string is option to get, second if present is the default value.
+ * @return the String for the given option or null if not present (if default value not specified)
+ */
+ public String getOptionValue(String... option)
+ {
+ if (option.length == 1)
+ {
+ return _commandLine.getOptionValue(option[0]);
+ }
+ else if (option.length == 2)
+ {
+ return _commandLine.getOptionValue(option[0], option[1]);
+ }
+ return null;
+ }
+
+ public void loadConfig(File file) throws InitException
+ {
+ setConfig(file);
+ loadConfig();
+ }
+
+ private void loadConfig() throws InitException
+ {
+ if (!_configFile.exists())
+ {
+ String error = "File " + _configFile + " could not be found. Check the file exists and is readable.";
+
+ if (QPIDHOME == null)
+ {
+ error = error + "\nNote: " + QPID_HOME + " is not set.";
+ }
+
+ throw new InitException(error, null);
+ }
+ else
+ {
+ _devlog.debug("Using configuration file " + _configFile.getAbsolutePath());
+ }
+
+// String logConfig = _commandLine.getOptionValue("l");
+// String logWatchConfig = _commandLine.getOptionValue("w", "0");
+// if (logConfig != null)
+// {
+// File logConfigFile = new File(logConfig);
+// configureLogging(logConfigFile, logWatchConfig);
+// }
+// else
+// {
+// File configFileDirectory = _configFile.getParentFile();
+// File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
+// configureLogging(logConfigFile, logWatchConfig);
+// }
+ }
+
+
+// private void configureLogging(File logConfigFile, String logWatchConfig)
+// {
+// int logWatchTime = 0;
+// try
+// {
+// logWatchTime = Integer.parseInt(logWatchConfig);
+// }
+// catch (NumberFormatException e)
+// {
+// _devlog.error("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
+// + "a non-negative integer. Using default of zero (no watching configured");
+// }
+//
+// if (logConfigFile.exists() && logConfigFile.canRead())
+// {
+// _devlog.info("Configuring logger using configuration file " + logConfigFile.getAbsolutePath());
+// if (logWatchTime > 0)
+// {
+// _devlog.info("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
+// + logWatchTime + " seconds");
+// // log4j expects the watch interval in milliseconds
+// DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000);
+// }
+// else
+// {
+// DOMConfigurator.configure(logConfigFile.getAbsolutePath());
+// }
+// }
+// else
+// {
+// System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
+// System.err.println("Using basic log4j configuration");
+// BasicConfigurator.configure();
+// }
+// }
+
+ public File getConfigFile()
+ {
+ return _configFile;
+ }
+
+
+ public class InitException extends Exception
+ {
+ InitException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+ }
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 868ac31a54..9ebb893362 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -38,9 +38,13 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.List;
+import java.util.Map;
+
public abstract class AbstractExchange implements Exchange, Managable
{
private AMQShortString _name;
@@ -189,6 +193,8 @@ public abstract class AbstractExchange implements Exchange, Managable
}
}
+ abstract public Map<AMQShortString, List<AMQQueue>> getBindings();
+
public String toString()
{
return getClass().getName() + "[" + getName() +"]";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 4774383642..98abf7977a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class DefaultExchangeRegistry implements ExchangeRegistry
{
@@ -63,7 +64,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange) throws AMQException
{
_exchangeMap.put(exchange.getName(), exchange);
- if(exchange.isDurable())
+ if (exchange.isDurable())
{
getMessageStore().createExchange(exchange);
}
@@ -79,13 +80,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
return _defaultExchange;
}
+ public Collection<AMQShortString> getExchangeNames()
+ {
+ return _exchangeMap.keySet();
+ }
+
public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
Exchange e = _exchangeMap.remove(name);
if (e != null)
{
- if(e.isDurable())
+ if (e.isDurable())
{
getMessageStore().removeExchange(e);
}
@@ -99,7 +105,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public Exchange getExchange(AMQShortString name)
{
- if((name == null) || name.length() == 0)
+ if ((name == null) || name.length() == 0)
{
return getDefaultExchange();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index a3aa6e7f5d..a632720ebe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -26,22 +26,16 @@ import java.util.Map;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -157,7 +151,7 @@ public class DestNameExchange extends AbstractExchange
if (!_index.remove(routingKey, queue))
{
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
- " with routing key " + routingKey + ". No queue was registered with that routing key");
+ " with routing key " + routingKey + ". No queue was registered with that _routing key");
}
}
@@ -222,4 +216,9 @@ public class DestNameExchange extends AbstractExchange
{
return !_index.getBindingsMap().isEmpty();
}
+
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return _index.getBindingsMap();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 418cf64c56..d88dcb11f7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -21,11 +21,9 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -35,17 +33,11 @@ import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -59,7 +51,7 @@ public class DestWildExchange extends AbstractExchange
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
- new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final String TOPIC_SEPARATOR = ".";
private static final String AMQP_STAR = "*";
@@ -92,7 +84,7 @@ public class DestWildExchange extends AbstractExchange
queueList.add(q.getName().toString());
}
- Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
+ Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -281,7 +273,7 @@ public class DestWildExchange extends AbstractExchange
if (queues == null)
{
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey + ". No queue was registered with that routing key");
+ + " with routing key " + routingKey + ". No queue was registered with that _routing key");
}
@@ -289,7 +281,7 @@ public class DestWildExchange extends AbstractExchange
if (!removedQ)
{
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
- + " with routing key " + routingKey);
+ + " with routing key " + routingKey);
}
if (queues.isEmpty())
@@ -311,6 +303,11 @@ public class DestWildExchange extends AbstractExchange
}
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return _routingKey2queues;
+ }
+
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -358,8 +355,8 @@ public class DestWildExchange extends AbstractExchange
if (queueList.size() > (depth + queueskip))
{ // a hash and it is the last entry
matching =
- queueList.get(depth + queueskip).equals(AMQP_HASH)
- && (queueList.size() == (depth + queueskip + 1));
+ queueList.get(depth + queueskip).equals(AMQP_HASH)
+ && (queueList.size() == (depth + queueskip + 1));
}
}
else if (routingkeyList.size() > (depth + routingskip))
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index a5f77cc2a4..9fa7b56ba7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -27,9 +27,13 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+import java.util.List;
+
public interface Exchange
{
AMQShortString getName();
+
AMQShortString getType();
void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
@@ -61,7 +65,7 @@ public interface Exchange
boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException;
/**
- * Determines whether a message is routing to any queue using a specific routing key
+ * Determines whether a message is routing to any queue using a specific _routing key
* @param routingKey
* @return
* @throws AMQException
@@ -76,4 +80,6 @@ public interface Exchange
* @throws AMQException
*/
boolean hasBindings() throws AMQException;
+
+ Map<AMQShortString, List<AMQQueue>> getBindings();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index d3a466565f..fe3b19e74e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import java.util.Collection;
+
public interface ExchangeRegistry extends MessageRouter
{
@@ -43,5 +45,7 @@ public interface ExchangeRegistry extends MessageRouter
Exchange getDefaultExchange();
+ Collection<AMQShortString> getExchangeNames();
+
void initialise() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 5a6301548b..a65b105809 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -34,17 +33,13 @@ import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -79,7 +74,7 @@ public class FanoutExchange extends AbstractExchange
{
String queueName = queue.getName().toString();
- Object[] bindingItemValues = { queueName, new String[] { queueName } };
+ Object[] bindingItemValues = {queueName, new String[]{queueName}};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -120,6 +115,11 @@ public class FanoutExchange extends AbstractExchange
}
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return null;
+ }
+
public AMQShortString getType()
{
return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index b5c03a7291..bcbca8becf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -20,23 +20,6 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.management.JMException;
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -50,6 +33,23 @@ import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import javax.management.JMException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
/**
* An exchange that binds queues based on a set of required headers and header values
* and routes messages to these queues by matching the headers of the message against
@@ -91,13 +91,13 @@ public class HeadersExchange extends AbstractExchange
private final class HeadersExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ Headers exchange")
- public HeadersExchangeMBean() throws JMException
+ public HeadersExchangeMBean() throws JMException
{
super();
_exchangeType = "headers";
init();
}
-
+
/**
* initialises the OpenType objects.
*/
@@ -113,7 +113,7 @@ public class HeadersExchange extends AbstractExchange
_bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings",
_bindingItemNames, _bindingItemNames, _bindingItemTypes);
_bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(),
- _bindingDataType, _bindingItemIndexNames);
+ _bindingDataType, _bindingItemIndexNames);
}
public TabularData bindings() throws OpenDataException
@@ -169,7 +169,7 @@ public class HeadersExchange extends AbstractExchange
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
}
- String[] bindings = binding.split(",");
+ String[] bindings = binding.split(",");
FieldTable bindingMap = new FieldTable();
for (int i = 0; i < bindings.length; i++)
{
@@ -288,6 +288,11 @@ public class HeadersExchange extends AbstractExchange
}
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return null;
+ }
+
private static class Registration
{
private final HeadersBinding binding;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index c0f1e7f40c..cbe9246f09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.server.queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
public class DefaultQueueRegistry implements QueueRegistry
{
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
@@ -57,4 +58,14 @@ public class DefaultQueueRegistry implements QueueRegistry
{
return _queueMap.get(name);
}
+
+ public Collection<AMQShortString> getQueueNames()
+ {
+ return _queueMap.keySet();
+ }
+
+ public Collection<AMQQueue> getQueues()
+ {
+ return _queueMap.values();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index 3b1b5acf3c..1210f0e97c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collection;
public interface QueueRegistry
{
@@ -34,4 +35,9 @@ public interface QueueRegistry
void unregisterQueue(AMQShortString name) throws AMQException;
AMQQueue getQueue(AMQShortString name);
+
+ Collection<AMQShortString> getQueueNames();
+
+ Collection<AMQQueue> getQueues();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
new file mode 100644
index 0000000000..c81d1dbffa
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.commands.Clear;
+import org.apache.qpid.tools.messagestore.commands.Command;
+import org.apache.qpid.tools.messagestore.commands.Dump;
+import org.apache.qpid.tools.messagestore.commands.Help;
+import org.apache.qpid.tools.messagestore.commands.List;
+import org.apache.qpid.tools.messagestore.commands.Load;
+import org.apache.qpid.tools.messagestore.commands.Quit;
+import org.apache.qpid.tools.messagestore.commands.Select;
+import org.apache.qpid.tools.messagestore.commands.Show;
+import org.apache.qpid.tools.utils.CommandParser;
+import org.apache.qpid.tools.utils.Console;
+import org.apache.qpid.tools.utils.SimpleCommandParser;
+import org.apache.qpid.tools.utils.SimpleConsole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * MessageStoreTool.
+ */
+public class MessageStoreTool
+{
+ /** Text outputted at the start of each console.*/
+ private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances";
+
+ /** I/O Wrapper. */
+ protected Console _console;
+
+ /** Batch mode flag. */
+ protected boolean _batchMode;
+
+ /** Internal State object. */
+ private State _state = new State();
+
+ private HashMap<String, Command> _commands = new HashMap<String, Command>();
+
+ /** SLF4J Logger. */
+ private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class);
+
+ /** Loaded configuration file. */
+ private Configuration _config;
+
+ /** Control used for main run loop. */
+ private boolean _running = true;
+
+ //---------------------------------------------------------------------------------------------------/
+
+ public static void main(String[] args) throws Configuration.InitException
+ {
+
+ MessageStoreTool tool = new MessageStoreTool(args);
+
+ tool.start();
+ }
+
+
+ public MessageStoreTool(String[] args) throws Configuration.InitException
+ {
+ this(args, System.in, System.out);
+ }
+
+ public MessageStoreTool(String[] args, InputStream in, OutputStream out) throws Configuration.InitException
+ {
+ BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in));
+ BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out));
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
+ _batchMode = false;
+
+ _console = new SimpleConsole(consoleWriter, consoleReader);
+
+ _config = new Configuration();
+
+ setOptions();
+ _config.processCommandline(args);
+ }
+
+
+ private void setOptions()
+ {
+ Option help = new Option("h", "help", false, "print this message");
+ Option version = new Option("v", "version", false, "print the version information and exit");
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg()
+ .withDescription("use given configuration file By "
+ + "default looks for a file named "
+ + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
+ .withLongOpt("config")
+ .create("c");
+
+ _config.setOption(help);
+ _config.setOption(version);
+ _config.setOption(configFile);
+ }
+
+ public State getState()
+ {
+ return _state;
+ }
+
+ public Map<String, Command> getCommands()
+ {
+ return _commands;
+ }
+
+ public void setConfigurationFile(String configfile) throws Configuration.InitException
+ {
+ _config.loadConfig(new File(configfile));
+ setup();
+ }
+
+ public Console getConsole()
+ {
+ return _console;
+ }
+
+ public void setConsole(Console console)
+ {
+ _console = console;
+ }
+
+ /**
+ * Simple ShutdownHook to cleanly shutdown the databases
+ */
+ class ShutdownHook implements Runnable
+ {
+ MessageStoreTool _tool;
+
+ ShutdownHook(MessageStoreTool messageStoreTool)
+ {
+ _tool = messageStoreTool;
+ }
+
+ public void run()
+ {
+ _tool.quit();
+ }
+ }
+
+ public void quit()
+ {
+ ApplicationRegistry.remove(1);
+
+ _console.println("...exiting");
+
+ _console.close();
+
+ _running = false;
+ }
+
+ public void setBatchMode(boolean batchmode)
+ {
+ _batchMode = batchmode;
+ }
+
+ /**
+ * Main loop
+ */
+ protected void start()
+ {
+ setup();
+
+ runCLI();
+ }
+
+ private void setup()
+ {
+ loadDefaultVirtualHosts();
+
+ loadCommands();
+
+ _console.println(BOILER_PLATE);
+
+ _state.clearAll();
+ }
+
+ private void loadCommands()
+ {
+ _commands.clear();
+ //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands
+ _commands.put("close", new Clear(this));
+ _commands.put("dump", new Dump(this));
+ _commands.put("help", new Help(this));
+ _commands.put("list", new List(this));
+ _commands.put("load", new Load(this));
+ _commands.put("quit", new Quit(this));
+ _commands.put("select", new Select(this));
+ _commands.put("show", new Show(this));
+ }
+
+ private void loadDefaultVirtualHosts()
+ {
+ final File configFile = _config.getConfigFile();
+
+ loadVirtualHosts(configFile);
+ }
+
+ private void loadVirtualHosts(File configFile)
+ {
+
+ if (!configFile.exists())
+ {
+ _devlog.error("Config file not found:" + configFile.getAbsolutePath());
+ return;
+ }
+ else
+ {
+ _devlog.debug("using config file :" + configFile.getAbsolutePath());
+ }
+
+ try
+ {
+ ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
+
+ ApplicationRegistry.remove(1);
+ ApplicationRegistry.initialise(registry);
+
+ checkMessageStores();
+ }
+ catch (ConfigurationException e)
+ {
+ _console.println("Unable to load configuration due to configuration error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ catch (Exception e)
+ {
+ _console.println("Unable to load configuration due to: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+
+ }
+
+ private void checkMessageStores()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+ boolean warning = false;
+ for (VirtualHost vhost : vhosts)
+ {
+ if (vhost.getMessageStore() instanceof MemoryMessageStore)
+ {
+ _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. "
+ + "Changes will not persist.");
+ warning = true;
+ }
+ }
+
+ if (warning)
+ {
+ _console.println("");
+ _console.println("Please ensure you are using the correct config file currently using '"
+ + _config.getConfigFile().getAbsolutePath() + "'");
+ _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline.");
+ _console.println("");
+ }
+ }
+
+ private void runCLI()
+ {
+ while (_running)
+ {
+ if (!_batchMode)
+ {
+ printPrompt();
+ }
+
+ String[] args = _console.readCommand();
+
+ while (args != null)
+ {
+ exec(args);
+
+ if (!_batchMode)
+ {
+ printPrompt();
+ }
+
+ args = _console.readCommand();
+ }
+ }
+ }
+
+ private void printPrompt()
+ {
+ _console.print(prompt());
+ }
+
+
+ /**
+ * Execute a script (batch mode).
+ *
+ * @param script The file script
+ */
+ protected void runScripts(String script)
+ {
+ //Store Current State
+ boolean oldBatch = _batchMode;
+ CommandParser oldParser = _console.getCommandParser();
+ setBatchMode(true);
+
+ try
+ {
+ _devlog.debug("Running script '" + script + "'");
+
+ _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script))));
+
+ start();
+ }
+ catch (java.io.FileNotFoundException e)
+ {
+ _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage());
+ }
+
+ //Restore previous state
+ _console.setCommandParser(oldParser);
+ setBatchMode(oldBatch);
+ }
+
+ public String prompt()
+ {
+ String state = _state.toString();
+ if (state != null && state.length() != 0)
+ {
+ return state + ":bdb$ ";
+ }
+ else
+ {
+ return "bdb$ ";
+ }
+ }
+
+ /**
+ * Execute the command.
+ *
+ * @param args [command, arg0, arg1...].
+ */
+ protected void exec(String[] args)
+ {
+ // Comment lines start with a #
+ if (args.length == 0 || args[0].startsWith("#"))
+ {
+ return;
+ }
+
+ final String command = args[0];
+
+ Command cmd = _commands.get(command);
+
+ if (cmd == null)
+ {
+ _console.println("Command not understood: " + command);
+ }
+ else
+ {
+ cmd.execute(args);
+ }
+ }
+
+
+ /**
+ * Displays usage info.
+ */
+ protected static void help()
+ {
+ System.out.println(BOILER_PLATE);
+ System.out.println("Usage: java " + MessageStoreTool.class + " [Options]");
+ System.out.println(" [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
+ }
+
+
+ /**
+ * This class is used to store the current state of the tool.
+ *
+ * This is then interrogated by the various commands to augment their behaviour.
+ *
+ *
+ */
+ public class State
+ {
+ private VirtualHost _vhost = null;
+ private AMQQueue _queue = null;
+ private Exchange _exchange = null;
+ private java.util.List<Long> _msgids = null;
+
+ public State()
+ {
+ }
+
+ public void setQueue(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public AMQQueue getQueue()
+ {
+ return _queue;
+ }
+
+ public void setVhost(VirtualHost vhost)
+ {
+ _vhost = vhost;
+ }
+
+ public VirtualHost getVhost()
+ {
+ return _vhost;
+ }
+
+ public Exchange getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(Exchange exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public String toString()
+ {
+ StringBuilder status = new StringBuilder();
+
+ if (_vhost != null)
+ {
+ status.append(_vhost.getName());
+
+ if (_exchange != null)
+ {
+ status.append("[");
+ status.append(_exchange.getName());
+ status.append("]");
+
+ if (_queue != null)
+ {
+ status.append("->'");
+ status.append(_queue.getName());
+ status.append("'");
+
+ if (_msgids != null)
+ {
+ status.append(printMessages());
+ }
+ }
+ }
+ }
+
+ return status.toString();
+ }
+
+
+ public String printMessages()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ Long previous = null;
+
+ Long start = null;
+ for (Long id : _msgids)
+ {
+ if (previous != null)
+ {
+ if (id == previous + 1)
+ {
+ if (start == null)
+ {
+ start = previous;
+ }
+ }
+ else
+ {
+ if (start != null)
+ {
+ sb.append(",");
+ sb.append(start);
+ sb.append("-");
+ sb.append(id);
+ start = null;
+ }
+ else
+ {
+ sb.append(",");
+ sb.append(previous);
+ }
+ }
+ }
+
+ previous = id;
+ }
+
+ if (start != null)
+ {
+ sb.append(",");
+ sb.append(start);
+ sb.append("-");
+ sb.append(_msgids.get(_msgids.size() - 1));
+ }
+ else
+ {
+ sb.append(",");
+ sb.append(previous);
+ }
+
+ // surround list in ()
+ sb.replace(0, 1, "(");
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void clearAll()
+ {
+ _vhost = null;
+ clearExchange();
+ }
+
+ public void clearExchange()
+ {
+ _exchange = null;
+ clearQueue();
+ }
+
+ public void clearQueue()
+ {
+ _queue = null;
+ clearMessages();
+ }
+
+ public void clearMessages()
+ {
+ _msgids = null;
+ }
+
+ public void setMessages(java.util.List<Long> msgids)
+ {
+ _msgids = msgids;
+ }
+
+ public java.util.List<Long> getMessages()
+ {
+ return _msgids;
+ }
+ }//Class State
+
+}//Class MessageStoreTool
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
new file mode 100644
index 0000000000..5444197cb4
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+public abstract class AbstractCommand implements Command
+{
+ protected Console _console;
+ protected MessageStoreTool _tool;
+
+ public AbstractCommand(MessageStoreTool tool)
+ {
+ _console = tool.getConsole();
+ _tool = tool;
+ }
+
+ public void setOutput(Console out)
+ {
+ _console = out;
+ }
+
+ protected void commandError(String message, String[] args)
+ {
+ _console.print(getCommand() + " : " + message);
+
+ if (args != null)
+ {
+ for (int i = 1; i < args.length; i++)
+ {
+ _console.print(args[i]);
+ }
+ }
+ _console.println("");
+ _console.println(help());
+ }
+
+
+ public abstract String help();
+
+ public abstract String usage();
+
+ public abstract String getCommand();
+
+
+ public abstract void execute(String... args);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java
new file mode 100644
index 0000000000..b0006b3fe6
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Clear extends AbstractCommand
+{
+ public Clear(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Clears any selection.";
+ }
+
+ public String usage()
+ {
+ return "clear [ all | virtualhost | exchange | queue | msgs ]";
+ }
+
+ public String getCommand()
+ {
+ return "clear";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length < 1)
+ {
+ doClose("all");
+ }
+ else
+ {
+ doClose(args[1]);
+ }
+ }
+
+ private void doClose(String type)
+ {
+ if (type.equals("virtualhost")
+ || type.equals("all"))
+ {
+ _tool.getState().clearAll();
+ }
+
+ if (type.equals("exchange"))
+ {
+ _tool.getState().clearExchange();
+ }
+
+ if (type.equals("queue"))
+ {
+ _tool.getState().clearQueue();
+ }
+
+ if (type.equals("msgs"))
+ {
+ _tool.getState().clearMessages();
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java
new file mode 100644
index 0000000000..bfa775a34a
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.utils.Console;
+
+public interface Command
+{
+ public void setOutput(Console out);
+
+ public String help();
+
+ public abstract String usage();
+
+ String getCommand();
+
+ public void execute(String... args);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
new file mode 100644
index 0000000000..0dd6924d56
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class Dump extends Show
+{
+ private static final int LINE_SIZE = 8;
+ private static final String DEFAULT_ENCODING = "utf-8";
+ private static final boolean SPACE_BYTES = true;
+ private static final String BYTE_SPACER = " ";
+ private static final String NON_PRINTING_ASCII_CHAR = "?";
+
+ protected boolean _content = true;
+
+ public Dump(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Dump selected message content. Default: show=content";
+ }
+
+ public String usage()
+ {
+ return getCommand() + " [show=[all],[msgheaders],[_amqHeaders],[routing],[content] id=<msgid e.g. 1,2,4-10>";
+ }
+
+ public String getCommand()
+ {
+ return "dump";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("show="))
+ {
+ _content = arg.contains("content") || arg.contains("all");
+ }
+ }
+
+ parseArgs(args);
+ }
+
+ performShow();
+ }
+
+
+ protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ boolean showMessageHeaders)
+ {
+
+ List<List> display = new LinkedList<List>();
+
+ List<String> hex = new LinkedList<String>();
+ List<String> ascii = new LinkedList<String>();
+ display.add(hex);
+ display.add(ascii);
+
+ for (AMQMessage msg : messages)
+ {
+ if (!includeMsg(msg, msgids))
+ {
+ continue;
+ }
+
+ //Add divider between messages
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ // Show general message information
+ hex.add(Show.Columns.ID.name());
+ ascii.add(msg.getMessageId().toString());
+
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ if (showRouting)
+ {
+ addShowInformation(hex, ascii, msg, "Routing Details", true, false, false);
+ }
+ if (showHeaders)
+ {
+ addShowInformation(hex, ascii, msg, "Headers", false, true, false);
+ }
+ if (showMessageHeaders)
+ {
+ addShowInformation(hex, ascii, msg, null, false, false, true);
+ }
+
+ // Add Content Body seciont
+ hex.add("Content Body");
+ ascii.add("");
+ hex.add(Console.ROW_DIVIDER);
+ ascii.add(Console.ROW_DIVIDER);
+
+ Iterator bodies = msg.getContentBodyIterator();
+ if (bodies.hasNext())
+ {
+
+ hex.add("Hex");
+ hex.add(Console.ROW_DIVIDER);
+
+
+ ascii.add("ASCII");
+ ascii.add(Console.ROW_DIVIDER);
+
+ while (bodies.hasNext())
+ {
+ ContentChunk chunk = (ContentChunk) bodies.next();
+
+ //Duplicate so we don't destroy original data :)
+ ByteBuffer hexBuffer = chunk.getData().duplicate();
+
+ ByteBuffer charBuffer = hexBuffer.duplicate();
+
+ Hex hexencoder = new Hex();
+
+ while (hexBuffer.hasRemaining())
+ {
+ byte[] line = new byte[LINE_SIZE];
+
+ int bufsize = hexBuffer.remaining();
+ if (bufsize < LINE_SIZE)
+ {
+ hexBuffer.get(line, 0, bufsize);
+ }
+ else
+ {
+ bufsize = line.length;
+ hexBuffer.get(line);
+ }
+
+ byte[] encoded = hexencoder.encode(line);
+
+ try
+ {
+ String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+ String hexLine = "";
+
+ int strKength = encStr.length();
+ for (int c = 0; c < strKength; c++)
+ {
+ hexLine += encStr.charAt(c);
+
+ if (c % 2 == 1 && SPACE_BYTES)
+ {
+ hexLine += BYTE_SPACER;
+ }
+ }
+
+ hex.add(hexLine);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ _console.println(e.getMessage());
+ return null;
+ }
+ }
+
+ while (charBuffer.hasRemaining())
+ {
+ String asciiLine = "";
+
+ for (int pos = 0; pos < LINE_SIZE; pos++)
+ {
+ if (charBuffer.hasRemaining())
+ {
+ byte ch = charBuffer.get();
+
+ if (isPrintable(ch))
+ {
+ asciiLine += (char) ch;
+ }
+ else
+ {
+ asciiLine += NON_PRINTING_ASCII_CHAR;
+ }
+
+ if (SPACE_BYTES)
+ {
+ asciiLine += BYTE_SPACER;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ ascii.add(asciiLine);
+ }
+ }
+ }
+ else
+ {
+ List<String> result = new LinkedList<String>();
+
+ display.add(result);
+ result.add("No ContentBodies");
+ }
+ }
+
+ // if hex is empty then we have no data to display
+ if (hex.size() == 0)
+ {
+ return null;
+ }
+
+ return display;
+ }
+
+ private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
+ String title, boolean routing, boolean headers, boolean messageHeaders)
+ {
+ List<AMQMessage> single = new LinkedList<AMQMessage>();
+ single.add(msg);
+
+ List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
+
+ //Reformat data
+ if (title != null)
+ {
+ column1.add(title);
+ column2.add("");
+ column1.add(Console.ROW_DIVIDER);
+ column2.add(Console.ROW_DIVIDER);
+ }
+
+ // look at all columns in the routing Data
+ for (List item : routingData)
+ {
+ // the item should be:
+ // Title
+ // *divider
+ // value
+ // otherwise we can't reason about the correct value
+ if (item.size() == 3)
+ {
+ //Filter out the columns we are not interested in.
+
+ String columnName = item.get(0).toString();
+
+ if (!(columnName.equals(Show.Columns.ID.name())
+ || columnName.equals(Show.Columns.Size.name())))
+ {
+ column1.add(columnName);
+ column2.add(item.get(2).toString());
+ }
+ }
+ }
+ column1.add(Console.ROW_DIVIDER);
+ column2.add(Console.ROW_DIVIDER);
+ }
+
+ private boolean isPrintable(byte c)
+ {
+ return c > 31 && c < 127;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java
new file mode 100644
index 0000000000..76967ff06f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+public class Help extends AbstractCommand
+{
+ public Help(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Provides detailed help on commands. ";
+ }
+
+ public String getCommand()
+ {
+ return "help";
+ }
+
+ public String usage()
+ {
+ return "help [<command>]";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 1)
+ {
+ Command command = _tool.getCommands().get(args[1]);
+ if (command != null)
+ {
+ _console.println(command.help());
+ }
+ else
+ {
+ commandError("Command not found: ", args);
+ }
+ }
+ else
+ {
+ java.util.List<java.util.List> data = new LinkedList<java.util.List>();
+
+ java.util.List<String> commandName = new LinkedList<String>();
+ java.util.List<String> commandDescription = new LinkedList<String>();
+
+ data.add(commandName);
+ data.add(commandDescription);
+
+ //Set up Headers
+ commandName.add("Command");
+ commandDescription.add("Description");
+
+ commandName.add(Console.ROW_DIVIDER);
+ commandDescription.add(Console.ROW_DIVIDER);
+
+ //Add current Commands with descriptions
+ Map<String, Command> commands = _tool.getCommands();
+
+ for (Command command : commands.values())
+ {
+ commandName.add(command.getCommand());
+ commandDescription.add(command.help());
+ }
+
+ _console.printMap("Available Commands", data);
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
new file mode 100644
index 0000000000..37d89c079f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+public class List extends AbstractCommand
+{
+
+ public List(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public void setOutput(Console out)
+ {
+ _console = out;
+ }
+
+ public String help()
+ {
+ return "list availble items.";
+ }
+
+ public String usage()
+ {
+ return "list queues [<exchange>] | exchanges| bindings [<exchange>] | all.";
+ }
+
+ public String getCommand()
+ {
+ return "list";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 1)
+ {
+ if ((args[1].equals("exchanges"))
+ || (args[1].equals("queues"))
+ || (args[1].equals("bindings"))
+ || (args[1].equals("all")))
+ {
+ if (args.length == 2)
+ {
+ doList(args[1]);
+ }
+ else if (args.length == 3)
+ {
+ doList(args[1], args[2]);
+ }
+ }
+ else
+ {
+ commandError("Unknown options. ", args);
+ }
+ }
+ else if (args.length < 2)
+ {
+ doList("all");
+ }
+ else
+ {
+ doList(args[1]);
+ }
+ }
+
+ private void doList(String... listItem)
+ {
+ if (_tool.getState().getVhost() == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ listVirtualHosts();
+ return;
+ }
+
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ java.util.List<String> data = null;
+
+ if (listItem[0].equals("queues"))
+ {
+ if (listItem.length > 1)
+ {
+ data = listQueues(vhost, new AMQShortString(listItem[1]));
+ }
+ else
+ {
+ Exchange exchange = _tool.getState().getExchange();
+ data = listQueues(vhost, exchange);
+ }
+ }
+
+ if (listItem[0].equals("exchanges"))
+ {
+ data = listExchanges(vhost);
+ }
+
+ if (listItem[0].equals("bindings"))
+ {
+
+ if (listItem.length > 1)
+ {
+ data = listBindings(vhost, new AMQShortString(listItem[1]));
+ }
+ else
+ {
+ Exchange exchange = _tool.getState().getExchange();
+
+ data = listBindings(vhost, exchange);
+ }
+ }
+
+ if (data != null)
+ {
+ if (data.size() == 1)
+ {
+ _console.println("No '" + listItem[0] + "' to display,");
+ }
+ else
+ {
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+ }
+
+
+ if (listItem[0].equals("all"))
+ {
+
+ boolean displayed = false;
+ Exchange exchange = _tool.getState().getExchange();
+
+ //Do the display here for each one so that they are pretty printed
+ data = listQueues(vhost, exchange);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+
+ if (exchange == null)
+ {
+ data = listExchanges(vhost);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+ }
+
+ data = listBindings(vhost, exchange);
+ if (data != null)
+ {
+ displayed = true;
+ _console.displayList(true, data.toArray(new String[0]));
+ }
+
+ if (!displayed)
+ {
+ _console.println("Nothing to list");
+ }
+ }
+ }
+
+ private void listVirtualHosts()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance()
+ .getVirtualHostRegistry().getVirtualHosts();
+
+ String[] data = new String[vhosts.size() + 1];
+
+ data[0] = "Available VirtualHosts";
+
+ int index = 1;
+ for (VirtualHost vhost : vhosts)
+ {
+ data[index] = vhost.getName();
+ index++;
+ }
+
+ _console.displayList(true, data);
+ }
+
+ private java.util.List<String> listBindings(VirtualHost vhost, AMQShortString exchangeName)
+ {
+ return listBindings(vhost, vhost.getExchangeRegistry().getExchange(exchangeName));
+ }
+
+ private java.util.List<String> listBindings(VirtualHost vhost, Exchange exchange)
+ {
+ Collection<AMQShortString> queues = vhost.getQueueRegistry().getQueueNames();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Current Bindings");
+
+ for (AMQShortString queue : queues)
+ {
+ if (exchange != null)
+ {
+ try
+ {
+ if (exchange.isBound(queue))
+ {
+ data.add(queue.toString());
+ }
+ }
+ catch (AMQException e)
+ {
+ // is never thrown by current impls forced to throw by interface.
+ commandError("Unable to check exchange bindings: " + e.getMessage(), null);
+ }
+ }
+ else
+ {
+ data.add(queue.toString());
+ }
+ }
+
+ return data;
+ }
+
+ private java.util.List<String> listExchanges(VirtualHost vhost)
+ {
+ Collection<AMQShortString> queues = vhost.getExchangeRegistry().getExchangeNames();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Available Exchanges");
+
+ for (AMQShortString queue : queues)
+ {
+ data.add(queue.toString());
+ }
+
+ return data;
+ }
+
+ private java.util.List<String> listQueues(VirtualHost vhost, AMQShortString exchangeName)
+ {
+ return listQueues(vhost, vhost.getExchangeRegistry().getExchange(exchangeName));
+ }
+
+ private java.util.List<String> listQueues(VirtualHost vhost, Exchange exchange)
+ {
+ Collection<AMQQueue> queues = vhost.getQueueRegistry().getQueues();
+
+ if (queues == null || queues.size() == 0)
+ {
+ return null;
+ }
+
+ java.util.List<String> data = new LinkedList<String>();
+
+ data.add("Available Queues");
+
+ for (AMQQueue queue : queues)
+ {
+ if (exchange != null)
+ {
+ try
+ {
+ if (exchange.isBound(queue))
+ {
+ data.add(queue.getName().toString());
+ }
+ }
+ catch (AMQException e)
+ {
+ // is never thrown by current impls forced to throw by interface.
+ commandError("Unable to check exchange bindings: " + e.getMessage(), null);
+ }
+ }
+ else
+ {
+ data.add(queue.getName().toString());
+ }
+ }
+
+ if (exchange != null)
+ {
+ if (queues.size() == 1)
+ {
+ return null;
+ }
+ }
+
+ return data;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java
new file mode 100644
index 0000000000..244a311c30
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Load extends AbstractCommand
+{
+ public Load(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Loads specified broker configuration file.";
+ }
+
+ public String usage()
+ {
+ return "load <configuration file>";
+ }
+
+ public String getCommand()
+ {
+ return "load";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length > 2)
+ {
+ _console.print("load " + args[1] + ": additional options not understood:");
+ for (int i = 2; i < args.length; i++)
+ {
+ _console.print(args[i] + " ");
+ }
+ _console.println("");
+ }
+ else if (args.length < 2)
+ {
+ _console.println("Enter Configuration file.");
+ String input = _console.readln();
+ if (input != null)
+ {
+ doLoad(input);
+ }
+ else
+ {
+ _console.println("Did not recognise config file.");
+ }
+ }
+ else
+ {
+ doLoad(args[1]);
+ }
+ }
+
+ private void doLoad(String configfile)
+ {
+ _console.println("Loading Configuration:" + configfile);
+
+ try
+ {
+ _tool.setConfigurationFile(configfile);
+ }
+ catch (Configuration.InitException e)
+ {
+ _console.println("Unable to open config file due to: '" + e.getMessage() + "'");
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java
new file mode 100644
index 0000000000..a81bc07c38
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+public class Quit extends AbstractCommand
+{
+ public Quit(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Quit the tool.";
+ }
+
+ public String usage()
+ {
+ return "quit";
+ }
+
+ public String getCommand()
+ {
+ return "quit";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals("quit");
+
+ _tool.quit();
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
new file mode 100644
index 0000000000..b9156f6889
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+
+public class Select extends AbstractCommand
+{
+
+ public Select(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Perform a selection";
+ }
+
+ public String usage()
+ {
+ return "select virtualhost <name> |exchange <name> |queue <name> | msgs id=< msgids eg. 1,2,4-10 >";
+ }
+
+ public String getCommand()
+ {
+ return "select";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 2;
+ assert args[0].equals("select");
+
+ if (args.length < 3)
+ {
+ if (args[1].equals("show"))
+ {
+ doSelect(args[1], null);
+ }
+ else
+ {
+ _console.print("select : unknown command:");
+ _console.println(help());
+ }
+ }
+ else
+ {
+ if (args[1].equals("virtualhost")
+ || args[1].equals("vhost")
+ || args[1].equals("exchange")
+ || args[1].equals("queue")
+ || args[1].equals("msg")
+ )
+ {
+ doSelect(args[1], args[2]);
+ }
+ else
+ {
+ _console.println(help());
+ }
+ }
+ }
+
+ private void doSelect(String type, String item)
+ {
+ if (type.equals("virtualhost"))
+ {
+
+ VirtualHost vhost = ApplicationRegistry.getInstance()
+ .getVirtualHostRegistry().getVirtualHost(item);
+
+ if (vhost == null)
+ {
+ _console.println("Virtualhost '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setVhost(vhost);
+ }
+ }
+
+ if (type.equals("exchange"))
+ {
+
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ if (vhost == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(new AMQShortString(item));
+
+ if (exchange == null)
+ {
+ _console.println("Exchange '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setExchange(exchange);
+ }
+
+ if (_tool.getState().getQueue() != null)
+ {
+ try
+ {
+ if (!exchange.isBound(_tool.getState().getQueue()))
+ {
+ _tool.getState().setQueue(null);
+ }
+ }
+ catch (AMQException e)
+ {
+ //ignore
+ }
+ }
+ }
+
+ if (type.equals("queue"))
+ {
+ VirtualHost vhost = _tool.getState().getVhost();
+
+ if (vhost == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+ AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(item));
+
+ if (queue == null)
+ {
+ _console.println("Queue '" + item + "' not found.");
+ }
+ else
+ {
+ _tool.getState().setQueue(queue);
+
+ if (_tool.getState().getExchange() == null)
+ {
+ for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames())
+ {
+ try
+ {
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ if (exchange.isBound(queue))
+ {
+ _tool.getState().setExchange(exchange);
+ break;
+ }
+ }
+ catch (AMQException e)
+ {
+ //ignore error
+ }
+ }
+ }
+
+ //remove the message selection
+ _tool.getState().setMessages(null);
+ }
+ }
+
+ if (type.equals("msg"))
+ {
+ if (item.startsWith("id="))
+ {
+ StringTokenizer tok = new StringTokenizer(item.substring(item.indexOf("=") + 1), ",");
+
+ java.util.List<Long> msgids = null;
+
+ if (tok.hasMoreTokens())
+ {
+ msgids = new LinkedList<Long>();
+ }
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+ if (next.contains("-"))
+ {
+ Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
+ Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
+
+ if (end >= start)
+ {
+ for (long l = start; l <= end; l++)
+ {
+ msgids.add(l);
+ }
+ }
+ }
+ else
+ {
+ msgids.add(Long.parseLong(next));
+ }
+ }
+
+ _tool.getState().setMessages(msgids);
+ }
+
+ }
+
+ if (type.equals("show"))
+ {
+ _console.println(_tool.getState().toString());
+ if (_tool.getState().getMessages() != null)
+ {
+ _console.print("Msgs:");
+ for (Long l : _tool.getState().getMessages())
+ {
+ _console.print(" " + l);
+ }
+ _console.println("");
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
new file mode 100644
index 0000000000..7bff89f61f
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.messagestore.commands;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.tools.utils.Console;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+public class Show extends AbstractCommand
+{
+ protected boolean _amqHeaders = false;
+ protected boolean _routing = false;
+ protected boolean _msgHeaders = false;
+
+ public Show(MessageStoreTool tool)
+ {
+ super(tool);
+ }
+
+ public String help()
+ {
+ return "Shows the messages headers.";
+ }
+
+ public String usage()
+ {
+ return getCommand() + " [show=[all],[msgheaders],[amqheaders],[routing] id=<msgid e.g. 1,2,4-10>";
+ }
+
+ public String getCommand()
+ {
+ return "show";
+ }
+
+ public void execute(String... args)
+ {
+ assert args.length > 0;
+ assert args[0].equals(getCommand());
+
+ if (args.length < 2)
+ {
+ parseArgs("all");
+ }
+ else
+ {
+ parseArgs(args);
+ }
+
+ performShow();
+ }
+
+ protected void parseArgs(String... args)
+ {
+ List<Long> msgids = null;
+
+ if (args.length >= 2)
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("show="))
+ {
+ _msgHeaders = arg.contains("msgheaders") || arg.contains("all");
+ _amqHeaders = arg.contains("amqheaders") || arg.contains("all");
+ _routing = arg.contains("routing") || arg.contains("all");
+ }
+
+ if (arg.startsWith("id="))
+ {
+ StringTokenizer tok = new StringTokenizer(arg.substring(arg.indexOf("=") + 1), ",");
+
+ if (tok.hasMoreTokens())
+ {
+ msgids = new LinkedList<Long>();
+ }
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+ if (next.contains("-"))
+ {
+ Long start = Long.parseLong(next.substring(0, next.indexOf("-")));
+ Long end = Long.parseLong(next.substring(next.indexOf("-") + 1));
+
+ if (end >= start)
+ {
+ for (long l = start; l <= end; l++)
+ {
+ msgids.add(l);
+ }
+ }
+ }
+ else
+ {
+ msgids.add(Long.parseLong(next));
+ }
+ }
+
+ _tool.getState().setMessages(msgids);
+ }
+ }//for args
+ }// if args > 2
+ }
+
+ protected void performShow()
+ {
+ if (_tool.getState().getVhost() == null)
+ {
+ _console.println("No Virtualhost open. Open a Virtualhost first.");
+ return;
+ }
+
+ AMQQueue _queue = _tool.getState().getQueue();
+
+ List<Long> msgids = _tool.getState().getMessages();
+
+ if (_queue != null)
+ {
+ List<AMQMessage> messages = _queue.getMessagesOnTheQueue();
+ if (messages == null || messages.size() == 0)
+ {
+ _console.println("No messages on queue");
+ return;
+ }
+
+ List<List> data = createMessageData(msgids, messages, _amqHeaders, _routing, _msgHeaders);
+ if (data != null)
+ {
+ _console.printMap(null, data);
+ }
+ else
+ {
+ String message = "No data to display.";
+ if (msgids != null)
+ {
+ message += " Is message selection correct? " + _tool.getState().printMessages();
+ }
+ _console.println(message);
+ }
+
+ }
+ else
+ {
+ _console.println("No Queue specified to show.");
+ }
+ }
+
+ /**
+ * Create the list data for display from the messages.
+ *
+ * @param msgids The list of message ids to display
+ * @param messages A list of messages to format and display.
+ * @param showHeaders should the header info be shown
+ * @param showRouting show the routing info be shown
+ * @param showMessageHeaders show the msg headers be shown
+ * @return the formated data lists for printing
+ */
+ protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting,
+ boolean showMessageHeaders)
+ {
+
+ // Currenly exposed message properties
+// //Printing the content Body
+// msg.getContentBodyIterator();
+// //Print the Headers
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppIdAsString();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getClusterId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getContentType();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getCorrelationId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getDeliveryMode();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getTimestamp();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getType();
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getUserId();
+//
+// //Print out all the property names
+// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
+//
+// msg.getMessageId();
+// msg.getSize();
+// msg.getArrivalTime();
+
+// msg.getDeliveredSubscription();
+// msg.getDeliveredToConsumer();
+// msg.getMessageHandle();
+// msg.getMessageId();
+// msg.getMessagePublishInfo();
+// msg.getPublisher();
+
+// msg.getStoreContext();
+// msg.isAllContentReceived();
+// msg.isPersistent();
+// msg.isRedelivered();
+// msg.isRejectedBy();
+// msg.isTaken();
+
+ //Header setup
+
+ List<List> data = new LinkedList<List>();
+
+ List<String> id = new LinkedList<String>();
+ data.add(id);
+ id.add(Columns.ID.name());
+ id.add(Console.ROW_DIVIDER);
+
+ List<String> exchange = new LinkedList<String>();
+ List<String> routingkey = new LinkedList<String>();
+ List<String> immediate = new LinkedList<String>();
+ List<String> mandatory = new LinkedList<String>();
+ if (showRouting)
+ {
+ data.add(exchange);
+ exchange.add(Columns.Exchange.name());
+ exchange.add(Console.ROW_DIVIDER);
+
+ data.add(routingkey);
+ routingkey.add(Columns.RoutingKey.name());
+ routingkey.add(Console.ROW_DIVIDER);
+
+ data.add(immediate);
+ immediate.add(Columns.isImmediate.name());
+ immediate.add(Console.ROW_DIVIDER);
+
+ data.add(mandatory);
+ mandatory.add(Columns.isMandatory.name());
+ mandatory.add(Console.ROW_DIVIDER);
+ }
+
+ List<String> size = new LinkedList<String>();
+ List<String> appid = new LinkedList<String>();
+ List<String> clusterid = new LinkedList<String>();
+ List<String> contenttype = new LinkedList<String>();
+ List<String> correlationid = new LinkedList<String>();
+ List<String> deliverymode = new LinkedList<String>();
+ List<String> encoding = new LinkedList<String>();
+ List<String> arrival = new LinkedList<String>();
+ List<String> expiration = new LinkedList<String>();
+ List<String> priority = new LinkedList<String>();
+ List<String> propertyflag = new LinkedList<String>();
+ List<String> replyto = new LinkedList<String>();
+ List<String> timestamp = new LinkedList<String>();
+ List<String> type = new LinkedList<String>();
+ List<String> userid = new LinkedList<String>();
+ List<String> ispersitent = new LinkedList<String>();
+ List<String> isredelivered = new LinkedList<String>();
+ List<String> isdelivered = new LinkedList<String>();
+
+ data.add(size);
+ size.add(Columns.Size.name());
+ size.add(Console.ROW_DIVIDER);
+
+ if (showHeaders)
+ {
+ data.add(ispersitent);
+ ispersitent.add(Columns.isPersistent.name());
+ ispersitent.add(Console.ROW_DIVIDER);
+
+ data.add(isredelivered);
+ isredelivered.add(Columns.isRedelivered.name());
+ isredelivered.add(Console.ROW_DIVIDER);
+
+ data.add(isdelivered);
+ isdelivered.add(Columns.isDelivered.name());
+ isdelivered.add(Console.ROW_DIVIDER);
+
+ data.add(appid);
+ appid.add(Columns.App_ID.name());
+ appid.add(Console.ROW_DIVIDER);
+
+ data.add(clusterid);
+ clusterid.add(Columns.Cluster_ID.name());
+ clusterid.add(Console.ROW_DIVIDER);
+
+ data.add(contenttype);
+ contenttype.add(Columns.Content_Type.name());
+ contenttype.add(Console.ROW_DIVIDER);
+
+ data.add(correlationid);
+ correlationid.add(Columns.Correlation_ID.name());
+ correlationid.add(Console.ROW_DIVIDER);
+
+ data.add(deliverymode);
+ deliverymode.add(Columns.Delivery_Mode.name());
+ deliverymode.add(Console.ROW_DIVIDER);
+
+ data.add(encoding);
+ encoding.add(Columns.Encoding.name());
+ encoding.add(Console.ROW_DIVIDER);
+
+ data.add(arrival);
+ expiration.add(Columns.Arrival.name());
+ expiration.add(Console.ROW_DIVIDER);
+
+ data.add(expiration);
+ expiration.add(Columns.Expiration.name());
+ expiration.add(Console.ROW_DIVIDER);
+
+ data.add(priority);
+ priority.add(Columns.Priority.name());
+ priority.add(Console.ROW_DIVIDER);
+
+ data.add(propertyflag);
+ propertyflag.add(Columns.Property_Flag.name());
+ propertyflag.add(Console.ROW_DIVIDER);
+
+ data.add(replyto);
+ replyto.add(Columns.ReplyTo.name());
+ replyto.add(Console.ROW_DIVIDER);
+
+ data.add(timestamp);
+ timestamp.add(Columns.Timestamp.name());
+ timestamp.add(Console.ROW_DIVIDER);
+
+ data.add(type);
+ type.add(Columns.Type.name());
+ type.add(Console.ROW_DIVIDER);
+
+ data.add(userid);
+ userid.add(Columns.UserID.name());
+ userid.add(Console.ROW_DIVIDER);
+ }
+
+ List<String> msgHeaders = new LinkedList<String>();
+ if (showMessageHeaders)
+ {
+ data.add(msgHeaders);
+ msgHeaders.add(Columns.MsgHeaders.name());
+ msgHeaders.add(Console.ROW_DIVIDER);
+ }
+
+ //Add create the table of data
+ for (AMQMessage msg : messages)
+ {
+ if (!includeMsg(msg, msgids))
+ {
+ continue;
+ }
+
+ id.add(msg.getMessageId().toString());
+
+ size.add("" + msg.getSize());
+
+ arrival.add("" + msg.getArrivalTime());
+
+ try
+ {
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
+ }
+ catch (AMQException e)
+ {
+ ispersitent.add("n/a");
+ }
+
+ isredelivered.add(msg.isRedelivered() ? "true" : "false");
+
+ isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
+
+// msg.getMessageHandle();
+
+ BasicContentHeaderProperties headers = null;
+
+ try
+ {
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+ }
+ catch (AMQException e)
+ {
+ //ignore
+// commandError("Unable to read properties for message: " + e.getMessage(), null);
+ }
+
+ if (headers != null)
+ {
+ String appidS = headers.getAppIdAsString();
+ appid.add(appidS == null ? "null" : appidS);
+
+ String clusterS = headers.getClusterIdAsString();
+ clusterid.add(clusterS == null ? "null" : clusterS);
+
+ String contentS = headers.getContentTypeAsString();
+ contenttype.add(contentS == null ? "null" : contentS);
+
+ String correlationS = headers.getCorrelationIdAsString();
+ correlationid.add(correlationS == null ? "null" : correlationS);
+
+ deliverymode.add("" + headers.getDeliveryMode());
+
+ AMQShortString encodeSS = headers.getEncoding();
+ encoding.add(encodeSS == null ? "null" : encodeSS.toString());
+
+ expiration.add("" + headers.getExpiration());
+
+ FieldTable headerFT = headers.getHeaders();
+ msgHeaders.add(headerFT == null ? "none" : "" + headerFT.toString());
+
+ priority.add("" + headers.getPriority());
+ propertyflag.add("" + headers.getPropertyFlags());
+
+ AMQShortString replytoSS = headers.getReplyTo();
+ replyto.add(replytoSS == null ? "null" : replytoSS.toString());
+
+ timestamp.add("" + headers.getTimestamp());
+
+ AMQShortString typeSS = headers.getType();
+ type.add(typeSS == null ? "null" : typeSS.toString());
+
+ AMQShortString useridSS = headers.getUserId();
+ userid.add(useridSS == null ? "null" : useridSS.toString());
+
+ MessagePublishInfo info = null;
+ try
+ {
+ info = msg.getMessagePublishInfo();
+ }
+ catch (AMQException e)
+ {
+ //ignore
+ }
+
+ if (info != null)
+ {
+ AMQShortString exchangeSS = info.getExchange();
+ exchange.add(exchangeSS == null ? "null" : exchangeSS.toString());
+
+ AMQShortString routingkeySS = info.getRoutingKey();
+ routingkey.add(routingkeySS == null ? "null" : routingkeySS.toString());
+
+ immediate.add(info.isImmediate() ? "true" : "false");
+ mandatory.add(info.isMandatory() ? "true" : "false");
+ }
+
+// msg.getPublisher(); -- only used in clustering
+// msg.getStoreContext();
+// msg.isAllContentReceived();
+
+ }// if headers!=null
+
+// need to access internal map and do lookups.
+// msg.isTaken();
+// msg.getDeliveredSubscription();
+// msg.isRejectedBy();
+
+ }
+
+ // if id only had the header and the divider in it then we have no data to display
+ if (id.size() == 2)
+ {
+ return null;
+ }
+ return data;
+ }
+
+ protected boolean includeMsg(AMQMessage msg, List<Long> msgids)
+ {
+ if (msgids == null)
+ {
+ return true;
+ }
+
+ Long msgid = msg.getMessageId();
+
+ boolean found = false;
+
+ if (msgids != null)
+ {
+ //check msgid is in msgids
+ for (Long l : msgids)
+ {
+ if (l.equals(msgid))
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ return found;
+ }
+
+ public enum Columns
+ {
+ ID,
+ Size,
+ Exchange,
+ RoutingKey,
+ isImmediate,
+ isMandatory,
+ isPersistent,
+ isRedelivered,
+ isDelivered,
+ App_ID,
+ Cluster_ID,
+ Content_Type,
+ Correlation_ID,
+ Delivery_Mode,
+ Encoding,
+ Arrival,
+ Expiration,
+ Priority,
+ Property_Flag,
+ ReplyTo,
+ Timestamp,
+ Type,
+ UserID,
+ MsgHeaders
+ }
+}
+
+
diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java b/java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java
index f9e093dba7..c27c52eb8e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java
@@ -18,7 +18,7 @@
*
*
*/
-package org.apache.qpid.server.security;
+package org.apache.qpid.tools.security;
import org.apache.commons.codec.binary.Base64;
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java
new file mode 100644
index 0000000000..986fea32cc
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+public interface CommandParser
+{
+ /**
+ * If there is more than one command received on the last parse request.
+ *
+ * Subsequent calls to parse will utilise this input rather than reading new data from the input source
+ * @return boolean
+ */
+ boolean more();
+
+ /**
+ * True if the currently parsed command has been requested as a background operation
+ *
+ * @return boolean
+ */
+ boolean isBackground();
+
+ /**
+ * Parses user commands, and groups tokens in the
+ * String[] format that all Java main's love.
+ *
+ * If more than one command is provided in one input line then the more() method will return true.
+ * A subsequent call to parse() will continue to parse that input line before reading new input.
+ *
+ * @return <code>input</code> split in args[] format; null if eof.
+ * @throws java.io.IOException if there is a problem reading from the input stream
+ */
+ String[] parse() throws java.io.IOException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java
new file mode 100644
index 0000000000..cf457d1ea5
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+import java.util.List;
+
+public interface Console
+{
+ public enum CellFormat
+ {
+ CENTRED, LEFT, RIGHT
+ }
+
+ public static String ROW_DIVIDER = "*divider";
+
+ public void print(String... message);
+
+ public void println(String... message);
+
+ public String readln();
+
+ /**
+ * Reads and parses the command line.
+ *
+ *
+ * @return The next command or null
+ */
+ public String[] readCommand();
+
+ public CommandParser getCommandParser();
+
+ public void setCommandParser(CommandParser parser);
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +-------------+
+ * | Heading |
+ * +-------------+
+ * | Item 1 |
+ * | Item 2 |
+ * | Item 3 |
+ * +-------------+
+ *
+ * @param hasTitle should list[0] be used as a heading
+ * @param list The list of Strings to display
+ */
+ public void displayList(boolean hasTitle, String... list);
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +----------------------------+
+ * | Heading |
+ * +----------------------------+
+ * | title | title | ..
+ * +----------------------------+
+ * | Item 2 | value 2 | ..
+ * +----------------------------+ (*divider)
+ * | Item 3 | value 2 | ..
+ * +----------------------------+
+ *
+ * @param title The title to display if any
+ * @param entries the entries to display in a map.
+ */
+ void printMap(String title, List<List> entries);
+
+
+ public void close();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java
new file mode 100644
index 0000000000..ba272ef19d
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+public class SimpleCommandParser implements CommandParser
+{
+ private static final String COMMAND_SEPERATOR = ";";
+
+ /** Input source of commands */
+ protected BufferedReader _reader;
+
+ /** The next list of commands from the command line */
+ private StringBuilder _nextCommand = null;
+
+ public SimpleCommandParser(BufferedReader reader)
+ {
+ _reader = reader;
+ }
+
+ public boolean more()
+ {
+ return _nextCommand != null;
+ }
+
+ public boolean isBackground()
+ {
+ return false;
+ }
+
+ public String[] parse() throws IOException
+ {
+ String[] commands = null;
+
+ String input = null;
+
+ if (_nextCommand == null)
+ {
+ input = _reader.readLine();
+ }
+ else
+ {
+ input = _nextCommand.toString();
+ _nextCommand = null;
+ }
+
+ StringTokenizer tok = new StringTokenizer(input, " ");
+
+ int tokenCount = tok.countTokens();
+ int index = 0;
+
+ if (tokenCount > 0)
+ {
+ commands = new String[tokenCount];
+ boolean commandComplete = false;
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+
+ if (next.equals(COMMAND_SEPERATOR))
+ {
+ commandComplete = true;
+ _nextCommand = new StringBuilder();
+ continue;
+ }
+
+ if (commandComplete)
+ {
+ _nextCommand.append(next);
+ _nextCommand.append(" ");
+ }
+ else
+ {
+ commands[index] = next;
+ index++;
+ }
+ }
+
+ }
+
+ //Reduce the String[] if not all the tokens were used in this command.
+ // i.e. there is more than one command on the line.
+ if (index != tokenCount)
+ {
+ String[] shortCommands = new String[index];
+ System.arraycopy(commands, 0, shortCommands, 0, index);
+ return shortCommands;
+ }
+ else
+ {
+ return commands;
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java
new file mode 100644
index 0000000000..ec080a4611
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.tools.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimpleConsole implements Console
+{
+ /** SLF4J Logger. */
+ private static Logger _devlog = LoggerFactory.getLogger(SimpleConsole.class);
+
+ /** Console Writer. */
+ protected static BufferedWriter _consoleWriter;
+
+ /** Console Reader. */
+ protected static BufferedReader _consoleReader;
+
+ /** Parser for command-line input. */
+ protected CommandParser _parser;
+
+ public SimpleConsole(BufferedWriter writer, BufferedReader reader)
+ {
+ _consoleWriter = writer;
+ _consoleReader = reader;
+ _parser = new SimpleCommandParser(_consoleReader);
+ }
+
+ public void print(String... message)
+ {
+ try
+ {
+ for (String s : message)
+ {
+ _consoleWriter.write(s);
+ }
+ _consoleWriter.flush();
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to write:" + message);
+ }
+
+ }
+
+ public void println(String... message)
+ {
+ print(message);
+ print(System.getProperty("line.separator"));
+ }
+
+
+ public String readln()
+ {
+ try
+ {
+ return _consoleReader.readLine();
+ }
+ catch (IOException e)
+ {
+ _devlog.debug("Unable to read input due to:" + e.getMessage());
+ return null;
+ }
+ }
+
+ public String[] readCommand()
+ {
+ try
+ {
+ return _parser.parse();
+ }
+ catch (IOException e)
+ {
+ _devlog.error("Error reading command:" + e.getMessage());
+ return new String[0];
+ }
+ }
+
+ public CommandParser getCommandParser()
+ {
+ return _parser;
+ }
+
+ public void setCommandParser(CommandParser parser)
+ {
+ _parser = parser;
+ }
+
+ public void displayList(boolean hasTitle, String... list)
+ {
+ java.util.List<java.util.List> data = new LinkedList<List>();
+
+ java.util.List<String> values = new LinkedList<String>();
+
+ data.add(values);
+
+ for (String value : list)
+ {
+ values.add(value);
+ }
+
+ if (hasTitle)
+ {
+ values.add(1, "*divider");
+ }
+
+ printMap(null, data);
+ }
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +----------------------------+
+ * | Heading |
+ * +----------------------------+
+ * | title | title | ..
+ * +----------------------------+
+ * | Item 2 | value 2 | ..
+ * | Item 3 | value 2 | ..
+ * +----------------------------+
+ *
+ * @param title The title to display if any
+ * @param entries the entries to display in a map.
+ */
+ public void printMap(String title, java.util.List<java.util.List> entries)
+ {
+ try
+ {
+ int columns = entries.size();
+
+ int[] columnWidth = new int[columns];
+
+ // calculate row count
+ int rowMax = 0;
+
+ //the longest item
+ int itemMax = 0;
+
+ for (int i = 0; i < columns; i++)
+ {
+ int columnIRowMax = entries.get(i).size();
+
+ if (columnIRowMax > rowMax)
+ {
+ rowMax = columnIRowMax;
+ }
+ for (Object values : entries.get(i))
+ {
+ if (values.toString().equals(Console.ROW_DIVIDER))
+ {
+ continue;
+ }
+
+ int itemLength = values.toString().length();
+
+ //note for single width
+ if (itemLength > itemMax)
+ {
+ itemMax = itemLength;
+ }
+
+ //note for mulit width
+ if (itemLength > columnWidth[i])
+ {
+ columnWidth[i] = itemLength;
+ }
+
+ }
+ }
+
+ int tableWidth = 0;
+
+
+ for (int i = 0; i < columns; i++)
+ {
+ // plus 2 for the space padding
+ columnWidth[i] += 2;
+ }
+ for (int size : columnWidth)
+ {
+ tableWidth += size;
+ }
+ tableWidth += (columns - 1);
+
+ if (title != null)
+ {
+ if (title.length() > tableWidth)
+ {
+ tableWidth = title.length();
+ }
+
+ printCellRow("+", "-", tableWidth);
+
+ printCell(CellFormat.CENTRED, "|", tableWidth, " " + title + " ", 0);
+ _consoleWriter.newLine();
+
+ }
+
+ //put top line | or bottom of title
+ printCellRow("+", "-", tableWidth);
+
+ //print the table data
+ int row = 0;
+
+ for (; row < rowMax; row++)
+ {
+ for (int i = 0; i < columns; i++)
+ {
+ java.util.List columnData = entries.get(i);
+
+ String value;
+ // does this column have a value for this row
+ if (columnData.size() > row)
+ {
+ value = " " + columnData.get(row).toString() + " ";
+ }
+ else
+ {
+ value = " ";
+ }
+
+ if (i == 0 && value.equals(" " + Console.ROW_DIVIDER + " "))
+ {
+ printCellRow("+", "-", tableWidth);
+ //move on to the next row
+ break;
+ }
+ else
+ {
+ printCell(CellFormat.LEFT, "|", columnWidth[i], value, i);
+ }
+
+ // if it is the last row then do a new line.
+ if (i == columns - 1)
+ {
+ _consoleWriter.newLine();
+ }
+ }
+ }
+
+ printCellRow("+", "-", tableWidth);
+
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to write.");
+ }
+ }
+
+ public void close()
+ {
+
+ try
+ {
+ _consoleReader.close();
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to close reader.");
+ }
+
+ try
+ {
+
+ _consoleWriter.close();
+ }
+ catch (IOException e)
+ {
+ _devlog.error(e.getMessage() + ": Occured whilst trying to close writer.");
+ }
+
+ }
+
+ private void printCell(CellFormat format, String edge, int cellWidth, String cell, int column) throws IOException
+ {
+ int pad = cellWidth - cell.length();
+
+ if (column == 0)
+ {
+ _consoleWriter.write(edge);
+ }
+
+ switch (format)
+ {
+ case CENTRED:
+ printPad(" ", pad / 2);
+ break;
+ case RIGHT:
+ printPad(" ", pad);
+ break;
+ }
+
+ _consoleWriter.write(cell);
+
+
+ switch (format)
+ {
+ case CENTRED:
+ // if pad isn't even put the extra one on the right
+ if (pad % 2 == 0)
+ {
+ printPad(" ", pad / 2);
+ }
+ else
+ {
+ printPad(" ", (pad / 2) + 1);
+ }
+ break;
+ case LEFT:
+ printPad(" ", pad);
+ break;
+ }
+
+
+ _consoleWriter.write(edge);
+
+ }
+
+ private void printCellRow(String edge, String mid, int cellWidth) throws IOException
+ {
+ _consoleWriter.write(edge);
+
+ printPad(mid, cellWidth);
+
+ _consoleWriter.write(edge);
+ _consoleWriter.newLine();
+ }
+
+ private void printPad(String padChar, int count) throws IOException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ _consoleWriter.write(padChar);
+ }
+ }
+
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java
new file mode 100644
index 0000000000..6a95529059
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package com.redhat.etp.qpid.utils;
+
+public interface CommandParser
+{
+ /**
+ * If there is more than one command received on the last parse request.
+ *
+ * Subsequent calls to parse will utilise this input rather than reading new data from the input source
+ * @return boolean
+ */
+ boolean more();
+
+ /**
+ * True if the currently parsed command has been requested as a background operation
+ *
+ * @return boolean
+ */
+ boolean isBackground();
+
+ /**
+ * Parses user commands, and groups tokens in the
+ * String[] format that all Java main's love.
+ *
+ * If more than one command is provided in one input line then the more() method will return true.
+ * A subsequent call to parse() will continue to parse that input line before reading new input.
+ *
+ * @return <code>input</code> split in args[] format; null if eof.
+ * @throws java.io.IOException if there is a problem reading from the input stream
+ */
+ String[] parse() throws java.io.IOException;
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java
new file mode 100644
index 0000000000..892a48254c
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package com.redhat.etp.qpid;
+
+import java.util.List;
+
+public interface Console
+{
+ public enum CellFormat
+ {
+ CENTRED, LEFT, RIGHT
+ }
+
+ public static String ROW_DIVIDER = "*divider";
+
+ public void print(String... message);
+
+ public void println(String... message);
+
+ public String[] readln();
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +-------------+
+ * | Heading |
+ * +-------------+
+ * | Item 1 |
+ * | Item 2 |
+ * | Item 3 |
+ * +-------------+
+ *
+ * @param hasTitle should list[0] be used as a heading
+ * @param list The list of Strings to display
+ */
+ public void displayList(boolean hasTitle, String... list);
+
+ /**
+ *
+ * Prints the list of String nicely.
+ *
+ * +----------------------------+
+ * | Heading |
+ * +----------------------------+
+ * | title | title | ..
+ * +----------------------------+
+ * | Item 2 | value 2 | ..
+ * +----------------------------+ (*divider)
+ * | Item 3 | value 2 | ..
+ * +----------------------------+
+ *
+ * @param title The title to display if any
+ * @param entries the entries to display in a map.
+ */
+ void printMap(String title, List<List> entries);
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java
new file mode 100644
index 0000000000..ea4045c917
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java
@@ -0,0 +1,352 @@
+/*
+ * The Java Shell: jsh core -- RELEASE: alpha3
+ * (C)1999 Osvaldo Pinali Doederlein.
+ *
+ * LICENSE
+ * =======
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * CHANGES
+ * =======
+ * 1.0.2 - Added support for non-quoted '\' escape char (Not interpreted by the shell)
+ * 1.0.1 - Added support for arguments in aliases
+ * 1.0.0 - Initial release; split from Shell and enhanced a lot.
+ *
+ * LINKS
+ * =====
+ * Contact: mailto@osvaldo.visionnaire.com.br, mailto@g.collin@appliweb.net
+ * Site #1: http://www.geocities.com/ResearchTriangle/Node/2005/
+ * Site #2: http://www.appliweb.net/jsh
+ */
+
+package com.redhat.etp.qpid.utils;
+
+import java.io.BufferedReader;
+import java.util.Vector;
+
+/**
+ * The Java Shell.
+ * <p>
+ * Provides an environment for launching and controlling Java apps.
+ * <p>
+ * TODO: - Support for applets!
+ * - Support for variable replacement
+ *
+ * @author Osvaldo Pinali Doederlein.
+ */
+public class RSHCommandParser implements CommandParser
+{
+ /** Where commands come from. */
+ protected BufferedReader reader;
+ /** Continuation for command line. */
+ protected String contLine = null;
+ /** Run next command in background? */
+ protected boolean background;
+ protected String[] env; // Commands passed in args.
+
+ public RSHCommandParser(BufferedReader reader)
+ {
+ this(reader, null);
+ }
+
+ public RSHCommandParser(BufferedReader reader, String env[])
+ {
+ this.reader = reader;
+ this.env = env;
+ if (env == null)
+ {
+ this.env = new String[0];
+ }
+ }
+
+
+ public boolean more()
+ {
+ return contLine != null;
+ }
+
+ public boolean isBackground()
+ {
+ return background;
+ }
+
+ /**
+ * Solves and expands aliases in an argument array.
+ * This expansion affects only the first (if any) argument; the
+ * typical thing to do, as we don't want to expand parameters.
+ * We recursively parse the result to be sure we expand everything.
+ * For example, an alias can be expanded to further aliases, or it can contains args.
+ *
+ * @param args Raw arguments.
+ * @return <code>args</code> resolved and expanded.
+ */
+ public static String[] expand(String[] args)
+ {
+ if (args.length > 0)
+ {
+ String expanded = args[0];//Alias.resolve(args[0]);
+
+ // Try to expand recursively the command line
+ if (expanded != args[0])
+ {
+ RSHCommandParser recurse = new RSHCommandParser(new BufferedReader(
+ new java.io.StringReader(expanded)));
+
+ try
+ {
+ String cmdLine[] = recurse.parse();
+ cmdLine = recurse.expand(cmdLine);
+
+ // do we need to handle new arguments and insert them in the command array ?
+ if (cmdLine.length > 1)
+ {
+ String[] newArgs = new String[cmdLine.length + args.length - 1];
+ System.arraycopy(cmdLine, 0, newArgs, 0, cmdLine.length);
+
+ if (args.length > 1)
+ {
+ System.arraycopy(args, 1, newArgs, cmdLine.length, args.length - 1);
+ }
+
+ args = newArgs;
+ }
+ else if (cmdLine.length == 1)
+ {
+ args[0] = cmdLine[0];
+ }
+ }
+ catch (java.io.IOException e)
+ {
+ }
+ }
+ }
+
+ return args;
+ }
+
+ public String[] parse() throws java.io.IOException
+ {
+ final int READ = 0, QUOTE = 1, SKIP = 2, ESCAPE = 3, NONQUOTEDESCAPE = 4, VARIABLE = 5;
+ final int EOF = 0xFFFF;
+ int mode = SKIP;
+ Vector<String> args = new Vector<String>();
+ StringBuffer current = new StringBuffer();
+ StringBuffer varName = null;
+ background = false;
+ String line;
+
+ if (contLine == null)
+ {
+ line = reader.readLine();
+ }
+ else
+ {
+ line = contLine;
+ contLine = null;
+ }
+
+ if (line == null)
+ {
+ reader = null;
+ return null;
+ }
+
+ for (int pos = 0; pos < line.length(); ++pos)
+ {
+ char c = line.charAt(pos);
+
+ switch (mode)
+ {
+ case SKIP:
+ switch (c)
+ {
+ case' ':
+ case'\t':
+ break;
+ case'\"':
+ mode = QUOTE;
+ break;
+ case'&':
+ background = true;
+ case';':
+ contLine = line.substring(pos + 1);
+ pos = line.length();
+ break;
+ default:
+ mode = READ;
+ --pos;
+ }
+ break;
+
+ case READ:
+ switch (c)
+ {
+ case'\"':
+ mode = QUOTE;
+ break;
+ case';':
+ case'&':
+ --pos;
+ case' ':
+ case'\t':
+ mode = SKIP;
+ break;
+ case'\\':
+ mode = NONQUOTEDESCAPE;
+ break;
+ case'$':
+ mode = VARIABLE;
+ varName = new StringBuffer();
+ break;
+ default:
+ current.append(c);
+ }
+ if ((mode != READ) && (mode != NONQUOTEDESCAPE))
+ {
+ args.addElement(current.toString());
+ current = new StringBuffer();
+ }
+ break;
+
+ case QUOTE:
+ switch (c)
+ {
+ case'\"':
+ mode = READ;
+ break;
+ case'\\':
+ mode = ESCAPE;
+ break;
+ default:
+ current.append(c);
+ }
+ break;
+
+ case ESCAPE:
+ switch (c)
+ {
+ case'n':
+ c = '\n';
+ break;
+ case'r':
+ c = '\r';
+ break;
+ case't':
+ c = '\t';
+ break;
+ case'b':
+ c = '\b';
+ break;
+ case'f':
+ c = '\f';
+ break;
+ default:
+ current.append('\\');
+ break;
+ }
+ mode = QUOTE;
+ current.append(c);
+ break;
+ case NONQUOTEDESCAPE:
+ switch (c)
+ {
+ case';':
+ mode = READ;
+ current.append(c);
+ break;
+ default: // This is not a escaped char.
+ mode = READ;
+ current.append('\\');
+ current.append(c);
+ break;
+ }
+ break;
+ case VARIABLE:
+ switch (c)
+ {
+ case'$':
+ {
+// String val = Set.get(new String(varName));
+// if (val != null)
+// {
+// current.append(val);
+// }
+ mode = READ;
+ break;
+ }
+ case'@':
+ {
+ StringBuffer val = new StringBuffer();
+ int i;
+ for (i = 0; i < env.length; i++)
+ {
+ val.append(env[i]);
+ val.append(' ');
+ }
+ current.append(val);
+ mode = READ;
+ break;
+ }
+ case'0':
+ case'1':
+ case'2':
+ case'3':
+ case'4':
+ case'5':
+ case'6':
+ case'7':
+ case'8':
+ case'9':
+ {
+ if (varName.length() == 0)
+ {
+ int value = Integer.parseInt(new String(new char[]{c}));
+ if (env.length > value)
+ {
+ current.append(env[value]);
+ }
+ mode = READ;
+ break;
+ }
+ // else fall back
+ }
+ default:
+ varName.append(c);
+ break;
+ }
+ break;
+ }
+ }
+
+ if (current.length() > 0)
+ {
+ args.addElement(current.toString());
+ }
+ return expand(toArray(args));
+ }
+
+ private String[] toArray(Vector strings)
+ {
+ String[] arr = new String[strings.size()];
+
+ for (int i = 0; i < strings.size(); ++i)
+ {
+ arr[i] = (String) strings.elementAt(i);
+ }
+
+ return arr;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java
new file mode 100644
index 0000000000..98e816554e
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package com.redhat.etp.qpid.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+public class SimpleCommandParser implements CommandParser
+{
+ private static final String COMMAND_SEPERATOR = ";";
+
+ /** Input source of commands */
+ protected BufferedReader _reader;
+
+ /** The next list of commands from the command line */
+ private StringBuilder _nextCommand = null;
+
+ public SimpleCommandParser(BufferedReader reader)
+ {
+ _reader = reader;
+ }
+
+ public boolean more()
+ {
+ return _nextCommand != null;
+ }
+
+ public boolean isBackground()
+ {
+ return false;
+ }
+
+ public String[] parse() throws IOException
+ {
+ String[] commands = null;
+
+ String input = null;
+
+ if (_nextCommand == null)
+ {
+ input = _reader.readLine();
+ }
+ else
+ {
+ input = _nextCommand.toString();
+ _nextCommand = null;
+ }
+
+ StringTokenizer tok = new StringTokenizer(input, " ");
+
+ int tokenCount = tok.countTokens();
+ int index = 0;
+
+ if (tokenCount > 0)
+ {
+ commands = new String[tokenCount];
+ boolean commandComplete = false;
+
+ while (tok.hasMoreTokens())
+ {
+ String next = tok.nextToken();
+
+ if (next.equals(COMMAND_SEPERATOR))
+ {
+ commandComplete = true;
+ _nextCommand = new StringBuilder();
+ continue;
+ }
+
+ if (commandComplete)
+ {
+ _nextCommand.append(next);
+ _nextCommand.append(" ");
+ }
+ else
+ {
+ commands[index] = next;
+ index++;
+ }
+ }
+
+ }
+
+ //Reduce the String[] if not all the tokens were used in this command.
+ // i.e. there is more than one command on the line.
+ if (index != tokenCount)
+ {
+ String[] shortCommands = new String[index];
+ System.arraycopy(commands, 0, shortCommands, 0, index);
+ return shortCommands;
+ }
+ else
+ {
+ return commands;
+ }
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index cf39e43ecc..114abfab1b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -391,7 +391,15 @@ under the License.
<version>0.5</version>
</plugin>
- <plugin>
+
+
+
+ </plugins>
+ </pluginManagement>
+
+
+ <!--plugins>
+ <plugin>
<artifactId>maven-remote-resources-plugin</artifactId>
<version>1.0-alpha-5</version>
<executions>
@@ -412,10 +420,7 @@ under the License.
</execution>
</executions>
</plugin>
-
-
- </plugins>
- </pluginManagement>
+ </plugins-->
<defaultGoal>install</defaultGoal>