diff options
Diffstat (limited to 'qpid/java')
| -rw-r--r-- | qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java | 119 |
1 files changed, 100 insertions, 19 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java index c7e6e1f745..ae8ba468f0 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java @@ -78,10 +78,14 @@ public class BDBStoreUpgradeTestPreparer public static final String QUEUE_WITH_DLQ_NAME="myQueueWithDLQ"; public static final String NONEXCLUSIVE_WITH_ERRONEOUS_OWNER = "nonexclusive-with-erroneous-owner"; public static final String MISUSED_OWNER = "misused-owner-as-description"; + private static final String VIRTUAL_HOST_NAME = "test"; + private static final String SORTED_QUEUE_NAME = "mySortedQueue"; + private static final String SORT_KEY = "mySortKey"; + private static final String TEST_EXCHANGE_NAME = "myCustomExchange"; + private static final String TEST_QUEUE_NAME = "myCustomQueue"; private static AMQConnectionFactory _connFac; - private static final String CONN_URL = - "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; + private static final String CONN_URL = "amqp://guest:guest@clientid/" + VIRTUAL_HOST_NAME + "?brokerlist='tcp://localhost:5672'"; /** * Create a BDBStoreUpgradeTestPreparer instance @@ -145,7 +149,7 @@ public class BDBStoreUpgradeTestPreparer MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.close(); - // Create a Message producer + // Create a Message priorityQueueProducer MessageProducer messageProducer = session.createProducer(queue); // Publish 5 persistent messages, 256k chars to ensure they are multi-frame @@ -164,7 +168,18 @@ public class BDBStoreUpgradeTestPreparer // Create a priority queue on broker final Map<String,Object> priorityQueueArguments = new HashMap<String, Object>(); priorityQueueArguments.put(QueueArgumentsConverter.X_QPID_PRIORITIES,10); - createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); + Queue priorityQueue = createAndBindQueueOnBroker(session, PRIORITY_QUEUE_NAME, priorityQueueArguments); + MessageProducer priorityQueueProducer = session.createProducer(priorityQueue); + + for (int msg = 0; msg < 5; msg++) + { + priorityQueueProducer.setPriority(msg % 10); + Message message = session.createTextMessage(generateString(256*1024)); + message.setIntProperty("ID", msg); + priorityQueueProducer.send(message); + } + session.commit(); + priorityQueueProducer.close(); // Create a queue that has a DLQ final Map<String,Object> queueWithDLQArguments = new HashMap<String, Object>(); @@ -180,33 +195,99 @@ public class BDBStoreUpgradeTestPreparer // Create a queue with JMX specifying an owner, so it can later be moved into description createAndBindQueueOnBrokerWithJMX(NONEXCLUSIVE_WITH_ERRONEOUS_OWNER, MISUSED_OWNER, priorityQueueArguments); + + createExchange(TEST_EXCHANGE_NAME, "direct"); + Queue customQueue = createAndBindQueueOnBroker(session, TEST_QUEUE_NAME, null, TEST_EXCHANGE_NAME, "direct"); + MessageProducer customQueueMessageProducer = session.createProducer(customQueue); + sendMessages(session, customQueueMessageProducer, customQueue, DeliveryMode.PERSISTENT, 1*1024, 1); + session.commit(); + customQueueMessageProducer.close(); + + prepareSortedQueue(session, SORTED_QUEUE_NAME, SORT_KEY); + session.close(); connection.close(); } - private void createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception + private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments) throws Exception + { + return createAndBindQueueOnBroker(session, queueName, arguments, "amq.direct", "direct"); + } + + private Queue createAndBindQueueOnBroker(Session session, String queueName, final Map<String, Object> arguments, String exchangeName, String exchangeType) throws Exception { ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), false, true, false, arguments); - Queue queue = (Queue) session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='true'"); + Queue queue = session.createQueue("BURL:" + exchangeType + "://" + exchangeName + "/" + queueName + "/" + queueName + "?durable='true'"); ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); + return queue; } private void createAndBindQueueOnBrokerWithJMX(String queueName, String owner, final Map<String, Object> arguments) throws Exception { - Map<String, Object> environment = new HashMap<String, Object>(); - environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","admin"}); - JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi"); - JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment); - MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); - ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"test\""); - - Object[] params = new Object[] {queueName, owner, true, arguments}; - String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()}; - mbsc.invoke(virtualHost, "createNewQueue", params, signature); - - ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"test\",name=\"amq.direct\",ExchangeType=direct"); - mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()}); + JMXConnector jmxConnector = createJMXConnector(); + try + { + MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); + ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\""); + + Object[] params = new Object[] {queueName, owner, true, arguments}; + String[] signature = new String[] {String.class.getName(), String.class.getName(), boolean.class.getName(), Map.class.getName()}; + mbsc.invoke(virtualHost, "createNewQueue", params, signature); + + ObjectName directExchange = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\",name=\"amq.direct\",ExchangeType=direct"); + mbsc.invoke(directExchange, "createNewBinding", new Object[] {queueName, queueName}, new String[] {String.class.getName(), String.class.getName()}); + } + finally + { + jmxConnector.close(); + } + } + + private void createExchange(String exchangeName, String exchangeType) throws Exception + { + JMXConnector jmxConnector = createJMXConnector(); + try + { + MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection(); + ObjectName virtualHost = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"" + VIRTUAL_HOST_NAME + "\""); + + Object[] params = new Object[]{exchangeName, exchangeType, true}; + String[] signature = new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()}; + mbsc.invoke(virtualHost, "createNewExchange", params, signature); + } + finally + { + jmxConnector.close(); + } + } + + private JMXConnector createJMXConnector() throws Exception + { + Map<String, Object> environment = new HashMap<>(); + environment.put(JMXConnector.CREDENTIALS, new String[] {"admin", "admin"}); + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi"); + return JMXConnectorFactory.connect(url, environment); } + + private void prepareSortedQueue(Session session, String queueName, String sortKey) throws Exception + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.queue_sort_key", sortKey); + Queue sortedQueue = createAndBindQueueOnBroker(session, queueName, arguments); + + MessageProducer messageProducer2 = session.createProducer(sortedQueue); + + String[] sortKeys = {"c", "b", "e", "a", "d"}; + for (int i = 1; i <= sortKeys.length; i++) + { + Message message = session.createTextMessage(generateString(256*1024)); + message.setIntProperty("ID", i); + message.setStringProperty(sortKey, sortKeys[i - 1]); + messageProducer2.send(message); + } + session.commit(); + } + /** * Prepare a DurableSubscription backing queue for use in testing selector * recovery and queue exclusivity marking during the upgrade process. |
