From 76cab2fbbc9c47d254414d588b5bb98552f5ab8d Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Mon, 13 Feb 2012 17:26:37 +0000 Subject: QPID-3835: add the empty 'no selector' argument to bindings for DurableSubscription queues that dont yet have the selector argument at all git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1243616 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/store/berkeleydb/BDBMessageStore.java | 21 +++---- .../server/store/berkeleydb/BDBStoreUpgrade.java | 70 +++++++++++++++++++--- .../qpid/server/store/berkeleydb/BindingKey.java | 62 ------------------- .../store/berkeleydb/records/BindingRecord.java | 62 +++++++++++++++++++ .../tuples/BindingTupleBindingFactory.java | 6 +- .../store/berkeleydb/tuples/BindingTuple_4.java | 10 ++-- 6 files changed, 143 insertions(+), 88 deletions(-) delete mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java create mode 100644 qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 92dd592143..29f2a2f2fb 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -68,6 +68,7 @@ import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory; @@ -691,13 +692,13 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore cursor = _queueBindingsDb.openCursor(null, null); DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); - TupleBinding binding = _bindingTupleBindingFactory.getInstance(); + TupleBinding binding = _bindingTupleBindingFactory.getInstance(); while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS) { //yes, this is retrieving all the useful information from the key only. //For table compatibility it shall currently be left as is - BindingKey bindingRecord = (BindingKey) binding.entryToObject(key); + BindingRecord bindingRecord = binding.entryToObject(key); String exchangeName = bindingRecord.getExchangeName() == null ? null : bindingRecord.getExchangeName().asString(); @@ -1105,18 +1106,18 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } - - /** * @see DurableConfigurationStore#bindQueue(Exchange, AMQShortString, AMQQueue, FieldTable) */ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + { + bindQueue(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args)); + } + + protected void bindQueue(final BindingRecord bindingRecord) throws AMQStoreException { if (_state != State.RECOVERING) { - BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), - queue.getNameShortString(), routingKey, args); - DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); @@ -1134,8 +1135,8 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } catch (DatabaseException e) { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange " - + exchange.getName() + " to database: " + e.getMessage(), e); + throw new AMQStoreException("Error writing binding for AMQQueue with name " + bindingRecord.getQueueName() + " to exchange " + + bindingRecord.getExchangeName() + " to database: " + e.getMessage(), e); } } } @@ -1148,7 +1149,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore { DatabaseEntry key = new DatabaseEntry(); EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance(); - keyBinding.objectToEntry(new BindingKey(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); + keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key); try { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java index f064079606..817ba2a5f5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java @@ -37,14 +37,17 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.logging.NullRootMessageLogger; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord; import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_4; @@ -417,11 +420,6 @@ public class BDBStoreUpgrade TopicExchangeDiscoverer exchangeListVisitor = new TopicExchangeDiscoverer(); _oldMessageStore.visitExchanges(exchangeListVisitor); - - //Migrate _queueBindingsDb - _logger.info("Queue Bindings"); - moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding"); - //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those //which have a colon in their name and are bound to the Topic exchanges above DurableSubDiscoverer durSubQueueListVisitor = @@ -431,6 +429,14 @@ public class BDBStoreUpgrade final List durableSubQueues = durSubQueueListVisitor.getDurableSubQueues(); + + //Migrate _queueBindingsDb + _logger.info("Queue Bindings"); + BindingsVisitor bindingsVisitor = new BindingsVisitor(durableSubQueues, + _oldMessageStore.getBindingTupleBindingFactory().getInstance(), _newMessageStore); + _oldMessageStore.visitBindings(bindingsVisitor); + logCount(bindingsVisitor.getVisitedCount(), "Queue Binding"); + //Migrate _queueDb _logger.info("Queues"); @@ -1133,11 +1139,11 @@ public class BDBStoreUpgrade private class DurableSubDiscoverer extends DatabaseVisitor { private final List _durableSubQueues; - private final TupleBinding _bindingTB; + private final TupleBinding _bindingTB; private final List _topicExchanges; - public DurableSubDiscoverer(List topicExchanges, TupleBinding bindingTB) + public DurableSubDiscoverer(List topicExchanges, TupleBinding bindingTB) { _durableSubQueues = new ArrayList(); _bindingTB = bindingTB; @@ -1146,7 +1152,7 @@ public class BDBStoreUpgrade public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException { - BindingKey bindingRec = _bindingTB.entryToObject(key); + BindingRecord bindingRec = _bindingTB.entryToObject(key); AMQShortString queueName = bindingRec.getQueueName(); AMQShortString exchangeName = bindingRec.getExchangeName(); @@ -1204,6 +1210,54 @@ public class BDBStoreUpgrade } } + private static class BindingsVisitor extends DatabaseVisitor + { + private final List _durableSubQueues; + private final BDBMessageStore _newMessageStore; + private final TupleBinding _oldBindingTB; + private AMQShortString _selectorFilterKey; + + public BindingsVisitor(List durableSubQueues, + TupleBinding oldBindingTB, + BDBMessageStore newMessageStore) + { + _oldBindingTB = oldBindingTB; + _durableSubQueues = durableSubQueues; + _newMessageStore = newMessageStore; + _selectorFilterKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException + { + //All the information required in binding entries is actually in the *key* not value. + BindingRecord oldBindingRec = _oldBindingTB.entryToObject(key); + + AMQShortString queueName = oldBindingRec.getQueueName(); + AMQShortString exchangeName = oldBindingRec.getExchangeName(); + AMQShortString routingKey = oldBindingRec.getRoutingKey(); + FieldTable arguments = oldBindingRec.getArguments(); + + //if the queue name is in the gathered list then inspect its binding arguments + if (_durableSubQueues.contains(queueName)) + { + if(arguments == null) + { + arguments = new FieldTable(); + } + + if(!arguments.containsKey(_selectorFilterKey)) + { + //add the empty string (i.e. 'no selector') value for the selector argument + arguments.put(_selectorFilterKey, ""); + } + } + + //create the binding in the new store + _newMessageStore.bindQueue( + new BindingRecord(exchangeName, queueName, routingKey, arguments)); + } + } + private static class MetaDataVisitor extends DatabaseVisitor { private final TupleBinding _oldMetaDataTupleBinding; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java deleted file mode 100644 index 396f0ed817..0000000000 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BindingKey.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store.berkeleydb; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - -public class BindingKey extends Object -{ - private final AMQShortString _exchangeName; - private final AMQShortString _queueName; - private final AMQShortString _routingKey; - private final FieldTable _arguments; - - public BindingKey(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) - { - _exchangeName = exchangeName; - _queueName = queueName; - _routingKey = routingKey; - _arguments = arguments; - } - - - public AMQShortString getExchangeName() - { - return _exchangeName; - } - - public AMQShortString getQueueName() - { - return _queueName; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public FieldTable getArguments() - { - return _arguments; - } - -} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java new file mode 100644 index 0000000000..394a6ea85c --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.records; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class BindingRecord extends Object +{ + private final AMQShortString _exchangeName; + private final AMQShortString _queueName; + private final AMQShortString _routingKey; + private final FieldTable _arguments; + + public BindingRecord(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey, FieldTable arguments) + { + _exchangeName = exchangeName; + _queueName = queueName; + _routingKey = routingKey; + _arguments = arguments; + } + + + public AMQShortString getExchangeName() + { + return _exchangeName; + } + + public AMQShortString getQueueName() + { + return _queueName; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public FieldTable getArguments() + { + return _arguments; + } + +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java index 09d43e6a08..468096ccc5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java @@ -22,16 +22,16 @@ package org.apache.qpid.server.store.berkeleydb.tuples; import com.sleepycat.bind.tuple.TupleBinding; -import org.apache.qpid.server.store.berkeleydb.BindingKey; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; -public class BindingTupleBindingFactory extends TupleBindingFactory +public class BindingTupleBindingFactory extends TupleBindingFactory { public BindingTupleBindingFactory(int version) { super(version); } - public TupleBinding getInstance() + public TupleBinding getInstance() { switch (getVersion()) { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java index e00ea5aac2..c6a5e63bc8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java @@ -29,10 +29,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; -import org.apache.qpid.server.store.berkeleydb.BindingKey; import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.store.berkeleydb.records.BindingRecord; -public class BindingTuple_4 extends TupleBinding implements BindingTuple +public class BindingTuple_4 extends TupleBinding implements BindingTuple { protected static final Logger _log = Logger.getLogger(BindingTuple.class); @@ -41,7 +41,7 @@ public class BindingTuple_4 extends TupleBinding implements BindingT super(); } - public BindingKey entryToObject(TupleInput tupleInput) + public BindingRecord entryToObject(TupleInput tupleInput) { AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput); AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); @@ -60,10 +60,10 @@ public class BindingTuple_4 extends TupleBinding implements BindingT return null; } - return new BindingKey(exchangeName, queueName, routingKey, arguments); + return new BindingRecord(exchangeName, queueName, routingKey, arguments); } - public void objectToEntry(BindingKey binding, TupleOutput tupleOutput) + public void objectToEntry(BindingRecord binding, TupleOutput tupleOutput) { AMQShortStringEncoding.writeShortString(binding.getExchangeName(), tupleOutput); AMQShortStringEncoding.writeShortString(binding.getQueueName(), tupleOutput); -- cgit v1.2.1