diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-17 21:19:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-17 21:19:05 +0000 |
| commit | f4c3e204c54e95a898ce6a56b589ace853579485 (patch) | |
| tree | 2b8540a45ce49432346d436d1c730ff67d0c52ea /qpid/java | |
| parent | e717efdc1f178792d05b3e7eedb509e950de35d9 (diff) | |
| download | qpid-python-f4c3e204c54e95a898ce6a56b589ace853579485.tar.gz | |
QPID-6457 : [Java Broker] Make asynchronous commits occur on executor threads
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667409 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 152 insertions, 4 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 93a7b466a3..96d2058c05 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -35,6 +35,17 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; @@ -109,11 +120,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore " WHERE format = ? and global_id = ? and branch_id = ?"; protected final EventManager _eventManager = new EventManager(); + private ConfiguredObject<?> _parent; protected abstract boolean isMessageStoreOpen(); protected abstract void checkMessageStoreOpen(); + private ScheduledThreadPoolExecutor _executor; + public AbstractJDBCMessageStore() + { + } protected void setMaximumMessageId() { @@ -269,6 +285,34 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } + protected void initMessageStore(final ConfiguredObject<?> parent) + { + _parent = parent; + _executor = new ScheduledThreadPoolExecutor(4, new ThreadFactory() + { + private final AtomicInteger _count = new AtomicInteger(); + @Override + public Thread newThread(final Runnable r) + { + final Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName(parent.getName() + "-store-"+_count.incrementAndGet()); + return thread; + } + }); + _executor.prestartAllCoreThreads(); + + } + + @Override + public void closeMessageStore() + { + if(_executor != null) + { + _executor.shutdown(); + } + + } + protected abstract Logger getLogger(); protected abstract String getSqlBlobType(); @@ -835,10 +879,105 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } } - private FutureResult commitTranAsync(ConnectionWrapper connWrapper) throws StoreException + private FutureResult commitTranAsync(final ConnectionWrapper connWrapper) throws StoreException { - commitTran(connWrapper); - return FutureResult.IMMEDIATE_FUTURE; + final Future<?> result = _executor.submit(new Runnable() + { + @Override + public void run() + { + commitTran(connWrapper); + } + }); + return new FutureResult() + { + @Override + public boolean isComplete() + { + boolean done = result.isDone(); + try + { + result.get(); + } + catch (InterruptedException e) + { + // this won't happen as we're actually already done; + } + catch (ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error)e.getCause(); + } + else + { + throw new StoreException(e); + } + } + return done; + } + + @Override + public void waitForCompletion() + { + try + { + result.get(); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + catch (ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error)e.getCause(); + } + else + { + throw new StoreException(e); + } + } + } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + + try + { + result.get(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + catch (ExecutionException e) + { + if(e.getCause() instanceof RuntimeException) + { + throw (RuntimeException)e.getCause(); + } + else if(e.getCause() instanceof Error) + { + throw (Error)e.getCause(); + } + else + { + throw new StoreException(e); + } + } + } + }; } private void abortTran(ConnectionWrapper connWrapper) throws StoreException diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java index 6a665a843e..df5eec025b 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java @@ -50,6 +50,7 @@ public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore if (_messageStoreOpen.compareAndSet(false, true)) { _parent = parent; + initMessageStore(parent); DerbyUtils.loadDerbyDriver(); @@ -85,7 +86,14 @@ public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore { if (_messageStoreOpen.compareAndSet(true, false)) { - doClose(); + try + { + doClose(); + } + finally + { + super.closeMessageStore(); + } } } diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java index 63c60d7400..e062af45a7 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java @@ -76,6 +76,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se finally { doClose(); + super.closeMessageStore(); } } |
