summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-29 13:05:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-29 13:05:07 +0000
commit8c1167ac57b774b43c881f72659db19b648b4441 (patch)
treebecc40775c44d27743307904f4d468d572f2772d /qpid/java/bdbstore/src
parentebb3caff09b7856990df34ef58ac1ddf08781170 (diff)
downloadqpid-python-8c1167ac57b774b43c881f72659db19b648b4441.tar.gz
QPID-3789 : [Java] code tidyups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1237273 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java34
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java481
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java9
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java7
12 files changed, 331 insertions, 224 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
index 8431382d59..354dba559c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
@@ -27,6 +27,10 @@ import org.apache.qpid.framing.AMQShortString;
public class AMQShortStringEncoding
{
+ private AMQShortStringEncoding()
+ {
+ }
+
public static AMQShortString readShortString(TupleInput tupleInput)
{
int length = (int) tupleInput.readShort();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 534d4b0a44..92dd592143 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -151,7 +151,7 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
* messageId (long) - bodySize (integer), metaData (MessageMetaData encoded as binary)
*
* Message (Content):
- * messageId (long), byteOffset (integer) - dataLength(integer), data(binary);
+ * messageId (long), byteOffset (integer) - dataLength(integer), data(binary)
*/
private LogSubject _logSubject;
@@ -429,9 +429,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
envConfig.setTransactional(true);
envConfig.setConfigParam("je.lock.nLockTables", "7");
- // Restore 500,000 default timeout.
- //envConfig.setLockTimeout(15000);
-
// Added to help diagnosis of Deadlock issue
// http://www.oracle.com/technology/products/berkeley-db/faq/je_faq.html#23
if (Boolean.getBoolean("qpid.bdb.lock.debug"))
@@ -903,7 +900,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void removeMessage(long messageId, boolean sync) throws AMQStoreException
{
- // _log.debug("public void removeMessage(Long messageId = " + messageId): called");
boolean complete = false;
com.sleepycat.je.Transaction tx = null;
@@ -1116,9 +1112,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*/
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
- // _log.debug("public void bindQueue(Exchange exchange = " + exchange + ", AMQShortString routingKey = " + routingKey
- // + ", AMQQueue queue = " + queue + ", FieldTable args = " + args + "): called");
-
if (_state != State.RECOVERING)
{
BindingKey bindingRecord = new BindingKey(exchange.getNameShortString(),
@@ -1408,8 +1401,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void enqueueMessage(final com.sleepycat.je.Transaction tx, final TransactionLogResource queue,
long messageId) throws AMQStoreException
{
- // _log.debug("public void enqueueMessage(Transaction tx = " + tx + ", AMQShortString name = " + name + ", Long messageId): called");
-
AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
@@ -1498,11 +1489,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*/
private StoreFuture commitTranImpl(final com.sleepycat.je.Transaction tx, boolean syncCommit) throws AMQStoreException
{
- //if (_log.isDebugEnabled())
- //{
- // _log.debug("public void commitTranImpl() called with (Transaction=" + tx + ", syncCommit= "+ syncCommit + ")");
- //}
-
if (tx == null)
{
throw new AMQStoreException("Fatal internal error: transactional is null at commitTran");
@@ -1969,8 +1955,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
{
- // _log.debug("void commit(Transaction tx = " + tx + ", sync = " + syncCommit + "): called");
-
tx.commitNoSync();
BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
@@ -1986,8 +1970,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private static final class BDBCommitFuture implements StoreFuture
{
- // private static final Logger _log = Logger.getLogger(BDBCommitFuture.class);
-
private final CommitThread _commitThread;
private final com.sleepycat.je.Transaction _tx;
private DatabaseException _databaseException;
@@ -1996,9 +1978,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public BDBCommitFuture(CommitThread commitThread, com.sleepycat.je.Transaction tx, boolean syncCommit)
{
- // _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
- // + "): called");
-
_commitThread = commitThread;
_tx = tx;
_syncCommit = syncCommit;
@@ -2017,9 +1996,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public synchronized void abort(DatabaseException databaseException)
{
- // _log.debug("public synchronized void abort(DatabaseException databaseException = " + databaseException
- // + "): called");
-
_complete = true;
_databaseException = databaseException;
@@ -2028,8 +2004,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
public void commit() throws DatabaseException
{
- //_log.debug("public void commit(): called");
-
_commitThread.addJob(this, _syncCommit);
if(!_syncCommit)
@@ -2039,7 +2013,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
waitForCompletion();
- // _log.debug("Commit completed, _databaseException = " + _databaseException);
if (_databaseException != null)
{
@@ -2081,8 +2054,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
*/
private class CommitThread extends Thread
{
- // private final Logger _log = Logger.getLogger(CommitThread.class);
-
private final AtomicBoolean _stopped = new AtomicBoolean(false);
private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
private final CheckpointConfig _config = new CheckpointConfig();
@@ -2119,7 +2090,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
}
catch (InterruptedException e)
{
- // _log.info(getName() + " interrupted. ");
}
}
}
@@ -2129,8 +2099,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
private void processJobs()
{
- // _log.debug("private void processJobs(): called");
-
int size = _jobQueue.size();
try
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
index 7e402e3320..f064079606 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
@@ -65,6 +65,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Set;
/**
* This is a simple BerkeleyDB Store upgrade tool that will upgrade a V4 Store to a V5 Store.
@@ -407,87 +408,39 @@ public class BDBStoreUpgrade
{
_logger.info("Starting store upgrade from version 4");
- //Migrate _exchangeDb;
+ //Migrate _exchangeDb
_logger.info("Exchanges");
moveContents(_oldMessageStore.getExchangesDb(), _newMessageStore.getExchangesDb(), "Exchange");
- final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
- final TupleBinding exchangeTB = new ExchangeTB();
-
- DatabaseVisitor exchangeListVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
- AMQShortString type = exchangeRec.getType();
- if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
- {
- topicExchanges.add(exchangeRec.getNameShortString());
- }
- }
- };
+ TopicExchangeDiscoverer exchangeListVisitor = new TopicExchangeDiscoverer();
_oldMessageStore.visitExchanges(exchangeListVisitor);
- //Migrate _queueBindingsDb;
+ //Migrate _queueBindingsDb
_logger.info("Queue Bindings");
moveContents(_oldMessageStore.getBindingsDb(), _newMessageStore.getBindingsDb(), "Queue Binding");
//Inspect the bindings to gather a list of queues which are probably durable subscriptions, i.e. those
//which have a colon in their name and are bound to the Topic exchanges above
- final List<AMQShortString> durableSubQueues = new ArrayList<AMQShortString>();
- final TupleBinding<BindingKey> bindingTB = _oldMessageStore.getBindingTupleBindingFactory().getInstance();
-
- DatabaseVisitor durSubQueueListVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- BindingKey bindingRec = (BindingKey) bindingTB.entryToObject(key);
- AMQShortString queueName = bindingRec.getQueueName();
- AMQShortString exchangeName = bindingRec.getExchangeName();
-
- if (topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
- {
- durableSubQueues.add(queueName);
- }
- }
- };
+ DurableSubDiscoverer durSubQueueListVisitor =
+ new DurableSubDiscoverer(exchangeListVisitor.getTopicExchanges(),
+ _oldMessageStore.getBindingTupleBindingFactory().getInstance());
_oldMessageStore.visitBindings(durSubQueueListVisitor);
+ final List<AMQShortString> durableSubQueues = durSubQueueListVisitor.getDurableSubQueues();
- //Migrate _queueDb;
+ //Migrate _queueDb
_logger.info("Queues");
// hold the list of existing queue names
- final List<AMQShortString> existingQueues = new ArrayList<AMQShortString>();
final TupleBinding<QueueRecord> queueTupleBinding = _oldMessageStore.getQueueTupleBindingFactory().getInstance();
- DatabaseVisitor queueVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
- {
- QueueRecord queueRec = (QueueRecord) queueTupleBinding.entryToObject(value);
- AMQShortString queueName = queueRec.getNameShortString();
-
- //if the queue name is in the gathered list then set its exclusivity true
- if (durableSubQueues.contains(queueName))
- {
- _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
- queueRec.setExclusive(true);
- }
-
- //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
- //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
- _newMessageStore.createQueue(queueRec);
-
- _count++;
- existingQueues.add(queueName);
- }
- };
+ QueueVisitor queueVisitor = new QueueVisitor(queueTupleBinding, durableSubQueues, _newMessageStore);
_oldMessageStore.visitQueues(queueVisitor);
+ final List<AMQShortString> existingQueues = queueVisitor.getExistingQueues();
logCount(queueVisitor.getVisitedCount(), "Queue");
@@ -495,42 +448,15 @@ public class BDBStoreUpgrade
// Look for persistent messages stored for non-durable queues
_logger.info("Checking for messages previously sent to non-durable queues");
- // track all message delivery to existing queues
- final HashSet<Long> queueMessages = new HashSet<Long>();
-
- // hold all non existing queues and their messages IDs
- final HashMap<String, HashSet<Long>> phantomMessageQueues = new HashMap<String, HashSet<Long>>();
-
// delivery DB visitor to check message delivery and identify non existing queues
final QueueEntryTB queueEntryTB = new QueueEntryTB();
- DatabaseVisitor messageDeliveryCheckVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
- Long messageId = entryKey.getMessageId();
- AMQShortString queueName = entryKey.getQueueName();
- if (!existingQueues.contains(queueName))
- {
- String name = queueName.asString();
- HashSet<Long> messages = phantomMessageQueues.get(name);
- if (messages == null)
- {
- messages = new HashSet<Long>();
- phantomMessageQueues.put(name, messages);
- }
- messages.add(messageId);
- _count++;
- }
- else
- {
- queueMessages.add(messageId);
- }
- }
- };
+ MessageDeliveryCheckVisitor messageDeliveryCheckVisitor =
+ new MessageDeliveryCheckVisitor(queueEntryTB, queueVisitor.getExistingQueues());
_oldMessageStore.visitDelivery(messageDeliveryCheckVisitor);
- if (phantomMessageQueues.isEmpty())
+ final Set<Long> queueMessages = messageDeliveryCheckVisitor.getQueueMessages();
+
+ if (messageDeliveryCheckVisitor.getPhantomMessageQueues().isEmpty())
{
_logger.info("No such messages were found");
}
@@ -538,7 +464,7 @@ public class BDBStoreUpgrade
{
_logger.info("Found " + messageDeliveryCheckVisitor.getVisitedCount()+ " such messages in total");
- for (Entry<String, HashSet<Long>> phantomQueue : phantomMessageQueues.entrySet())
+ for (Entry<String, HashSet<Long>> phantomQueue : messageDeliveryCheckVisitor.getPhantomMessageQueues().entrySet())
{
String queueName = phantomQueue.getKey();
HashSet<Long> messages = phantomQueue.getValue();
@@ -572,40 +498,20 @@ public class BDBStoreUpgrade
}
- //Migrate _messageMetaDataDb;
+ //Migrate _messageMetaDataDb
_logger.info("Message MetaData");
final Database newMetaDataDB = _newMessageStore.getMetaDataDb();
- final TupleBinding<Object> oldMetaDataTupleBinding = _oldMessageStore.getMetaDataTupleBindingFactory().getInstance();
- final TupleBinding<Object> newMetaDataTupleBinding = _newMessageStore.getMetaDataTupleBindingFactory().getInstance();
-
- DatabaseVisitor metaDataVisitor = new DatabaseVisitor()
- {
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
- MessageMetaData metaData = (MessageMetaData) oldMetaDataTupleBinding.entryToObject(value);
-
- // get message id
- Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
- // ONLY copy data if message is delivered to existing queue
- if (!queueMessages.contains(messageId))
- {
- return;
- }
- DatabaseEntry newValue = new DatabaseEntry();
- newMetaDataTupleBinding.objectToEntry(metaData, newValue);
-
- newMetaDataDB.put(null, key, newValue);
- }
- };
+ MetaDataVisitor metaDataVisitor = new MetaDataVisitor(queueMessages, newMetaDataDB,
+ _oldMessageStore.getMetaDataTupleBindingFactory().getInstance(),
+ _newMessageStore.getMetaDataTupleBindingFactory().getInstance());
_oldMessageStore.visitMetaDataDb(metaDataVisitor);
logCount(metaDataVisitor.getVisitedCount(), "Message MetaData");
- //Migrate _messageContentDb;
+ //Migrate _messageContentDb
_logger.info("Message Contents");
final Database newContentDB = _newMessageStore.getContentDb();
@@ -613,74 +519,17 @@ public class BDBStoreUpgrade
final TupleBinding<MessageContentKey> newContentKeyTupleBinding = new MessageContentKeyTB_5();
final TupleBinding contentTB = new ContentTB();
- DatabaseVisitor contentVisitor = new DatabaseVisitor()
- {
- private long _prevMsgId = -1; //Initialise to invalid value
- private int _bytesSeenSoFar = 0;
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
-
- //determine the msgId of the current entry
- MessageContentKey_4 contentKey = (MessageContentKey_4) oldContentKeyTupleBinding.entryToObject(key);
- long msgId = contentKey.getMessageId();
-
- // ONLY copy data if message is delivered to existing queue
- if (!queueMessages.contains(msgId))
- {
- return;
- }
- //if this is a new message, restart the byte offset count.
- if(_prevMsgId != msgId)
- {
- _bytesSeenSoFar = 0;
- }
-
- //determine the content size
- ByteBuffer content = (ByteBuffer) contentTB.entryToObject(value);
- int contentSize = content.limit();
-
- //create the new key: id + previously seen data count
- MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar);
- DatabaseEntry newKeyEntry = new DatabaseEntry();
- newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
-
- DatabaseEntry newValueEntry = new DatabaseEntry();
- contentTB.objectToEntry(content, newValueEntry);
-
- newContentDB.put(null, newKeyEntry, newValueEntry);
-
- _prevMsgId = msgId;
- _bytesSeenSoFar += contentSize;
- }
- };
+ DatabaseVisitor contentVisitor = new ContentVisitor(oldContentKeyTupleBinding, queueMessages,
+ contentTB, newContentKeyTupleBinding, newContentDB);
_oldMessageStore.visitContentDb(contentVisitor);
logCount(contentVisitor.getVisitedCount(), "Message Content");
- //Migrate _deliveryDb;
+ //Migrate _deliveryDb
_logger.info("Delivery Records");
final Database deliveryDb =_newMessageStore.getDeliveryDb();
- DatabaseVisitor deliveryDbVisitor = new DatabaseVisitor()
- {
-
- public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
- {
- _count++;
-
- // get message id from entry key
- QueueEntryKey entryKey = (QueueEntryKey) queueEntryTB.entryToObject(key);
- AMQShortString queueName = entryKey.getQueueName();
-
- // ONLY copy data if message queue exists
- if (existingQueues.contains(queueName))
- {
- deliveryDb.put(null, key, value);
- }
- }
- };
+ DatabaseVisitor deliveryDbVisitor = new DeliveryDbVisitor(queueEntryTB, existingQueues, deliveryDb);
_oldMessageStore.visitDelivery(deliveryDbVisitor);
logCount(contentVisitor.getVisitedCount(), "Delivery Record");
}
@@ -711,7 +560,7 @@ public class BDBStoreUpgrade
{
public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
{
- _count++;
+ incrementCount();
newDatabase.put(null, key, value);
}
};
@@ -1117,4 +966,280 @@ public class BDBStoreUpgrade
System.exit(0);
}
+ private static class TopicExchangeDiscoverer extends DatabaseVisitor
+ {
+ private final List<AMQShortString> topicExchanges = new ArrayList<AMQShortString>();
+ private final TupleBinding exchangeTB = new ExchangeTB();
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ ExchangeRecord exchangeRec = (ExchangeRecord) exchangeTB.entryToObject(value);
+ AMQShortString type = exchangeRec.getType();
+
+ if (ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(type))
+ {
+ topicExchanges.add(exchangeRec.getNameShortString());
+ }
+ }
+
+ public List<AMQShortString> getTopicExchanges()
+ {
+ return topicExchanges;
+ }
+ }
+
+ private static class MessageDeliveryCheckVisitor extends DatabaseVisitor
+ {
+ private final QueueEntryTB _queueEntryTB;
+ private final List<AMQShortString> _existingQueues;
+
+ // track all message delivery to existing queues
+ private final HashSet<Long> _queueMessages = new HashSet<Long>();
+
+ // hold all non existing queues and their messages IDs
+ private final HashMap<String, HashSet<Long>> _phantomMessageQueues = new HashMap<String, HashSet<Long>>();
+
+
+
+ public MessageDeliveryCheckVisitor(QueueEntryTB queueEntryTB, List<AMQShortString> existingQueues)
+ {
+ _queueEntryTB = queueEntryTB;
+ _existingQueues = existingQueues;
+ }
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ QueueEntryKey entryKey = (QueueEntryKey) _queueEntryTB.entryToObject(key);
+ Long messageId = entryKey.getMessageId();
+ AMQShortString queueName = entryKey.getQueueName();
+ if (!_existingQueues.contains(queueName))
+ {
+ String name = queueName.asString();
+ HashSet<Long> messages = _phantomMessageQueues.get(name);
+ if (messages == null)
+ {
+ messages = new HashSet<Long>();
+ _phantomMessageQueues.put(name, messages);
+ }
+ messages.add(messageId);
+ incrementCount();
+ }
+ else
+ {
+ _queueMessages.add(messageId);
+ }
+ }
+
+ public HashSet<Long> getQueueMessages()
+ {
+ return _queueMessages;
+ }
+
+ public HashMap<String, HashSet<Long>> getPhantomMessageQueues()
+ {
+ return _phantomMessageQueues;
+ }
+ }
+
+ private static class ContentVisitor extends DatabaseVisitor
+ {
+ private long _prevMsgId; //Initialise to invalid value
+ private int _bytesSeenSoFar;
+ private final TupleBinding<MessageContentKey> _oldContentKeyTupleBinding;
+ private final Set<Long> _queueMessages;
+ private final TupleBinding _contentTB;
+ private final TupleBinding<MessageContentKey> _newContentKeyTupleBinding;
+ private final Database _newContentDB;
+
+ public ContentVisitor(TupleBinding<MessageContentKey> oldContentKeyTupleBinding, Set<Long> queueMessages, TupleBinding contentTB, TupleBinding<MessageContentKey> newContentKeyTupleBinding, Database newContentDB)
+ {
+ _oldContentKeyTupleBinding = oldContentKeyTupleBinding;
+ _queueMessages = queueMessages;
+ _contentTB = contentTB;
+ _newContentKeyTupleBinding = newContentKeyTupleBinding;
+ _newContentDB = newContentDB;
+ _prevMsgId = -1;
+ _bytesSeenSoFar = 0;
+ }
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ incrementCount();
+
+ //determine the msgId of the current entry
+ MessageContentKey_4 contentKey = (MessageContentKey_4) _oldContentKeyTupleBinding.entryToObject(key);
+ long msgId = contentKey.getMessageId();
+
+ // ONLY copy data if message is delivered to existing queue
+ if (!_queueMessages.contains(msgId))
+ {
+ return;
+ }
+ //if this is a new message, restart the byte offset count.
+ if(_prevMsgId != msgId)
+ {
+ _bytesSeenSoFar = 0;
+ }
+
+ //determine the content size
+ ByteBuffer content = (ByteBuffer) _contentTB.entryToObject(value);
+ int contentSize = content.limit();
+
+ //create the new key: id + previously seen data count
+ MessageContentKey_5 newKey = new MessageContentKey_5(msgId, _bytesSeenSoFar);
+ DatabaseEntry newKeyEntry = new DatabaseEntry();
+ _newContentKeyTupleBinding.objectToEntry(newKey, newKeyEntry);
+
+ DatabaseEntry newValueEntry = new DatabaseEntry();
+ _contentTB.objectToEntry(content, newValueEntry);
+
+ _newContentDB.put(null, newKeyEntry, newValueEntry);
+
+ _prevMsgId = msgId;
+ _bytesSeenSoFar += contentSize;
+ }
+ }
+
+ private static class DeliveryDbVisitor extends DatabaseVisitor
+ {
+
+ private final QueueEntryTB _queueEntryTB;
+ private final List<AMQShortString> _existingQueues;
+ private final Database _deliveryDb;
+
+ public DeliveryDbVisitor(QueueEntryTB queueEntryTB, List<AMQShortString> existingQueues, Database deliveryDb)
+ {
+ _queueEntryTB = queueEntryTB;
+ _existingQueues = existingQueues;
+ _deliveryDb = deliveryDb;
+ }
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ incrementCount();
+
+ // get message id from entry key
+ QueueEntryKey entryKey = (QueueEntryKey) _queueEntryTB.entryToObject(key);
+ AMQShortString queueName = entryKey.getQueueName();
+
+ // ONLY copy data if message queue exists
+ if (_existingQueues.contains(queueName))
+ {
+ _deliveryDb.put(null, key, value);
+ }
+ }
+ }
+
+ private class DurableSubDiscoverer extends DatabaseVisitor
+ {
+ private final List<AMQShortString> _durableSubQueues;
+ private final TupleBinding<BindingKey> _bindingTB;
+ private final List<AMQShortString> _topicExchanges;
+
+
+ public DurableSubDiscoverer(List<AMQShortString> topicExchanges, TupleBinding<BindingKey> bindingTB)
+ {
+ _durableSubQueues = new ArrayList<AMQShortString>();
+ _bindingTB = bindingTB;
+ _topicExchanges = topicExchanges;
+ }
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ BindingKey bindingRec = _bindingTB.entryToObject(key);
+ AMQShortString queueName = bindingRec.getQueueName();
+ AMQShortString exchangeName = bindingRec.getExchangeName();
+
+ if (_topicExchanges.contains(exchangeName) && queueName.asString().contains(":"))
+ {
+ _durableSubQueues.add(queueName);
+ }
+ }
+
+ public List<AMQShortString> getDurableSubQueues()
+ {
+ return _durableSubQueues;
+ }
+ }
+
+ private static class QueueVisitor extends DatabaseVisitor
+ {
+ private final TupleBinding<QueueRecord> _queueTupleBinding;
+ private final List<AMQShortString> _durableSubQueues;
+ private final List<AMQShortString> _existingQueues = new ArrayList<AMQShortString>();
+ private final BDBMessageStore _newMessageStore;
+
+ public QueueVisitor(TupleBinding<QueueRecord> queueTupleBinding,
+ List<AMQShortString> durableSubQueues,
+ BDBMessageStore newMessageStore)
+ {
+ _queueTupleBinding = queueTupleBinding;
+ _durableSubQueues = durableSubQueues;
+ _newMessageStore = newMessageStore;
+ }
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws AMQStoreException
+ {
+ QueueRecord queueRec = _queueTupleBinding.entryToObject(value);
+ AMQShortString queueName = queueRec.getNameShortString();
+
+ //if the queue name is in the gathered list then set its exclusivity true
+ if (_durableSubQueues.contains(queueName))
+ {
+ _logger.info("Marking as possible DurableSubscription backing queue: " + queueName);
+ queueRec.setExclusive(true);
+ }
+
+ //The simple call to createQueue with the QueueRecord object is sufficient for a v2->v3 upgrade as
+ //the extra 'exclusive' property in v3 will be defaulted to false in the record creation.
+ _newMessageStore.createQueue(queueRec);
+
+ incrementCount();
+ _existingQueues.add(queueName);
+ }
+
+ public List<AMQShortString> getExistingQueues()
+ {
+ return _existingQueues;
+ }
+ }
+
+ private static class MetaDataVisitor extends DatabaseVisitor
+ {
+ private final TupleBinding<Object> _oldMetaDataTupleBinding;
+ private final TupleBinding<Object> _newMetaDataTupleBinding;
+ private final Set<Long> _queueMessages;
+ private final Database _newMetaDataDB;
+
+ public MetaDataVisitor(Set<Long> queueMessages,
+ Database newMetaDataDB,
+ TupleBinding<Object> oldMetaDataTupleBinding,
+ TupleBinding<Object> newMetaDataTupleBinding)
+ {
+ _queueMessages = queueMessages;
+ _newMetaDataDB = newMetaDataDB;
+ _oldMetaDataTupleBinding = oldMetaDataTupleBinding;
+ _newMetaDataTupleBinding = newMetaDataTupleBinding;
+ }
+
+
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ incrementCount();
+ MessageMetaData metaData = (MessageMetaData) _oldMetaDataTupleBinding.entryToObject(value);
+
+ // get message id
+ Long messageId = TupleBinding.getPrimitiveBinding(Long.class).entryToObject(key);
+
+ // ONLY copy data if message is delivered to existing queue
+ if (!_queueMessages.contains(messageId))
+ {
+ return;
+ }
+ DatabaseEntry newValue = new DatabaseEntry();
+ _newMetaDataTupleBinding.objectToEntry(metaData, newValue);
+
+ _newMetaDataDB.put(null, key, newValue);
+ }
+ }
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
index 26a3b674ae..c6a1372d7e 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
@@ -28,15 +28,20 @@ import org.apache.qpid.AMQStoreException;
/** Visitor Interface so that each DatabaseEntry for a database can easily be processed. */
public abstract class DatabaseVisitor
{
- protected int _count;
+ private int _count;
abstract public void visit(DatabaseEntry entry, DatabaseEntry value) throws AMQStoreException, DatabaseException;
- public int getVisitedCount()
+ public final int getVisitedCount()
{
return _count;
}
+ protected final void incrementCount()
+ {
+ _count++;
+ }
+
public void resetVisitCount()
{
_count = 0;
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
index 371f0fed05..a876a056fc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
@@ -32,6 +32,10 @@ import java.io.IOException;
public class FieldTableEncoding
{
+ private FieldTableEncoding()
+ {
+ }
+
public static FieldTable readFieldTable(TupleInput tupleInput) throws DatabaseException
{
long length = tupleInput.readLong();
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
index c60f981011..09d43e6a08 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
@@ -33,7 +33,7 @@ public class BindingTupleBindingFactory extends TupleBindingFactory<BindingKey>
public TupleBinding<BindingKey> getInstance()
{
- switch (_version)
+ switch (getVersion())
{
default:
case 5:
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
index 872ec87ad6..4a320f49c9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
@@ -33,7 +33,7 @@ public class MessageContentKeyTupleBindingFactory extends TupleBindingFactory<Me
public TupleBinding<MessageContentKey> getInstance()
{
- switch (_version)
+ switch (getVersion())
{
default:
case 5:
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
index 40153c13ea..cb742e76a1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
@@ -31,7 +31,7 @@ public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Obje
public TupleBinding<Object> getInstance()
{
- switch (_version)
+ switch (getVersion())
{
default:
case 5:
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
index ccdf4492e6..a189786885 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
@@ -34,7 +34,7 @@ public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord>
public TupleBinding<QueueRecord> getInstance()
{
- switch (_version)
+ switch (getVersion())
{
default:
case 5:
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
index d57534e547..d2ba4dbbca 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
@@ -34,9 +34,7 @@ import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple
{
- protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
-
- protected FieldTable _arguments;
+ private static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
public QueueTuple_4()
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
index 0a0163b239..c9094a132d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
@@ -33,9 +33,7 @@ import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
public class QueueTuple_5 extends QueueTuple_4
{
- protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
-
- protected FieldTable _arguments;
+ private static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
public QueueTuple_5()
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
index 2adac1f9a3..97b1398e10 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
@@ -24,7 +24,7 @@ import com.sleepycat.bind.tuple.TupleBinding;
public abstract class TupleBindingFactory<E>
{
- protected int _version;
+ private final int _version;
public TupleBindingFactory(int version)
{
@@ -32,4 +32,9 @@ public abstract class TupleBindingFactory<E>
}
public abstract TupleBinding<E> getInstance();
+
+ public int getVersion()
+ {
+ return _version;
+ }
}