diff options
Diffstat (limited to 'java')
5 files changed, 247 insertions, 138 deletions
| diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java index 831cb90a4d..96ecb36952 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java @@ -20,20 +20,12 @@   */  package org.apache.qpid.tools.messagestore.commands; -import org.apache.qpid.framing.AMQShortString;  import org.apache.qpid.server.queue.AMQQueue;  import org.apache.qpid.server.store.StoreContext;  import org.apache.qpid.tools.messagestore.MessageStoreTool; -public class Copy extends AbstractCommand +public class Copy extends Move  { - -    /** -     * Since the Coopy command is not associated with a real channel we can safely create our own store context -     * for use in the few methods that require one. -     */ -    private StoreContext _storeContext = new StoreContext(); -      public Copy(MessageStoreTool tool)      {          super(tool); @@ -56,94 +48,9 @@ public class Copy extends AbstractCommand          return "copy";      } -    public void execute(String... args) +    protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext)      { -        AMQQueue toQueue = null; -        AMQQueue fromQueue = _tool.getState().getQueue(); -        java.util.List<Long> msgids = _tool.getState().getMessages(); - -        if (args.length >= 2) -        { -            for (String arg : args) -            { -                if (arg.startsWith("to=")) -                { -                    String queueName = arg.substring(arg.indexOf("=") + 1); -                    toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); -                } - -                if (arg.startsWith("from=")) -                { -                    String queueName = arg.substring(arg.indexOf("=") + 1); -                    fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); -                } - -                if (arg.startsWith("msgids=")) -                { -                    String msgidStr = arg.substring(arg.indexOf("=") + 1); - -                    // Record the current message selection -                    java.util.List<Long> currentIDs = _tool.getState().getMessages(); - -                    // Use the ToolState class to perform the messasge parsing -                    _tool.getState().setMessages(msgidStr); -                    msgids = _tool.getState().getMessages(); - -                    // Reset the original selection of messages -                    _tool.getState().setMessages(currentIDs); -                } -            } -        } - -        if (toQueue == null) -        { -            _console.println("Queue to copy to not specifed."); -            _console.println(usage()); -            return; -        } - -        if (fromQueue == null) -        { -            _console.println("Queue to copy from not specifed."); -            _console.println(usage()); -            return; -        } - -        performCopy(fromQueue, toQueue, msgids); +        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), storeContext);      } -    protected void performCopy(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) -    { -        Long previous = null; -        Long start = null; - -        for (long id : msgids) -        { -            if (previous != null) -            { -                if (id == previous + 1) -                { -                    if (start == null) -                    { -                        start = previous; -                    } -                } -                else -                { -                    if (start != null) -                    { -                        //move a range of ids -                        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext); -                    } -                    else -                    { -                        //move a single id -                        fromQueue.moveMessagesToAnotherQueue(id, id, toQueue.getName().toString(), _storeContext); -                    } -                } -            } - -            previous = id; -        } -    }  } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java index 8a42718542..df8b59ec19 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java @@ -20,7 +20,6 @@   */  package org.apache.qpid.tools.messagestore.commands; -import org.apache.qpid.AMQException;  import org.apache.qpid.framing.AMQShortString;  import org.apache.qpid.server.exchange.Exchange;  import org.apache.qpid.server.queue.AMQQueue; @@ -234,17 +233,9 @@ public class List extends AbstractCommand          {              if (exchange != null)              { -                try +                if (exchange.isBound(queue))                  { -                    if (exchange.isBound(queue)) -                    { -                        data.add(queue.toString()); -                    } -                } -                catch (AMQException e) -                { -                    // is never thrown by current impls forced to throw by interface. -                    commandError("Unable to check exchange bindings: " + e.getMessage(), null); +                    data.add(queue.toString());                  }              }              else @@ -299,17 +290,9 @@ public class List extends AbstractCommand          {              if (exchange != null)              { -                try -                { -                    if (exchange.isBound(queue)) -                    { -                        data.add(queue.getName().toString()); -                    } -                } -                catch (AMQException e) +                if (exchange.isBound(queue))                  { -                    // is never thrown by current impls forced to throw by interface. -                    commandError("Unable to check exchange bindings: " + e.getMessage(), null); +                    data.add(queue.getName().toString());                  }              }              else diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java new file mode 100644 index 0000000000..a9497fd23e --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -0,0 +1,166 @@ +/* + *  Licensed to the Apache Software Foundation (ASF) under one + *  or more contributor license agreements.  See the NOTICE file + *  distributed with this work for additional information + *  regarding copyright ownership.  The ASF licenses this file + *  to you under the Apache License, Version 2.0 (the + *  "License"); you may not use this file except in compliance + *  with the License.  You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + *  Unless required by applicable law or agreed to in writing, + *  software distributed under the License is distributed on an + *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + *  KIND, either express or implied.  See the License for the + *  specific language governing permissions and limitations + *  under the License.     + * + *  + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +import java.util.List; + +public class Move extends AbstractCommand +{ + +    /** +     * Since the Coopy command is not associated with a real channel we can safely create our own store context +     * for use in the few methods that require one. +     */ +    private StoreContext _storeContext = new StoreContext(); + +    public Move(MessageStoreTool tool) +    { +        super(tool); +    } + +    public String help() +    { +        return "Move messages between queues.\n" + +               "The currently selected message set will be moved to the specifed queue.\n" + +               "Alternatively the values can be provided on the command line."; +    } + +    public String usage() +    { +        return "move to=<queue> [from=<queue>] [msgids=<msgids eg, 1,2,4-10>]"; +    } + +    public String getCommand() +    { +        return "move"; +    } + +    public void execute(String... args) +    { +        AMQQueue toQueue = null; +        AMQQueue fromQueue = _tool.getState().getQueue(); +        java.util.List<Long> msgids = _tool.getState().getMessages(); + +        if (args.length >= 2) +        { +            for (String arg : args) +            { +                if (arg.startsWith("to=")) +                { +                    String queueName = arg.substring(arg.indexOf("=") + 1); +                    toQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); +                } + +                if (arg.startsWith("from=")) +                { +                    String queueName = arg.substring(arg.indexOf("=") + 1); +                    fromQueue = _tool.getState().getVhost().getQueueRegistry().getQueue(new AMQShortString(queueName)); +                } + +                if (arg.startsWith("msgids=")) +                { +                    String msgidStr = arg.substring(arg.indexOf("=") + 1); + +                    // Record the current message selection +                    java.util.List<Long> currentIDs = _tool.getState().getMessages(); + +                    // Use the ToolState class to perform the messasge parsing +                    _tool.getState().setMessages(msgidStr); +                    msgids = _tool.getState().getMessages(); + +                    // Reset the original selection of messages +                    _tool.getState().setMessages(currentIDs); +                } +            } +        } + +        if (!checkRequirements(fromQueue, toQueue, msgids)) +        { +            return; +        } + +        processIDs(fromQueue, toQueue, msgids); +    } + +    private void processIDs(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) +    { +        Long previous = null; +        Long start = null; + +        for (long id : msgids) +        { +            if (previous != null) +            { +                if (id == previous + 1) +                { +                    if (start == null) +                    { +                        start = previous; +                    } +                } +                else +                { +                    if (start != null) +                    { +                        //move a range of ids +                        doCommand(fromQueue, start, id, toQueue, _storeContext); +                    } +                    else +                    { +                        //move a single id +                        doCommand(fromQueue, id, id, toQueue, _storeContext); +                    } +                } +            } + +            previous = id; +        } +    } + +    protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, List<Long> msgids) +    { +        if (toQueue == null) +        { +            _console.println("Destination queue not specifed."); +            _console.println(usage()); +            return false; +        } + +        if (fromQueue == null) +        { +            _console.println("Source queue not specifed."); +            _console.println(usage()); +            return false; +        } + +        return true; +    } + +    protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue, StoreContext storeContext) +    { +        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext); +    } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java new file mode 100644 index 0000000000..7154159b40 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java @@ -0,0 +1,68 @@ +/* + *  Licensed to the Apache Software Foundation (ASF) under one + *  or more contributor license agreements.  See the NOTICE file + *  distributed with this work for additional information + *  regarding copyright ownership.  The ASF licenses this file + *  to you under the Apache License, Version 2.0 (the + *  "License"); you may not use this file except in compliance + *  with the License.  You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + *  Unless required by applicable law or agreed to in writing, + *  software distributed under the License is distributed on an + *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + *  KIND, either express or implied.  See the License for the + *  specific language governing permissions and limitations + *  under the License.     + * + *  + */ +package org.apache.qpid.tools.messagestore.commands; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.tools.messagestore.MessageStoreTool; + +public class Purge extends Move +{ +    public Purge(MessageStoreTool tool) +    { +        super(tool); +    } + +    public String help() +    { +        return "Purge messages from a queue.\n" + +               "The currently selected message set will be purged from the specifed queue.\n" + +               "Alternatively the values can be provided on the command line."; +    } + +    public String usage() +    { +        return "purge from=<queue> [msgids=<msgids eg, 1,2,4-10>]"; +    } + +    public String getCommand() +    { +        return "purge"; +    } + + +    protected boolean checkRequirements(AMQQueue fromQueue, AMQQueue toQueue, java.util.List<Long> msgids) +    { +        if (fromQueue == null) +        { +            _console.println("Source queue not specifed."); +            _console.println(usage()); +            return false; +        } + +        return true; +    } + +    protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue, StoreContext storeContext) +    { +        fromQueue.removeMessagesFromQueue(start, end, storeContext); +    } +} diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java index 39f35da912..5e9b7028e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Select.java @@ -20,7 +20,6 @@   */  package org.apache.qpid.tools.messagestore.commands; -import org.apache.qpid.AMQException;  import org.apache.qpid.framing.AMQShortString;  import org.apache.qpid.server.exchange.Exchange;  import org.apache.qpid.server.queue.AMQQueue; @@ -132,16 +131,9 @@ public class Select extends AbstractCommand              if (_tool.getState().getQueue() != null)              { -                try +                if (!exchange.isBound(_tool.getState().getQueue()))                  { -                    if (!exchange.isBound(_tool.getState().getQueue())) -                    { -                        _tool.getState().setQueue(null); -                    } -                } -                catch (AMQException e) -                { -                    //ignore +                    _tool.getState().setQueue(null);                  }              }          } @@ -170,18 +162,11 @@ public class Select extends AbstractCommand                  {                      for (AMQShortString exchangeName : vhost.getExchangeRegistry().getExchangeNames())                      { -                        try -                        { -                            Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); -                            if (exchange.isBound(queue)) -                            { -                                _tool.getState().setExchange(exchange); -                                break; -                            } -                        } -                        catch (AMQException e) +                        Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); +                        if (exchange.isBound(queue))                          { -                            //ignore error +                            _tool.getState().setExchange(exchange); +                            break;                          }                      }                  } | 
