summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-28 17:14:25 +0000
committerKeith Wall <kwall@apache.org>2014-03-28 17:14:25 +0000
commit59f63df7016f77288fd5434e9e09557cd551eefd (patch)
treea93b7059b600a5590f86552dd164ddfff296bb8a /qpid/java/broker-core/src
parente7b1fb49e4d5521c952f3ebed73384611c3ceb48 (diff)
downloadqpid-python-59f63df7016f77288fd5434e9e09557cd551eefd.tar.gz
NO-JIRA: Make the MessageStore and DurableConfigurationStore stateless. This changes
removes the StateManager and delegates the operational logging (open/close) messages to the vhost. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1582835 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java467
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java40
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java91
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/State.java47
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java149
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java53
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/EventManagerTest.java36
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java185
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java200
17 files changed, 314 insertions, 1038 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
index ed989d764f..4165cd2fca 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT;
public class MessageStoreLogSubject extends AbstractLogSubject
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
index 52208f7d7f..7c28ac7e1f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractConfiguredObject.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
-import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import javax.security.auth.Subject;
@@ -565,10 +564,19 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
return getClass().getSimpleName() + " [id=" + _id + ", name=" + getName() + "]";
}
+
public ConfiguredObjectRecord asObjectRecord()
{
return new ConfiguredObjectRecord()
{
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "[name=" + getName() + ", categoryClass=" + getCategoryClass() + ", type="
+ + getType() + ", id=" + getId() + "]";
+ }
+
@Override
public UUID getId()
{
@@ -617,9 +625,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
return parents;
}
+
};
}
+
@SuppressWarnings("unchecked")
@Override
public <C extends ConfiguredObject> C createChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 3098572e39..5006908cee 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -57,7 +57,7 @@ public class AMQQueueFactory implements QueueFactory
{
_virtualHost = virtualHost;
_queueRegistry = queueRegistry;
- }
+ }
@Override
public AMQQueue restoreQueue(Map<String, Object> attributes)
@@ -75,7 +75,7 @@ public class AMQQueueFactory implements QueueFactory
private AMQQueue createOrRestoreQueue(Map<String, Object> attributes, boolean createInStore)
{
String queueName = MapValueConverter.getStringAttribute(Queue.NAME,attributes);
- boolean createDLQ = shouldCreateDLQ(attributes, _virtualHost.getDefaultDeadLetterQueueEnabled());
+ boolean createDLQ = createInStore && shouldCreateDLQ(attributes, _virtualHost.getDefaultDeadLetterQueueEnabled());
if (createDLQ)
{
validateDLNames(queueName);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index ad3e685004..28ac79075e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -21,11 +21,7 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -46,6 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -53,7 +50,6 @@ import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.transport.ConnectionOpen;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -180,24 +176,19 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected final EventManager _eventManager = new EventManager();
- protected final StateManager _messageStoreStateManager;
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
- private StateManager _configurationStoreStateManager;
private boolean _initialized;
- public AbstractJDBCMessageStore()
- {
- _messageStoreStateManager = new StateManager(_eventManager);
- _configurationStoreStateManager = new StateManager(new EventManager());
- }
-
@Override
public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
{
- _configurationStoreStateManager.attainState(State.INITIALISING);
- initialiseIfNecessary(virtualHostName, storeSettings);
- _configurationStoreStateManager.attainState(State.INITIALISED);
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ initialiseIfNecessary(virtualHostName, storeSettings);
+ }
}
private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
@@ -216,14 +207,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
throw new StoreException("Unexpected exception occured", e);
}
- _initialized =true;
+ _initialized = true;
}
}
@Override
public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
{
- _configurationStoreStateManager.attainState(State.ACTIVATING);
+ checkConfigurationStoreOpen();
+
try
{
createOrOpenConfigurationStoreDatabase();
@@ -236,7 +228,22 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
- _configurationStoreStateManager.attainState(State.ACTIVE);
+ }
+
+ private void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
+ }
+
+ private void checkMessageStoreOpen()
+ {
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
}
private void upgradeIfVersionTableExists(ConfiguredObject<?> parent)
@@ -261,15 +268,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
{
- _messageStoreStateManager.attainState(State.INITIALISING);
- initialiseIfNecessary(virtualHostName, messageStoreSettings);
- _messageStoreStateManager.attainState(State.INITIALISED);
+ if (_messageStoreOpen.compareAndSet(false, true))
+ {
+ initialiseIfNecessary(virtualHostName, messageStoreSettings);
+ }
}
@Override
public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
- _messageStoreStateManager.attainState(State.ACTIVATING);
+ checkMessageStoreOpen();
try
{
createOrOpenMessageStoreDatabase();
@@ -305,8 +313,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
-
- _messageStoreStateManager.attainState(State.ACTIVE);
}
protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
@@ -634,8 +640,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
-
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -700,8 +704,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
-
-
private void createXidTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_TABLE_NAME, conn))
@@ -836,35 +838,34 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void closeMessageStore()
{
- _messageStoreStateManager.attainState(State.CLOSING);
-
- if (_configurationStoreStateManager.isInState(State.CLOSED) || _configurationStoreStateManager.isInState(State.INITIAL))
+ if (_messageStoreOpen.compareAndSet(true, false))
{
- doClose();
+ if (!_configurationStoreOpen.get())
+ {
+ doClose();
+ }
}
-
- _messageStoreStateManager.attainState(State.CLOSED);
}
@Override
public void closeConfigurationStore()
{
- _configurationStoreStateManager.attainState(State.CLOSING);
-
- if (_messageStoreStateManager.isInState(State.CLOSED) || _messageStoreStateManager.isInState(State.INITIAL))
+ if (_configurationStoreOpen.compareAndSet(true, false))
{
- doClose();
+ if (!_messageStoreOpen.get())
+ {
+ doClose();
+ }
}
-
- _configurationStoreStateManager.attainState(State.CLOSED);
}
-
protected abstract void doClose();
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
+ checkMessageStoreOpen();
+
if(metaData.isPersistent())
{
return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
@@ -875,12 +876,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- public StoredMessage getMessage(long messageNumber)
- {
- return null;
- }
-
- public void removeMessage(long messageId)
+ private void removeMessage(long messageId)
{
try
{
@@ -944,26 +940,24 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void create(ConfiguredObjectRecord object) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE))
+ checkConfigurationStoreOpen();
+ try
{
+ Connection conn = newConnection();
try
{
- Connection conn = newConnection();
- try
- {
- insertConfiguredObject(object, conn);
- conn.commit();
- }
- finally
- {
- conn.close();
- }
+ insertConfiguredObject(object, conn);
+ conn.commit();
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error creating ConfiguredObject " + object);
+ conn.close();
}
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Error creating ConfiguredObject " + object);
+ }
}
/**
@@ -1021,46 +1015,15 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract Connection getConnection() throws SQLException;
- private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws StoreException
- {
- byte[] argumentBytes;
- if(arguments == null)
- {
- argumentBytes = new byte[0];
- }
- else
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
-
- try
- {
- dos.writeInt(arguments.size());
- for(Map.Entry<String,String> arg : arguments.entrySet())
- {
- dos.writeUTF(arg.getKey());
- dos.writeUTF(arg.getValue());
- }
- }
- catch (IOException e)
- {
- // This should never happen
- throw new StoreException(e.getMessage(), e);
- }
- argumentBytes = bos.toByteArray();
- }
- return argumentBytes;
- }
-
@Override
public Transaction newTransaction()
{
+ checkMessageStoreOpen();
+
return new JDBCTransaction();
}
- public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws
- StoreException
+ private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -1103,8 +1066,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws
- StoreException
+ private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -1284,7 +1246,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public void commitTran(ConnectionWrapper connWrapper) throws StoreException
+ private void commitTran(ConnectionWrapper connWrapper) throws StoreException
{
try
@@ -1309,13 +1271,13 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
return StoreFuture.IMMEDIATE_FUTURE;
}
- public void abortTran(ConnectionWrapper connWrapper) throws StoreException
+ private void abortTran(ConnectionWrapper connWrapper) throws StoreException
{
if (connWrapper == null)
{
@@ -1340,11 +1302,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public Long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
@@ -1398,7 +1355,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1455,7 +1412,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1585,7 +1542,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1672,7 +1629,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- StorableMessageMetaData getMetaData(long messageId) throws SQLException
+ private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -1754,7 +1711,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- public int getContent(long messageId, int offset, ByteBuffer dst)
+ private int getContent(long messageId, int offset, ByteBuffer dst)
{
Connection conn = null;
PreparedStatement stmt = null;
@@ -1835,6 +1792,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ checkMessageStoreOpen();
+
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
@@ -1855,12 +1814,16 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void commitTran()
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@@ -1868,6 +1831,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public StoreFuture commitTranAsync()
{
+ checkMessageStoreOpen();
+
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
@@ -1876,18 +1841,24 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void abortTran()
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
}
}
@@ -1929,6 +1900,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
+ checkMessageStoreOpen();
try
{
metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
@@ -1984,6 +1956,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
else
{
+ checkMessageStoreOpen();
return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
@@ -2002,6 +1975,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public synchronized StoreFuture flushToStore()
{
+ checkMessageStoreOpen();
+
Connection conn = null;
try
{
@@ -2033,6 +2008,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void remove()
{
+ checkMessageStoreOpen();
+
int delta = getMetaData().getContentSize();
AbstractJDBCMessageStore.this.removeMessage(_messageId);
storedSizeChange(-delta);
@@ -2105,87 +2082,85 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE))
+ try
{
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
try
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ boolean exists;
try
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- boolean exists;
+ exists = rs.next();
+
+ }
+ finally
+ {
+ rs.close();
+ }
+ // If we don't have any data in the result set then we can add this configured object
+ if (!exists)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
try
{
- exists = rs.next();
-
- }
- finally
- {
- rs.close();
- }
- // If we don't have any data in the result set then we can add this configured object
- if (!exists)
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
{
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
+ insertStmt.setNull(3, Types.BLOB);
}
- finally
+ else
{
- insertStmt.close();
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
-
- writeHierarchy(configuredObject, conn);
+ insertStmt.execute();
}
-
- }
- finally
- {
- stmt.close();
+ finally
+ {
+ insertStmt.close();
+ }
+
+ writeHierarchy(configuredObject, conn);
}
-
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ stmt.close();
}
- }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
}
@Override
public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
{
+ checkConfigurationStoreOpen();
+
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
try
{
@@ -2242,31 +2217,27 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
- if (_configurationStoreStateManager.isInState(State.ACTIVE) || _configurationStoreStateManager.isInState(State.ACTIVATING))
+ checkConfigurationStoreOpen();
+ try
{
+ Connection conn = newConnection();
try
{
- Connection conn = newConnection();
- try
- {
- for(ConfiguredObjectRecord record : records)
- {
- updateConfiguredObject(record, createIfNecessary, conn);
- }
- conn.commit();
- }
- finally
+ for(ConfiguredObjectRecord record : records)
{
- conn.close();
+ updateConfiguredObject(record, createIfNecessary, conn);
}
+ conn.commit();
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ conn.close();
}
-
}
-
+ catch (SQLException e)
+ {
+ throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
}
private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
@@ -2274,89 +2245,88 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
Connection conn)
throws SQLException, StoreException
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
try
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ if (rs.next())
{
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- if (rs.next())
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
{
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
{
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
}
- finally
+ else
{
- stmt2.close();
+ stmt2.setNull(2, Types.BLOB);
}
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
}
- else if(createIfNecessary)
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
{
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
{
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
+ insertStmt.setNull(3, Types.BLOB);
}
- finally
+ else
{
- insertStmt.close();
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
- writeHierarchy(configuredObject, conn);
+ insertStmt.execute();
}
+ finally
+ {
+ insertStmt.close();
+ }
+ writeHierarchy(configuredObject, conn);
}
- finally
- {
- rs.close();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
finally
{
- stmt.close();
+ rs.close();
}
-
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
}
private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
@@ -2483,6 +2453,7 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
@Override
public void onDelete()
{
+ // TODO should probably check we are closed
try
{
Connection conn = newAutoCommitConnection();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index bf538e4592..a7e9ef2ab6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -20,11 +20,9 @@
*/
package org.apache.qpid.server.store;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.ConfiguredObject;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
abstract public class AbstractMemoryMessageStore extends NullMessageStore
@@ -70,39 +68,8 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
}
};
- private final StateManager _stateManager;
private final EventManager _eventManager = new EventManager();
- public AbstractMemoryMessageStore()
- {
- _stateManager = new StateManager(_eventManager);
- }
-
- @Override
- public void openConfigurationStore(String virtualHostName, Map<String, Object> storeSettings)
- {
- }
-
- @Override
- public void recoverConfigurationStore(ConfiguredObject<?> parent, ConfigurationRecoveryHandler recoveryHandler)
- {
-
- }
-
- @Override
- public void openMessageStore(String virtualHostName, Map<String, Object> messageStoreSettings)
- {
- _stateManager.attainState(State.INITIALISING);
- _stateManager.attainState(State.INITIALISED);
- }
-
- @Override
- public void recoverMessageStore(ConfiguredObject<?> parent, MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- _stateManager.attainState(State.ACTIVATING);
-
- _stateManager.attainState(State.ACTIVE);
- }
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
@@ -126,13 +93,6 @@ abstract public class AbstractMemoryMessageStore extends NullMessageStore
}
@Override
- public void closeMessageStore()
- {
- _stateManager.attainState(State.CLOSING);
- _stateManager.attainState(State.CLOSED);
- }
-
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
index f8d8ecdd7c..84f24df1cc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
@@ -27,10 +27,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
@@ -74,6 +76,7 @@ public class DurableConfigurationRecoverer implements ConfigurationRecoveryHandl
_store = store;
_upgrader = _upgraderProvider.getUpgrader(configVersion, this);
+ _eventLogger.message(_logSubject, ConfigStoreMessages.RECOVERY_START());
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
index c681126c11..a9a5ea8086 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
@@ -21,22 +21,6 @@ package org.apache.qpid.server.store;
public enum Event
{
- BEFORE_INIT,
- AFTER_INIT,
-
- BEFORE_ACTIVATE,
- AFTER_ACTIVATE,
-
- BEFORE_PASSIVATE,
- AFTER_PASSIVATE,
-
- BEFORE_CLOSE,
- AFTER_CLOSE,
-
- BEFORE_QUIESCE,
- AFTER_QUIESCE,
- BEFORE_RESTART,
-
PERSISTENT_MESSAGE_SIZE_OVERFULL,
PERSISTENT_MESSAGE_SIZE_UNDERFULL
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 7fb6c4df48..b6b65087a4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -72,6 +72,7 @@ public interface MessageStore
String getStoreLocation();
+ // TODO dead method - remove??
String getStoreType();
void onDelete();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
deleted file mode 100644
index 43c75f75b1..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-
-public class OperationalLoggingListener implements EventListener
-{
- protected final LogSubject _logSubject;
- private MessageStore _store;
- private final EventLogger _eventLogger;
-
-
- private OperationalLoggingListener(final MessageStore store, LogSubject logSubject, final EventLogger eventLogger)
- {
- _logSubject = logSubject;
- _eventLogger = eventLogger;
- store.addEventListener(this,
- Event.BEFORE_INIT,
- Event.AFTER_INIT,
- Event.BEFORE_ACTIVATE,
- Event.AFTER_ACTIVATE,
- Event.AFTER_CLOSE,
- Event.PERSISTENT_MESSAGE_SIZE_OVERFULL,
- Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- _store = store;
-
- }
-
- public void event(Event event)
- {
-
- switch(event)
- {
- case BEFORE_INIT:
- _eventLogger.message(_logSubject, ConfigStoreMessages.CREATED());
- break;
- case AFTER_INIT:
- _eventLogger.message(_logSubject, MessageStoreMessages.CREATED());
- _eventLogger.message(_logSubject, TransactionLogMessages.CREATED());
- String storeLocation = _store.getStoreLocation();
- if (storeLocation != null)
- {
- _eventLogger.message(_logSubject, MessageStoreMessages.STORE_LOCATION(storeLocation));
- }
- break;
- case BEFORE_ACTIVATE:
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
- break;
- case AFTER_ACTIVATE:
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
- break;
- case AFTER_CLOSE:
- _eventLogger.message(_logSubject, MessageStoreMessages.CLOSED());
- break;
- case PERSISTENT_MESSAGE_SIZE_OVERFULL:
- _eventLogger.message(_logSubject, MessageStoreMessages.OVERFULL());
- break;
- case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
- _eventLogger.message(_logSubject, MessageStoreMessages.UNDERFULL());
- break;
-
- }
- }
-
- public static void listen(final MessageStore store, LogSubject logSubject, final EventLogger eventLogger)
- {
- new OperationalLoggingListener(store, logSubject, eventLogger);
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/State.java
deleted file mode 100644
index 1d0936cec4..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/State.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public enum State
-{
- /** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */
- INITIAL,
-
- INITIALISING,
- /**
- * The initial set-up of the store has completed.
- * If the store is persistent, it has not yet loaded configuration from disk.
- *
- * From the point of view of the user, the store is essentially stopped.
- */
- INITIALISED,
-
- ACTIVATING,
- ACTIVE,
-
- CLOSING,
- CLOSED,
-
- QUIESCING,
- /** The virtual host (and implicitly also the store) has been manually paused by the user to allow configuration changes to take place */
- QUIESCED;
-
-} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
deleted file mode 100644
index 63612da455..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-
-import java.util.EnumMap;
-import java.util.Map;
-
-public class StateManager
-{
- private State _state = State.INITIAL;
- private EventListener _eventListener;
-
- private static final Map<State,Map<State, Transition>> _validTransitions = new EnumMap<State, Map<State, Transition>>(State.class);
-
-
- static class Transition
- {
- private final Event _event;
- private final State _endState;
- private final State _startState;
-
- public Transition(State startState, State endState, Event event)
- {
- _event = event;
- _startState = startState;
- _endState = endState;
-
- Map<State, Transition> stateTransitions = _validTransitions.get(startState);
- if(stateTransitions == null)
- {
- stateTransitions = new EnumMap<State, Transition>(State.class);
- _validTransitions.put(startState, stateTransitions);
- }
- stateTransitions.put(endState, this);
- }
-
- public Event getEvent()
- {
- return _event;
- }
-
- public State getStartState()
- {
- return _startState;
- }
-
- public State getEndState()
- {
- return _endState;
- }
-
- }
-
- public static final Transition INITIALISE = new Transition(State.INITIAL, State.INITIALISING, Event.BEFORE_INIT);
- public static final Transition INITIALISE_COMPLETE = new Transition(State.INITIALISING, State.INITIALISED, Event.AFTER_INIT);
-
- public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
- public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
-
- public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);
- public static final Transition CLOSE_ACTIVATING = new Transition(State.ACTIVATING, State.CLOSING, Event.BEFORE_CLOSE);
- public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
- public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
- public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
-
- public static final Transition PASSIVATE = new Transition(State.ACTIVE, State.INITIALISED, Event.BEFORE_PASSIVATE);
-
- public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_QUIESCE);
- public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.AFTER_QUIESCE);
-
- public static final Transition RESTART = new Transition(State.QUIESCED, State.ACTIVATING, Event.BEFORE_RESTART);
-
-
- public StateManager(final EventManager eventManager)
- {
- this(new EventListener()
- {
- @Override
- public void event(Event event)
- {
- eventManager.notifyEvent(event);
- }
- });
- }
-
-
- public StateManager(EventListener eventListener)
- {
- _eventListener = eventListener;
- }
-
- public synchronized State getState()
- {
- return _state;
- }
-
- public synchronized void attainState(State desired)
- {
- Transition transition = null;
- final Map<State, Transition> stateTransitionMap = _validTransitions.get(_state);
- if(stateTransitionMap != null)
- {
- transition = stateTransitionMap.get(desired);
- }
- if(transition == null)
- {
- throw new IllegalStateException("No valid transition from state " + _state + " to state " + desired);
- }
- _state = desired;
- _eventListener.event(transition.getEvent());
- }
-
- public synchronized boolean isInState(State testedState)
- {
- return _state.equals(testedState);
- }
-
- public synchronized boolean isNotInState(State testedState)
- {
- return !isInState(testedState);
- }
-
- public synchronized void checkInState(State checkedState)
- {
- if (isNotInState(checkedState))
- {
- throw new IllegalStateException("Unexpected state. Was : " + _state + " but expected : " + checkedState);
- }
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index e41ee051f3..dd7e82a100 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -45,7 +44,10 @@ import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageNode;
import org.apache.qpid.server.message.MessageSource;
@@ -120,7 +122,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final EventLogger _eventLogger;
-
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
SecurityManager parentSecurityManager,
@@ -175,6 +176,8 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
abstract protected void initialiseStorage(org.apache.qpid.server.model.VirtualHost<?> virtualHost);
+ abstract protected MessageStoreLogSubject getMessageStoreLogSubject();
+
public IConnectionRegistry getConnectionRegistry()
{
return _connectionRegistry;
@@ -283,8 +286,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
protected void initialiseModel()
{
- _logger.debug("Loading configuration for virtualhost: " + _model.getName());
-
_exchangeRegistry.initialise(_exchangeFactory);
}
@@ -568,17 +569,15 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
}
- protected void closeStorage()
+ private void closeStorage()
{
- //Close MessageStore
if (getMessageStore() != null)
{
- // TODO Remove MessageStore Interface should not throw Exception
try
{
getMessageStore().closeMessageStore();
}
- catch (Exception e)
+ catch (StoreException e)
{
_logger.error("Failed to close message store", e);
}
@@ -588,22 +587,25 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
try
{
getDurableConfigurationStore().closeConfigurationStore();
+ MessageStoreLogSubject configurationStoreSubject = getConfigurationStoreLogSubject();
+ if (configurationStoreSubject != null)
+ {
+ getEventLogger().message(configurationStoreSubject, ConfigStoreMessages.CLOSE());
+ }
}
catch (StoreException e)
{
_logger.error("Failed to close configuration store", e);
}
}
+ getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED());
}
-
- protected Logger getLogger()
+ protected MessageStoreLogSubject getConfigurationStoreLogSubject()
{
- return _logger;
+ return null;
}
-
-
public VirtualHostRegistry getVirtualHostRegistry()
{
return _virtualHostRegistry;
@@ -739,9 +741,11 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
case PERSISTENT_MESSAGE_SIZE_OVERFULL:
block();
+ _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL());
break;
case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
unblock();
+ _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL());
break;
}
}
@@ -952,4 +956,5 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
{
return _model;
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index bb3f8fc012..6b75c39c49 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -21,6 +21,8 @@ package org.apache.qpid.server.virtualhost;/*
import java.util.Map;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageStoreFactory;
@@ -29,7 +31,6 @@ import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.OperationalLoggingListener;
public class StandardVirtualHost extends AbstractVirtualHost
{
@@ -37,6 +38,10 @@ public class StandardVirtualHost extends AbstractVirtualHost
private DurableConfigurationStore _durableConfigurationStore;
+ private MessageStoreLogSubject _messageStoreLogSubject;
+
+ private MessageStoreLogSubject _configurationStoreLogSubject;
+
StandardVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
org.apache.qpid.server.security.SecurityManager parentSecurityManager,
@@ -45,19 +50,6 @@ public class StandardVirtualHost extends AbstractVirtualHost
super(virtualHostRegistry, brokerStatisticsGatherer, parentSecurityManager, virtualHost);
}
-
-
- private MessageStore initialiseMessageStore(String storeType)
- {
- MessageStore messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore();
-
- MessageStoreLogSubject
- storeLogSubject = new MessageStoreLogSubject(getName(), messageStore.getClass().getSimpleName());
- OperationalLoggingListener.listen(messageStore, storeLogSubject, getEventLogger());
-
- return messageStore;
- }
-
private DurableConfigurationStore initialiseConfigurationStore(String storeType)
{
DurableConfigurationStore configurationStore;
@@ -78,30 +70,45 @@ public class StandardVirtualHost extends AbstractVirtualHost
return configurationStore;
}
-
+ @Override
protected void initialiseStorage(VirtualHost virtualHost)
{
Map<String, Object> messageStoreSettings = virtualHost.getMessageStoreSettings();
String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
- _messageStore = initialiseMessageStore(storeType);
+ _messageStore = MessageStoreFactory.FACTORY_LOADER.get(storeType).createMessageStore();
+ _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName());
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
String configurationStoreType = configurationStoreSettings == null ? null : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE);
_durableConfigurationStore = initialiseConfigurationStore(configurationStoreType);
+ boolean combinedStores = _durableConfigurationStore == _messageStore;
+ if (!combinedStores)
+ {
+ _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED());
+ }
DurableConfigurationRecoverer configRecoverer =
new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
new DefaultUpgraderProvider(this, getExchangeRegistry()), getEventLogger());
- _durableConfigurationStore.openConfigurationStore(virtualHost.getName(), _durableConfigurationStore == _messageStore ? messageStoreSettings: configurationStoreSettings);
+ _durableConfigurationStore.openConfigurationStore(virtualHost.getName(), combinedStores ? messageStoreSettings: configurationStoreSettings);
_messageStore.openMessageStore(virtualHost.getName(), virtualHost.getMessageStoreSettings());
+ getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
+
+ if (_configurationStoreLogSubject != null)
+ {
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+ }
+
_durableConfigurationStore.recoverConfigurationStore(getModel(), configRecoverer);
// If store does not have entries for standard exchanges (amq.*), the following will create them.
initialiseModel();
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
_messageStore.recoverMessageStore(getModel(), recoveryHandler, recoveryHandler);
attainActivation();
@@ -119,5 +126,15 @@ public class StandardVirtualHost extends AbstractVirtualHost
return _durableConfigurationStore;
}
+ @Override
+ protected MessageStoreLogSubject getMessageStoreLogSubject()
+ {
+ return _messageStoreLogSubject;
+ }
+ @Override
+ protected MessageStoreLogSubject getConfigurationStoreLogSubject()
+ {
+ return _configurationStoreLogSubject;
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index bc6739eef4..3216115967 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueueableMessage;
@@ -62,18 +63,18 @@ public class VirtualHostConfigRecoveryHandler implements
private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
private final EventLogger _eventLogger;
- private MessageStoreLogSubject _logSubject;
+ private final MessageStoreLogSubject _logSubject;
private MessageStore _store;
- public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
+ public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost, MessageStoreLogSubject logSubject)
{
_virtualHost = virtualHost;
_eventLogger = virtualHost.getEventLogger();
+ _logSubject = logSubject;
}
public VirtualHostConfigRecoveryHandler begin(MessageStore store)
{
- _logSubject = new MessageStoreLogSubject(_virtualHost.getName(), store.getClass().getSimpleName());
_store = store;
_eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
return this;
@@ -81,6 +82,7 @@ public class VirtualHostConfigRecoveryHandler implements
public StoredMessageRecoveryHandler begin()
{
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
return this;
}
@@ -232,10 +234,9 @@ public class VirtualHostConfigRecoveryHandler implements
m.remove();
}
_eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
- }
- public void complete()
- {
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
+ _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
}
public void queueEntry(final UUID queueId, long messageId)
@@ -314,8 +315,6 @@ public class VirtualHostConfigRecoveryHandler implements
_eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
}
-
-
return this;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/EventManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/EventManagerTest.java
index 702874fb88..a9b2e0d961 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/EventManagerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/EventManagerTest.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.store;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.apache.qpid.server.store.Event.AFTER_ACTIVATE;
-import static org.apache.qpid.server.store.Event.BEFORE_ACTIVATE;
+import static org.apache.qpid.server.store.Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL;
+import static org.apache.qpid.server.store.Event.PERSISTENT_MESSAGE_SIZE_OVERFULL;
import junit.framework.TestCase;
public class EventManagerTest extends TestCase
@@ -33,28 +33,28 @@ public class EventManagerTest extends TestCase
public void testEventListenerFires()
{
- _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE);
- _eventManager.notifyEvent(BEFORE_ACTIVATE);
- verify(_mockListener).event(BEFORE_ACTIVATE);
+ _eventManager.addEventListener(_mockListener, PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _eventManager.notifyEvent(PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ verify(_mockListener).event(PERSISTENT_MESSAGE_SIZE_OVERFULL);
}
public void testEventListenerDoesntFire()
{
- _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE);
- _eventManager.notifyEvent(AFTER_ACTIVATE);
+ _eventManager.addEventListener(_mockListener, PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
verifyZeroInteractions(_mockListener);
}
public void testEventListenerFiresMultipleTimes()
{
- _eventManager.addEventListener(_mockListener, BEFORE_ACTIVATE);
- _eventManager.addEventListener(_mockListener, AFTER_ACTIVATE);
+ _eventManager.addEventListener(_mockListener, PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _eventManager.addEventListener(_mockListener, PERSISTENT_MESSAGE_SIZE_UNDERFULL);
- _eventManager.notifyEvent(BEFORE_ACTIVATE);
- verify(_mockListener).event(BEFORE_ACTIVATE);
+ _eventManager.notifyEvent(PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ verify(_mockListener).event(PERSISTENT_MESSAGE_SIZE_OVERFULL);
- _eventManager.notifyEvent(AFTER_ACTIVATE);
- verify(_mockListener).event(AFTER_ACTIVATE);
+ _eventManager.notifyEvent(PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ verify(_mockListener).event(PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
public void testMultipleListenersFireForSameEvent()
@@ -62,11 +62,11 @@ public class EventManagerTest extends TestCase
final EventListener mockListener1 = mock(EventListener.class);
final EventListener mockListener2 = mock(EventListener.class);
- _eventManager.addEventListener(mockListener1, BEFORE_ACTIVATE);
- _eventManager.addEventListener(mockListener2, BEFORE_ACTIVATE);
- _eventManager.notifyEvent(BEFORE_ACTIVATE);
+ _eventManager.addEventListener(mockListener1, PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _eventManager.addEventListener(mockListener2, PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _eventManager.notifyEvent(PERSISTENT_MESSAGE_SIZE_OVERFULL);
- verify(mockListener1).event(BEFORE_ACTIVATE);
- verify(mockListener2).event(BEFORE_ACTIVATE);
+ verify(mockListener1).event(PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ verify(mockListener2).event(PERSISTENT_MESSAGE_SIZE_OVERFULL);
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
deleted file mode 100644
index aa9483a894..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/OperationalLoggingListenerTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.server.store;
-
-import java.util.ArrayList;
-import java.util.List;
-import junit.framework.TestCase;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.MessageLogger;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-
-import static org.mockito.Mockito.mock;
-
-public class OperationalLoggingListenerTest extends TestCase
-{
-
-
- public static final String STORE_LOCATION = "The moon!";
- private EventLogger _eventLogger;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _eventLogger = new EventLogger();
- }
-
- public void testOperationalLoggingWithStoreLocation() throws Exception
- {
- TestMessageStore messageStore = new TestMessageStore();
- LogSubject logSubject = LOG_SUBJECT;
-
- OperationalLoggingListener.listen(messageStore, logSubject, _eventLogger);
-
- performTests(messageStore, true);
-
- }
-
- public void testOperationalLogging() throws Exception
- {
- TestMessageStore messageStore = new TestMessageStore();
- LogSubject logSubject = LOG_SUBJECT;
-
- OperationalLoggingListener.listen(messageStore, logSubject, _eventLogger);
-
- performTests(messageStore, false);
- }
-
- private void performTests(TestMessageStore messageStore, boolean setStoreLocation)
- {
- final List<LogMessage> messages = new ArrayList<LogMessage>();
-
- _eventLogger.setMessageLogger(new TestLogger(messages));
-
- if(setStoreLocation)
- {
- messageStore.setStoreLocation(STORE_LOCATION);
- }
-
-
- messageStore.attainState(State.INITIALISING);
- assertEquals("Unexpected number of operational log messages on configuring", 1, messages.size());
- assertEquals(messages.remove(0).toString(), ConfigStoreMessages.CREATED().toString());
-
- messageStore.attainState(State.INITIALISED);
- assertEquals("Unexpected number of operational log messages on CONFIGURED", setStoreLocation ? 3 : 2, messages.size());
- assertEquals(messages.remove(0).toString(), MessageStoreMessages.CREATED().toString());
- assertEquals(messages.remove(0).toString(), TransactionLogMessages.CREATED().toString());
- if(setStoreLocation)
- {
- assertEquals(messages.remove(0).toString(), MessageStoreMessages.STORE_LOCATION(STORE_LOCATION).toString());
- }
-
- messageStore.attainState(State.ACTIVATING);
- assertEquals("Unexpected number of operational log messages on RECOVERING", 1, messages.size());
- assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_START().toString());
-
-
- messageStore.attainState(State.ACTIVE);
- assertEquals("Unexpected number of operational log messages on ACTIVE", 1, messages.size());
- assertEquals(messages.remove(0).toString(), MessageStoreMessages.RECOVERY_COMPLETE().toString());
-
- messageStore.attainState(State.CLOSING);
- assertEquals("Unexpected number of operational log messages on CLOSING", 0, messages.size());
-
- messageStore.attainState(State.CLOSED);
- assertEquals("Unexpected number of operational log messages on CLOSED", 1, messages.size());
- assertEquals(messages.remove(0).toString(), MessageStoreMessages.CLOSED().toString());
- }
-
- private static final LogSubject LOG_SUBJECT = new LogSubject()
- {
- public String toLogString()
- {
- return "";
- }
- };
-
- private static final class TestMessageStore extends NullMessageStore
- {
-
- private final EventManager _eventManager = new EventManager();
- private final StateManager _stateManager = new StateManager(_eventManager);
- private String _storeLocation;
-
- public void attainState(State state)
- {
- _stateManager.attainState(state);
- }
-
- @Override
- public String getStoreLocation()
- {
- return _storeLocation;
- }
-
- public void setStoreLocation(String storeLocation)
- {
- _storeLocation = storeLocation;
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- _eventManager.addEventListener(eventListener, events);
- }
-
- @Override
- public String getStoreType()
- {
- return "TEST";
- }
- }
-
- private static class TestLogger implements MessageLogger
- {
- private final List<LogMessage> _messages;
-
- private TestLogger(final List<LogMessage> messages)
- {
- _messages = messages;
- }
-
- public void message(LogSubject subject, LogMessage message)
- {
- _messages.add(message);
- }
-
- @Override
- public boolean isEnabled()
- {
- return true;
- }
-
- @Override
- public boolean isMessageEnabled(final String logHierarchy)
- {
- return true;
- }
-
- public void message(LogMessage message)
- {
- _messages.add(message);
- }
-
- }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
deleted file mode 100644
index 1996620950..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/StateManagerTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-
-import java.util.EnumSet;
-
-import junit.framework.TestCase;
-
-public class StateManagerTest extends TestCase implements EventListener
-{
-
- private StateManager _manager;
- private Event _event;
-
- public void setUp() throws Exception
- {
- super.setUp();
- _manager = new StateManager(this);
- }
-
- public void testInitialState()
- {
- assertEquals(State.INITIAL, _manager.getState());
- }
-
- public void testStateTransitionAllowed()
- {
- assertEquals(State.INITIAL, _manager.getState());
-
- _manager.attainState(State.INITIALISING);
- assertEquals(State.INITIALISING, _manager.getState());
- }
-
- public void testStateTransitionDisallowed()
- {
- assertEquals(State.INITIAL, _manager.getState());
-
- try
- {
- _manager.attainState(State.CLOSING);
- fail("Exception not thrown");
- }
- catch (IllegalStateException e)
- {
- // PASS
- }
- assertEquals(State.INITIAL, _manager.getState());
- }
-
- public void testIsInState()
- {
- assertEquals(State.INITIAL, _manager.getState());
- assertFalse(_manager.isInState(State.ACTIVE));
- assertTrue(_manager.isInState(State.INITIAL));
- }
-
- public void testIsNotInState()
- {
- assertEquals(State.INITIAL, _manager.getState());
- assertTrue(_manager.isNotInState(State.ACTIVE));
- assertFalse(_manager.isNotInState(State.INITIAL));
- }
-
- public void testCheckInState()
- {
- assertEquals(State.INITIAL, _manager.getState());
-
- try
- {
- _manager.checkInState(State.ACTIVE);
- fail("Exception not thrown");
- }
- catch (IllegalStateException e)
- {
- // PASS
- }
- assertEquals(State.INITIAL, _manager.getState());
- }
-
- public void testValidStateTransitions()
- {
- assertEquals(State.INITIAL, _manager.getState());
- performValidTransition(StateManager.INITIALISE);
- performValidTransition(StateManager.INITIALISE_COMPLETE);
- performValidTransition(StateManager.ACTIVATE);
- performValidTransition(StateManager.ACTIVATE_COMPLETE);
- performValidTransition(StateManager.QUIESCE);
- performValidTransition(StateManager.QUIESCE_COMPLETE);
- performValidTransition(StateManager.RESTART);
- performValidTransition(StateManager.ACTIVATE_COMPLETE);
- performValidTransition(StateManager.CLOSE_ACTIVE);
- performValidTransition(StateManager.CLOSE_COMPLETE);
-
- _manager = new StateManager(this);
- assertEquals(State.INITIAL, _manager.getState());
- performValidTransition(StateManager.INITIALISE);
- performValidTransition(StateManager.INITIALISE_COMPLETE);
- performValidTransition(StateManager.CLOSE_INITIALISED);
- performValidTransition(StateManager.CLOSE_COMPLETE);
-
- _manager = new StateManager(this);
- performValidTransition(StateManager.INITIALISE);
- performValidTransition(StateManager.INITIALISE_COMPLETE);
- performValidTransition(StateManager.ACTIVATE);
- performValidTransition(StateManager.ACTIVATE_COMPLETE);
- performValidTransition(StateManager.QUIESCE);
- performValidTransition(StateManager.QUIESCE_COMPLETE);
- performValidTransition(StateManager.CLOSE_QUIESCED);
- performValidTransition(StateManager.CLOSE_COMPLETE);
- }
-
- private void performValidTransition(StateManager.Transition transition)
- {
- _manager.attainState(transition.getEndState());
- assertEquals("Unexpected end state", transition.getEndState(), _manager.getState());
- assertEquals("Unexpected event", transition.getEvent(), _event);
- _event = null;
- }
-
- public void testInvalidStateTransitions()
- {
- assertEquals(State.INITIAL, _manager.getState());
-
- performInvalidTransitions(StateManager.INITIALISE, State.INITIALISED);
- performInvalidTransitions(StateManager.INITIALISE_COMPLETE, State.ACTIVATING, State.CLOSING);
- performInvalidTransitions(StateManager.ACTIVATE, State.ACTIVE, State.CLOSING);
- performInvalidTransitions(StateManager.ACTIVATE_COMPLETE, State.QUIESCING, State.CLOSING, State.INITIALISED);
- performInvalidTransitions(StateManager.QUIESCE, State.QUIESCED);
- performInvalidTransitions(StateManager.QUIESCE_COMPLETE, State.ACTIVATING, State.CLOSING);
- performInvalidTransitions(StateManager.CLOSE_QUIESCED, State.CLOSED);
- performInvalidTransitions(StateManager.CLOSE_COMPLETE);
-
- }
-
- private void performInvalidTransitions(StateManager.Transition preTransition, State... validEndStates)
- {
- if(preTransition != null)
- {
- performValidTransition(preTransition);
- }
-
- EnumSet<State> endStates = EnumSet.allOf(State.class);
-
- if(validEndStates != null)
- {
- for(State state: validEndStates)
- {
- endStates.remove(state);
- }
- }
-
- for(State invalidEndState : endStates)
- {
- performInvalidStateTransition(invalidEndState);
- }
-
-
- }
-
- private void performInvalidStateTransition(State invalidEndState)
- {
- try
- {
- _event = null;
- State startState = _manager.getState();
- _manager.attainState(invalidEndState);
- fail("Invalid state transition performed: " + startState + " to " + invalidEndState);
- }
- catch(IllegalStateException e)
- {
- // pass
- }
- assertNull("No event should have be fired", _event);
- }
-
- @Override
- public void event(Event event)
- {
- _event = event;
- }
-}