summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-11-05 13:32:54 +0000
committerKeith Wall <kwall@apache.org>2014-11-05 13:32:54 +0000
commit3c2f41d29f26526565d302d5d1c01584eb0c0ab2 (patch)
treeaf5f8a7515ff671a4da82beaf7e56f5fd26a0bf7 /qpid/java/broker-core
parentd942858418286c716b0dac33b888a3f4b994ea41 (diff)
downloadqpid-python-3c2f41d29f26526565d302d5d1c01584eb0c0ab2.tar.gz
QPID-6214: [Java Broker] Change asynch recoverer to allow for task cancellation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1636871 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java71
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java6
4 files changed, 80 insertions, 17 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index a0a89d9f12..52d779f877 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -161,7 +161,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private MessageDestination _defaultDestination;
private MessageStore _messageStore;
-
+ private MessageStoreRecoverer _messageStoreRecoverer;
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
@@ -688,6 +688,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
try
{
+ if (_messageStoreRecoverer != null)
+ {
+ _messageStoreRecoverer.cancel();
+ }
+
getMessageStore().closeMessageStore();
}
catch (StoreException e)
@@ -1411,16 +1416,15 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
createDefaultExchanges();
}
- MessageStoreRecoverer messageStoreRecoverer;
if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
{
- messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
+ _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
}
else
{
- messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
+ _messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
}
- messageStoreRecoverer.recover(this);
+ _messageStoreRecoverer.recover(this);
State finalState = State.ERRORED;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index 899cfdcd6e..628b0ea08c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -27,6 +27,9 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
@@ -56,17 +59,28 @@ import org.apache.qpid.transport.util.Functions;
public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class);
+ private AsynchronousRecoverer _asynchronousRecoverer;
@Override
public void recover(final VirtualHostImpl virtualHost)
{
- AsynchronousRecoverer asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
+ _asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
- asynchronousRecoverer.recover();
+ _asynchronousRecoverer.recover();
+ }
+
+ @Override
+ public void cancel()
+ {
+ if (_asynchronousRecoverer != null)
+ {
+ _asynchronousRecoverer.cancel();
+ }
}
private static class AsynchronousRecoverer
{
+ public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000;
private final VirtualHostImpl<?, ?, ?> _virtualHost;
private final EventLogger _eventLogger;
private final MessageStore _store;
@@ -75,7 +89,8 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
-
+ private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool();
+ private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost)
{
@@ -95,8 +110,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
for(AMQQueue<?> queue : _recoveringQueues)
{
- Thread queueThread = new Thread(new QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + getVirtualHost().getName() + ")");
- queueThread.start();
+ _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue));
}
}
@@ -161,14 +175,18 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
{
messagesToDelete.add(storedMessage);
}
- return messageNumber <_maxMessageId-1;
+ return _continueRecovery.get() && messageNumber <_maxMessageId-1;
}
});
for(StoredMessage<?> storedMessage : messagesToDelete)
{
-
- _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
- storedMessage.remove();
+ if (_continueRecovery.get())
+ {
+ _logger.info("Message id "
+ + storedMessage.getMessageNumber()
+ + " in store, but not in any queue - removing....");
+ storedMessage.remove();
+ }
}
messagesToDelete.clear();
@@ -198,6 +216,25 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
return ref == null ? null : ref.getMessage();
}
+ public void cancel()
+ {
+ _continueRecovery.set(false);
+ _queueRecoveryExecutor.shutdown();
+ try
+ {
+ boolean wasShutdown = _queueRecoveryExecutor.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!wasShutdown)
+ {
+ _logger.warn("Failed to gracefully shutdown queue recovery executor within permitted time period");
+ _queueRecoveryExecutor.shutdownNow();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private class DistributedTransactionVisitor implements DistributedTransactionHandler
{
@@ -335,7 +372,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
branch.setState(DtxBranch.State.PREPARED);
branch.prePrepareTransaction();
- return true;
+ return _continueRecovery.get();
}
private StringBuilder xidAsString(Xid id)
@@ -364,7 +401,17 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
@Override
public void run()
{
- recoverQueue(_queue);
+ String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("Queue Recoverer : " + _queue.getName() + " (vh: " + getVirtualHost().getName() + ")");
+
+ try
+ {
+ recoverQueue(_queue);
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
}
}
@@ -408,7 +455,7 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
txn.dequeueMessage(_queue, new DummyMessage(messageId));
txn.commitTranAsync();
}
- return true;
+ return _continueRecovery.get();
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
index 9f4c7dd319..7500d4e6e0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
@@ -23,4 +23,10 @@ package org.apache.qpid.server.virtualhost;
public interface MessageStoreRecoverer
{
void recover(VirtualHostImpl virtualHost);
+
+ /**
+ * Cancels any in-progress message store recovery. If message store recovery has already
+ * completed, this method call has no effect.
+ */
+ void cancel();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
index becf4a073c..1cdafde945 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
@@ -113,6 +113,12 @@ public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
}
+ @Override
+ public void cancel()
+ {
+ // No-op
+ }
+
private static class MessageVisitor implements MessageHandler
{