summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-17 21:19:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-17 21:19:05 +0000
commitf4c3e204c54e95a898ce6a56b589ace853579485 (patch)
tree2b8540a45ce49432346d436d1c730ff67d0c52ea /qpid/java
parente717efdc1f178792d05b3e7eedb509e950de35d9 (diff)
downloadqpid-python-f4c3e204c54e95a898ce6a56b589ace853579485.tar.gz
QPID-6457 : [Java Broker] Make asynchronous commits occur on executor threads
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667409 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java145
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java10
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java1
3 files changed, 152 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 93a7b466a3..96d2058c05 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -35,6 +35,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -109,11 +120,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
" WHERE format = ? and global_id = ? and branch_id = ?";
protected final EventManager _eventManager = new EventManager();
+ private ConfiguredObject<?> _parent;
protected abstract boolean isMessageStoreOpen();
protected abstract void checkMessageStoreOpen();
+ private ScheduledThreadPoolExecutor _executor;
+ public AbstractJDBCMessageStore()
+ {
+ }
protected void setMaximumMessageId()
{
@@ -269,6 +285,34 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
+ protected void initMessageStore(final ConfiguredObject<?> parent)
+ {
+ _parent = parent;
+ _executor = new ScheduledThreadPoolExecutor(4, new ThreadFactory()
+ {
+ private final AtomicInteger _count = new AtomicInteger();
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ final Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setName(parent.getName() + "-store-"+_count.incrementAndGet());
+ return thread;
+ }
+ });
+ _executor.prestartAllCoreThreads();
+
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ if(_executor != null)
+ {
+ _executor.shutdown();
+ }
+
+ }
+
protected abstract Logger getLogger();
protected abstract String getSqlBlobType();
@@ -835,10 +879,105 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
}
- private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private FutureResult commitTranAsync(final ConnectionWrapper connWrapper) throws StoreException
{
- commitTran(connWrapper);
- return FutureResult.IMMEDIATE_FUTURE;
+ final Future<?> result = _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ commitTran(connWrapper);
+ }
+ });
+ return new FutureResult()
+ {
+ @Override
+ public boolean isComplete()
+ {
+ boolean done = result.isDone();
+ try
+ {
+ result.get();
+ }
+ catch (InterruptedException e)
+ {
+ // this won't happen as we're actually already done;
+ }
+ catch (ExecutionException e)
+ {
+ if(e.getCause() instanceof RuntimeException)
+ {
+ throw (RuntimeException)e.getCause();
+ }
+ else if(e.getCause() instanceof Error)
+ {
+ throw (Error)e.getCause();
+ }
+ else
+ {
+ throw new StoreException(e);
+ }
+ }
+ return done;
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ try
+ {
+ result.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ catch (ExecutionException e)
+ {
+ if(e.getCause() instanceof RuntimeException)
+ {
+ throw (RuntimeException)e.getCause();
+ }
+ else if(e.getCause() instanceof Error)
+ {
+ throw (Error)e.getCause();
+ }
+ else
+ {
+ throw new StoreException(e);
+ }
+ }
+ }
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+
+ try
+ {
+ result.get(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ catch (ExecutionException e)
+ {
+ if(e.getCause() instanceof RuntimeException)
+ {
+ throw (RuntimeException)e.getCause();
+ }
+ else if(e.getCause() instanceof Error)
+ {
+ throw (Error)e.getCause();
+ }
+ else
+ {
+ throw new StoreException(e);
+ }
+ }
+ }
+ };
}
private void abortTran(ConnectionWrapper connWrapper) throws StoreException
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
index 6a665a843e..df5eec025b 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
@@ -50,6 +50,7 @@ public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
if (_messageStoreOpen.compareAndSet(false, true))
{
_parent = parent;
+ initMessageStore(parent);
DerbyUtils.loadDerbyDriver();
@@ -85,7 +86,14 @@ public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
{
if (_messageStoreOpen.compareAndSet(true, false))
{
- doClose();
+ try
+ {
+ doClose();
+ }
+ finally
+ {
+ super.closeMessageStore();
+ }
}
}
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index 63c60d7400..e062af45a7 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -76,6 +76,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se
finally
{
doClose();
+ super.closeMessageStore();
}
}