From 4547988868ea17dd34064204de84e66206b16d5b Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Thu, 5 Apr 2007 11:47:50 +0000 Subject: Merged revisions 522994-523245 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r522994 | rgreig | 2007-03-27 17:48:23 +0100 (Tue, 27 Mar 2007) | 1 line Test added for durability of messages under broker failure. ........ r523245 | rgreig | 2007-03-28 10:30:49 +0100 (Wed, 28 Mar 2007) | 1 line Reversed accidental replacing of the word 'initialize' in comments to 'establishConnection' through a method refactoring. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525800 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/util/ClasspathScanner.java | 10 +- .../qpid/management/ui/views/NavigationView.java | 600 +++++++------ .../main/java/org/apache/qpid/ping/PingClient.java | 56 +- .../org/apache/qpid/ping/PingDurableClient.java | 389 +++++++++ .../apache/qpid/requestreply/PingPongProducer.java | 930 +++++++++++---------- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 8 +- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 2 +- .../java/org/apache/qpid/ping/PingTestPerf.java | 67 +- .../apache/qpid/requestreply/PingPongTestPerf.java | 75 +- 9 files changed, 1282 insertions(+), 855 deletions(-) create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (limited to 'java') diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java index cd8e0a80a1..bad49060ca 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java @@ -61,10 +61,10 @@ public class ClasspathScanner * @return All the classes that match this collector. */ public static Collection> getMatches(Class matchingClass, String matchingRegexp, - boolean beanOnly) + boolean beanOnly) { log.debug("public static Collection> getMatches(Class matchingClass = " + matchingClass - + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called"); + + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called"); // Build a compiled regular expression from the pattern to match. Pattern matchPattern = Pattern.compile(matchingRegexp); @@ -95,11 +95,11 @@ public class ClasspathScanner * iteration. */ private static void gatherFiles(File classRoot, String classFileName, Map> result, - Pattern matchPattern, Class matchClass) + Pattern matchPattern, Class matchClass) { log.debug("private static void gatherFiles(File classRoot = " + classRoot + ", String classFileName = " - + classFileName + ", Map> result, Pattern matchPattern = " + matchPattern - + ", Class matchClass = " + matchClass + "): called"); + + classFileName + ", Map> result, Pattern matchPattern = " + matchPattern + + ", Class matchClass = " + matchClass + "): called"); File thisRoot = new File(classRoot, classFileName); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java index e9215a4876..a861405d30 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,8 +20,6 @@ */ package org.apache.qpid.management.ui.views; -import static org.apache.qpid.management.ui.Constants.*; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -29,12 +27,14 @@ import java.util.HashMap; import java.util.List; import org.apache.qpid.management.ui.ApplicationRegistry; +import static org.apache.qpid.management.ui.Constants.*; import org.apache.qpid.management.ui.ManagedBean; import org.apache.qpid.management.ui.ManagedServer; import org.apache.qpid.management.ui.ServerRegistry; import org.apache.qpid.management.ui.exceptions.InfoRequiredException; import org.apache.qpid.management.ui.jmx.JMXServerRegistry; import org.apache.qpid.management.ui.jmx.MBeanUtility; + import org.eclipse.jface.preference.PreferenceStore; import org.eclipse.jface.viewers.DoubleClickEvent; import org.eclipse.jface.viewers.IDoubleClickListener; @@ -48,6 +48,7 @@ import org.eclipse.jface.viewers.TreeExpansionEvent; import org.eclipse.jface.viewers.TreeViewer; import org.eclipse.jface.viewers.Viewer; import org.eclipse.jface.viewers.ViewerSorter; + import org.eclipse.swt.SWT; import org.eclipse.swt.graphics.Font; import org.eclipse.swt.graphics.Image; @@ -62,6 +63,7 @@ import org.eclipse.swt.widgets.MenuItem; import org.eclipse.swt.widgets.Shell; import org.eclipse.swt.widgets.Tree; import org.eclipse.swt.widgets.TreeItem; + import org.eclipse.ui.part.ViewPart; /** @@ -71,29 +73,29 @@ import org.eclipse.ui.part.ViewPart; */ public class NavigationView extends ViewPart { - public static final String ID = "org.apache.qpid.management.ui.navigationView"; + public static final String ID = "org.apache.qpid.management.ui.navigationView"; public static final String INI_FILENAME = System.getProperty("user.home") + File.separator + "qpidManagementConsole.ini"; - + private static final String INI_SERVERS = "Servers"; private static final String INI_QUEUES = QUEUE + "s"; private static final String INI_CONNECTIONS = CONNECTION + "s"; private static final String INI_EXCHANGES = EXCHANGE + "s"; - + private TreeViewer _treeViewer = null; private TreeObject _rootNode = null; private TreeObject _serversRootNode = null; - + private PreferenceStore _preferences; // Map of connected servers private HashMap _managedServerMap = new HashMap(); - + private void createTreeViewer(Composite parent) { _treeViewer = new TreeViewer(parent); _treeViewer.setContentProvider(new ContentProviderImpl()); - _treeViewer.setLabelProvider(new LabelProviderImpl()); + _treeViewer.setLabelProvider(new LabelProviderImpl()); _treeViewer.setSorter(new ViewerSorterImpl()); - + // layout the tree viewer below the label field, to cover the area GridData layoutData = new GridData(); layoutData = new GridData(); @@ -103,118 +105,124 @@ public class NavigationView extends ViewPart layoutData.verticalAlignment = GridData.FILL; _treeViewer.getControl().setLayoutData(layoutData); _treeViewer.setUseHashlookup(true); - + createListeners(); } - + /** * Creates listeners for the JFace treeviewer */ private void createListeners() { - _treeViewer.addDoubleClickListener(new IDoubleClickListener() { + _treeViewer.addDoubleClickListener(new IDoubleClickListener() + { public void doubleClick(DoubleClickEvent event) { - IStructuredSelection ss = (IStructuredSelection)event.getSelection(); - if (ss == null || ss.getFirstElement() == null) + IStructuredSelection ss = (IStructuredSelection) event.getSelection(); + if ((ss == null) || (ss.getFirstElement() == null)) { return; } + boolean state = _treeViewer.getExpandedState(ss.getFirstElement()); _treeViewer.setExpandedState(ss.getFirstElement(), !state); } }); - - _treeViewer.addTreeListener(new ITreeViewerListener() { - public void treeExpanded(TreeExpansionEvent event) - { - _treeViewer.setExpandedState(event.getElement(), true); - // Following will cause the selection event to be sent, so commented - //_treeViewer.setSelection(new StructuredSelection(event.getElement())); - _treeViewer.refresh(); - } - public void treeCollapsed(TreeExpansionEvent event) + _treeViewer.addTreeListener(new ITreeViewerListener() { - _treeViewer.setExpandedState(event.getElement(), false); - _treeViewer.refresh(); - } - }); - + public void treeExpanded(TreeExpansionEvent event) + { + _treeViewer.setExpandedState(event.getElement(), true); + // Following will cause the selection event to be sent, so commented + // _treeViewer.setSelection(new StructuredSelection(event.getElement())); + _treeViewer.refresh(); + } + + public void treeCollapsed(TreeExpansionEvent event) + { + _treeViewer.setExpandedState(event.getElement(), false); + _treeViewer.refresh(); + } + }); + // This listener is for popup menu, which pops up if a queue,exchange or connection is selected // with right click. - _treeViewer.getTree().addListener(SWT.MenuDetect, new Listener () { - Display display = getSite().getShell().getDisplay(); - final Shell shell = new Shell (display); - - public void handleEvent(Event event) + _treeViewer.getTree().addListener(SWT.MenuDetect, new Listener() { - Tree widget = (Tree)event.widget; - TreeItem[] items = widget.getSelection(); - if (items == null) return; - - // Get the selected node - final TreeObject selectedNode = (TreeObject)items[0].getData(); - final TreeObject parentNode = selectedNode.getParent(); - - // This popup is only for mbeans and only connection,exchange and queue types - if (parentNode == null || - !MBEAN.equals(selectedNode.getType()) || - !(CONNECTION.equals(parentNode.getName()) || - QUEUE.equals(parentNode.getName()) || - EXCHANGE.equals(parentNode.getName())) - ) + Display display = getSite().getShell().getDisplay(); + final Shell shell = new Shell(display); + + public void handleEvent(Event event) { - return; - } - - Menu menu = new Menu (shell, SWT.POP_UP); - MenuItem item = new MenuItem (menu, SWT.PUSH); - // Add the action item, which will remove the node from the tree if selected - item.setText(ACTION_REMOVE_MBEANNODE); - item.addListener (SWT.Selection, new Listener () { - public void handleEvent (Event e) + Tree widget = (Tree) event.widget; + TreeItem[] items = widget.getSelection(); + if (items == null) { - removeManagedObject(parentNode, (ManagedBean)selectedNode.getManagedObject()); - _treeViewer.refresh(); - // set the selection to the parent node - _treeViewer.setSelection(new StructuredSelection(parentNode)); + return; } - }); - menu.setLocation (event.x, event.y); - menu.setVisible (true); - while (!menu.isDisposed () && menu.isVisible ()) - { - if (!display.readAndDispatch ()) + + // Get the selected node + final TreeObject selectedNode = (TreeObject) items[0].getData(); + final TreeObject parentNode = selectedNode.getParent(); + + // This popup is only for mbeans and only connection,exchange and queue types + if ((parentNode == null) || !MBEAN.equals(selectedNode.getType()) + || !(CONNECTION.equals(parentNode.getName()) || QUEUE.equals(parentNode.getName()) + || EXCHANGE.equals(parentNode.getName()))) { - display.sleep (); + return; } + + Menu menu = new Menu(shell, SWT.POP_UP); + MenuItem item = new MenuItem(menu, SWT.PUSH); + // Add the action item, which will remove the node from the tree if selected + item.setText(ACTION_REMOVE_MBEANNODE); + item.addListener(SWT.Selection, new Listener() + { + public void handleEvent(Event e) + { + removeManagedObject(parentNode, (ManagedBean) selectedNode.getManagedObject()); + _treeViewer.refresh(); + // set the selection to the parent node + _treeViewer.setSelection(new StructuredSelection(parentNode)); + } + }); + menu.setLocation(event.x, event.y); + menu.setVisible(true); + while (!menu.isDisposed() && menu.isVisible()) + { + if (!display.readAndDispatch()) + { + display.sleep(); + } + } + + menu.dispose(); } - menu.dispose (); - } - }); - } - + }); + } + /** * Creates Qpid Server connection using JMX RMI protocol * @param server * @throws Exception */ private void createRMIServerConnection(ManagedServer server) throws Exception - { + { try { // Currently Qpid Management Console only supports JMX MBeanServer - ServerRegistry serverRegistry = new JMXServerRegistry(server); - ApplicationRegistry.addServer(server, serverRegistry); + ServerRegistry serverRegistry = new JMXServerRegistry(server); + ApplicationRegistry.addServer(server, serverRegistry); } - catch(Exception ex) + catch (Exception ex) { ex.printStackTrace(); throw new Exception("Error in connecting to Qpid broker at " + server.getUrl(), ex); } } - + /** * Adds a new server node in the navigation view if server connection is successful. * @param transportProtocol @@ -223,15 +231,15 @@ public class NavigationView extends ViewPart * @param domain * @throws Exception */ - public void addNewServer(String transportProtocol, String host, int port, - String domain, String user, String pwd) throws Exception + public void addNewServer(String transportProtocol, String host, int port, String domain, String user, String pwd) + throws Exception { String serverAddress = host + ":" + port; String url = null; ManagedServer managedServer = new ManagedServer(host, port, domain, user, pwd); - + if ("RMI".equals(transportProtocol)) - { + { url = managedServer.getUrl(); List list = _serversRootNode.getChildren(); for (TreeObject node : list) @@ -242,10 +250,11 @@ public class NavigationView extends ViewPart // Set the server node as selected and then connect it. _treeViewer.setSelection(new StructuredSelection(node)); reconnect(user, pwd); + return; } } - + // The server is not in the list of already added servers, so now connect and add it. managedServer.setName(serverAddress); createRMIServerConnection(managedServer); @@ -254,28 +263,28 @@ public class NavigationView extends ViewPart { throw new InfoRequiredException(transportProtocol + " transport is not supported"); } - + // Server connection is successful. Now add the server in the tree TreeObject serverNode = new TreeObject(serverAddress, NODE_TYPE_SERVER); serverNode.setUrl(url); serverNode.setManagedObject(managedServer); _serversRootNode.addChild(serverNode); - + // Add server in the connected server map _managedServerMap.put(managedServer, serverNode); - + // populate the server tree - populateServer(serverNode); - + populateServer(serverNode); + // Add the Queue/Exchanges/Connections from config file into the navigation tree addConfiguredItems(managedServer); - + _treeViewer.refresh(); - - // save server address in file + + // save server address in file addServerInConfigFile(serverAddress); } - + /** * Create the config file, if it doesn't already exist. * Exits the application if the file could not be created. @@ -290,29 +299,29 @@ public class NavigationView extends ViewPart file.createNewFile(); } } - catch(IOException ex) + catch (IOException ex) { System.out.println("Could not write to the file " + INI_FILENAME); System.out.println(ex); System.exit(1); } } - + /** * Server addresses are stored in a file. When user launches the application again, the * server addresses are picked up from the file and shown in the navigfation view. This method - * adds the server address in a file, when a new server is added in the navigation view. + * adds the server address in a file, when a new server is added in the navigation view. * @param serverAddress */ private void addServerInConfigFile(String serverAddress) { // Check if the address already exists List list = getServerListFromFile(); - if (list != null && list.contains(serverAddress)) + if ((list != null) && list.contains(serverAddress)) { return; } - + // Get the existing server list and add to that String servers = _preferences.getString(INI_SERVERS); String value = (servers.length() != 0) ? (servers + "," + serverAddress) : serverAddress; @@ -321,12 +330,12 @@ public class NavigationView extends ViewPart { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { System.err.println("Could not add " + serverAddress + " in " + INI_SERVERS + " (" + INI_FILENAME + ")"); System.out.println(ex); } - } + } /** * Adds the item (Queue/Exchange/Connection) to the config file @@ -337,20 +346,20 @@ public class NavigationView extends ViewPart */ private void addItemInConfigFile(TreeObject node) { - ManagedBean mbean = (ManagedBean)node.getManagedObject(); + ManagedBean mbean = (ManagedBean) node.getManagedObject(); String server = mbean.getServer().getName(); String virtualhost = mbean.getVirtualHostName(); String type = node.getParent().getName() + "s"; String name = node.getName(); String itemKey = server + "." + virtualhost + "." + type; - + // Check if the item already exists in the config file List list = getConfiguredItemsFromFile(itemKey); - if (list != null && list.contains(name)) + if ((list != null) && list.contains(name)) { return; } - + // Add this item to the existing list of items String items = _preferences.getString(itemKey); String value = (items.length() != 0) ? (items + "," + name) : name; @@ -359,21 +368,21 @@ public class NavigationView extends ViewPart { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { System.err.println("Could not add " + name + " in " + itemKey + " (" + INI_FILENAME + ")"); System.out.println(ex); } } - + private void removeItemFromConfigFile(TreeObject node) { - ManagedBean mbean = (ManagedBean)node.getManagedObject(); + ManagedBean mbean = (ManagedBean) node.getManagedObject(); String server = mbean.getServer().getName(); String vHost = mbean.getVirtualHostName(); String type = node.getParent().getName() + "s"; String itemKey = server + "." + vHost + "." + type; - + List list = getConfiguredItemsFromFile(itemKey); if (list.contains(node.getName())) { @@ -383,29 +392,30 @@ public class NavigationView extends ViewPart { value += item + ","; } + value = (value.lastIndexOf(",") != -1) ? value.substring(0, value.lastIndexOf(",")) : value; - + _preferences.putValue(itemKey, value); try { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { - System.err.println("Error in updating the config file "+ INI_FILENAME); + System.err.println("Error in updating the config file " + INI_FILENAME); System.out.println(ex); } } } /** - * Queries the qpid server for MBeans and populates the navigation view with all MBeans for + * Queries the qpid server for MBeans and populates the navigation view with all MBeans for * the given server node. * @param serverNode */ private void populateServer(TreeObject serverNode) { - ManagedServer server = (ManagedServer)serverNode.getManagedObject(); + ManagedServer server = (ManagedServer) serverNode.getManagedObject(); String domain = server.getDomain(); try { @@ -414,29 +424,30 @@ public class NavigationView extends ViewPart TreeObject domainNode = new TreeObject(domain, NODE_TYPE_DOMAIN); domainNode.setParent(serverNode); - populateDomain(domainNode); + populateDomain(domainNode); } else { List domainList = new ArrayList(); - List domains = MBeanUtility.getAllDomains(server);; + List domains = MBeanUtility.getAllDomains(server); + ; for (String domainName : domains) - { + { TreeObject domainNode = new TreeObject(domainName, NODE_TYPE_DOMAIN); domainNode.setParent(serverNode); domainList.add(domainNode); - populateDomain(domainNode); + populateDomain(domainNode); } } } - catch(Exception ex) + catch (Exception ex) { System.out.println("\nError in connecting to Qpid broker "); ex.printStackTrace(); } } - + /** * Queries the Qpid Server and populates the given domain node with all MBeans undser that domain. * @param domain @@ -446,19 +457,19 @@ public class NavigationView extends ViewPart @SuppressWarnings("unchecked") private void populateDomain(TreeObject domain) throws IOException, Exception { - ManagedServer server = (ManagedServer)domain.getParent().getManagedObject(); - + ManagedServer server = (ManagedServer) domain.getParent().getManagedObject(); + // Now populate the mbenas under those types List mbeans = MBeanUtility.getManagedObjectsForDomain(server, domain.getName()); for (ManagedBean mbean : mbeans) { mbean.setServer(server); ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server); - serverRegistry.addManagedObject(mbean); - + serverRegistry.addManagedObject(mbean); + // Add all mbeans other than Connections, Exchanges and Queues. Because these will be added - // manually by selecting from MBeanView - if (!(mbean.isConnection() || mbean.isExchange() || mbean.isQueue()) ) + // manually by selecting from MBeanView + if (!(mbean.isConnection() || mbean.isExchange() || mbean.isQueue())) { addManagedBean(domain, mbean); } @@ -471,10 +482,11 @@ public class NavigationView extends ViewPart { addDefaultNodes(domain); } + break; } } - + /** * Add these three types - Connection, Exchange, Queue * By adding these, these will always be available, even if there are no mbeans under thse types @@ -493,7 +505,7 @@ public class NavigationView extends ViewPart typeChild.setParent(parent); typeChild.setVirtualHost(parent.getVirtualHost()); } - + /** * Checks if a particular mbeantype is already there in the navigation view for a domain. * This is used while populating domain with mbeans. @@ -506,24 +518,30 @@ public class NavigationView extends ViewPart List childNodes = parent.getChildren(); for (TreeObject child : childNodes) { - if ((NODE_TYPE_MBEANTYPE.equals(child.getType()) || NODE_TYPE_TYPEINSTANCE.equals(child.getType())) && - typeName.equals(child.getName())) + if ((NODE_TYPE_MBEANTYPE.equals(child.getType()) || NODE_TYPE_TYPEINSTANCE.equals(child.getType())) + && typeName.equals(child.getName())) + { return child; + } } + return null; } - + private boolean doesMBeanNodeAlreadyExist(TreeObject typeNode, String mbeanName) { List childNodes = typeNode.getChildren(); for (TreeObject child : childNodes) { if (MBEAN.equals(child.getType()) && mbeanName.equals(child.getName())) + { return true; + } } + return false; } - + /** * Adds the given MBean to the given domain node. Creates Notification node for the MBean. * sample ObjectNames - @@ -533,18 +551,18 @@ public class NavigationView extends ViewPart * @param mbean * @throws Exception */ - private void addManagedBean(TreeObject domain, ManagedBean mbean)// throws Exception + private void addManagedBean(TreeObject domain, ManagedBean mbean) // throws Exception { String name = mbean.getName(); // Split the mbean type into array of Strings, to create hierarchy // eg. type=VirtualHost.VirtualHostManager,VirtualHost=localhost will be: - // localhost->VirtualHostManager + // localhost->VirtualHostManager // eg. type=org.apache.qpid:type=VirtualHost.Queue,VirtualHost=test,name=ping will be: - // test->Queue->ping + // test->Queue->ping String[] types = mbean.getType().split("\\."); TreeObject typeNode = null; TreeObject parentNode = domain; - + // Run this loop till all nodes(hierarchy) for this mbean are created. This loop only creates // all the required parent nodes for the mbean for (int i = 0; i < types.length; i++) @@ -554,34 +572,34 @@ public class NavigationView extends ViewPart // If value is not null, then there will be a parent node for this mbean // eg. for type=VirtualHost the value is "test" typeNode = getMBeanTypeNode(parentNode, type); - + // create the type node if not already created if (typeNode == null) { // If the ObjectName doesn't have name property, that means there will be only one instance // of this mbean for given "type". So there will be no type node created for this mbean. - if (name == null && (i == types.length -1)) + if ((name == null) && (i == (types.length - 1))) { break; } - + // create a node for "type" typeNode = createTypeNode(parentNode, type); typeNode.setVirtualHost(mbean.getVirtualHostName()); } - + // now type node create becomes the parent node for next node in hierarchy parentNode = typeNode; - - /* + + /* * Now create instances node for this type if value exists. */ - if (valueOftype == null) + if (valueOftype == null) { - // No instance node will be created when value is null (eg type=Queue) + // No instance node will be created when value is null (eg type=Queue) break; - } - + } + // For different virtual hosts, the nodes with given value will be created. // eg type=VirtualHost, value=test typeNode = getMBeanTypeNode(parentNode, valueOftype); @@ -589,55 +607,60 @@ public class NavigationView extends ViewPart { typeNode = createTypeInstanceNode(parentNode, valueOftype); typeNode.setVirtualHost(mbean.getVirtualHostName()); - + // Create default nodes for VHost instances if (type.equals(VIRTUAL_HOST)) { addDefaultNodes(typeNode); } } + parentNode = typeNode; } - + if (typeNode == null) { typeNode = parentNode; } - + // Check if an MBean is already added if (doesMBeanNodeAlreadyExist(typeNode, name)) + { return; - + } + // Add the mbean node now TreeObject mbeanNode = new TreeObject(mbean); mbeanNode.setParent(typeNode); - + // Add the mbean to the config file if (mbean.isQueue() || mbean.isExchange() || mbean.isConnection()) { addItemInConfigFile(mbeanNode); } - + // Add notification node // TODO: show this only if the mbean sends any notification TreeObject notificationNode = new TreeObject(NOTIFICATION, NOTIFICATION); notificationNode.setParent(mbeanNode); } - + private TreeObject createTypeNode(TreeObject parent, String name) { TreeObject typeNode = new TreeObject(name, NODE_TYPE_MBEANTYPE); typeNode.setParent(parent); + return typeNode; } - + private TreeObject createTypeInstanceNode(TreeObject parent, String name) { TreeObject typeNode = new TreeObject(name, NODE_TYPE_TYPEINSTANCE); typeNode.setParent(parent); + return typeNode; } - + /** * Removes all the child nodes of the given parent node. Used when closing a server. * @param parent @@ -649,7 +672,7 @@ public class NavigationView extends ViewPart { removeManagedObject(child); } - + list.clear(); } @@ -666,10 +689,11 @@ public class NavigationView extends ViewPart { if (MBEAN.equals(child.getType())) { - String name = mbean.getName() != null ? mbean.getName() : mbean.getType(); + String name = (mbean.getName() != null) ? mbean.getName() : mbean.getType(); if (child.getName().equals(name)) { objectToRemove = child; + break; } } @@ -678,35 +702,39 @@ public class NavigationView extends ViewPart removeManagedObject(child, mbean); } } - + if (objectToRemove != null) { list.remove(objectToRemove); removeItemFromConfigFile(objectToRemove); } - + } - + /** * Closes the Qpid server connection */ public void disconnect() throws Exception { - TreeObject selectedNode = getSelectedServerNode(); - ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject(); + TreeObject selectedNode = getSelectedServerNode(); + ManagedServer managedServer = (ManagedServer) selectedNode.getManagedObject(); if (!_managedServerMap.containsKey(managedServer)) + { return; + } // Close server connection ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(managedServer); - if (serverRegistry == null) // server connection is already closed + if (serverRegistry == null) // server connection is already closed + { return; - + } + serverRegistry.closeServerConnection(); // Add server to the closed server list and the worker thread will remove the server from required places. ApplicationRegistry.serverConnectionClosed(managedServer); } - + /** * Connects the selected server node * @throws Exception @@ -714,27 +742,28 @@ public class NavigationView extends ViewPart public void reconnect(String user, String password) throws Exception { TreeObject selectedNode = getSelectedServerNode(); - ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject(); - if(_managedServerMap.containsKey(managedServer)) + ManagedServer managedServer = (ManagedServer) selectedNode.getManagedObject(); + if (_managedServerMap.containsKey(managedServer)) { throw new InfoRequiredException("Server " + managedServer.getName() + " is already connected"); - } + } + managedServer.setUser(user); managedServer.setPassword(password); createRMIServerConnection(managedServer); - + // put the server in the managed server map _managedServerMap.put(managedServer, selectedNode); - + // populate the server tree now populateServer(selectedNode); - + // Add the Queue/Exchanges/Connections from config file into the navigation tree addConfiguredItems(managedServer); - - _treeViewer.refresh(); + + _treeViewer.refresh(); } - + /** * Adds the items(queues/exchanges/connectins) from config file to the server tree * @param server @@ -750,13 +779,13 @@ public class NavigationView extends ViewPart List items = getConfiguredItemsFromFile(itemKey); List mbeans = serverRegistry.getQueues(virtualHost); addConfiguredItems(items, mbeans); - + // Add Exchanges itemKey = server.getName() + "." + virtualHost + "." + INI_EXCHANGES; items = getConfiguredItemsFromFile(itemKey); mbeans = serverRegistry.getExchanges(virtualHost); addConfiguredItems(items, mbeans); - + // Add Connections itemKey = server.getName() + "." + virtualHost + "." + INI_CONNECTIONS; items = getConfiguredItemsFromFile(itemKey); @@ -764,7 +793,7 @@ public class NavigationView extends ViewPart addConfiguredItems(items, mbeans); } } - + /** * Gets the mbeans corresponding to the items and adds those to the navigation tree * @param items @@ -772,11 +801,11 @@ public class NavigationView extends ViewPart */ private void addConfiguredItems(List items, List mbeans) { - if ((items == null) || items.isEmpty() | (mbeans == null) || mbeans.isEmpty()) + if ((items == null) || (items.isEmpty() | (mbeans == null)) || mbeans.isEmpty()) { return; } - + for (String item : items) { for (ManagedBean mbean : mbeans) @@ -784,12 +813,13 @@ public class NavigationView extends ViewPart if (item.equals(mbean.getName())) { addManagedBean(mbean); + break; } } } } - + /** * Closes the Qpid server connection if not already closed and removes the server node from the navigation view and * also from the ini file stored in the system. @@ -798,9 +828,9 @@ public class NavigationView extends ViewPart public void removeServer() throws Exception { disconnect(); - + // Remove from the Tree - String serverNodeName = getSelectedServerNode().getName(); + String serverNodeName = getSelectedServerNode().getName(); List list = _serversRootNode.getChildren(); TreeObject objectToRemove = null; for (TreeObject child : list) @@ -808,46 +838,48 @@ public class NavigationView extends ViewPart if (child.getName().equals(serverNodeName)) { objectToRemove = child; + break; } } - + if (objectToRemove != null) { list.remove(objectToRemove); } - + _treeViewer.refresh(); - + // Remove from the ini file removeServerFromConfigFile(serverNodeName); } - + private void removeServerFromConfigFile(String serverNodeName) { List serversList = getServerListFromFile(); serversList.remove(serverNodeName); - + String value = ""; for (String item : serversList) { value += item + ","; } + value = (value.lastIndexOf(",") != -1) ? value.substring(0, value.lastIndexOf(",")) : value; - + _preferences.putValue(INI_SERVERS, value); - + try { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { - System.err.println("Error in updating the config file "+ INI_FILENAME); + System.err.println("Error in updating the config file " + INI_FILENAME); System.out.println(ex); } } - + /** * @return the server addresses from the ini file * @throws Exception @@ -856,7 +888,7 @@ public class NavigationView extends ViewPart { return getConfiguredItemsFromFile(INI_SERVERS); } - + /** * Returns the list of items from the config file. * sample ini file: @@ -879,59 +911,60 @@ public class NavigationView extends ViewPart list.add(item); } } - + return list; } - + public TreeObject getSelectedServerNode() throws Exception { - IStructuredSelection ss = (IStructuredSelection)_treeViewer.getSelection(); - TreeObject selectedNode = (TreeObject)ss.getFirstElement(); - if (ss.isEmpty() || selectedNode == null || (!selectedNode.getType().equals(NODE_TYPE_SERVER))) + IStructuredSelection ss = (IStructuredSelection) _treeViewer.getSelection(); + TreeObject selectedNode = (TreeObject) ss.getFirstElement(); + if (ss.isEmpty() || (selectedNode == null) || (!selectedNode.getType().equals(NODE_TYPE_SERVER))) { throw new InfoRequiredException("Please select the server"); } return selectedNode; } - /** + + /** * This is a callback that will allow us to create the viewer and initialize * it. */ - public void createPartControl(Composite parent) + public void createPartControl(Composite parent) { Composite composite = new Composite(parent, SWT.NONE); GridLayout gridLayout = new GridLayout(); gridLayout.marginHeight = 2; gridLayout.marginWidth = 2; - gridLayout.horizontalSpacing = 0; - gridLayout.verticalSpacing = 2; + gridLayout.horizontalSpacing = 0; + gridLayout.verticalSpacing = 2; composite.setLayout(gridLayout); - + createTreeViewer(composite); _rootNode = new TreeObject("ROOT", "ROOT"); _serversRootNode = new TreeObject(NAVIGATION_ROOT, "ROOT"); _serversRootNode.setParent(_rootNode); - + _treeViewer.setInput(_rootNode); // set viewer as selection event provider for MBeanView - getSite().setSelectionProvider(_treeViewer); - + getSite().setSelectionProvider(_treeViewer); + // Start worker thread to refresh tree for added or removed objects - (new Thread(new Worker())).start(); - + (new Thread(new Worker())).start(); + createConfigFile(); _preferences = new PreferenceStore(INI_FILENAME); - + try { _preferences.load(); } - catch(IOException ex) + catch (IOException ex) { System.out.println(ex); } - + // load the list of servers already added from file List serversList = getServerListFromFile(); if (serversList != null) @@ -945,23 +978,22 @@ public class NavigationView extends ViewPart _serversRootNode.addChild(serverNode); } } + _treeViewer.refresh(); - - } - /** - * Passing the focus request to the viewer's control. - */ - public void setFocus() - { + } + + /** + * Passing the focus request to the viewer's control. + */ + public void setFocus() + { } - } - public void refresh() { _treeViewer.refresh(); } - + /** * Content provider class for the tree viewer */ @@ -971,36 +1003,39 @@ public class NavigationView extends ViewPart { return getChildren(parent); } - + public Object[] getChildren(final Object parentElement) { - final TreeObject node = (TreeObject)parentElement; + final TreeObject node = (TreeObject) parentElement; + return node.getChildren().toArray(new TreeObject[0]); } - + public Object getParent(final Object element) { - final TreeObject node = (TreeObject)element; + final TreeObject node = (TreeObject) element; + return node.getParent(); } - + public boolean hasChildren(final Object element) { final TreeObject node = (TreeObject) element; + return !node.getChildren().isEmpty(); } - + public void inputChanged(final Viewer viewer, final Object oldInput, final Object newInput) { // Do nothing } - + public void dispose() { // Do nothing } } - + /** * Label provider class for the tree viewer */ @@ -1008,28 +1043,32 @@ public class NavigationView extends ViewPart { public Image getImage(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(NOTIFICATION)) { return ApplicationRegistry.getImage(NOTIFICATION_IMAGE); } else if (!node.getType().equals(MBEAN)) { - if (_treeViewer.getExpandedState(node)) - return ApplicationRegistry.getImage(OPEN_FOLDER_IMAGE); - else - return ApplicationRegistry.getImage(CLOSED_FOLDER_IMAGE); - + if (_treeViewer.getExpandedState(node)) + { + return ApplicationRegistry.getImage(OPEN_FOLDER_IMAGE); + } + else + { + return ApplicationRegistry.getImage(CLOSED_FOLDER_IMAGE); + } + } else { return ApplicationRegistry.getImage(MBEAN_IMAGE); } } - + public String getText(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(NODE_TYPE_MBEANTYPE)) { return node.getName() + "s"; @@ -1039,35 +1078,42 @@ public class NavigationView extends ViewPart return node.getName(); } } - + public Font getFont(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(NODE_TYPE_SERVER)) { if (node.getChildren().isEmpty()) + { return ApplicationRegistry.getFont(FONT_NORMAL); + } else + { return ApplicationRegistry.getFont(FONT_BOLD); + } } + return ApplicationRegistry.getFont(FONT_NORMAL); } } // End of LabelProviderImpl - - + private class ViewerSorterImpl extends ViewerSorter { public int category(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(MBEAN)) + { return 1; + } + return 2; } } - + /** - * Worker thread, which keeps looking for new ManagedObjects to be added and + * Worker thread, which keeps looking for new ManagedObjects to be added and * unregistered objects to be removed from the tree. * @author Bhupendra Bhardwaj */ @@ -1075,33 +1121,31 @@ public class NavigationView extends ViewPart { public void run() { - while(true) + while (true) { if (!_managedServerMap.isEmpty()) { - refreshRemovedObjects(); + refreshRemovedObjects(); refreshClosedServerConnections(); } - + try { Thread.sleep(3000); } - catch(Exception ex) - { + catch (Exception ex) + { } + + } // end of while loop + } // end of run method. + } // end of Worker class - } - - }// end of while loop - }// end of run method. - }// end of Worker class - /** * Adds the mbean to the navigation tree * @param mbean * @throws Exception */ - public void addManagedBean(ManagedBean mbean)// throws Exception + public void addManagedBean(ManagedBean mbean) // throws Exception { TreeObject treeServerObject = _managedServerMap.get(mbean.getServer()); List domains = treeServerObject.getChildren(); @@ -1111,22 +1155,25 @@ public class NavigationView extends ViewPart if (child.getName().equals(mbean.getDomain())) { domain = child; + break; } } - + addManagedBean(domain, mbean); _treeViewer.refresh(); } - + private void refreshRemovedObjects() { for (ManagedServer server : _managedServerMap.keySet()) { final ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server); - if (serverRegistry == null) // server connection is closed + if (serverRegistry == null) // server connection is closed + { continue; - + } + final List removalList = serverRegistry.getObjectsToBeRemoved(); if (removalList != null) { @@ -1145,19 +1192,22 @@ public class NavigationView extends ViewPart if (child.getName().equals(mbean.getDomain())) { domain = child; + break; } } + removeManagedObject(domain, mbean); - //serverRegistry.removeManagedObject(mbean); + // serverRegistry.removeManagedObject(mbean); } + _treeViewer.refresh(); } }); } } } - + /** * Gets the list of closed server connection from the ApplicationRegistry and then removes * the closed server nodes from the navigation view @@ -1169,20 +1219,20 @@ public class NavigationView extends ViewPart { Display display = getSite().getShell().getDisplay(); display.syncExec(new Runnable() - { - public void run() { - for (ManagedServer server : closedServers) + public void run() { - removeManagedObject(_managedServerMap.get(server)); - _managedServerMap.remove(server); - ApplicationRegistry.removeServer(server); + for (ManagedServer server : closedServers) + { + removeManagedObject(_managedServerMap.get(server)); + _managedServerMap.remove(server); + ApplicationRegistry.removeServer(server); + } + + _treeViewer.refresh(); } - - _treeViewer.refresh(); - } - }); + }); } } - -} \ No newline at end of file + +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 013bda5927..e3b0249ed3 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -21,6 +21,7 @@ package org.apache.qpid.ping; import java.util.List; +import java.util.Properties; import javax.jms.Destination; @@ -31,54 +32,36 @@ import org.apache.qpid.requestreply.PingPongProducer; * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues. * It is an all in one ping client, that produces and consumes its own pings. * + *

The constructor increments a count of the number of ping clients created. It is assumed that where many + * are created they will all be run in parallel and be active in sending and consuming pings at the same time. + * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear + * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of + * active ping clients. The {@link #getConsumersPerTopic()} method is used to supply this multiplier under these + * conditions. + * *

*
CRC Card
Responsibilities Collaborations - *
Create a ping pong producer that listens to its own pings {@link PingPongProducer} + *
Create a ping producer that listens to its own pings {@link PingPongProducer} + *
Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings. *
*/ public class PingClient extends PingPongProducer { + /** Used to count the number of ping clients created. */ private static int _pingClientCount; /** - * Creates a ping producer with the specified parameters, of which there are many. See their individual comments - * for details. This constructor creates ping pong producer but de-registers its reply-to destination message - * listener, and replaces it by listening to all of its ping destinations. + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates + * producer and consumer sessions on it, to send and recieve its pings and replies on. * - * @param brokerDetails The URL of the broker to send pings to. - * @param username The username to log onto the broker with. - * @param password The password to log onto the broker with. - * @param virtualpath The virtual host name to use on the broker. - * @param destinationName The name (or root where multiple destinations are used) of the desitination to send - * pings to. - * @param selector The selector to filter replies with. - * @param transacted Indicates whether or not pings are sent and received in transactions. - * @param persistent Indicates whether pings are sent using peristent delivery. - * @param messageSize Specifies the size of ping messages to send. - * @param verbose Indicates that information should be printed to the console on every ping. - * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover. - * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover. - * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover. - * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover. - * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all. - * @param txBatchSize Specifies the number of pings to send in each transaction. - * @param noOfDestinations The number of destinations to ping. Must be 1 or more. - * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as - * possible, with no rate restriction. - * @param pubsub True to ping topics, false to ping queues. - * @param unique True to use unique destinations for each ping pong producer, false to share. + * @param overrides Properties containing any desired overrides to the defaults. * * @throws Exception Any exceptions are allowed to fall through. */ - public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName, - String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose, - boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce, - int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique, - int ackMode, long pausetime) throws Exception + public PingClient(Properties overrides) throws Exception { - super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize, - verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate, - pubsub, unique, ackMode, pausetime); + super(overrides); _pingClientCount++; } @@ -94,6 +77,11 @@ public class PingClient extends PingPongProducer return _pingDestinations; } + /** + * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging. + * + * @return The scaling up of the number of expected pub/sub pings. + */ public int getConsumersPerTopic() { if (_isUnique) diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java new file mode 100644 index 0000000000..77526141d6 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -0,0 +1,389 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.util.MathUtils; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and + * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop + * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the + * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with + * failure conditions when using durable messaging. + * + *

The events that can stop it from sending are input from the user on the console, failure of its connection to + * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases + * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings + * with. + * + *

The event to re-connect and attempt to recieve the pings is input from the user on the console. + * + *

This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and + * additionally accepts the following parameters: + * + *

+ *
Parameters
Parameter Default Comments + *
numMessages 100 The total number of messages to send. + *
duration 30S The length of time to ping for. (Format dDhHmMsS, for d days, h hours, + * m minutes and s seconds). + *
+ * + *

This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up + * when no parameters are specified. + * + *

+ *
Parameters
Parameter Default Comments + *
uniqueDests false Prevents destination names being timestamped. + *
transacted true Only makes sense to test with transactions. + *
persistent true Only makes sense to test persistent. + *
commitBatchSize 10 + *
rate 20 Total default test time is 5 seconds. + *
+ * + *

When a number of messages or duration is specified, this ping client will ping until the first of those limits + * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will + * wait for the second signal before receiving its pings. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send and receive pings. + *
Accept user input to signal stop sending. + *
Accept user input to signal start receiving. + *
Provide feedback on pings sent versus pings received. + *
+ */ +public class PingDurableClient extends PingPongProducer implements ExceptionListener +{ + private static final Logger log = Logger.getLogger(PingDurableClient.class); + + public static final String NUM_MESSAGES_PROPNAME = "numMessages"; + public static final String NUM_MESSAGES_DEFAULT = "100"; + public static final String DURATION_PROPNAME = "duration"; + public static final String DURATION_DEFAULT = "30S"; + + /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ + private static final long TIME_OUT = 3000; + + static + { + defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); + defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); + defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); + defaults.setProperty(TRANSACTED_PROPNAME, "true"); + defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); + defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); + defaults.setProperty(RATE_PROPNAME, "20"); + } + + /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ + private int numMessages; + + /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ + private long duration; + + /** Used to indciate that this application should terminate. Set by the shutdown hook. */ + private boolean terminate = false; + + /** + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingDurableClient(Properties overrides) throws Exception + { + super(overrides); + log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); + + // Extract the additional configuration parameters. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); + String durationSpec = properties.getProperty(DURATION_PROPNAME); + + if (durationSpec != null) + { + duration = MathUtils.parseDuration(durationSpec) * 1000000; + } + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = processCommandLine(args); + PingDurableClient pingProducer = new PingDurableClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Performs the main test procedure implemented by this ping client. See the class level comment for details. + */ + public int send() throws Exception + { + log.debug("public void sendWaitReceive(): called"); + + log.debug("duration = " + duration); + log.debug("numMessages = " + numMessages); + + if (duration > 0) + { + System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); + } + + if (_rate > 0) + { + System.out.println("Sending at " + _rate + " messages per second."); + } + + if (numMessages > 0) + { + System.out.println("Sending up to " + numMessages + " messages."); + } + + // Establish the connection and the message producer. + establishConnection(true, false); + getConnection().start(); + + Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + + // Send pings until a terminating condition is received. + boolean endCondition = false; + int messagesSent = 0; + int messagesCommitted = 0; + int messagesNotCommitted = 0; + long start = System.nanoTime(); + + // Clear console in. + clearConsole(); + + while (!endCondition) + { + boolean committed = false; + + try + { + committed = sendMessage(messagesSent, message) && _transacted; + + messagesSent++; + messagesNotCommitted++; + + // Keep count of the number of messsages currently committed and pending commit. + if (committed) + { + log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); + messagesCommitted += messagesNotCommitted; + messagesNotCommitted = 0; + + System.out.println("Commited: " + messagesCommitted); + } + } + catch (JMSException e) + { + log.debug("Got JMSException whilst sending."); + _publish = false; + } + + // Determine if the end condition has been met, based on the number of messages, time passed, errors on + // the connection or user input. + long now = System.nanoTime(); + + if ((duration != 0) && ((now - start) > duration)) + { + System.out.println("Send halted because duration expired."); + endCondition = true; + } + else if ((numMessages != 0) && (messagesSent >= numMessages)) + { + System.out.println("Send halted because # messages completed."); + endCondition = true; + } + else if (System.in.available() > 0) + { + System.out.println("Send halted by user input."); + endCondition = true; + + clearConsole(); + } + else if (!_publish) + { + System.out.println("Send halted by error on the connection."); + endCondition = true; + } + } + + log.debug("messagesSent = " + messagesSent); + log.debug("messagesCommitted = " + messagesCommitted); + log.debug("messagesNotCommitted = " + messagesNotCommitted); + + System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted + + ", Messages not Committed = " + messagesNotCommitted); + + // Clean up the connection. + try + { + close(); + } + catch (JMSException e) + { + // Ignore as did best could manage to clean up. + } + + return messagesSent; + } + + private void receive(int messagesSent) throws Exception + { + // Re-establish the connection and the message consumer. + _queueJVMSequenceID = new AtomicInteger(); + _queueSharedID = new AtomicInteger(); + + establishConnection(false, true); + _consumer.setMessageListener(null); + _connection.start(); + + // Try to receive all of the pings that were successfully sent. + int messagesReceived = 0; + boolean endCondition = false; + + while (!endCondition) + { + // Message received = _consumer.receiveNoWait(); + Message received = _consumer.receive(TIME_OUT); + log.debug("received = " + received); + + if (received != null) + { + messagesReceived++; + } + + // Determine if the end condition has been met, based on the number of messages and time passed since last + // receiving a message. + if (received == null) + { + System.out.println("Timed out."); + endCondition = true; + } + else if (messagesReceived >= messagesSent) + { + System.out.println("Got all messages."); + endCondition = true; + } + } + + log.debug("messagesReceived = " + messagesReceived); + + System.out.println("Messages received: " + messagesReceived); + + // Clean up the connection. + close(); + } + + /** + * Clears any pending input from the console. + */ + private void clearConsole() + { + try + { + BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); + + // System.in.skip(System.in.available()); + while (bis.ready()) + { + bis.readLine(); + } + } + catch (IOException e) + { } + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with + * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the + * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving + * message should stop, not that the application should termiante. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + terminate = true; + } + }); + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index c6a69807a3..44f7083bb5 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -34,20 +34,22 @@ import javax.jms.*; import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; -import org.apache.qpid.topic.Config; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.util.CommandLineParser; import uk.co.thebadgerset.junit.extensions.BatchedThrottle; import uk.co.thebadgerset.junit.extensions.Throttle; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; /** * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may @@ -65,31 +67,37 @@ import uk.co.thebadgerset.junit.extensions.Throttle; * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: * - *

