diff options
37 files changed, 4045 insertions, 74 deletions
diff --git a/java/broker/bin/msTool.sh b/java/broker/bin/msTool.sh new file mode 100755 index 0000000000..2f05e511c5 --- /dev/null +++ b/java/broker/bin/msTool.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java \ + JAVA_VM=-server \ + JAVA_OPTS=-Dlog4j.configuration=$QPID_TOOLS\etc\messagestoretool-log4j.xml \ + QPID_CLASSPATH=$QPID_LIBS + +if [ -z "$QPID_TOOLS" ]; then + die "QPID_HOME must be set" +fi + +. qpid-run org.apache.qpid.tools.messagestore.MessageStoreTool "$@" diff --git a/java/broker/bin/qpid-passwd b/java/broker/bin/qpid-passwd index 6e64af6e70..f046252522 100644 --- a/java/broker/bin/qpid-passwd +++ b/java/broker/bin/qpid-passwd @@ -27,4 +27,4 @@ export JAVA=java \ JAVA_MEM=-Xmx1024m \
QPID_CLASSPATH=$QPID_LIBS
-. qpid-run org.apache.qpid.server.security.Passwd "$@"
+. qpid-run org.apache.qpid.tools.security.Passwd "$@"
diff --git a/java/broker/bin/qpid-server b/java/broker/bin/qpid-server index 76d0ad786d..fabf7e941c 100644 --- a/java/broker/bin/qpid-server +++ b/java/broker/bin/qpid-server @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,7 +25,7 @@ QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar export JAVA=java \ JAVA_VM=-server \ JAVA_MEM=-Xmx1024m \ - JAVA_GC=-XX:-UseConcMarkSweepGC + JAVA_GC=-XX:-UseConcMarkSweepGC \ QPID_CLASSPATH=$QPID_LIBS . qpid-run org.apache.qpid.server.Main "$@" diff --git a/java/broker/etc/messagestoretool-log4j.xml b/java/broker/etc/messagestoretool-log4j.xml new file mode 100644 index 0000000000..0313de4f95 --- /dev/null +++ b/java/broker/etc/messagestoretool-log4j.xml @@ -0,0 +1,53 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + </layout> + </appender> + + <category name="org.apache.qpid.tools"> + <priority value="info"/> + </category> + + <category name="org.apache.qpid"> + <priority value="info"/> + </category> + + <category name="org.apache.qpid.server.security"> + <priority value="info"/> + </category> + + <category name="org.apache.qpid.server.management"> + <priority value="error"/> + </category> + + + <root> + <priority value="info"/> + <appender-ref ref="STDOUT"/> + </root> +</log4j:configuration> diff --git a/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java b/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java index 931c15a664..7e0c4defe1 100644 --- a/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java +++ b/java/broker/src/main/java/org/apache/log4j/QpidCompositeRollingAppender.java @@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPOutputStream; import org.apache.log4j.helpers.CountingQuietWriter; @@ -39,8 +38,6 @@ import org.apache.log4j.helpers.LogLog; import org.apache.log4j.helpers.OptionConverter; import org.apache.log4j.spi.LoggingEvent; -import org.apache.qpid.framing.FieldTable; - /** * <p>CompositeRollingAppender combines RollingFileAppender and DailyRollingFileAppender<br> It can function as either * or do both at the same time (making size based rolling files like RollingFileAppender until a data/time boundary is diff --git a/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java new file mode 100644 index 0000000000..c17c27a497 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/configuration/Configuration.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.configuration; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class Configuration +{ + public static final String QPID_HOME = "QPID_HOME"; + + final String QPIDHOME = System.getProperty(QPID_HOME); + + private static Logger _devlog = LoggerFactory.getLogger(Configuration.class); + + public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml"; + public static final String DEFAULT_CONFIG_FILE = "etc/config.xml"; + + protected final Options _options = new Options(); + protected CommandLine _commandLine; + protected File _configFile; + + + public Configuration() + { + + } + + public void processCommandline(String[] args) throws InitException + { + try + { + _commandLine = new PosixParser().parse(_options, args); + } + catch (ParseException e) + { + throw new InitException("Unable to parse commmandline", e); + } + + final File defaultConfigFile = new File(QPIDHOME, DEFAULT_CONFIG_FILE); + setConfig(new File(_commandLine.getOptionValue("c", defaultConfigFile.getPath()))); + } + + public void setConfig(File file) + { + _configFile = file; + } + + /** + * @param option The option to set. + */ + public void setOption(Option option) + { + _options.addOption(option); + } + + /** + * getOptionValue from the configuration + * @param option variable argument, first string is option to get, second if present is the default value. + * @return the String for the given option or null if not present (if default value not specified) + */ + public String getOptionValue(String... option) + { + if (option.length == 1) + { + return _commandLine.getOptionValue(option[0]); + } + else if (option.length == 2) + { + return _commandLine.getOptionValue(option[0], option[1]); + } + return null; + } + + public void loadConfig(File file) throws InitException + { + setConfig(file); + loadConfig(); + } + + private void loadConfig() throws InitException + { + if (!_configFile.exists()) + { + String error = "File " + _configFile + " could not be found. Check the file exists and is readable."; + + if (QPIDHOME == null) + { + error = error + "\nNote: " + QPID_HOME + " is not set."; + } + + throw new InitException(error, null); + } + else + { + _devlog.debug("Using configuration file " + _configFile.getAbsolutePath()); + } + +// String logConfig = _commandLine.getOptionValue("l"); +// String logWatchConfig = _commandLine.getOptionValue("w", "0"); +// if (logConfig != null) +// { +// File logConfigFile = new File(logConfig); +// configureLogging(logConfigFile, logWatchConfig); +// } +// else +// { +// File configFileDirectory = _configFile.getParentFile(); +// File logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME); +// configureLogging(logConfigFile, logWatchConfig); +// } + } + + +// private void configureLogging(File logConfigFile, String logWatchConfig) +// { +// int logWatchTime = 0; +// try +// { +// logWatchTime = Integer.parseInt(logWatchConfig); +// } +// catch (NumberFormatException e) +// { +// _devlog.error("Log watch configuration value of " + logWatchConfig + " is invalid. Must be " +// + "a non-negative integer. Using default of zero (no watching configured"); +// } +// +// if (logConfigFile.exists() && logConfigFile.canRead()) +// { +// _devlog.info("Configuring logger using configuration file " + logConfigFile.getAbsolutePath()); +// if (logWatchTime > 0) +// { +// _devlog.info("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every " +// + logWatchTime + " seconds"); +// // log4j expects the watch interval in milliseconds +// DOMConfigurator.configureAndWatch(logConfigFile.getAbsolutePath(), logWatchTime * 1000); +// } +// else +// { +// DOMConfigurator.configure(logConfigFile.getAbsolutePath()); +// } +// } +// else +// { +// System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath()); +// System.err.println("Using basic log4j configuration"); +// BasicConfigurator.configure(); +// } +// } + + public File getConfigFile() + { + return _configFile; + } + + + public class InitException extends Exception + { + InitException(String msg, Throwable cause) + { + super(msg, cause); + } + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 868ac31a54..9ebb893362 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -38,9 +38,13 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.List; +import java.util.Map; + public abstract class AbstractExchange implements Exchange, Managable { private AMQShortString _name; @@ -189,6 +193,8 @@ public abstract class AbstractExchange implements Exchange, Managable } } + abstract public Map<AMQShortString, List<AMQQueue>> getBindings(); + public String toString() { return getClass().getName() + "[" + getName() +"]"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 4774383642..98abf7977a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -20,16 +20,17 @@ */ package org.apache.qpid.server.exchange; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.ExchangeInitialiser; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class DefaultExchangeRegistry implements ExchangeRegistry { @@ -63,7 +64,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public void registerExchange(Exchange exchange) throws AMQException { _exchangeMap.put(exchange.getName(), exchange); - if(exchange.isDurable()) + if (exchange.isDurable()) { getMessageStore().createExchange(exchange); } @@ -79,13 +80,18 @@ public class DefaultExchangeRegistry implements ExchangeRegistry return _defaultExchange; } + public Collection<AMQShortString> getExchangeNames() + { + return _exchangeMap.keySet(); + } + public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException { // TODO: check inUse argument Exchange e = _exchangeMap.remove(name); if (e != null) { - if(e.isDurable()) + if (e.isDurable()) { getMessageStore().removeExchange(e); } @@ -99,7 +105,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry public Exchange getExchange(AMQShortString name) { - if((name == null) || name.length() == 0) + if ((name == null) || name.length() == 0) { return getDefaultExchange(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index a3aa6e7f5d..a632720ebe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -26,22 +26,16 @@ import java.util.Map; import javax.management.JMException; import javax.management.MBeanException; -import javax.management.openmbean.ArrayType; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; @@ -157,7 +151,7 @@ public class DestNameExchange extends AbstractExchange if (!_index.remove(routingKey, queue)) { throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + - " with routing key " + routingKey + ". No queue was registered with that routing key"); + " with routing key " + routingKey + ". No queue was registered with that _routing key"); } } @@ -222,4 +216,9 @@ public class DestNameExchange extends AbstractExchange { return !_index.getBindingsMap().isEmpty(); } + + public Map<AMQShortString, List<AMQQueue>> getBindings() + { + return _index.getBindingsMap(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 418cf64c56..d88dcb11f7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -21,11 +21,9 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; - import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; @@ -35,17 +33,11 @@ import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException; import javax.management.MBeanException; -import javax.management.openmbean.ArrayType; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -59,7 +51,7 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = - new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private static final String TOPIC_SEPARATOR = "."; private static final String AMQP_STAR = "*"; @@ -92,7 +84,7 @@ public class DestWildExchange extends AbstractExchange queueList.add(q.getName().toString()); } - Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) }; + Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -281,7 +273,7 @@ public class DestWildExchange extends AbstractExchange if (queues == null) { throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey + ". No queue was registered with that routing key"); + + " with routing key " + routingKey + ". No queue was registered with that _routing key"); } @@ -289,7 +281,7 @@ public class DestWildExchange extends AbstractExchange if (!removedQ) { throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey); + + " with routing key " + routingKey); } if (queues.isEmpty()) @@ -311,6 +303,11 @@ public class DestWildExchange extends AbstractExchange } } + public Map<AMQShortString, List<AMQQueue>> getBindings() + { + return _routingKey2queues; + } + private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { List<AMQQueue> list = new LinkedList<AMQQueue>(); @@ -358,8 +355,8 @@ public class DestWildExchange extends AbstractExchange if (queueList.size() > (depth + queueskip)) { // a hash and it is the last entry matching = - queueList.get(depth + queueskip).equals(AMQP_HASH) - && (queueList.size() == (depth + queueskip + 1)); + queueList.get(depth + queueskip).equals(AMQP_HASH) + && (queueList.size() == (depth + queueskip + 1)); } } else if (routingkeyList.size() > (depth + routingskip)) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index a5f77cc2a4..9fa7b56ba7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -27,9 +27,13 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Map; +import java.util.List; + public interface Exchange { AMQShortString getName(); + AMQShortString getType(); void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; @@ -61,7 +65,7 @@ public interface Exchange boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException; /** - * Determines whether a message is routing to any queue using a specific routing key + * Determines whether a message is routing to any queue using a specific _routing key * @param routingKey * @return * @throws AMQException @@ -76,4 +80,6 @@ public interface Exchange * @throws AMQException */ boolean hasBindings() throws AMQException; + + Map<AMQShortString, List<AMQQueue>> getBindings(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index d3a466565f..fe3b19e74e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.exchange; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import java.util.Collection; + public interface ExchangeRegistry extends MessageRouter { @@ -43,5 +45,7 @@ public interface ExchangeRegistry extends MessageRouter Exchange getDefaultExchange(); + Collection<AMQShortString> getExchangeNames(); + void initialise() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 5a6301548b..a65b105809 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -34,17 +33,13 @@ import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException;
import javax.management.MBeanException;
-import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
@@ -79,7 +74,7 @@ public class FanoutExchange extends AbstractExchange {
String queueName = queue.getName().toString();
- Object[] bindingItemValues = { queueName, new String[] { queueName } };
+ Object[] bindingItemValues = {queueName, new String[]{queueName}};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -120,6 +115,11 @@ public class FanoutExchange extends AbstractExchange }
}
+ public Map<AMQShortString, List<AMQQueue>> getBindings()
+ {
+ return null;
+ }
+
public AMQShortString getType()
{
return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index b5c03a7291..bcbca8becf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -20,23 +20,6 @@ */ package org.apache.qpid.server.exchange; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import javax.management.JMException; -import javax.management.openmbean.ArrayType; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; @@ -50,6 +33,23 @@ import org.apache.qpid.server.management.MBeanDescription; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import javax.management.JMException; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + /** * An exchange that binds queues based on a set of required headers and header values * and routes messages to these queues by matching the headers of the message against @@ -91,13 +91,13 @@ public class HeadersExchange extends AbstractExchange private final class HeadersExchangeMBean extends ExchangeMBean { @MBeanConstructor("Creates an MBean for AMQ Headers exchange") - public HeadersExchangeMBean() throws JMException + public HeadersExchangeMBean() throws JMException { super(); _exchangeType = "headers"; init(); } - + /** * initialises the OpenType objects. */ @@ -113,7 +113,7 @@ public class HeadersExchange extends AbstractExchange _bindingDataType = new CompositeType("Exchange Binding", "Queue name and header bindings", _bindingItemNames, _bindingItemNames, _bindingItemTypes); _bindinglistDataType = new TabularType("Exchange Bindings", "List of exchange bindings for " + getName(), - _bindingDataType, _bindingItemIndexNames); + _bindingDataType, _bindingItemIndexNames); } public TabularData bindings() throws OpenDataException @@ -169,7 +169,7 @@ public class HeadersExchange extends AbstractExchange throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); } - String[] bindings = binding.split(","); + String[] bindings = binding.split(","); FieldTable bindingMap = new FieldTable(); for (int i = 0; i < bindings.length; i++) { @@ -288,6 +288,11 @@ public class HeadersExchange extends AbstractExchange } } + public Map<AMQShortString, List<AMQQueue>> getBindings() + { + return null; + } + private static class Registration { private final HeadersBinding binding; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index c0f1e7f40c..cbe9246f09 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -20,13 +20,14 @@ */ package org.apache.qpid.server.queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class DefaultQueueRegistry implements QueueRegistry { private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>(); @@ -57,4 +58,14 @@ public class DefaultQueueRegistry implements QueueRegistry { return _queueMap.get(name); } + + public Collection<AMQShortString> getQueueNames() + { + return _queueMap.keySet(); + } + + public Collection<AMQQueue> getQueues() + { + return _queueMap.values(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 3b1b5acf3c..1210f0e97c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.Collection; public interface QueueRegistry { @@ -34,4 +35,9 @@ public interface QueueRegistry void unregisterQueue(AMQShortString name) throws AMQException; AMQQueue getQueue(AMQShortString name); + + Collection<AMQShortString> getQueueNames(); + + Collection<AMQQueue> getQueues(); + } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java new file mode 100644 index 0000000000..c81d1dbffa --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.configuration.Configuration; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.tools.messagestore.commands.Clear; +import org.apache.qpid.tools.messagestore.commands.Command; +import org.apache.qpid.tools.messagestore.commands.Dump; +import org.apache.qpid.tools.messagestore.commands.Help; +import org.apache.qpid.tools.messagestore.commands.List; +import org.apache.qpid.tools.messagestore.commands.Load; +import org.apache.qpid.tools.messagestore.commands.Quit; +import org.apache.qpid.tools.messagestore.commands.Select; +import org.apache.qpid.tools.messagestore.commands.Show; +import org.apache.qpid.tools.utils.CommandParser; +import org.apache.qpid.tools.utils.Console; +import org.apache.qpid.tools.utils.SimpleCommandParser; +import org.apache.qpid.tools.utils.SimpleConsole; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * MessageStoreTool. + */ +public class MessageStoreTool +{ + /** Text outputted at the start of each console.*/ + private static final String BOILER_PLATE = "MessageStoreTool - for examining Persistent Qpid Broker MessageStore instances"; + + /** I/O Wrapper. */ + protected Console _console; + + /** Batch mode flag. */ + protected boolean _batchMode; + + /** Internal State object. */ + private State _state = new State(); + + private HashMap<String, Command> _commands = new HashMap<String, Command>(); + + /** SLF4J Logger. */ + private static Logger _devlog = LoggerFactory.getLogger(MessageStoreTool.class); + + /** Loaded configuration file. */ + private Configuration _config; + + /** Control used for main run loop. */ + private boolean _running = true; + + //---------------------------------------------------------------------------------------------------/ + + public static void main(String[] args) throws Configuration.InitException + { + + MessageStoreTool tool = new MessageStoreTool(args); + + tool.start(); + } + + + public MessageStoreTool(String[] args) throws Configuration.InitException + { + this(args, System.in, System.out); + } + + public MessageStoreTool(String[] args, InputStream in, OutputStream out) throws Configuration.InitException + { + BufferedReader consoleReader = new BufferedReader(new InputStreamReader(in)); + BufferedWriter consoleWriter = new BufferedWriter(new OutputStreamWriter(out)); + + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this))); + _batchMode = false; + + _console = new SimpleConsole(consoleWriter, consoleReader); + + _config = new Configuration(); + + setOptions(); + _config.processCommandline(args); + } + + + private void setOptions() + { + Option help = new Option("h", "help", false, "print this message"); + Option version = new Option("v", "version", false, "print the version information and exit"); + Option configFile = + OptionBuilder.withArgName("file").hasArg() + .withDescription("use given configuration file By " + + "default looks for a file named " + + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME) + .withLongOpt("config") + .create("c"); + + _config.setOption(help); + _config.setOption(version); + _config.setOption(configFile); + } + + public State getState() + { + return _state; + } + + public Map<String, Command> getCommands() + { + return _commands; + } + + public void setConfigurationFile(String configfile) throws Configuration.InitException + { + _config.loadConfig(new File(configfile)); + setup(); + } + + public Console getConsole() + { + return _console; + } + + public void setConsole(Console console) + { + _console = console; + } + + /** + * Simple ShutdownHook to cleanly shutdown the databases + */ + class ShutdownHook implements Runnable + { + MessageStoreTool _tool; + + ShutdownHook(MessageStoreTool messageStoreTool) + { + _tool = messageStoreTool; + } + + public void run() + { + _tool.quit(); + } + } + + public void quit() + { + ApplicationRegistry.remove(1); + + _console.println("...exiting"); + + _console.close(); + + _running = false; + } + + public void setBatchMode(boolean batchmode) + { + _batchMode = batchmode; + } + + /** + * Main loop + */ + protected void start() + { + setup(); + + runCLI(); + } + + private void setup() + { + loadDefaultVirtualHosts(); + + loadCommands(); + + _console.println(BOILER_PLATE); + + _state.clearAll(); + } + + private void loadCommands() + { + _commands.clear(); + //todo Dynamically load the classes that exis in com.redhat.etp.qpid.commands + _commands.put("close", new Clear(this)); + _commands.put("dump", new Dump(this)); + _commands.put("help", new Help(this)); + _commands.put("list", new List(this)); + _commands.put("load", new Load(this)); + _commands.put("quit", new Quit(this)); + _commands.put("select", new Select(this)); + _commands.put("show", new Show(this)); + } + + private void loadDefaultVirtualHosts() + { + final File configFile = _config.getConfigFile(); + + loadVirtualHosts(configFile); + } + + private void loadVirtualHosts(File configFile) + { + + if (!configFile.exists()) + { + _devlog.error("Config file not found:" + configFile.getAbsolutePath()); + return; + } + else + { + _devlog.debug("using config file :" + configFile.getAbsolutePath()); + } + + try + { + ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile); + + ApplicationRegistry.remove(1); + ApplicationRegistry.initialise(registry); + + checkMessageStores(); + } + catch (ConfigurationException e) + { + _console.println("Unable to load configuration due to configuration error: " + e.getMessage()); + e.printStackTrace(); + } + catch (Exception e) + { + _console.println("Unable to load configuration due to: " + e.getMessage()); + e.printStackTrace(); + } + + + } + + private void checkMessageStores() + { + Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts(); + + boolean warning = false; + for (VirtualHost vhost : vhosts) + { + if (vhost.getMessageStore() instanceof MemoryMessageStore) + { + _console.println("WARNING: Virtualhost '" + vhost.getName() + "' is using a MemoryMessageStore. " + + "Changes will not persist."); + warning = true; + } + } + + if (warning) + { + _console.println(""); + _console.println("Please ensure you are using the correct config file currently using '" + + _config.getConfigFile().getAbsolutePath() + "'"); + _console.println("New config file can be specifed by 'load <config file>' or -c on the commandline."); + _console.println(""); + } + } + + private void runCLI() + { + while (_running) + { + if (!_batchMode) + { + printPrompt(); + } + + String[] args = _console.readCommand(); + + while (args != null) + { + exec(args); + + if (!_batchMode) + { + printPrompt(); + } + + args = _console.readCommand(); + } + } + } + + private void printPrompt() + { + _console.print(prompt()); + } + + + /** + * Execute a script (batch mode). + * + * @param script The file script + */ + protected void runScripts(String script) + { + //Store Current State + boolean oldBatch = _batchMode; + CommandParser oldParser = _console.getCommandParser(); + setBatchMode(true); + + try + { + _devlog.debug("Running script '" + script + "'"); + + _console.setCommandParser(new SimpleCommandParser(new BufferedReader(new FileReader(script)))); + + start(); + } + catch (java.io.FileNotFoundException e) + { + _devlog.error("Script not found: '" + script + "' due to:" + e.getMessage()); + } + + //Restore previous state + _console.setCommandParser(oldParser); + setBatchMode(oldBatch); + } + + public String prompt() + { + String state = _state.toString(); + if (state != null && state.length() != 0) + { + return state + ":bdb$ "; + } + else + { + return "bdb$ "; + } + } + + /** + * Execute the command. + * + * @param args [command, arg0, arg1...]. + */ + protected void exec(String[] args) + { + // Comment lines start with a # + if (args.length == 0 || args[0].startsWith("#")) + { + return; + } + + final String command = args[0]; + + Command cmd = _commands.get(command); + + if (cmd == null) + { + _console.println("Command not understood: " + command); + } + else + { + cmd.execute(args); + } + } + + + /** + * Displays usage info. + */ + protected static void help() + { + System.out.println(BOILER_PLATE); + System.out.println("Usage: java " + MessageStoreTool.class + " [Options]"); + System.out.println(" [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\""); + } + + + /** + * This class is used to store the current state of the tool. + * + * This is then interrogated by the various commands to augment their behaviour. + * + * + */ + public class State + { + private VirtualHost _vhost = null; + private AMQQueue _queue = null; + private Exchange _exchange = null; + private java.util.List<Long> _msgids = null; + + public State() + { + } + + public void setQueue(AMQQueue queue) + { + _queue = queue; + } + + public AMQQueue getQueue() + { + return _queue; + } + + public void setVhost(VirtualHost vhost) + { + _vhost = vhost; + } + + public VirtualHost getVhost() + { + return _vhost; + } + + public Exchange getExchange() + { + return _exchange; + } + + public void setExchange(Exchange exchange) + { + _exchange = exchange; + } + + public String toString() + { + StringBuilder status = new StringBuilder(); + + if (_vhost != null) + { + status.append(_vhost.getName()); + + if (_exchange != null) + { + status.append("["); + status.append(_exchange.getName()); + status.append("]"); + + if (_queue != null) + { + status.append("->'"); + status.append(_queue.getName()); + status.append("'"); + + if (_msgids != null) + { + status.append(printMessages()); + } + } + } + } + + return status.toString(); + } + + + public String printMessages() + { + StringBuilder sb = new StringBuilder(); + + Long previous = null; + + Long start = null; + for (Long id : _msgids) + { + if (previous != null) + { + if (id == previous + 1) + { + if (start == null) + { + start = previous; + } + } + else + { + if (start != null) + { + sb.append(","); + sb.append(start); + sb.append("-"); + sb.append(id); + start = null; + } + else + { + sb.append(","); + sb.append(previous); + } + } + } + + previous = id; + } + + if (start != null) + { + sb.append(","); + sb.append(start); + sb.append("-"); + sb.append(_msgids.get(_msgids.size() - 1)); + } + else + { + sb.append(","); + sb.append(previous); + } + + // surround list in () + sb.replace(0, 1, "("); + sb.append(")"); + return sb.toString(); + } + + public void clearAll() + { + _vhost = null; + clearExchange(); + } + + public void clearExchange() + { + _exchange = null; + clearQueue(); + } + + public void clearQueue() + { + _queue = null; + clearMessages(); + } + + public void clearMessages() + { + _msgids = null; + } + + public void setMessages(java.util.List<Long> msgids) + { + _msgids = msgids; + } + + public java.util.List<Long> getMessages() + { + return _msgids; + } + }//Class State + +}//Class MessageStoreTool diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java new file mode 100644 index 0000000000..5444197cb4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/AbstractCommand.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +public abstract class AbstractCommand implements Command +{ + protected Console _console; + protected MessageStoreTool _tool; + + public AbstractCommand(MessageStoreTool tool) + { + _console = tool.getConsole(); + _tool = tool; + } + + public void setOutput(Console out) + { + _console = out; + } + + protected void commandError(String message, String[] args) + { + _console.print(getCommand() + " : " + message); + + if (args != null) + { + for (int i = 1; i < args.length; i++) + { + _console.print(args[i]); + } + } + _console.println(""); + _console.println(help()); + } + + + public abstract String help(); + + public abstract String usage(); + + public abstract String getCommand(); + + + public abstract void execute(String... args); +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java new file mode 100644 index 0000000000..b0006b3fe6 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Clear.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Clear extends AbstractCommand +{ + public Clear(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Clears any selection."; + } + + public String usage() + { + return "clear [ all | virtualhost | exchange | queue | msgs ]"; + } + + public String getCommand() + { + return "clear"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length < 1) + { + doClose("all"); + } + else + { + doClose(args[1]); + } + } + + private void doClose(String type) + { + if (type.equals("virtualhost") + || type.equals("all")) + { + _tool.getState().clearAll(); + } + + if (type.equals("exchange")) + { + _tool.getState().clearExchange(); + } + + if (type.equals("queue")) + { + _tool.getState().clearQueue(); + } + + if (type.equals("msgs")) + { + _tool.getState().clearMessages(); + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java new file mode 100644 index 0000000000..bfa775a34a --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Command.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.utils.Console; + +public interface Command +{ + public void setOutput(Console out); + + public String help(); + + public abstract String usage(); + + String getCommand(); + + public void execute(String... args); +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java new file mode 100644 index 0000000000..0dd6924d56 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.commons.codec.binary.Hex; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.io.UnsupportedEncodingException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +public class Dump extends Show +{ + private static final int LINE_SIZE = 8; + private static final String DEFAULT_ENCODING = "utf-8"; + private static final boolean SPACE_BYTES = true; + private static final String BYTE_SPACER = " "; + private static final String NON_PRINTING_ASCII_CHAR = "?"; + + protected boolean _content = true; + + public Dump(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Dump selected message content. Default: show=content"; + } + + public String usage() + { + return getCommand() + " [show=[all],[msgheaders],[_amqHeaders],[routing],[content] id=<msgid e.g. 1,2,4-10>"; + } + + public String getCommand() + { + return "dump"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + + if (args.length >= 2) + { + for (String arg : args) + { + if (arg.startsWith("show=")) + { + _content = arg.contains("content") || arg.contains("all"); + } + } + + parseArgs(args); + } + + performShow(); + } + + + protected List<List> createMessageData(java.util.List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting, + boolean showMessageHeaders) + { + + List<List> display = new LinkedList<List>(); + + List<String> hex = new LinkedList<String>(); + List<String> ascii = new LinkedList<String>(); + display.add(hex); + display.add(ascii); + + for (AMQMessage msg : messages) + { + if (!includeMsg(msg, msgids)) + { + continue; + } + + //Add divider between messages + hex.add(Console.ROW_DIVIDER); + ascii.add(Console.ROW_DIVIDER); + + // Show general message information + hex.add(Show.Columns.ID.name()); + ascii.add(msg.getMessageId().toString()); + + hex.add(Console.ROW_DIVIDER); + ascii.add(Console.ROW_DIVIDER); + + if (showRouting) + { + addShowInformation(hex, ascii, msg, "Routing Details", true, false, false); + } + if (showHeaders) + { + addShowInformation(hex, ascii, msg, "Headers", false, true, false); + } + if (showMessageHeaders) + { + addShowInformation(hex, ascii, msg, null, false, false, true); + } + + // Add Content Body seciont + hex.add("Content Body"); + ascii.add(""); + hex.add(Console.ROW_DIVIDER); + ascii.add(Console.ROW_DIVIDER); + + Iterator bodies = msg.getContentBodyIterator(); + if (bodies.hasNext()) + { + + hex.add("Hex"); + hex.add(Console.ROW_DIVIDER); + + + ascii.add("ASCII"); + ascii.add(Console.ROW_DIVIDER); + + while (bodies.hasNext()) + { + ContentChunk chunk = (ContentChunk) bodies.next(); + + //Duplicate so we don't destroy original data :) + ByteBuffer hexBuffer = chunk.getData().duplicate(); + + ByteBuffer charBuffer = hexBuffer.duplicate(); + + Hex hexencoder = new Hex(); + + while (hexBuffer.hasRemaining()) + { + byte[] line = new byte[LINE_SIZE]; + + int bufsize = hexBuffer.remaining(); + if (bufsize < LINE_SIZE) + { + hexBuffer.get(line, 0, bufsize); + } + else + { + bufsize = line.length; + hexBuffer.get(line); + } + + byte[] encoded = hexencoder.encode(line); + + try + { + String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING); + String hexLine = ""; + + int strKength = encStr.length(); + for (int c = 0; c < strKength; c++) + { + hexLine += encStr.charAt(c); + + if (c % 2 == 1 && SPACE_BYTES) + { + hexLine += BYTE_SPACER; + } + } + + hex.add(hexLine); + } + catch (UnsupportedEncodingException e) + { + _console.println(e.getMessage()); + return null; + } + } + + while (charBuffer.hasRemaining()) + { + String asciiLine = ""; + + for (int pos = 0; pos < LINE_SIZE; pos++) + { + if (charBuffer.hasRemaining()) + { + byte ch = charBuffer.get(); + + if (isPrintable(ch)) + { + asciiLine += (char) ch; + } + else + { + asciiLine += NON_PRINTING_ASCII_CHAR; + } + + if (SPACE_BYTES) + { + asciiLine += BYTE_SPACER; + } + } + else + { + break; + } + } + + ascii.add(asciiLine); + } + } + } + else + { + List<String> result = new LinkedList<String>(); + + display.add(result); + result.add("No ContentBodies"); + } + } + + // if hex is empty then we have no data to display + if (hex.size() == 0) + { + return null; + } + + return display; + } + + private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg, + String title, boolean routing, boolean headers, boolean messageHeaders) + { + List<AMQMessage> single = new LinkedList<AMQMessage>(); + single.add(msg); + + List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders); + + //Reformat data + if (title != null) + { + column1.add(title); + column2.add(""); + column1.add(Console.ROW_DIVIDER); + column2.add(Console.ROW_DIVIDER); + } + + // look at all columns in the routing Data + for (List item : routingData) + { + // the item should be: + // Title + // *divider + // value + // otherwise we can't reason about the correct value + if (item.size() == 3) + { + //Filter out the columns we are not interested in. + + String columnName = item.get(0).toString(); + + if (!(columnName.equals(Show.Columns.ID.name()) + || columnName.equals(Show.Columns.Size.name()))) + { + column1.add(columnName); + column2.add(item.get(2).toString()); + } + } + } + column1.add(Console.ROW_DIVIDER); + column2.add(Console.ROW_DIVIDER); + } + + private boolean isPrintable(byte c) + { + return c > 31 && c < 127; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java new file mode 100644 index 0000000000..76967ff06f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Help.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.util.LinkedList; +import java.util.Map; + +public class Help extends AbstractCommand +{ + public Help(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Provides detailed help on commands. "; + } + + public String getCommand() + { + return "help"; + } + + public String usage() + { + return "help [<command>]"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length > 1) + { + Command command = _tool.getCommands().get(args[1]); + if (command != null) + { + _console.println(command.help()); + } + else + { + commandError("Command not found: ", args); + } + } + else + { + java.util.List<java.util.List> data = new LinkedList<java.util.List>(); + + java.util.List<String> commandName = new LinkedList<String>(); + java.util.List<String> commandDescription = new LinkedList<String>(); + + data.add(commandName); + data.add(commandDescription); + + //Set up Headers + commandName.add("Command"); + commandDescription.add("Description"); + + commandName.add(Console.ROW_DIVIDER); + commandDescription.add(Console.ROW_DIVIDER); + + //Add current Commands with descriptions + Map<String, Command> commands = _tool.getCommands(); + + for (Command command : commands.values()) + { + commandName.add(command.getCommand()); + commandDescription.add(command.help()); + } + + _console.printMap("Available Commands", data); + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java new file mode 100644 index 0000000000..37d89c079f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.util.Collection; +import java.util.LinkedList; + +public class List extends AbstractCommand +{ + + public List(MessageStoreTool tool) + { + super(tool); + } + + public void setOutput(Console out) + { + _console = out; + } + + public String help() + { + return "list availble items."; + } + + public String usage() + { + return "list queues [<exchange>] | exchanges| bindings [<exchange>] | all."; + } + + public String getCommand() + { + return "list"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length > 1) + { + if ((args[1].equals("exchanges")) + || (args[1].equals("queues")) + || (args[1].equals("bindings")) + || (args[1].equals("all"))) + { + if (args.length == 2) + { + doList(args[1]); + } + else if (args.length == 3) + { + doList(args[1], args[2]); + } + } + else + { + commandError("Unknown options. ", args); + } + } + else if (args.length < 2) + { + doList("all"); + } + else + { + doList(args[1]); + } + } + + private void doList(String... listItem) + { + if (_tool.getState().getVhost() == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + listVirtualHosts(); + return; + } + + VirtualHost vhost = _tool.getState().getVhost(); + + java.util.List<String> data = null; + + if (listItem[0].equals("queues")) + { + if (listItem.length > 1) + { + data = listQueues(vhost, new AMQShortString(listItem[1])); + } + else + { + Exchange exchange = _tool.getState().getExchange(); + data = listQueues(vhost, exchange); + } + } + + if (listItem[0].equals("exchanges")) + { + data = listExchanges(vhost); + } + + if (listItem[0].equals("bindings")) + { + + if (listItem.length > 1) + { + data = listBindings(vhost, new AMQShortString(listItem[1])); + } + else + { + Exchange exchange = _tool.getState().getExchange(); + + data = listBindings(vhost, exchange); + } + } + + if (data != null) + { + if (data.size() == 1) + { + _console.println("No '" + listItem[0] + "' to display,"); + } + else + { + _console.displayList(true, data.toArray(new String[0])); + } + } + + + if (listItem[0].equals("all")) + { + + boolean displayed = false; + Exchange exchange = _tool.getState().getExchange(); + + //Do the display here for each one so that they are pretty printed + data = listQueues(vhost, exchange); + if (data != null) + { + displayed = true; + _console.displayList(true, data.toArray(new String[0])); + } + + if (exchange == null) + { + data = listExchanges(vhost); + if (data != null) + { + displayed = true; + _console.displayList(true, data.toArray(new String[0])); + } + } + + data = listBindings(vhost, exchange); + if (data != null) + { + displayed = true; + _console.displayList(true, data.toArray(new String[0])); + } + + if (!displayed) + { + _console.println("Nothing to list"); + } + } + } + + private void listVirtualHosts() + { + Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance() + .getVirtualHostRegistry().getVirtualHosts(); + + String[] data = new String[vhosts.size() + 1]; + + data[0] = "Available VirtualHosts"; + + int index = 1; + for (VirtualHost vhost : vhosts) + { + data[index] = vhost.getName(); + index++; + } + + _console.displayList(true, data); + } + + private java.util.List<String> listBindings(VirtualHost vhost, AMQShortString exchangeName) + { + return listBindings(vhost, vhost.getExchangeRegistry().getExchange(exchangeName)); + } + + private java.util.List<String> listBindings(VirtualHost vhost, Exchange exchange) + { + Collection<AMQShortString> queues = vhost.getQueueRegistry().getQueueNames(); + + if (queues == null || queues.size() == 0) + { + return null; + } + + java.util.List<String> data = new LinkedList<String>(); + + data.add("Current Bindings"); + + for (AMQShortString queue : queues) + { + if (exchange != null) + { + try + { + if (exchange.isBound(queue)) + { + data.add(queue.toString()); + } + } + catch (AMQException e) + { + // is never thrown by current impls forced to throw by interface. + commandError("Unable to check exchange bindings: " + e.getMessage(), null); + } + } + else + { + data.add(queue.toString()); + } + } + + return data; + } + + private java.util.List<String> listExchanges(VirtualHost vhost) + { + Collection<AMQShortString> queues = vhost.getExchangeRegistry().getExchangeNames(); + + if (queues == null || queues.size() == 0) + { + return null; + } + + java.util.List<String> data = new LinkedList<String>(); + + data.add("Available Exchanges"); + + for (AMQShortString queue : queues) + { + data.add(queue.toString()); + } + + return data; + } + + private java.util.List<String> listQueues(VirtualHost vhost, AMQShortString exchangeName) + { + return listQueues(vhost, vhost.getExchangeRegistry().getExchange(exchangeName)); + } + + private java.util.List<String> listQueues(VirtualHost vhost, Exchange exchange) + { + Collection<AMQQueue> queues = vhost.getQueueRegistry().getQueues(); + + if (queues == null || queues.size() == 0) + { + return null; + } + + java.util.List<String> data = new LinkedList<String>(); + + data.add("Available Queues"); + + for (AMQQueue queue : queues) + { + if (exchange != null) + { + try + { + if (exchange.isBound(queue)) + { + data.add(queue.getName().toString()); + } + } + catch (AMQException e) + { + // is never thrown by current impls forced to throw by interface. + commandError("Unable to check exchange bindings: " + e.getMessage(), null); + } + } + else + { + data.add(queue.getName().toString()); + } + } + + if (exchange != null) + { + if (queues.size() == 1) + { + return null; + } + } + + return data; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java new file mode 100644 index 0000000000..244a311c30 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Load.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.configuration.Configuration; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Load extends AbstractCommand +{ + public Load(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Loads specified broker configuration file."; + } + + public String usage() + { + return "load <configuration file>"; + } + + public String getCommand() + { + return "load"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length > 2) + { + _console.print("load " + args[1] + ": additional options not understood:"); + for (int i = 2; i < args.length; i++) + { + _console.print(args[i] + " "); + } + _console.println(""); + } + else if (args.length < 2) + { + _console.println("Enter Configuration file."); + String input = _console.readln(); + if (input != null) + { + doLoad(input); + } + else + { + _console.println("Did not recognise config file."); + } + } + else + { + doLoad(args[1]); + } + } + + private void doLoad(String configfile) + { + _console.println("Loading Configuration:" + configfile); + + try + { + _tool.setConfigurationFile(configfile); + } + catch (Configuration.InitException e) + { + _console.println("Unable to open config file due to: '" + e.getMessage() + "'"); + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java new file mode 100644 index 0000000000..a81bc07c38 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Quit.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Quit extends AbstractCommand +{ + public Quit(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Quit the tool."; + } + + public String usage() + { + return "quit"; + } + + public String getCommand() + { + return "quit"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals("quit"); + + _tool.quit(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java new file mode 100644 index 0000000000..b9156f6889 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +import java.util.LinkedList; +import java.util.StringTokenizer; + +public class Select extends AbstractCommand +{ + + public Select(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Perform a selection"; + } + + public String usage() + { + return "select virtualhost <name> |exchange <name> |queue <name> | msgs id=< msgids eg. 1,2,4-10 >"; + } + + public String getCommand() + { + return "select"; + } + + public void execute(String... args) + { + assert args.length > 2; + assert args[0].equals("select"); + + if (args.length < 3) + { + if (args[1].equals("show")) + { + doSelect(args[1], null); + } + else + { + _console.print("select : unknown command:"); + _console.println(help()); + } + } + else + { + if (args[1].equals("virtualhost") + || args[1].equals("vhost") + || args[1].equals("exchange") + || args[1].equals("queue") + || args[1].equals("msg") + ) + { + doSelect(args[1], args[2]); + } + else + { + _console.println(help()); + } + } + } + + private void doSelect(String type, String item) + { + if (type.equals("virtualhost")) + { + + VirtualHost vhost = ApplicationRegistry.getInstance() + .getVirtualHostRegistry().getVirtualHost(item); + + if (vhost == null) + { + _console.println("Virtualhost '" + item + "' not found."); + } + else + { + _tool.getState().setVhost(vhost); + } + } + + if (type.equals("exchange")) + { + + VirtualHost vhost = _tool.getState().getVhost(); + + if (vhost == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + return; + } + + + Exchange exchange = vhost.getExchangeRegistry().getExchange(new AMQShortString(item)); + + if (exchange == null) + { + _console.println("Exchange '" + item + "' not found."); + } + else + { + _tool.getState().setExchange(exchange); + } + + if (_tool.getState().getQueue() != null) + { + try + { + if (!exchange.isBound(_tool.getState().getQueue())) + { + _tool.getState().setQueue(null); + } + } + catch (AMQException e) + { + //ignore + } + } + } + + if (type.equals("queue")) + { + VirtualHost vhost = _tool.getState().getVhost(); + + if (vhost == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + return; + } + + AMQQueue queue = vhost.getQueueRegistry().getQueue(new AMQShortString(item)); + + if (queue == null) + { + _console.println("Queue '" + item + "' not found."); + } + else + { + _tool.getState().setQueue(queue); + + if (_tool.getState().getExchange() == null) + { + for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames()) + { + try + { + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + if (exchange.isBound(queue)) + { + _tool.getState().setExchange(exchange); + break; + } + } + catch (AMQException e) + { + //ignore error + } + } + } + + //remove the message selection + _tool.getState().setMessages(null); + } + } + + if (type.equals("msg")) + { + if (item.startsWith("id=")) + { + StringTokenizer tok = new StringTokenizer(item.substring(item.indexOf("=") + 1), ","); + + java.util.List<Long> msgids = null; + + if (tok.hasMoreTokens()) + { + msgids = new LinkedList<Long>(); + } + + while (tok.hasMoreTokens()) + { + String next = tok.nextToken(); + if (next.contains("-")) + { + Long start = Long.parseLong(next.substring(0, next.indexOf("-"))); + Long end = Long.parseLong(next.substring(next.indexOf("-") + 1)); + + if (end >= start) + { + for (long l = start; l <= end; l++) + { + msgids.add(l); + } + } + } + else + { + msgids.add(Long.parseLong(next)); + } + } + + _tool.getState().setMessages(msgids); + } + + } + + if (type.equals("show")) + { + _console.println(_tool.getState().toString()); + if (_tool.getState().getMessages() != null) + { + _console.print("Msgs:"); + for (Long l : _tool.getState().getMessages()) + { + _console.print(" " + l); + } + _console.println(""); + } + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java new file mode 100644 index 0000000000..7bff89f61f --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.tools.messagestore.MessageStoreTool; +import org.apache.qpid.tools.utils.Console; + +import java.util.LinkedList; +import java.util.List; +import java.util.StringTokenizer; + +public class Show extends AbstractCommand +{ + protected boolean _amqHeaders = false; + protected boolean _routing = false; + protected boolean _msgHeaders = false; + + public Show(MessageStoreTool tool) + { + super(tool); + } + + public String help() + { + return "Shows the messages headers."; + } + + public String usage() + { + return getCommand() + " [show=[all],[msgheaders],[amqheaders],[routing] id=<msgid e.g. 1,2,4-10>"; + } + + public String getCommand() + { + return "show"; + } + + public void execute(String... args) + { + assert args.length > 0; + assert args[0].equals(getCommand()); + + if (args.length < 2) + { + parseArgs("all"); + } + else + { + parseArgs(args); + } + + performShow(); + } + + protected void parseArgs(String... args) + { + List<Long> msgids = null; + + if (args.length >= 2) + { + for (String arg : args) + { + if (arg.startsWith("show=")) + { + _msgHeaders = arg.contains("msgheaders") || arg.contains("all"); + _amqHeaders = arg.contains("amqheaders") || arg.contains("all"); + _routing = arg.contains("routing") || arg.contains("all"); + } + + if (arg.startsWith("id=")) + { + StringTokenizer tok = new StringTokenizer(arg.substring(arg.indexOf("=") + 1), ","); + + if (tok.hasMoreTokens()) + { + msgids = new LinkedList<Long>(); + } + + while (tok.hasMoreTokens()) + { + String next = tok.nextToken(); + if (next.contains("-")) + { + Long start = Long.parseLong(next.substring(0, next.indexOf("-"))); + Long end = Long.parseLong(next.substring(next.indexOf("-") + 1)); + + if (end >= start) + { + for (long l = start; l <= end; l++) + { + msgids.add(l); + } + } + } + else + { + msgids.add(Long.parseLong(next)); + } + } + + _tool.getState().setMessages(msgids); + } + }//for args + }// if args > 2 + } + + protected void performShow() + { + if (_tool.getState().getVhost() == null) + { + _console.println("No Virtualhost open. Open a Virtualhost first."); + return; + } + + AMQQueue _queue = _tool.getState().getQueue(); + + List<Long> msgids = _tool.getState().getMessages(); + + if (_queue != null) + { + List<AMQMessage> messages = _queue.getMessagesOnTheQueue(); + if (messages == null || messages.size() == 0) + { + _console.println("No messages on queue"); + return; + } + + List<List> data = createMessageData(msgids, messages, _amqHeaders, _routing, _msgHeaders); + if (data != null) + { + _console.printMap(null, data); + } + else + { + String message = "No data to display."; + if (msgids != null) + { + message += " Is message selection correct? " + _tool.getState().printMessages(); + } + _console.println(message); + } + + } + else + { + _console.println("No Queue specified to show."); + } + } + + /** + * Create the list data for display from the messages. + * + * @param msgids The list of message ids to display + * @param messages A list of messages to format and display. + * @param showHeaders should the header info be shown + * @param showRouting show the routing info be shown + * @param showMessageHeaders show the msg headers be shown + * @return the formated data lists for printing + */ + protected List<List> createMessageData(List<Long> msgids, List<AMQMessage> messages, boolean showHeaders, boolean showRouting, + boolean showMessageHeaders) + { + + // Currenly exposed message properties +// //Printing the content Body +// msg.getContentBodyIterator(); +// //Print the Headers +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getAppIdAsString(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getClusterId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getContentType(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getCorrelationId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getDeliveryMode(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getTimestamp(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getType(); +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getUserId(); +// +// //Print out all the property names +// ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames(); +// +// msg.getMessageId(); +// msg.getSize(); +// msg.getArrivalTime(); + +// msg.getDeliveredSubscription(); +// msg.getDeliveredToConsumer(); +// msg.getMessageHandle(); +// msg.getMessageId(); +// msg.getMessagePublishInfo(); +// msg.getPublisher(); + +// msg.getStoreContext(); +// msg.isAllContentReceived(); +// msg.isPersistent(); +// msg.isRedelivered(); +// msg.isRejectedBy(); +// msg.isTaken(); + + //Header setup + + List<List> data = new LinkedList<List>(); + + List<String> id = new LinkedList<String>(); + data.add(id); + id.add(Columns.ID.name()); + id.add(Console.ROW_DIVIDER); + + List<String> exchange = new LinkedList<String>(); + List<String> routingkey = new LinkedList<String>(); + List<String> immediate = new LinkedList<String>(); + List<String> mandatory = new LinkedList<String>(); + if (showRouting) + { + data.add(exchange); + exchange.add(Columns.Exchange.name()); + exchange.add(Console.ROW_DIVIDER); + + data.add(routingkey); + routingkey.add(Columns.RoutingKey.name()); + routingkey.add(Console.ROW_DIVIDER); + + data.add(immediate); + immediate.add(Columns.isImmediate.name()); + immediate.add(Console.ROW_DIVIDER); + + data.add(mandatory); + mandatory.add(Columns.isMandatory.name()); + mandatory.add(Console.ROW_DIVIDER); + } + + List<String> size = new LinkedList<String>(); + List<String> appid = new LinkedList<String>(); + List<String> clusterid = new LinkedList<String>(); + List<String> contenttype = new LinkedList<String>(); + List<String> correlationid = new LinkedList<String>(); + List<String> deliverymode = new LinkedList<String>(); + List<String> encoding = new LinkedList<String>(); + List<String> arrival = new LinkedList<String>(); + List<String> expiration = new LinkedList<String>(); + List<String> priority = new LinkedList<String>(); + List<String> propertyflag = new LinkedList<String>(); + List<String> replyto = new LinkedList<String>(); + List<String> timestamp = new LinkedList<String>(); + List<String> type = new LinkedList<String>(); + List<String> userid = new LinkedList<String>(); + List<String> ispersitent = new LinkedList<String>(); + List<String> isredelivered = new LinkedList<String>(); + List<String> isdelivered = new LinkedList<String>(); + + data.add(size); + size.add(Columns.Size.name()); + size.add(Console.ROW_DIVIDER); + + if (showHeaders) + { + data.add(ispersitent); + ispersitent.add(Columns.isPersistent.name()); + ispersitent.add(Console.ROW_DIVIDER); + + data.add(isredelivered); + isredelivered.add(Columns.isRedelivered.name()); + isredelivered.add(Console.ROW_DIVIDER); + + data.add(isdelivered); + isdelivered.add(Columns.isDelivered.name()); + isdelivered.add(Console.ROW_DIVIDER); + + data.add(appid); + appid.add(Columns.App_ID.name()); + appid.add(Console.ROW_DIVIDER); + + data.add(clusterid); + clusterid.add(Columns.Cluster_ID.name()); + clusterid.add(Console.ROW_DIVIDER); + + data.add(contenttype); + contenttype.add(Columns.Content_Type.name()); + contenttype.add(Console.ROW_DIVIDER); + + data.add(correlationid); + correlationid.add(Columns.Correlation_ID.name()); + correlationid.add(Console.ROW_DIVIDER); + + data.add(deliverymode); + deliverymode.add(Columns.Delivery_Mode.name()); + deliverymode.add(Console.ROW_DIVIDER); + + data.add(encoding); + encoding.add(Columns.Encoding.name()); + encoding.add(Console.ROW_DIVIDER); + + data.add(arrival); + expiration.add(Columns.Arrival.name()); + expiration.add(Console.ROW_DIVIDER); + + data.add(expiration); + expiration.add(Columns.Expiration.name()); + expiration.add(Console.ROW_DIVIDER); + + data.add(priority); + priority.add(Columns.Priority.name()); + priority.add(Console.ROW_DIVIDER); + + data.add(propertyflag); + propertyflag.add(Columns.Property_Flag.name()); + propertyflag.add(Console.ROW_DIVIDER); + + data.add(replyto); + replyto.add(Columns.ReplyTo.name()); + replyto.add(Console.ROW_DIVIDER); + + data.add(timestamp); + timestamp.add(Columns.Timestamp.name()); + timestamp.add(Console.ROW_DIVIDER); + + data.add(type); + type.add(Columns.Type.name()); + type.add(Console.ROW_DIVIDER); + + data.add(userid); + userid.add(Columns.UserID.name()); + userid.add(Console.ROW_DIVIDER); + } + + List<String> msgHeaders = new LinkedList<String>(); + if (showMessageHeaders) + { + data.add(msgHeaders); + msgHeaders.add(Columns.MsgHeaders.name()); + msgHeaders.add(Console.ROW_DIVIDER); + } + + //Add create the table of data + for (AMQMessage msg : messages) + { + if (!includeMsg(msg, msgids)) + { + continue; + } + + id.add(msg.getMessageId().toString()); + + size.add("" + msg.getSize()); + + arrival.add("" + msg.getArrivalTime()); + + try + { + ispersitent.add(msg.isPersistent() ? "true" : "false"); + } + catch (AMQException e) + { + ispersitent.add("n/a"); + } + + isredelivered.add(msg.isRedelivered() ? "true" : "false"); + + isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); + +// msg.getMessageHandle(); + + BasicContentHeaderProperties headers = null; + + try + { + headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); + } + catch (AMQException e) + { + //ignore +// commandError("Unable to read properties for message: " + e.getMessage(), null); + } + + if (headers != null) + { + String appidS = headers.getAppIdAsString(); + appid.add(appidS == null ? "null" : appidS); + + String clusterS = headers.getClusterIdAsString(); + clusterid.add(clusterS == null ? "null" : clusterS); + + String contentS = headers.getContentTypeAsString(); + contenttype.add(contentS == null ? "null" : contentS); + + String correlationS = headers.getCorrelationIdAsString(); + correlationid.add(correlationS == null ? "null" : correlationS); + + deliverymode.add("" + headers.getDeliveryMode()); + + AMQShortString encodeSS = headers.getEncoding(); + encoding.add(encodeSS == null ? "null" : encodeSS.toString()); + + expiration.add("" + headers.getExpiration()); + + FieldTable headerFT = headers.getHeaders(); + msgHeaders.add(headerFT == null ? "none" : "" + headerFT.toString()); + + priority.add("" + headers.getPriority()); + propertyflag.add("" + headers.getPropertyFlags()); + + AMQShortString replytoSS = headers.getReplyTo(); + replyto.add(replytoSS == null ? "null" : replytoSS.toString()); + + timestamp.add("" + headers.getTimestamp()); + + AMQShortString typeSS = headers.getType(); + type.add(typeSS == null ? "null" : typeSS.toString()); + + AMQShortString useridSS = headers.getUserId(); + userid.add(useridSS == null ? "null" : useridSS.toString()); + + MessagePublishInfo info = null; + try + { + info = msg.getMessagePublishInfo(); + } + catch (AMQException e) + { + //ignore + } + + if (info != null) + { + AMQShortString exchangeSS = info.getExchange(); + exchange.add(exchangeSS == null ? "null" : exchangeSS.toString()); + + AMQShortString routingkeySS = info.getRoutingKey(); + routingkey.add(routingkeySS == null ? "null" : routingkeySS.toString()); + + immediate.add(info.isImmediate() ? "true" : "false"); + mandatory.add(info.isMandatory() ? "true" : "false"); + } + +// msg.getPublisher(); -- only used in clustering +// msg.getStoreContext(); +// msg.isAllContentReceived(); + + }// if headers!=null + +// need to access internal map and do lookups. +// msg.isTaken(); +// msg.getDeliveredSubscription(); +// msg.isRejectedBy(); + + } + + // if id only had the header and the divider in it then we have no data to display + if (id.size() == 2) + { + return null; + } + return data; + } + + protected boolean includeMsg(AMQMessage msg, List<Long> msgids) + { + if (msgids == null) + { + return true; + } + + Long msgid = msg.getMessageId(); + + boolean found = false; + + if (msgids != null) + { + //check msgid is in msgids + for (Long l : msgids) + { + if (l.equals(msgid)) + { + found = true; + break; + } + } + } + return found; + } + + public enum Columns + { + ID, + Size, + Exchange, + RoutingKey, + isImmediate, + isMandatory, + isPersistent, + isRedelivered, + isDelivered, + App_ID, + Cluster_ID, + Content_Type, + Correlation_ID, + Delivery_Mode, + Encoding, + Arrival, + Expiration, + Priority, + Property_Flag, + ReplyTo, + Timestamp, + Type, + UserID, + MsgHeaders + } +} + + diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java b/java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java index f9e093dba7..c27c52eb8e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/Passwd.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/security/Passwd.java @@ -18,7 +18,7 @@ * * */ -package org.apache.qpid.server.security; +package org.apache.qpid.tools.security; import org.apache.commons.codec.binary.Base64; diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java new file mode 100644 index 0000000000..986fea32cc --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/CommandParser.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.utils; + +public interface CommandParser +{ + /** + * If there is more than one command received on the last parse request. + * + * Subsequent calls to parse will utilise this input rather than reading new data from the input source + * @return boolean + */ + boolean more(); + + /** + * True if the currently parsed command has been requested as a background operation + * + * @return boolean + */ + boolean isBackground(); + + /** + * Parses user commands, and groups tokens in the + * String[] format that all Java main's love. + * + * If more than one command is provided in one input line then the more() method will return true. + * A subsequent call to parse() will continue to parse that input line before reading new input. + * + * @return <code>input</code> split in args[] format; null if eof. + * @throws java.io.IOException if there is a problem reading from the input stream + */ + String[] parse() throws java.io.IOException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java new file mode 100644 index 0000000000..cf457d1ea5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/Console.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.utils; + +import java.util.List; + +public interface Console +{ + public enum CellFormat + { + CENTRED, LEFT, RIGHT + } + + public static String ROW_DIVIDER = "*divider"; + + public void print(String... message); + + public void println(String... message); + + public String readln(); + + /** + * Reads and parses the command line. + * + * + * @return The next command or null + */ + public String[] readCommand(); + + public CommandParser getCommandParser(); + + public void setCommandParser(CommandParser parser); + + /** + * + * Prints the list of String nicely. + * + * +-------------+ + * | Heading | + * +-------------+ + * | Item 1 | + * | Item 2 | + * | Item 3 | + * +-------------+ + * + * @param hasTitle should list[0] be used as a heading + * @param list The list of Strings to display + */ + public void displayList(boolean hasTitle, String... list); + + /** + * + * Prints the list of String nicely. + * + * +----------------------------+ + * | Heading | + * +----------------------------+ + * | title | title | .. + * +----------------------------+ + * | Item 2 | value 2 | .. + * +----------------------------+ (*divider) + * | Item 3 | value 2 | .. + * +----------------------------+ + * + * @param title The title to display if any + * @param entries the entries to display in a map. + */ + void printMap(String title, List<List> entries); + + + public void close(); +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java new file mode 100644 index 0000000000..ba272ef19d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleCommandParser.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.StringTokenizer; + +public class SimpleCommandParser implements CommandParser +{ + private static final String COMMAND_SEPERATOR = ";"; + + /** Input source of commands */ + protected BufferedReader _reader; + + /** The next list of commands from the command line */ + private StringBuilder _nextCommand = null; + + public SimpleCommandParser(BufferedReader reader) + { + _reader = reader; + } + + public boolean more() + { + return _nextCommand != null; + } + + public boolean isBackground() + { + return false; + } + + public String[] parse() throws IOException + { + String[] commands = null; + + String input = null; + + if (_nextCommand == null) + { + input = _reader.readLine(); + } + else + { + input = _nextCommand.toString(); + _nextCommand = null; + } + + StringTokenizer tok = new StringTokenizer(input, " "); + + int tokenCount = tok.countTokens(); + int index = 0; + + if (tokenCount > 0) + { + commands = new String[tokenCount]; + boolean commandComplete = false; + + while (tok.hasMoreTokens()) + { + String next = tok.nextToken(); + + if (next.equals(COMMAND_SEPERATOR)) + { + commandComplete = true; + _nextCommand = new StringBuilder(); + continue; + } + + if (commandComplete) + { + _nextCommand.append(next); + _nextCommand.append(" "); + } + else + { + commands[index] = next; + index++; + } + } + + } + + //Reduce the String[] if not all the tokens were used in this command. + // i.e. there is more than one command on the line. + if (index != tokenCount) + { + String[] shortCommands = new String[index]; + System.arraycopy(commands, 0, shortCommands, 0, index); + return shortCommands; + } + else + { + return commands; + } + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java new file mode 100644 index 0000000000..ec080a4611 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/SimpleConsole.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.tools.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class SimpleConsole implements Console +{ + /** SLF4J Logger. */ + private static Logger _devlog = LoggerFactory.getLogger(SimpleConsole.class); + + /** Console Writer. */ + protected static BufferedWriter _consoleWriter; + + /** Console Reader. */ + protected static BufferedReader _consoleReader; + + /** Parser for command-line input. */ + protected CommandParser _parser; + + public SimpleConsole(BufferedWriter writer, BufferedReader reader) + { + _consoleWriter = writer; + _consoleReader = reader; + _parser = new SimpleCommandParser(_consoleReader); + } + + public void print(String... message) + { + try + { + for (String s : message) + { + _consoleWriter.write(s); + } + _consoleWriter.flush(); + } + catch (IOException e) + { + _devlog.error(e.getMessage() + ": Occured whilst trying to write:" + message); + } + + } + + public void println(String... message) + { + print(message); + print(System.getProperty("line.separator")); + } + + + public String readln() + { + try + { + return _consoleReader.readLine(); + } + catch (IOException e) + { + _devlog.debug("Unable to read input due to:" + e.getMessage()); + return null; + } + } + + public String[] readCommand() + { + try + { + return _parser.parse(); + } + catch (IOException e) + { + _devlog.error("Error reading command:" + e.getMessage()); + return new String[0]; + } + } + + public CommandParser getCommandParser() + { + return _parser; + } + + public void setCommandParser(CommandParser parser) + { + _parser = parser; + } + + public void displayList(boolean hasTitle, String... list) + { + java.util.List<java.util.List> data = new LinkedList<List>(); + + java.util.List<String> values = new LinkedList<String>(); + + data.add(values); + + for (String value : list) + { + values.add(value); + } + + if (hasTitle) + { + values.add(1, "*divider"); + } + + printMap(null, data); + } + + /** + * + * Prints the list of String nicely. + * + * +----------------------------+ + * | Heading | + * +----------------------------+ + * | title | title | .. + * +----------------------------+ + * | Item 2 | value 2 | .. + * | Item 3 | value 2 | .. + * +----------------------------+ + * + * @param title The title to display if any + * @param entries the entries to display in a map. + */ + public void printMap(String title, java.util.List<java.util.List> entries) + { + try + { + int columns = entries.size(); + + int[] columnWidth = new int[columns]; + + // calculate row count + int rowMax = 0; + + //the longest item + int itemMax = 0; + + for (int i = 0; i < columns; i++) + { + int columnIRowMax = entries.get(i).size(); + + if (columnIRowMax > rowMax) + { + rowMax = columnIRowMax; + } + for (Object values : entries.get(i)) + { + if (values.toString().equals(Console.ROW_DIVIDER)) + { + continue; + } + + int itemLength = values.toString().length(); + + //note for single width + if (itemLength > itemMax) + { + itemMax = itemLength; + } + + //note for mulit width + if (itemLength > columnWidth[i]) + { + columnWidth[i] = itemLength; + } + + } + } + + int tableWidth = 0; + + + for (int i = 0; i < columns; i++) + { + // plus 2 for the space padding + columnWidth[i] += 2; + } + for (int size : columnWidth) + { + tableWidth += size; + } + tableWidth += (columns - 1); + + if (title != null) + { + if (title.length() > tableWidth) + { + tableWidth = title.length(); + } + + printCellRow("+", "-", tableWidth); + + printCell(CellFormat.CENTRED, "|", tableWidth, " " + title + " ", 0); + _consoleWriter.newLine(); + + } + + //put top line | or bottom of title + printCellRow("+", "-", tableWidth); + + //print the table data + int row = 0; + + for (; row < rowMax; row++) + { + for (int i = 0; i < columns; i++) + { + java.util.List columnData = entries.get(i); + + String value; + // does this column have a value for this row + if (columnData.size() > row) + { + value = " " + columnData.get(row).toString() + " "; + } + else + { + value = " "; + } + + if (i == 0 && value.equals(" " + Console.ROW_DIVIDER + " ")) + { + printCellRow("+", "-", tableWidth); + //move on to the next row + break; + } + else + { + printCell(CellFormat.LEFT, "|", columnWidth[i], value, i); + } + + // if it is the last row then do a new line. + if (i == columns - 1) + { + _consoleWriter.newLine(); + } + } + } + + printCellRow("+", "-", tableWidth); + + } + catch (IOException e) + { + _devlog.error(e.getMessage() + ": Occured whilst trying to write."); + } + } + + public void close() + { + + try + { + _consoleReader.close(); + } + catch (IOException e) + { + _devlog.error(e.getMessage() + ": Occured whilst trying to close reader."); + } + + try + { + + _consoleWriter.close(); + } + catch (IOException e) + { + _devlog.error(e.getMessage() + ": Occured whilst trying to close writer."); + } + + } + + private void printCell(CellFormat format, String edge, int cellWidth, String cell, int column) throws IOException + { + int pad = cellWidth - cell.length(); + + if (column == 0) + { + _consoleWriter.write(edge); + } + + switch (format) + { + case CENTRED: + printPad(" ", pad / 2); + break; + case RIGHT: + printPad(" ", pad); + break; + } + + _consoleWriter.write(cell); + + + switch (format) + { + case CENTRED: + // if pad isn't even put the extra one on the right + if (pad % 2 == 0) + { + printPad(" ", pad / 2); + } + else + { + printPad(" ", (pad / 2) + 1); + } + break; + case LEFT: + printPad(" ", pad); + break; + } + + + _consoleWriter.write(edge); + + } + + private void printCellRow(String edge, String mid, int cellWidth) throws IOException + { + _consoleWriter.write(edge); + + printPad(mid, cellWidth); + + _consoleWriter.write(edge); + _consoleWriter.newLine(); + } + + private void printPad(String padChar, int count) throws IOException + { + for (int i = 0; i < count; i++) + { + _consoleWriter.write(padChar); + } + } + + +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java new file mode 100644 index 0000000000..6a95529059 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/CommandParser.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package com.redhat.etp.qpid.utils; + +public interface CommandParser +{ + /** + * If there is more than one command received on the last parse request. + * + * Subsequent calls to parse will utilise this input rather than reading new data from the input source + * @return boolean + */ + boolean more(); + + /** + * True if the currently parsed command has been requested as a background operation + * + * @return boolean + */ + boolean isBackground(); + + /** + * Parses user commands, and groups tokens in the + * String[] format that all Java main's love. + * + * If more than one command is provided in one input line then the more() method will return true. + * A subsequent call to parse() will continue to parse that input line before reading new input. + * + * @return <code>input</code> split in args[] format; null if eof. + * @throws java.io.IOException if there is a problem reading from the input stream + */ + String[] parse() throws java.io.IOException; +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java new file mode 100644 index 0000000000..892a48254c --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/Console.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package com.redhat.etp.qpid; + +import java.util.List; + +public interface Console +{ + public enum CellFormat + { + CENTRED, LEFT, RIGHT + } + + public static String ROW_DIVIDER = "*divider"; + + public void print(String... message); + + public void println(String... message); + + public String[] readln(); + + /** + * + * Prints the list of String nicely. + * + * +-------------+ + * | Heading | + * +-------------+ + * | Item 1 | + * | Item 2 | + * | Item 3 | + * +-------------+ + * + * @param hasTitle should list[0] be used as a heading + * @param list The list of Strings to display + */ + public void displayList(boolean hasTitle, String... list); + + /** + * + * Prints the list of String nicely. + * + * +----------------------------+ + * | Heading | + * +----------------------------+ + * | title | title | .. + * +----------------------------+ + * | Item 2 | value 2 | .. + * +----------------------------+ (*divider) + * | Item 3 | value 2 | .. + * +----------------------------+ + * + * @param title The title to display if any + * @param entries the entries to display in a map. + */ + void printMap(String title, List<List> entries); +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java new file mode 100644 index 0000000000..ea4045c917 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/RSHCommandParser.java @@ -0,0 +1,352 @@ +/* + * The Java Shell: jsh core -- RELEASE: alpha3 + * (C)1999 Osvaldo Pinali Doederlein. + * + * LICENSE + * ======= + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * + * CHANGES + * ======= + * 1.0.2 - Added support for non-quoted '\' escape char (Not interpreted by the shell) + * 1.0.1 - Added support for arguments in aliases + * 1.0.0 - Initial release; split from Shell and enhanced a lot. + * + * LINKS + * ===== + * Contact: mailto@osvaldo.visionnaire.com.br, mailto@g.collin@appliweb.net + * Site #1: http://www.geocities.com/ResearchTriangle/Node/2005/ + * Site #2: http://www.appliweb.net/jsh + */ + +package com.redhat.etp.qpid.utils; + +import java.io.BufferedReader; +import java.util.Vector; + +/** + * The Java Shell. + * <p> + * Provides an environment for launching and controlling Java apps. + * <p> + * TODO: - Support for applets! + * - Support for variable replacement + * + * @author Osvaldo Pinali Doederlein. + */ +public class RSHCommandParser implements CommandParser +{ + /** Where commands come from. */ + protected BufferedReader reader; + /** Continuation for command line. */ + protected String contLine = null; + /** Run next command in background? */ + protected boolean background; + protected String[] env; // Commands passed in args. + + public RSHCommandParser(BufferedReader reader) + { + this(reader, null); + } + + public RSHCommandParser(BufferedReader reader, String env[]) + { + this.reader = reader; + this.env = env; + if (env == null) + { + this.env = new String[0]; + } + } + + + public boolean more() + { + return contLine != null; + } + + public boolean isBackground() + { + return background; + } + + /** + * Solves and expands aliases in an argument array. + * This expansion affects only the first (if any) argument; the + * typical thing to do, as we don't want to expand parameters. + * We recursively parse the result to be sure we expand everything. + * For example, an alias can be expanded to further aliases, or it can contains args. + * + * @param args Raw arguments. + * @return <code>args</code> resolved and expanded. + */ + public static String[] expand(String[] args) + { + if (args.length > 0) + { + String expanded = args[0];//Alias.resolve(args[0]); + + // Try to expand recursively the command line + if (expanded != args[0]) + { + RSHCommandParser recurse = new RSHCommandParser(new BufferedReader( + new java.io.StringReader(expanded))); + + try + { + String cmdLine[] = recurse.parse(); + cmdLine = recurse.expand(cmdLine); + + // do we need to handle new arguments and insert them in the command array ? + if (cmdLine.length > 1) + { + String[] newArgs = new String[cmdLine.length + args.length - 1]; + System.arraycopy(cmdLine, 0, newArgs, 0, cmdLine.length); + + if (args.length > 1) + { + System.arraycopy(args, 1, newArgs, cmdLine.length, args.length - 1); + } + + args = newArgs; + } + else if (cmdLine.length == 1) + { + args[0] = cmdLine[0]; + } + } + catch (java.io.IOException e) + { + } + } + } + + return args; + } + + public String[] parse() throws java.io.IOException + { + final int READ = 0, QUOTE = 1, SKIP = 2, ESCAPE = 3, NONQUOTEDESCAPE = 4, VARIABLE = 5; + final int EOF = 0xFFFF; + int mode = SKIP; + Vector<String> args = new Vector<String>(); + StringBuffer current = new StringBuffer(); + StringBuffer varName = null; + background = false; + String line; + + if (contLine == null) + { + line = reader.readLine(); + } + else + { + line = contLine; + contLine = null; + } + + if (line == null) + { + reader = null; + return null; + } + + for (int pos = 0; pos < line.length(); ++pos) + { + char c = line.charAt(pos); + + switch (mode) + { + case SKIP: + switch (c) + { + case' ': + case'\t': + break; + case'\"': + mode = QUOTE; + break; + case'&': + background = true; + case';': + contLine = line.substring(pos + 1); + pos = line.length(); + break; + default: + mode = READ; + --pos; + } + break; + + case READ: + switch (c) + { + case'\"': + mode = QUOTE; + break; + case';': + case'&': + --pos; + case' ': + case'\t': + mode = SKIP; + break; + case'\\': + mode = NONQUOTEDESCAPE; + break; + case'$': + mode = VARIABLE; + varName = new StringBuffer(); + break; + default: + current.append(c); + } + if ((mode != READ) && (mode != NONQUOTEDESCAPE)) + { + args.addElement(current.toString()); + current = new StringBuffer(); + } + break; + + case QUOTE: + switch (c) + { + case'\"': + mode = READ; + break; + case'\\': + mode = ESCAPE; + break; + default: + current.append(c); + } + break; + + case ESCAPE: + switch (c) + { + case'n': + c = '\n'; + break; + case'r': + c = '\r'; + break; + case't': + c = '\t'; + break; + case'b': + c = '\b'; + break; + case'f': + c = '\f'; + break; + default: + current.append('\\'); + break; + } + mode = QUOTE; + current.append(c); + break; + case NONQUOTEDESCAPE: + switch (c) + { + case';': + mode = READ; + current.append(c); + break; + default: // This is not a escaped char. + mode = READ; + current.append('\\'); + current.append(c); + break; + } + break; + case VARIABLE: + switch (c) + { + case'$': + { +// String val = Set.get(new String(varName)); +// if (val != null) +// { +// current.append(val); +// } + mode = READ; + break; + } + case'@': + { + StringBuffer val = new StringBuffer(); + int i; + for (i = 0; i < env.length; i++) + { + val.append(env[i]); + val.append(' '); + } + current.append(val); + mode = READ; + break; + } + case'0': + case'1': + case'2': + case'3': + case'4': + case'5': + case'6': + case'7': + case'8': + case'9': + { + if (varName.length() == 0) + { + int value = Integer.parseInt(new String(new char[]{c})); + if (env.length > value) + { + current.append(env[value]); + } + mode = READ; + break; + } + // else fall back + } + default: + varName.append(c); + break; + } + break; + } + } + + if (current.length() > 0) + { + args.addElement(current.toString()); + } + return expand(toArray(args)); + } + + private String[] toArray(Vector strings) + { + String[] arr = new String[strings.size()]; + + for (int i = 0; i < strings.size(); ++i) + { + arr[i] = (String) strings.elementAt(i); + } + + return arr; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java new file mode 100644 index 0000000000..98e816554e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/utils/utils/SimpleCommandParser.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package com.redhat.etp.qpid.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.StringTokenizer; + +public class SimpleCommandParser implements CommandParser +{ + private static final String COMMAND_SEPERATOR = ";"; + + /** Input source of commands */ + protected BufferedReader _reader; + + /** The next list of commands from the command line */ + private StringBuilder _nextCommand = null; + + public SimpleCommandParser(BufferedReader reader) + { + _reader = reader; + } + + public boolean more() + { + return _nextCommand != null; + } + + public boolean isBackground() + { + return false; + } + + public String[] parse() throws IOException + { + String[] commands = null; + + String input = null; + + if (_nextCommand == null) + { + input = _reader.readLine(); + } + else + { + input = _nextCommand.toString(); + _nextCommand = null; + } + + StringTokenizer tok = new StringTokenizer(input, " "); + + int tokenCount = tok.countTokens(); + int index = 0; + + if (tokenCount > 0) + { + commands = new String[tokenCount]; + boolean commandComplete = false; + + while (tok.hasMoreTokens()) + { + String next = tok.nextToken(); + + if (next.equals(COMMAND_SEPERATOR)) + { + commandComplete = true; + _nextCommand = new StringBuilder(); + continue; + } + + if (commandComplete) + { + _nextCommand.append(next); + _nextCommand.append(" "); + } + else + { + commands[index] = next; + index++; + } + } + + } + + //Reduce the String[] if not all the tokens were used in this command. + // i.e. there is more than one command on the line. + if (index != tokenCount) + { + String[] shortCommands = new String[index]; + System.arraycopy(commands, 0, shortCommands, 0, index); + return shortCommands; + } + else + { + return commands; + } + } +} diff --git a/java/pom.xml b/java/pom.xml index cf39e43ecc..114abfab1b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -391,7 +391,15 @@ under the License. <version>0.5</version> </plugin> - <plugin> + + + + </plugins> + </pluginManagement> + + + <!--plugins> + <plugin> <artifactId>maven-remote-resources-plugin</artifactId> <version>1.0-alpha-5</version> <executions> @@ -412,10 +420,7 @@ under the License. </execution> </executions> </plugin> - - - </plugins> - </pluginManagement> + </plugins--> <defaultGoal>install</defaultGoal> |
