diff options
Diffstat (limited to 'qpid/java/broker')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 90de1aa8fa..c014739324 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -95,6 +95,7 @@ public class DerbyMessageStore extends AbstractMessageStore private String _connectionURL; + Map<AMQShortString, Integer> _queueRecoveries = new TreeMap<AMQShortString, Integer>(); private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; @@ -380,6 +381,11 @@ public class DerbyMessageStore extends AbstractMessageStore } queueMap.put(queueNameShortString,q); + + CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(String.valueOf(q.getName()), true)); + + //Record that we have a queue for recovery + _queueRecoveries.put(new AMQShortString(queueName), 0); } return queueMap; @@ -1378,7 +1384,6 @@ public class DerbyMessageStore extends AbstractMessageStore Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>(); List<ProcessAction> actions = new ArrayList<ProcessAction>(); - Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>(); final boolean inLocaltran = inTran(context); Connection conn = null; @@ -1436,17 +1441,14 @@ public class DerbyMessageStore extends AbstractMessageStore _logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName()); } - if (_logger.isInfoEnabled()) + Integer count = _queueRecoveries.get(queueName); + if (count == null) { - Integer count = queueRecoveries.get(queueName); - if (count == null) - { - count = 0; - } - - queueRecoveries.put(queueName, ++count); + count = 0; } + _queueRecoveries.put(queueName, ++count); + actions.add(new ProcessAction(queue, context, message)); } @@ -1472,8 +1474,19 @@ public class DerbyMessageStore extends AbstractMessageStore if (_logger.isInfoEnabled()) { - _logger.info("Recovered message counts: " + queueRecoveries); + _logger.info("Recovered message counts: " + _queueRecoveries); } + + for(Map.Entry<AMQShortString,Integer> entry : _queueRecoveries.entrySet()) + { + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1005(entry.getValue(), String.valueOf(entry.getKey()))); + + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(String.valueOf(entry.getKey()), true)); + } + + // Free the memory + _queueRecoveries = null; + } private Connection getConnection(final StoreContext context) |
