diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-29 13:05:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-29 13:05:07 +0000 |
| commit | 8c1167ac57b774b43c881f72659db19b648b4441 (patch) | |
| tree | becc40775c44d27743307904f4d468d572f2772d /qpid/java/bdbstore/src | |
| parent | ebb3caff09b7856990df34ef58ac1ddf08781170 (diff) | |
| download | qpid-python-8c1167ac57b774b43c881f72659db19b648b4441.tar.gz | |
QPID-3789 : [Java] code tidyups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1237273 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
12 files changed, 331 insertions, 224 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java index 8431382d59..354dba559c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java @@ -27,6 +27,10 @@ import org.apache.qpid.framing.AMQShortString; public class AMQShortStringEncoding { + private AMQShortStringEncoding() + { + } + public static AMQShortString readShortString(TupleInput tupleInput) { int length = (int) tupleInput.readShort(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 534d4b0a44..92dd592143 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -151,7 +151,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore * messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary) * * Message (Content): - * messageId (long), byteOffset (integer) - dataLength(integer), data(binary); + * messageId (long), byteOffset (integer) - dataLength(integer), data(binary) */ private LogSubject _logSubject; @@ -429,9 +429,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore envConfig.setTransactional(true); envConfig.setConfigParam("je.lock.nLockTables", "7"); - // Restore 500,000 default timeout. - //envConfig.setLockTimeout(15000); - // Added to help diagnosis of Deadlock issue // http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23 if (Boolean.getBoolean("qpid.bdb.lock.debug")) @@ -903,7 +900,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void removeMessage(long messageId, boolean sync) throws AMQStoreException { - // _log.debug("public void removeMessage(Long messageId = " + messageId): called"); boolean complete = false; com.sleepycat.je.Transaction tx = null; @@ -1116,9 +1112,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore */ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - // _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey - // + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called"); - if (_state != State.RECOVERING) { BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(), @@ -1408,8 +1401,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue, long messageId) throws AMQStoreException { - // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called"); - AMQShortString name = AMQShortString.valueOf(queue.getResourceName()); DatabaseEntry key = new DatabaseEntry(); @@ -1498,11 +1489,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore */ private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException { - //if (_log.isDebugEnabled()) - //{ - // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")"); - //} - if (tx == null) { throw new AMQStoreException("Fatal internal error: transactional is null at commitTran"); @@ -1969,8 +1955,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException { - // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called"); - tx.commitNoSync(); BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); @@ -1986,8 +1970,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private static final class BDBCommitFuture implements StoreFuture { - // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class); - private final CommitThread _commitThread; private final com.sleepycat.je.Transaction _tx; private DatabaseException _databaseException; @@ -1996,9 +1978,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit) { - // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx - // + "): called"); - _commitThread = commitThread; _tx = tx; _syncCommit = syncCommit; @@ -2017,9 +1996,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public synchronized void abort(DatabaseException databaseException) { - // _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException - // + "): called"); - _complete = true; _databaseException = databaseException; @@ -2028,8 +2004,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore public void commit() throws DatabaseException { - //_log.debug("public void commit(): called"); - _commitThread.addJob(this, _syncCommit); if(!_syncCommit) @@ -2039,7 +2013,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } waitForCompletion(); - // _log.debug("Commit completed, _databaseException = " + _databaseException); if (_databaseException != null) { @@ -2081,8 +2054,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore */ private class CommitThread extends Thread { - // private final Logger _log = Logger.getLogger(CommitThread.class); - private final AtomicBoolean _stopped = new AtomicBoolean(false); private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); private final CheckpointConfig _config = new CheckpointConfig(); @@ -2119,7 +2090,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore } catch (InterruptedException e) { - // _log.info(getName() + " interrupted. "); } } } @@ -2129,8 +2099,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore private void processJobs() { - // _log.debug("private void processJobs(): called"); - int size = _jobQueue.size(); try diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java index 7e402e3320..f064079606 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java @@ -65,6 +65,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.Set; /** * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store. @@ -407,87 +408,39 @@ public class BDBStoreUpgrade { _logger.info("Starting store upgrade from version 4"); - //Migrate _exchangeDb; + //Migrate _exchangeDb _logger.info("Exchanges"); moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange"); - final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>(); - final TupleBinding exchangeTB = new ExchangeTB(); - - DatabaseVisitor exchangeListVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value); - AMQShortString type = exchangeRec.getType(); - if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type)) - { - topicExchanges.add(exchangeRec.getNameShortString()); - } - } - }; + TopicExchangeDiscoverer exchangeListVisitor = new TopicExchangeDiscoverer(); _oldMessageStore.visitExchanges(exchangeListVisitor); - //Migrate _queueBindingsDb; + //Migrate _queueBindingsDb _logger.info("Queue Bindings"); moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding"); //Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those //which have a colon in their name and are bound to the Topic exchanges above - final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>(); - final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance(); - - DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key); - AMQShortString queueName = bindingRec.getQueueName(); - AMQShortString exchangeName = bindingRec.getExchangeName(); - - if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":")) - { - durableSubQueues.add(queueName); - } - } - }; + DurableSubDiscoverer durSubQueueListVisitor = + new DurableSubDiscoverer(exchangeListVisitor.getTopicExchanges(), + _oldMessageStore.getBindingTupleBindingFactory().getInstance()); _oldMessageStore.visitBindings(durSubQueueListVisitor); + final List<AMQShortString> durableSubQueues = durSubQueueListVisitor.getDurableSubQueues(); - //Migrate _queueDb; + //Migrate _queueDb _logger.info("Queues"); // hold the list of existing queue names - final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>(); final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance(); - DatabaseVisitor queueVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException - { - QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value); - AMQShortString queueName = queueRec.getNameShortString(); - - //if the queue name is in the gathered list then set its exclusivity true - if (durableSubQueues.contains(queueName)) - { - _logger.info("Marking as possible DurableSubscription backing queue: " + queueName); - queueRec.setExclusive(true); - } - - //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as - //the extra 'exclusive' property in v3 will be defaulted to false in the record creation. - _newMessageStore.createQueue(queueRec); - - _count++; - existingQueues.add(queueName); - } - }; + QueueVisitor queueVisitor = new QueueVisitor(queueTupleBinding, durableSubQueues, _newMessageStore); _oldMessageStore.visitQueues(queueVisitor); + final List<AMQShortString> existingQueues = queueVisitor.getExistingQueues(); logCount(queueVisitor.getVisitedCount(), "Queue"); @@ -495,42 +448,15 @@ public class BDBStoreUpgrade // Look for persistent messages stored for non-durable queues _logger.info("Checking for messages previously sent to non-durable queues"); - // track all message delivery to existing queues - final HashSet<Long> queueMessages = new HashSet<Long>(); - - // hold all non existing queues and their messages IDs - final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>(); - // delivery DB visitor to check message delivery and identify non existing queues final QueueEntryTB queueEntryTB = new QueueEntryTB(); - DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key); - Long messageId = entryKey.getMessageId(); - AMQShortString queueName = entryKey.getQueueName(); - if (!existingQueues.contains(queueName)) - { - String name = queueName.asString(); - HashSet<Long> messages = phantomMessageQueues.get(name); - if (messages == null) - { - messages = new HashSet<Long>(); - phantomMessageQueues.put(name, messages); - } - messages.add(messageId); - _count++; - } - else - { - queueMessages.add(messageId); - } - } - }; + MessageDeliveryCheckVisitor messageDeliveryCheckVisitor = + new MessageDeliveryCheckVisitor(queueEntryTB, queueVisitor.getExistingQueues()); _oldMessageStore.visitDelivery(messageDeliveryCheckVisitor); - if (phantomMessageQueues.isEmpty()) + final Set<Long> queueMessages = messageDeliveryCheckVisitor.getQueueMessages(); + + if (messageDeliveryCheckVisitor.getPhantomMessageQueues().isEmpty()) { _logger.info("No such messages were found"); } @@ -538,7 +464,7 @@ public class BDBStoreUpgrade { _logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total"); - for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet()) + for (Entry<String, HashSet<Long>> phantomQueue : messageDeliveryCheckVisitor.getPhantomMessageQueues().entrySet()) { String queueName = phantomQueue.getKey(); HashSet<Long> messages = phantomQueue.getValue(); @@ -572,40 +498,20 @@ public class BDBStoreUpgrade } - //Migrate _messageMetaDataDb; + //Migrate _messageMetaDataDb _logger.info("Message MetaData"); final Database newMetaDataDB = _newMessageStore.getMetaDataDb(); - final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance(); - final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance(); - - DatabaseVisitor metaDataVisitor = new DatabaseVisitor() - { - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value); - - // get message id - Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key); - // ONLY copy data if message is delivered to existing queue - if (!queueMessages.contains(messageId)) - { - return; - } - DatabaseEntry newValue = new DatabaseEntry(); - newMetaDataTupleBinding.objectToEntry(metaData, newValue); - - newMetaDataDB.put(null, key, newValue); - } - }; + MetaDataVisitor metaDataVisitor = new MetaDataVisitor(queueMessages, newMetaDataDB, + _oldMessageStore.getMetaDataTupleBindingFactory().getInstance(), + _newMessageStore.getMetaDataTupleBindingFactory().getInstance()); _oldMessageStore.visitMetaDataDb(metaDataVisitor); logCount(metaDataVisitor.getVisitedCount(), "Message MetaData"); - //Migrate _messageContentDb; + //Migrate _messageContentDb _logger.info("Message Contents"); final Database newContentDB = _newMessageStore.getContentDb(); @@ -613,74 +519,17 @@ public class BDBStoreUpgrade final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5(); final TupleBinding contentTB = new ContentTB(); - DatabaseVisitor contentVisitor = new DatabaseVisitor() - { - private long _prevMsgId = -1; //Initialise to invalid value - private int _bytesSeenSoFar = 0; - - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - - //determine the msgId of the current entry - MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key); - long msgId = contentKey.getMessageId(); - - // ONLY copy data if message is delivered to existing queue - if (!queueMessages.contains(msgId)) - { - return; - } - //if this is a new message, restart the byte offset count. - if(_prevMsgId != msgId) - { - _bytesSeenSoFar = 0; - } - - //determine the content size - ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value); - int contentSize = content.limit(); - - //create the new key: id + previously seen data count - MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar); - DatabaseEntry newKeyEntry = new DatabaseEntry(); - newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry); - - DatabaseEntry newValueEntry = new DatabaseEntry(); - contentTB.objectToEntry(content, newValueEntry); - - newContentDB.put(null, newKeyEntry, newValueEntry); - - _prevMsgId = msgId; - _bytesSeenSoFar += contentSize; - } - }; + DatabaseVisitor contentVisitor = new ContentVisitor(oldContentKeyTupleBinding, queueMessages, + contentTB, newContentKeyTupleBinding, newContentDB); _oldMessageStore.visitContentDb(contentVisitor); logCount(contentVisitor.getVisitedCount(), "Message Content"); - //Migrate _deliveryDb; + //Migrate _deliveryDb _logger.info("Delivery Records"); final Database deliveryDb =_newMessageStore.getDeliveryDb(); - DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor() - { - - public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException - { - _count++; - - // get message id from entry key - QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key); - AMQShortString queueName = entryKey.getQueueName(); - - // ONLY copy data if message queue exists - if (existingQueues.contains(queueName)) - { - deliveryDb.put(null, key, value); - } - } - }; + DatabaseVisitor deliveryDbVisitor = new DeliveryDbVisitor(queueEntryTB, existingQueues, deliveryDb); _oldMessageStore.visitDelivery(deliveryDbVisitor); logCount(contentVisitor.getVisitedCount(), "Delivery Record"); } @@ -711,7 +560,7 @@ public class BDBStoreUpgrade { public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException { - _count++; + incrementCount(); newDatabase.put(null, key, value); } }; @@ -1117,4 +966,280 @@ public class BDBStoreUpgrade System.exit(0); } + private static class TopicExchangeDiscoverer extends DatabaseVisitor + { + private final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>(); + private final TupleBinding exchangeTB = new ExchangeTB(); + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value); + AMQShortString type = exchangeRec.getType(); + + if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type)) + { + topicExchanges.add(exchangeRec.getNameShortString()); + } + } + + public List<AMQShortString> getTopicExchanges() + { + return topicExchanges; + } + } + + private static class MessageDeliveryCheckVisitor extends DatabaseVisitor + { + private final QueueEntryTB _queueEntryTB; + private final List<AMQShortString> _existingQueues; + + // track all message delivery to existing queues + private final HashSet<Long> _queueMessages = new HashSet<Long>(); + + // hold all non existing queues and their messages IDs + private final HashMap<String, HashSet<Long>> _phantomMessageQueues = new HashMap<String, HashSet<Long>>(); + + + + public MessageDeliveryCheckVisitor(QueueEntryTB queueEntryTB, List<AMQShortString> existingQueues) + { + _queueEntryTB = queueEntryTB; + _existingQueues = existingQueues; + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + QueueEntryKey entryKey = (QueueEntryKey) _queueEntryTB.entryToObject(key); + Long messageId = entryKey.getMessageId(); + AMQShortString queueName = entryKey.getQueueName(); + if (!_existingQueues.contains(queueName)) + { + String name = queueName.asString(); + HashSet<Long> messages = _phantomMessageQueues.get(name); + if (messages == null) + { + messages = new HashSet<Long>(); + _phantomMessageQueues.put(name, messages); + } + messages.add(messageId); + incrementCount(); + } + else + { + _queueMessages.add(messageId); + } + } + + public HashSet<Long> getQueueMessages() + { + return _queueMessages; + } + + public HashMap<String, HashSet<Long>> getPhantomMessageQueues() + { + return _phantomMessageQueues; + } + } + + private static class ContentVisitor extends DatabaseVisitor + { + private long _prevMsgId; //Initialise to invalid value + private int _bytesSeenSoFar; + private final TupleBinding<MessageContentKey> _oldContentKeyTupleBinding; + private final Set<Long> _queueMessages; + private final TupleBinding _contentTB; + private final TupleBinding<MessageContentKey> _newContentKeyTupleBinding; + private final Database _newContentDB; + + public ContentVisitor(TupleBinding<MessageContentKey> oldContentKeyTupleBinding, Set<Long> queueMessages, TupleBinding contentTB, TupleBinding<MessageContentKey> newContentKeyTupleBinding, Database newContentDB) + { + _oldContentKeyTupleBinding = oldContentKeyTupleBinding; + _queueMessages = queueMessages; + _contentTB = contentTB; + _newContentKeyTupleBinding = newContentKeyTupleBinding; + _newContentDB = newContentDB; + _prevMsgId = -1; + _bytesSeenSoFar = 0; + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + incrementCount(); + + //determine the msgId of the current entry + MessageContentKey_4 contentKey = (MessageContentKey_4) _oldContentKeyTupleBinding.entryToObject(key); + long msgId = contentKey.getMessageId(); + + // ONLY copy data if message is delivered to existing queue + if (!_queueMessages.contains(msgId)) + { + return; + } + //if this is a new message, restart the byte offset count. + if(_prevMsgId != msgId) + { + _bytesSeenSoFar = 0; + } + + //determine the content size + ByteBuffer content = (ByteBuffer) _contentTB.entryToObject(value); + int contentSize = content.limit(); + + //create the new key: id + previously seen data count + MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar); + DatabaseEntry newKeyEntry = new DatabaseEntry(); + _newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry); + + DatabaseEntry newValueEntry = new DatabaseEntry(); + _contentTB.objectToEntry(content, newValueEntry); + + _newContentDB.put(null, newKeyEntry, newValueEntry); + + _prevMsgId = msgId; + _bytesSeenSoFar += contentSize; + } + } + + private static class DeliveryDbVisitor extends DatabaseVisitor + { + + private final QueueEntryTB _queueEntryTB; + private final List<AMQShortString> _existingQueues; + private final Database _deliveryDb; + + public DeliveryDbVisitor(QueueEntryTB queueEntryTB, List<AMQShortString> existingQueues, Database deliveryDb) + { + _queueEntryTB = queueEntryTB; + _existingQueues = existingQueues; + _deliveryDb = deliveryDb; + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + incrementCount(); + + // get message id from entry key + QueueEntryKey entryKey = (QueueEntryKey) _queueEntryTB.entryToObject(key); + AMQShortString queueName = entryKey.getQueueName(); + + // ONLY copy data if message queue exists + if (_existingQueues.contains(queueName)) + { + _deliveryDb.put(null, key, value); + } + } + } + + private class DurableSubDiscoverer extends DatabaseVisitor + { + private final List<AMQShortString> _durableSubQueues; + private final TupleBinding<BindingKey> _bindingTB; + private final List<AMQShortString> _topicExchanges; + + + public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingKey> bindingTB) + { + _durableSubQueues = new ArrayList<AMQShortString>(); + _bindingTB = bindingTB; + _topicExchanges = topicExchanges; + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + BindingKey bindingRec = _bindingTB.entryToObject(key); + AMQShortString queueName = bindingRec.getQueueName(); + AMQShortString exchangeName = bindingRec.getExchangeName(); + + if (_topicExchanges.contains(exchangeName) && queueName.asString().contains(":")) + { + _durableSubQueues.add(queueName); + } + } + + public List<AMQShortString> getDurableSubQueues() + { + return _durableSubQueues; + } + } + + private static class QueueVisitor extends DatabaseVisitor + { + private final TupleBinding<QueueRecord> _queueTupleBinding; + private final List<AMQShortString> _durableSubQueues; + private final List<AMQShortString> _existingQueues = new ArrayList<AMQShortString>(); + private final BDBMessageStore _newMessageStore; + + public QueueVisitor(TupleBinding<QueueRecord> queueTupleBinding, + List<AMQShortString> durableSubQueues, + BDBMessageStore newMessageStore) + { + _queueTupleBinding = queueTupleBinding; + _durableSubQueues = durableSubQueues; + _newMessageStore = newMessageStore; + } + + public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException + { + QueueRecord queueRec = _queueTupleBinding.entryToObject(value); + AMQShortString queueName = queueRec.getNameShortString(); + + //if the queue name is in the gathered list then set its exclusivity true + if (_durableSubQueues.contains(queueName)) + { + _logger.info("Marking as possible DurableSubscription backing queue: " + queueName); + queueRec.setExclusive(true); + } + + //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as + //the extra 'exclusive' property in v3 will be defaulted to false in the record creation. + _newMessageStore.createQueue(queueRec); + + incrementCount(); + _existingQueues.add(queueName); + } + + public List<AMQShortString> getExistingQueues() + { + return _existingQueues; + } + } + + private static class MetaDataVisitor extends DatabaseVisitor + { + private final TupleBinding<Object> _oldMetaDataTupleBinding; + private final TupleBinding<Object> _newMetaDataTupleBinding; + private final Set<Long> _queueMessages; + private final Database _newMetaDataDB; + + public MetaDataVisitor(Set<Long> queueMessages, + Database newMetaDataDB, + TupleBinding<Object> oldMetaDataTupleBinding, + TupleBinding<Object> newMetaDataTupleBinding) + { + _queueMessages = queueMessages; + _newMetaDataDB = newMetaDataDB; + _oldMetaDataTupleBinding = oldMetaDataTupleBinding; + _newMetaDataTupleBinding = newMetaDataTupleBinding; + } + + + public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException + { + incrementCount(); + MessageMetaData metaData = (MessageMetaData) _oldMetaDataTupleBinding.entryToObject(value); + + // get message id + Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key); + + // ONLY copy data if message is delivered to existing queue + if (!_queueMessages.contains(messageId)) + { + return; + } + DatabaseEntry newValue = new DatabaseEntry(); + _newMetaDataTupleBinding.objectToEntry(metaData, newValue); + + _newMetaDataDB.put(null, key, newValue); + } + } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java index 26a3b674ae..c6a1372d7e 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java @@ -28,15 +28,20 @@ import org.apache.qpid.AMQStoreException; /** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */ public abstract class DatabaseVisitor { - protected int _count; + private int _count; abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException; - public int getVisitedCount() + public final int getVisitedCount() { return _count; } + protected final void incrementCount() + { + _count++; + } + public void resetVisitCount() { _count = 0; diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java index 371f0fed05..a876a056fc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java @@ -32,6 +32,10 @@ import java.io.IOException; public class FieldTableEncoding { + private FieldTableEncoding() + { + } + public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException { long length = tupleInput.readLong(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java index c60f981011..09d43e6a08 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java @@ -33,7 +33,7 @@ public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey> public TupleBinding<BindingKey> getInstance() { - switch (_version) + switch (getVersion()) { default: case 5: diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java index 872ec87ad6..4a320f49c9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java @@ -33,7 +33,7 @@ public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<Me public TupleBinding<MessageContentKey> getInstance() { - switch (_version) + switch (getVersion()) { default: case 5: diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java index 40153c13ea..cb742e76a1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java @@ -31,7 +31,7 @@ public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Obje public TupleBinding<Object> getInstance() { - switch (_version) + switch (getVersion()) { default: case 5: diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java index ccdf4492e6..a189786885 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java @@ -34,7 +34,7 @@ public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord> public TupleBinding<QueueRecord> getInstance() { - switch (_version) + switch (getVersion()) { default: case 5: diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java index d57534e547..d2ba4dbbca 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java @@ -34,9 +34,7 @@ import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple { - protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class); - - protected FieldTable _arguments; + private static final Logger _logger = Logger.getLogger(QueueTuple_4.class); public QueueTuple_4() { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java index 0a0163b239..c9094a132d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java @@ -33,9 +33,7 @@ import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; public class QueueTuple_5 extends QueueTuple_4 { - protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class); - - protected FieldTable _arguments; + private static final Logger _logger = Logger.getLogger(QueueTuple_5.class); public QueueTuple_5() { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java index 2adac1f9a3..97b1398e10 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java @@ -24,7 +24,7 @@ import com.sleepycat.bind.tuple.TupleBinding; public abstract class TupleBindingFactory<E> { - protected int _version; + private final int _version; public TupleBindingFactory(int version) { @@ -32,4 +32,9 @@ public abstract class TupleBindingFactory<E> } public abstract TupleBinding<E> getInstance(); + + public int getVersion() + { + return _version; + } } |