- *
CRC Card
Parameter Default Comments - *
messageSize 0 Message size in bytes. Not including any headers. - *
destinationName ping The root name to use to generate destination names to ping. - *
persistent false Determines whether peristent delivery is used. - *
transacted false Determines whether messages are sent/received in transactions. + *

+ *
Parameters
Parameter Default Comments + *
messageSize 0 Message size in bytes. Not including any headers. + *
destinationName ping The root name to use to generate destination names to ping. + *
persistent false Determines whether peristent delivery is used. + *
transacted false Determines whether messages are sent/received in transactions. *
broker tcp://localhost:5672 Determines the broker to connect to. - *
virtualHost test Determines the virtual host to send all ping over. - *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. - *
verbose false The verbose flag for debugging. Prints to console on every message. - *
pubsub false Whether to ping topics or queues. Uses p2p by default. - *
failAfterCommit false Whether to prompt user to kill broker after a commit batch. - *
failBeforeCommit false Whether to prompt user to kill broker before a commit batch. - *
failAfterSend false Whether to prompt user to kill broker after a send. - *
failBeforeSend false Whether to prompt user to kill broker before a send. - *
failOnce true Whether to prompt for failover only once. - *
username guest The username to access the broker with. - *
password guest The password to access the broker with. - *
selector null Not used. Defines a message selector to filter pings with. - *
destinationCount 1 The number of receivers listening to the pings. - *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. - *
commitBatchSize 1 The number of messages per transaction in transactional mode. - *
uniqueDests true Whether each receiver only listens to one ping destination or all. - *
ackMode NO_ACK The message acknowledgement mode. - *
pauseBatch 0 In milliseconds. A pause to insert between transaction batches. + *
virtualHost test Determines the virtual host to send all ping over. + *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. + *
verbose false The verbose flag for debugging. Prints to console on every message. + *
pubsub false Whether to ping topics or queues. Uses p2p by default. + *
failAfterCommit false Whether to prompt user to kill broker after a commit batch. + *
failBeforeCommit false Whether to prompt user to kill broker before a commit batch. + *
failAfterSend false Whether to prompt user to kill broker after a send. + *
failBeforeSend false Whether to prompt user to kill broker before a send. + *
failOnce true Whether to prompt for failover only once. + *
username guest The username to access the broker with. + *
password guest The password to access the broker with. + *
selector null Not used. Defines a message selector to filter pings with. + *
destinationCount 1 The number of receivers listening to the pings. + *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. + *
commitBatchSize 1 The number of messages per transaction in transactional mode. + *
uniqueDests true Whether each receiver only listens to one ping destination or all. + *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE + *
pauseBatch 0 In milliseconds. A pause to insert between transaction batches. *
* *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop @@ -121,7 +129,7 @@ import uk.co.thebadgerset.junit.extensions.Throttle; */ public class PingPongProducer implements Runnable, MessageListener, ExceptionListener { - private static final Logger _logger = Logger.getLogger(PingPongProducer.class); + private static final Logger log = Logger.getLogger(PingPongProducer.class); /** Holds the name of the property to get the test message size from. */ public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; @@ -181,31 +189,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; /** Holds the default failover after commit test flag. */ - public static final String FAIL_AFTER_COMMIT_DEFAULT = "false"; + public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; /** Holds the name of the proeprty to get the fail before commit flag from. */ public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; /** Holds the default failover before commit test flag. */ - public static final String FAIL_BEFORE_COMMIT_DEFAULT = "false"; + public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; /** Holds the name of the proeprty to get the fail after send flag from. */ public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; /** Holds the default failover after send test flag. */ - public static final String FAIL_AFTER_SEND_DEFAULT = "false"; + public static final boolean FAIL_AFTER_SEND_DEFAULT = false; /** Holds the name of the property to get the fail before send flag from. */ public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; /** Holds the default failover before send test flag. */ - public static final String FAIL_BEFORE_SEND_DEFAULT = "false"; + public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; /** Holds the name of the property to get the fail once flag from. */ public static final String FAIL_ONCE_PROPNAME = "failOnce"; /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ - public static final String FAIL_ONCE_DEFAULT = "true"; + public static final boolean FAIL_ONCE_DEFAULT = true; /** Holds the name of the property to get the broker access username from. */ public static final String USERNAME_PROPNAME = "username"; @@ -223,7 +231,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String SELECTOR_PROPNAME = "selector"; /** Holds the default message selector. */ - public static final String SELECTOR_DEFAULT = null; + public static final String SELECTOR_DEFAULT = ""; /** Holds the name of the proeprty to get the destination count from. */ public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; @@ -253,7 +261,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String ACK_MODE_PROPNAME = "ackMode"; /** Defines the default message acknowledgement mode. */ - public static final int ACK_MODE_DEFAULT = Session.NO_ACKNOWLEDGE; + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; /** Holds the name of the property to get the pause between batches property from. */ public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch"; @@ -273,96 +281,140 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ - private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); - - /** - * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple - * ping producers on the same JVM. - */ - private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + /** Holds the default configuration properties. */ + public static ParsedProperties defaults = new ParsedProperties(); - /** A convenient formatter to use when time stamping output. */ - protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + static + { + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + } - /** - * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when - * creating multiple ping producers in the same JVM. - */ - protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); - - /** Holds the destination where the response messages will arrive. */ - private Destination _replyDestination; + protected String _brokerDetails; + protected String _username; + protected String _password; + protected String _virtualpath; + protected String _destinationName; + protected String _selector; + protected boolean _transacted; /** Determines whether this producer sends persistent messages. */ protected boolean _persistent; /** Holds the acknowledgement mode used for sending and receiving messages. */ - private int _ackMode = Session.NO_ACKNOWLEDGE; + private int _ackMode; /** Determines what size of messages this producer sends. */ protected int _messageSize; /** Used to indicate that the ping loop should print out whenever it pings. */ - protected boolean _verbose = VERBOSE_DEFAULT; + protected boolean _verbose; - /** Holds the session on which ping replies are received. */ - protected Session _consumerSession; + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + protected boolean _isPubSub; - /** Used to restrict the sending rate to a specified limit. */ - private Throttle _rateLimiter = null; + /** Flag used to indicate if the destinations should be unique client. */ + protected boolean _isUnique; - /** Holds a message listener that this message listener chains all its messages to. */ - private ChainedMessageListener _chainedMessageListener = null; + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + protected boolean _failBeforeCommit; - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - protected boolean _isPubSub = PUBSUB_DEFAULT; + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + protected boolean _failAfterCommit; - /** Flag used to indicate if the destinations should be unique client. */ - protected static boolean _isUnique = UNIQUE_DESTS_DEFAULT; + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + protected boolean _failBeforeSend; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + protected boolean _failAfterSend; + + /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + protected boolean _failOnce; + + /** Holds the number of sends that should be performed in every transaction when using transactions. */ + protected int _txBatchSize; + + protected int _noOfDestinations; + protected int _rate; + + /** Holds the wait time to insert between every batch of messages committed. */ + private long _pauseBatch; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ + private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); /** - * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers - * on the same JVM using this id generator will allow them to ping on the same queues. + * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple + * ping producers on the same JVM. */ - protected AtomicInteger _queueSharedId = new AtomicInteger(); + private static Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ - protected boolean _publish = true; + /** A convenient formatter to use when time stamping output. */ + protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); /** Holds the connection to the broker. */ - private Connection _connection; + protected Connection _connection; + + /** Holds the session on which ping replies are received. */ + protected Session _consumerSession; /** Holds the producer session, needed to create ping messages. */ - private Session _producerSession; + protected Session _producerSession; - /** Holds the set of destinations that this ping producer pings. */ - protected List _pingDestinations = new ArrayList(); + /** Holds the destination where the response messages will arrive. */ + protected Destination _replyDestination; - /** Holds the message producer to send the pings through. */ - protected MessageProducer _producer; + /** Holds the set of destinations that this ping producer pings. */ + protected List _pingDestinations; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ - protected boolean _failBeforeCommit = false; + /** Used to restrict the sending rate to a specified limit. */ + protected Throttle _rateLimiter; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ - protected boolean _failAfterCommit = false; + /** Holds a message listener that this message listener chains all its messages to. */ + protected ChainedMessageListener _chainedMessageListener = null; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ - protected boolean _failBeforeSend = false; + /** + * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when + * creating multiple ping producers in the same JVM. + */ + protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ - protected boolean _failAfterSend = false; + /** + * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers + * on the same JVM using this id generator will allow them to ping on the same queues. + */ + protected AtomicInteger _queueSharedID = new AtomicInteger(); - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ - protected boolean _failOnce = true; + /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + protected boolean _publish = true; - /** Holds the number of sends that should be performed in every transaction when using transactions. */ - protected int _txBatchSize = TX_BATCH_SIZE_DEFAULT; + /** Holds the message producer to send the pings through. */ + protected MessageProducer _producer; - /** Holds the wait time to insert between every batch of messages committed. */ - private static long _pauseBatch = PAUSE_AFTER_BATCH_DEFAULT; + /** Holds the message consumer to receive the ping replies through. */ + protected MessageConsumer _consumer; /** * Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the @@ -370,202 +422,195 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ static int _consumersPerTopic = 1; + /** The prompt to display when asking the user to kill the broker for failover testing. */ + private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; + /** - * Creates a ping producer with the specified parameters, of which there are many. See their individual comments for - * details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, to send - * and recieve its pings and replies on. The other options are kept, and control how this pinger behaves. + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on + * it, to send and recieve its pings and replies on. * - * @param brokerDetails The URL of the broker to send pings to. - * @param username The username to log onto the broker with. - * @param password The password to log onto the broker with. - * @param virtualpath The virtual host name to use on the broker. - * @param destinationName The name (or root where multiple destinations are used) of the desitination to send pings to. - * @param selector The selector to filter replies with. - * @param transacted Indicates whether or not pings are sent and received in transactions. - * @param persistent Indicates whether pings are sent using peristent delivery. - * @param messageSize Specifies the size of ping messages to send. - * @param verbose Indicates that information should be printed to the console on every ping. - * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test - * failover. - * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test - * failover. - * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover. - * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test - * failover. - * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not - * all. - * @param txBatchSize Specifies the number of pings to send in each transaction. - * @param noOfDestinations The number of destinations to ping. Must be 1 or more. - * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as - * possible, with no rate restriction. - * @param pubsub True to ping topics, false to ping queues. - * @param unique True to use unique destinations for each ping pong producer, false to share. + * @param overrides Properties containing any desired overrides to the defaults. * * @throws Exception Any exceptions are allowed to fall through. */ - public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, - String destinationName, String selector, boolean transacted, boolean persistent, int messageSize, - boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend, - boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate, - boolean pubsub, boolean unique, int ackMode, long pause) throws Exception + public PingPongProducer(Properties overrides) throws Exception { - _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username - + ", String password = " + password + ", String virtualpath = " + virtualpath - + ", String destinationName = " + destinationName + ", String selector = " + selector - + ", boolean transacted = " + transacted + ", boolean persistent = " + persistent - + ", int messageSize = " + messageSize + ", boolean verbose = " + verbose + ", boolean afterCommit = " - + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend - + ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = " - + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate - + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + ", ackMode = " + ackMode - + "): called"); - - // Keep all the relevant options. - _persistent = persistent; - _messageSize = messageSize; - _verbose = verbose; - _failAfterCommit = afterCommit; - _failBeforeCommit = beforeCommit; - _failAfterSend = afterSend; - _failBeforeSend = beforeSend; - _failOnce = failOnce; - _txBatchSize = txBatchSize; - _isPubSub = pubsub; - _isUnique = unique; - _pauseBatch = pause; - - if (ackMode != 0) - { - _ackMode = ackMode; - } + log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + + // Create a set of parsed properties from the defaults overriden by the passed in values. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + // Extract the configuration properties to set the pinger up with. + _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _username = properties.getProperty(USERNAME_PROPNAME); + _password = properties.getProperty(PASSWORD_PROPNAME); + _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); + _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); + _selector = properties.getProperty(SELECTOR_PROPNAME); + _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); + _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); + _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); + _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); + _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); + _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); + _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); + _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); + _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); + _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); + _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); + _rate = properties.getPropertyAsInteger(RATE_PROPNAME); + _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); + _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); + _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME); // Check that one or more destinations were specified. - if (noOfDestinations < 1) + if (_noOfDestinations < 1) { throw new IllegalArgumentException("There must be at least one destination."); } - // Create a connection to the broker. + // Set up a throttle to control the send rate, if a rate > 0 is specified. + if (_rate > 0) + { + _rateLimiter = new BatchedThrottle(); + _rateLimiter.setRate(_rate); + } + + // Create the connection and message producers/consumers. + // establishConnection(true, true); + } + + /** + * Establishes a connection to the broker and creates message consumers and producers based on the parameters + * that this ping client was created with. + * + * @param producer Flag to indicate whether or not the producer should be set up. + * @param consumer Flag to indicate whether or not the consumers should be set up. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public void establishConnection(boolean producer, boolean consumer) throws Exception + { + log.debug("public void establishConnection(): called"); + + // Generate a unique identifying name for this client, based on it ip address and the current time. InetAddress address = InetAddress.getLocalHost(); String clientID = address.getHostName() + System.currentTimeMillis(); - _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath); + // Create a connection to the broker. + createConnection(clientID); // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = (Session) getConnection().createSession(transacted, _ackMode); - _consumerSession = (Session) getConnection().createSession(transacted, _ackMode); + _producerSession = (Session) getConnection().createSession(_transacted, _ackMode); + _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode); - // Set up a throttle to control the send rate, if a rate > 0 is specified. - if (rate > 0) + // Create the destinations to send pings to and receive replies from. + _replyDestination = _consumerSession.createTemporaryQueue(); + createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique); + + // Create the message producer only if instructed to. + if (producer) { - _rateLimiter = new BatchedThrottle(); - _rateLimiter.setRate(rate); + createProducer(); } - // Create the temporary queue for replies. - _replyDestination = _consumerSession.createTemporaryQueue(); + // Create the message consumer only if instructed to. + if (consumer) + { + createReplyConsumers(getReplyDestinations(), _selector); + } + } - // Create the producer and the consumers for all reply destinations. - createProducer(); - createPingDestinations(noOfDestinations, selector, destinationName, unique); - createReplyConsumers(getReplyDestinations(), selector); + /** + * Establishes a connection to the broker, based on the configuration parameters that this ping client was + * created with. + * + * @param clientID The clients identifier. + * + * @throws AMQException Any underlying exceptions are allowed to fall through. + * @throws URLSyntaxException Any underlying exceptions are allowed to fall through. + */ + protected void createConnection(String clientID) throws AMQException, URLSyntaxException + { + _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); } /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs to be - * started to bounce the pings back again. + * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs + * to be started to bounce the pings back again. * * @param args The command line arguments. - * - * @throws Exception When something went wrong with the test */ - public static void main(String[] args) throws Exception + public static void main(String[] args) { - // Extract the command line. - Config config = new Config(); - config.setOptions(args); - if (args.length == 0) + try { - _logger.info("Running test with default values..."); - // usage(); - // System.exit(0); - } + Properties options = processCommandLine(args); - String brokerDetails = config.getHost() + ":" + config.getPort(); - String virtualpath = VIRTUAL_HOST_DEFAULT; - String selector = (config.getSelector() == null) ? SELECTOR_DEFAULT : config.getSelector(); - boolean verbose = true; - boolean transacted = config.isTransacted(); - boolean persistent = config.usePersistentMessages(); - int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT; - // int messageCount = config.getMessages(); - int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT; - int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT; - int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT; - boolean pubsub = config.isPubSub(); - - String destName = config.getDestination(); - if (destName == null) - { - destName = PING_QUEUE_NAME_DEFAULT; - } + // Create a ping producer overriding its defaults with all options passed on the command line. + PingPongProducer pingProducer = new PingPongProducer(options); + pingProducer.establishConnection(true, true); - boolean afterCommit = false; - boolean beforeCommit = false; - boolean afterSend = false; - boolean beforeSend = false; - boolean failOnce = false; + // Start the ping producers dispatch thread running. + pingProducer.getConnection().start(); - for (String arg : args) - { - if (arg.startsWith("failover:")) - { - // failover:: | failover:once - String[] parts = arg.split(":"); - if (parts.length == 3) - { - if (parts[2].equals("commit")) - { - afterCommit = parts[1].equals("after"); - beforeCommit = parts[1].equals("before"); - } + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - if (parts[2].equals("send")) - { - afterSend = parts[1].equals("after"); - beforeSend = parts[1].equals("before"); - } + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + pingProducer.getConnection().setExceptionListener(pingProducer); - if (parts[1].equals("once")) - { - failOnce = true; - } - } - else - { - System.out.println("Unrecognized failover request:" + arg); - } - } + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. + Thread pingThread = new Thread(pingProducer); + pingThread.run(); + pingThread.join(); } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } - // Create a ping producer to handle the request/wait/reply cycle. - PingPongProducer pingProducer = - new PingPongProducer(brokerDetails, USERNAME_DEFAULT, PASSWORD_DEFAULT, virtualpath, destName, selector, - transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, - beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0); + /** + * Extracts all name=value pairs from the command line, sets them all as system properties and also returns + * a map of properties containing them. + * + * @param args The command line. + * + * @return A set of properties containing all name=value pairs from the command line. + * + * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the + * CommandLineParser class. + */ + protected static Properties processCommandLine(String[] args) + { + // Use the command line parser to evaluate the command line. + CommandLineParser commandLine = new CommandLineParser(new String[][] {}); - pingProducer.getConnection().start(); + // Capture the command line arguments or display errors and correct usage and then exit. + Properties options = null; - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + try + { + options = commandLine.parseCommandLine(args); - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingProducer.getConnection().setExceptionListener(pingProducer); + // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up + // overridden values from there. + commandLine.addCommandLineToSysProperties(); + } + catch (IllegalArgumentException e) + { + System.out.println(commandLine.getErrors()); + System.out.println(commandLine.getUsage()); + System.exit(1); + } - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingProducer); - pingThread.run(); - pingThread.join(); + return options; } /** @@ -582,9 +627,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime); } catch (InterruptedException ie) - { - // ignore - } + { } } } @@ -596,27 +639,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public List getReplyDestinations() { - _logger.debug("public List getReplyDestinations(): called"); + log.debug("public List getReplyDestinations(): called"); List replyDestinations = new ArrayList(); replyDestinations.add(_replyDestination); + log.debug("replyDestinations = " + replyDestinations); + return replyDestinations; } /** - * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery flag - * is set accoring the ping producer creation options. + * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery + * flag is set accoring the ping producer creation options. * * @throws JMSException Any JMSExceptions are allowed to fall through. */ public void createProducer() throws JMSException { - _logger.debug("public void createProducer(): called"); + log.debug("public void createProducer(): called"); _producer = (MessageProducer) _producerSession.createProducer(null); - // _producer.setDisableMessageTimestamp(true); _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); } /** @@ -632,13 +678,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws JMSException Any JMSExceptions are allowed to fall through. */ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique) - throws JMSException + throws JMSException { - _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations - + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " - + unique + "): called"); + log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called"); + + _pingDestinations = new ArrayList(); // Create the desired number of ping destinations and consumers for them. + log.debug("Creating " + noOfDestinations + " destinations to ping."); + for (int i = 0; i < noOfDestinations; i++) { AMQDestination destination; @@ -648,26 +697,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. if (unique) { - _logger.debug("Creating unique destinations."); + log.debug("Creating unique destinations."); id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); } else { - _logger.debug("Creating shared destinations."); - id = "_" + _queueSharedId.incrementAndGet(); + log.debug("Creating shared destinations."); + id = "_" + _queueSharedID.incrementAndGet(); } // Check if this is a pub/sub pinger, in which case create topics. if (_isPubSub) { destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); - _logger.debug("Created topic " + destination); + log.debug("Created topic " + destination); } // Otherwise this is a p2p pinger, in which case create queues. else { destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id); - _logger.debug("Created queue " + destination); + log.debug("Created queue " + destination); } // Keep the destination. @@ -675,6 +724,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } } + /** + * Creates consumers for the specified destinations and registers this pinger to listen to their messages. + * + * @param destinations The destinations to listen to. + * @param selector A selector to filter the messages with. + * + * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. + */ + public void createReplyConsumers(Collection destinations, String selector) throws JMSException + { + log.debug("public void createReplyConsumers(Collection destinations = " + destinations + + ", String selector = " + selector + "): called"); + + for (Destination destination : destinations) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + _consumer = + _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, + selector); + _consumer.setMessageListener(this); + } + } + /** * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating * reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map. @@ -683,13 +755,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void onMessage(Message message) { - _logger.debug("public void onMessage(Message message): called"); + log.debug("public void onMessage(Message message): called"); try { // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); - _logger.debug("correlationID = " + correlationID); + log.debug("correlationID = " + correlationID); // Countdown on the traffic light if there is one for the matching correlation id. PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); @@ -701,7 +773,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Restart the timeout timer on every message. perCorrelationId.timeOutStart = System.nanoTime(); - _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); // Decrement the countdown latch. Before this point, it is possible that two threads might enter this // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block @@ -716,20 +788,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis trueCount = trafficLight.getCount(); remainingCount = trueCount - 1; - _logger.debug("remainingCount = " + remainingCount); - _logger.debug("trueCount = " + trueCount); + log.debug("remainingCount = " + remainingCount); + log.debug("trueCount = " + trueCount); // Commit on transaction batch size boundaries. At this point in time the waiting producer remains // blocked, even on the last message. if ((remainingCount % _txBatchSize) == 0) { commitTx(_consumerSession); - if (!_consumerSession.getTransacted() && - _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) - { - // Acknowledge the messages when the session is not transacted but client_ack - ((AMQSession) _consumerSession).acknowledge(); - } } // Forward the message and remaining count to any interested chained message listener. @@ -748,7 +814,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } else { - _logger.warn("Got unexpected message with correlationId: " + correlationID); + log.warn("Got unexpected message with correlationId: " + correlationID); } // Print out ping times for every message in verbose mode only. @@ -759,48 +825,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (timestamp != null) { long diff = System.nanoTime() - timestamp; - _logger.trace("Time for round trip (nanos): " + diff); + log.trace("Time for round trip (nanos): " + diff); } } } catch (JMSException e) { - _logger.warn("There was a JMSException: " + e.getMessage(), e); + log.warn("There was a JMSException: " + e.getMessage(), e); } - _logger.debug("public void onMessage(Message message): ending"); - } - - /** - * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a - * reply arrives, then a null reply is returned from this method. This method generates a new unqiue correlation id for - * the messages. - * - * @param message The message to send. - * @param numPings The number of ping messages to send. - * @param timeout The timeout in milliseconds. - * - * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for - * all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - * @throws InterruptedException When interrupted by a timeout. - */ - public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException - { - _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + "): called"); - - // Create a unique correlation id to put on the messages before sending them. - String messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); - - return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId); - } - - public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException - { - return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId); + log.debug("public void onMessage(Message message): ending"); } /** @@ -808,10 +842,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * reply arrives, then a null reply is returned from this method. This method allows the caller to specify the * correlation id. * - * @param message The message to send. + * @param message The message to send. If this is null, one is generated. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. - * @param messageCorrelationId The message correlation id. + * @param messageCorrelationId The message correlation id. If this is null, one is generated. * * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for * all prematurely. @@ -820,10 +854,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws InterruptedException When interrupted by a timeout */ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException + throws JMSException, InterruptedException { - _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + + // Generate a unique correlation id to put on the messages before sending them, if one was not specified. + if (messageCorrelationId == null) + { + messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); + } try { @@ -858,31 +898,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis allMessagesReceived = numReplies == getExpectedNumPings(numPings); - _logger.debug("numReplies = " + numReplies); - _logger.debug("allMessagesReceived = " + allMessagesReceived); + log.debug("numReplies = " + numReplies); + log.debug("allMessagesReceived = " + allMessagesReceived); // Recheck the timeout condition. long now = System.nanoTime(); long lastMessageReceievedAt = perCorrelationId.timeOutStart; timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - _logger.debug("now = " + now); - _logger.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + log.debug("now = " + now); + log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); } while (!timedOut && !allMessagesReceived); if ((numReplies < getExpectedNumPings(numPings)) && _verbose) { - _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } else if (_verbose) { - _logger.info("Got all replies on id, " + messageCorrelationId); + log.info("Got all replies on id, " + messageCorrelationId); } commitTx(_consumerSession); - _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); return numReplies; } @@ -905,8 +945,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { - _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called"); if (message == null) { @@ -923,40 +963,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Send all of the ping messages. for (int i = 0; i < numPings; i++) { - // Reset the committed flag to indicate that there are uncommitted messages. + // Reset the committed flag to indicate that there may be uncommitted messages. committed = false; // Re-timestamp the message. message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - // Round robin the destinations as the messages are sent. - // return _destinationCount; - sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message); - - // Apply message rate throttling if a rate limit has been set up. - if (_rateLimiter != null) - { - _rateLimiter.throttle(); - } - - // Call commit every time the commit batch size is reached. - if ((i % _txBatchSize) == 0) - { - commitTx(_producerSession); - committed = true; - - /* This pause is required for some cases. eg in load testing when sessions are non-transacted the - Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear - the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0 - */ - pause(_pauseBatch); - } + // Send the message, passing in the message count. + committed = sendMessage(i, message); // Spew out per message timings on every message sonly in verbose mode. if (_verbose) { - _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " - + messageCorrelationId); + log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); } } @@ -968,7 +987,70 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. + * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of + * messages sent so far must be specified and is used to round robin the ping destinations (where there are more + * than one), and to determine if the transaction batch size has been reached and the sent messages should be + * committed. + * + * @param i The count of messages sent so far in a loop of multiple calls to this send method. + * @param message The message to send. + * + * @return true if the messages were committed, false otherwise. + * + * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. + */ + protected boolean sendMessage(int i, Message message) throws JMSException + { + // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + // log.debug("_txBatchSize = " + _txBatchSize); + + // Round robin the destinations as the messages are sent. + Destination destination = _pingDestinations.get(i % _pingDestinations.size()); + + // Prompt the user to kill the broker when doing failover testing. + if (_failBeforeSend) + { + if (_failOnce) + { + _failBeforeSend = false; + } + + log.trace("Failing Before Send"); + waitForUser(KILL_BROKER_PROMPT); + } + + // Send the message either to its round robin destination, or its default destination. + if (destination == null) + { + _producer.send(message); + } + else + { + _producer.send(destination, message); + } + + // Apply message rate throttling if a rate limit has been set up. + if (_rateLimiter != null) + { + _rateLimiter.throttle(); + } + + // Call commit every time the commit batch size is reached. + boolean committed = false; + + // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. + if (((i + 1) % _txBatchSize) == 0) + { + committed = commitTx(_producerSession); + } + + return committed; + } + + /** + * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction + * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, + * which will terminate the pinger. */ public void pingLoop() { @@ -979,25 +1061,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); // Send the message and wait for a reply. - pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT); + pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); } catch (JMSException e) { _publish = false; - _logger.debug("There was a JMSException: " + e.getMessage(), e); + log.debug("There was a JMSException: " + e.getMessage(), e); } catch (InterruptedException e) { _publish = false; - _logger.debug("There was an interruption: " + e.getMessage(), e); + log.debug("There was an interruption: " + e.getMessage(), e); } } - public Destination getReplyDestination() - { - return getReplyDestinations().get(0); - } - /** * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here. * @@ -1038,8 +1115,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag has - * been cleared. + * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this + * flag has been cleared. */ public void stop() { @@ -1066,8 +1143,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void onException(JMSException e) { + log.debug("public void onException(JMSException e = " + e + "): called", e); _publish = false; - _logger.debug("There was a JMSException: " + e.getMessage(), e); } /** @@ -1079,12 +1156,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook() { return new Thread(new Runnable() - { - public void run() { - stop(); - } - }); + public void run() + { + stop(); + } + }); } /** @@ -1097,29 +1174,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis return _connection; } - /** - * Creates consumers for the specified destinations and registers this pinger to listen to their messages. - * - * @param destinations The destinations to listen to. - * @param selector A selector to filter the messages with. - * - * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. - */ - public void createReplyConsumers(Collection destinations, String selector) throws JMSException - { - _logger.debug("public void createReplyConsumers(Collection destinations = " + destinations - + ", String selector = " + selector + "): called"); - - for (Destination destination : destinations) - { - // Create a consumer for the destination and set this pinger to listen to its messages. - MessageConsumer consumer = - _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, - selector); - consumer.setMessageListener(this); - } - } - /** * Closes the pingers connection. * @@ -1127,11 +1181,24 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void close() throws JMSException { - _logger.debug("public void close(): called"); + log.debug("public void close(): called"); - if (_connection != null) + try + { + if (_connection != null) + { + _connection.close(); + } + } + finally { - _connection.close(); + _connection = null; + _producerSession = null; + _consumerSession = null; + _producer = null; + _consumer = null; + _pingDestinations = null; + _replyDestination = null; } } @@ -1150,25 +1217,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. * + * @return true if the session was committed, false if it was not. + * * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit * method, because commits only apply to transactional pingers, but fail after send applied to transactional and * non-transactional alike. */ - protected void commitTx(Session session) throws JMSException + protected boolean commitTx(Session session) throws JMSException { - _logger.debug("protected void commitTx(Session session): called"); + log.debug("protected void commitTx(Session session): called"); + + boolean committed = false; - _logger.trace("Batch time reached"); + log.trace("Batch time reached"); if (_failAfterSend) { - _logger.trace("Batch size reached"); + log.trace("Batch size reached"); if (_failOnce) { _failAfterSend = false; } - _logger.trace("Failing After Send"); - doFailover(); + log.trace("Failing After Send"); + waitForUser(KILL_BROKER_PROMPT); } if (session.getTransacted()) @@ -1182,13 +1253,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failBeforeCommit = false; } - _logger.trace("Failing Before Commit"); - doFailover(); + log.trace("Failing Before Commit"); + waitForUser(KILL_BROKER_PROMPT); } - long l = System.currentTimeMillis(); + long l = System.nanoTime(); session.commit(); - _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms"); + committed = true; + log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms"); if (_failAfterCommit) { @@ -1197,84 +1269,56 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failAfterCommit = false; } - _logger.trace("Failing After Commit"); - doFailover(); + log.trace("Failing After Commit"); + waitForUser(KILL_BROKER_PROMPT); } - _logger.trace("Session Commited."); + log.trace("Session Commited."); } catch (JMSException e) { - _logger.trace("JMSException on commit:" + e.getMessage(), e); + log.trace("JMSException on commit:" + e.getMessage(), e); // Warn that the bounce back client is not available. if (e.getLinkedException() instanceof AMQNoConsumersException) { - _logger.debug("No consumers on queue."); + log.debug("No consumers on queue."); } try { session.rollback(); - _logger.trace("Message rolled back."); + log.trace("Message rolled back."); } catch (JMSException jmse) { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + log.trace("JMSE on rollback:" + jmse.getMessage(), jmse); // Both commit and rollback failed. Throw the rollback exception. throw jmse; } } } + + return committed; } /** - * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination of - * the ping producer. If an explicit destination is set, this overrides the default. + * Outputs a prompt to the console and waits for the user to press return. * - * @param destination The destination to send to. - * @param message The message to send. - * - * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. + * @param prompt The prompt to display on the console. */ - protected void sendMessage(Destination destination, Message message) throws JMSException + protected void waitForUser(String prompt) { - if (_failBeforeSend) - { - if (_failOnce) - { - _failBeforeSend = false; - } - - _logger.trace("Failing Before Send"); - doFailover(); - } - - if (destination == null) - { - _producer.send(message); - } - else - { - _producer.send(destination, message); - } - } + System.out.println(prompt); - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block until the - * user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now then press return"); try { System.in.read(); } catch (IOException e) { - // ignore + // Ignored. } System.out.println("Continuing."); diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index a782058fd4..3b8e670f8f 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -86,7 +86,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Sets up the test parameters with defaults. testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); + Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); } /** @@ -159,7 +159,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId); + int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); // Check that all the replies were received and log a fail if they were not. if (numReplies < perCorrelationId._expectedCount) @@ -247,8 +247,8 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA String correlationId = message.getJMSCorrelationID(); _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount - + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId()); + + "): called on batch boundary for message id: " + correlationId + " with thread id: " + + Thread.currentThread().getId()); // Get the details for the correlation id and check that they are not null. They can become null // if a test times out. diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java index 620ddd13f7..b303e16d2c 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -175,7 +175,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); // Check that all the replies were received and log a fail if they were not. if (numReplies < numPings) diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 6fb9d543a0..fd3bc3ff23 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -66,19 +66,22 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware ThreadLocal threadSetup = new ThreadLocal(); /** Holds a property reader to extract the test parameters from. */ - protected ParsedProperties testParameters = new TestContextProperties(System.getProperties()); + protected ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); public PingTestPerf(String name) { super(name); + _logger.debug("testParameters = " + testParameters); + // Sets up the test parameters with defaults. - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT); + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT); testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); + PingPongProducer.PING_QUEUE_NAME_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - PingPongProducer.PERSISTENT_MODE_DEFAULT); + PingPongProducer.PERSISTENT_MODE_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); @@ -90,20 +93,20 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - PingPongProducer.DESTINATION_COUNT_DEFAULT); + PingPongProducer.DESTINATION_COUNT_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT); + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ } /** @@ -139,20 +142,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger( - PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean( - PingPongProducer.PERSISTENT_MODE_PROPNAME)); + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // start the test long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); // Fail the test if the timeout was exceeded. if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) { Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " - + numReplies); + + numReplies); } } @@ -167,43 +168,13 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware { PerThreadSetup perThreadSetup = new PerThreadSetup(); - // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); - String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); - String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); - String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); - String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); - boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); - boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); - String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); - boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); - int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME); - int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME); - boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); - boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME); - boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME); - boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME); - boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME); - int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME); - Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME); - boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME); - int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME); - int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME); - - // Extract the test set up paramaeters. - int destinationscount = - Integer.parseInt(testParameters.getProperty(PingPongProducer.DESTINATION_COUNT_PROPNAME)); - // This is synchronized because there is a race condition, which causes one connection to sleep if // all threads try to create connection concurrently. synchronized (this) { // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName, - selector, transacted, persistent, messageSize, verbose, - failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend, - failOnce, batchSize, destinationscount, rate, pubsub, unique, - ackMode, pausetime); + perThreadSetup._pingClient = new PingClient(testParameters); + perThreadSetup._pingClient.establishConnection(true, true); } // Start the client connection perThreadSetup._pingClient.getConnection().start(); diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index a75cbf7e19..a09324b568 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -69,52 +69,55 @@ public class PingPongTestPerf extends AsymptoticTestCase // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. // private Properties testParameters = System.getProperties(); - private ParsedProperties testParameters = new TestContextProperties(System.getProperties()); + private ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); public PingPongTestPerf(String name) { super(name); + _logger.debug(testParameters); + // Sets up the test parameters with defaults. - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, - Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); + Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); + PingPongProducer.PING_QUEUE_NAME_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); + Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, - Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); + Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, - Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); + Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, - Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); + Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); + Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, - Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); + Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, - Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); + Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT); + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ } /** @@ -147,14 +150,12 @@ public class PingPongTestPerf extends AsymptoticTestCase // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger( - PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean( - PingPongProducer.PERSISTENT_MODE_PROPNAME)); + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the message and wait for a reply. int numReplies = - perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT); + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); // Fail the test if the timeout was exceeded. if (numReplies != numPings) @@ -182,37 +183,21 @@ public class PingPongTestPerf extends AsymptoticTestCase boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); - int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME); - int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME); boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); - boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME); - boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME); - boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME); - boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME); - int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME); - Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME); - boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME); - int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME); - long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME); synchronized (this) { // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath, - destinationName, persistent, transacted, selector, - verbose, pubsub); + perThreadSetup._testPingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + transacted, selector, verbose, pubsub); // Start the connections for client and producer running. perThreadSetup._testPingBouncer.getConnection().start(); - // Establish a ping-pong client on the ping queue to send the pings with. - - perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath, - destinationName, selector, transacted, persistent, - messageSize, verbose, failAfterCommit, - failBeforeCommit, failAfterSend, failBeforeSend, - failOnce, batchSize, 0, rate, pubsub, unique, - ackMode, pause); + // Establish a ping-pong client on the ping queue to send the pings and receive replies with. + perThreadSetup._testPingProducer = new PingPongProducer(testParameters); + perThreadSetup._testPingProducer.establishConnection(true, true); perThreadSetup._testPingProducer.getConnection().start(); } -- cgit v1.2.1