summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/etc/config-systests-derby.xml3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java (renamed from java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java)4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java81
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java23
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java2
6 files changed, 104 insertions, 53 deletions
diff --git a/java/broker/etc/config-systests-derby.xml b/java/broker/etc/config-systests-derby.xml
index 2efc458e26..e9cfa04ab5 100644
--- a/java/broker/etc/config-systests-derby.xml
+++ b/java/broker/etc/config-systests-derby.xml
@@ -96,6 +96,7 @@
<localhost>
<store>
<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${work}/derbyDB/localhost-store</environment-path>
</store>
<housekeeping>
@@ -110,6 +111,7 @@
<development>
<store>
<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${work}/derbyDB/development-store</environment-path>
</store>
</development>
</virtualhost>
@@ -119,6 +121,7 @@
<test>
<store>
<class>org.apache.qpid.server.store.DerbyMessageStore</class>
+ <environment-path>${work}/derbyDB/test-store</environment-path>
</store>
</test>
</virtualhost>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
index 28d64de74e..e11cbba4f4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.MessageStore;
-public class MessagesStoreLogSubject extends AbstractLogSubject
+public class MessageStoreLogSubject extends AbstractLogSubject
{
/**
@@ -37,7 +37,7 @@ public class MessagesStoreLogSubject extends AbstractLogSubject
protected static String BINDING_FORMAT = "vh(/{0})/ms({1})";
/** Create an ExchangeLogSubject that Logs in the following format. */
- public MessagesStoreLogSubject(VirtualHost vhost, MessageStore store)
+ public MessageStoreLogSubject(VirtualHost vhost, MessageStore store)
{
setLogStringWithFormat(BINDING_FORMAT, vhost.getName(),
store.getClass().getSimpleName());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
new file mode 100644
index 0000000000..fd2d09b777
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+
+public abstract class AbstractMessageStore implements MessageStore
+{
+ protected LogSubject _logSubject;
+
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception
+ {
+ _logSubject = new MessageStoreLogSubject(virtualHost, this);
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
+ }
+
+ public void close() throws Exception
+ {
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 18269fa5e8..90de1aa8fa 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -20,49 +20,51 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
-
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.io.File;
import java.io.ByteArrayInputStream;
-import java.sql.DriverManager;
-import java.sql.Driver;
+import java.io.File;
+import java.sql.Blob;
import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.Driver;
+import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.Blob;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.List;
import java.util.ArrayList;
-import java.util.Map;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore extends AbstractMessageStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -145,6 +147,8 @@ public class DerbyMessageStore implements MessageStore
public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
+ super.configure(virtualHost,base,config);
+
stateTransition(State.INITIAL, State.CONFIGURING);
initialiseDriver();
@@ -167,12 +171,15 @@ public class DerbyMessageStore implements MessageStore
}
}
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath()));
+
createOrOpenDatabase(databasePath);
// this recovers durable queues and persistent messages
recover();
+
stateTransition(State.RECOVERING, State.STARTED);
}
@@ -187,6 +194,7 @@ public class DerbyMessageStore implements MessageStore
private void createOrOpenDatabase(final String environmentPath) throws SQLException
{
+ //fixme this the _vhost name should not be added here.
_connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
Connection conn = newConnection();
@@ -309,7 +317,8 @@ public class DerbyMessageStore implements MessageStore
{
stateTransition(State.CONFIGURING, State.RECOVERING);
- _logger.info("Recovering persistent state...");
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(null, false));
+
StoreContext context = new StoreContext();
try
@@ -324,9 +333,10 @@ public class DerbyMessageStore implements MessageStore
beginTran(context);
deliverMessages(context, queues);
- _logger.info("Persistent state recovered successfully");
commitTran(context);
+ //Recovery Complete
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(null, false));
}
finally
{
@@ -335,6 +345,7 @@ public class DerbyMessageStore implements MessageStore
abortTran(context);
}
}
+
}
catch (SQLException e)
{
@@ -342,6 +353,7 @@ public class DerbyMessageStore implements MessageStore
throw new AMQException("Error recovering persistent state: " + e, e);
}
+
}
private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
@@ -486,6 +498,8 @@ public class DerbyMessageStore implements MessageStore
public void close() throws Exception
{
_closed.getAndSet(true);
+
+ super.close();
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
@@ -1353,7 +1367,6 @@ public class DerbyMessageStore implements MessageStore
public void process() throws AMQException
{
_queue.enqueue(_context, _message);
-
}
}
@@ -1371,7 +1384,6 @@ public class DerbyMessageStore implements MessageStore
Connection conn = null;
try
{
-
if(inLocaltran)
{
conn = getConnection(context);
@@ -1381,7 +1393,6 @@ public class DerbyMessageStore implements MessageStore
conn = newConnection();
}
-
MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
@@ -1390,15 +1401,10 @@ public class DerbyMessageStore implements MessageStore
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
-
while (rs.next())
{
-
-
-
AMQShortString queueName = new AMQShortString(rs.getString(1));
-
AMQQueue queue = queues.get(queueName);
if (queue == null)
{
@@ -1406,6 +1412,9 @@ public class DerbyMessageStore implements MessageStore
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
+
+ //Log Recovery Start
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1004(String.valueOf(queue.getName()), true));
}
long messageId = rs.getLong(2);
@@ -1436,11 +1445,9 @@ public class DerbyMessageStore implements MessageStore
}
queueRecoveries.put(queueName, ++count);
-
}
actions.add(new ProcessAction(queue, context, message));
-
}
for(ProcessAction action : actions)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 4691f02952..87ec66030d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -31,6 +31,10 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,7 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore extends AbstractMessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -55,27 +59,18 @@ public class MemoryMessageStore implements MessageStore
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
+ private LogSubject _logSubject;
- public void configure()
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
- _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY);
- }
+ super.configure(virtualHost,base,config);
- public void configure(String base, VirtualHostConfiguration config)
- {
int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
_log.info("Using capacity " + hashtableCapacity + " for hash tables");
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
_contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity);
}
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
- {
- configure(base, config);
- }
-
public void close() throws Exception
{
_closed.getAndSet(true);
@@ -89,6 +84,8 @@ public class MemoryMessageStore implements MessageStore
_contentBodyMap.clear();
_contentBodyMap = null;
}
+
+ super.close();
}
public void removeMessage(StoreContext context, Long messageId) throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
index 624421f447..53edf98d79 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
@@ -34,7 +34,7 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject
_testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().
getVirtualHost("test");
- _subject = new MessagesStoreLogSubject(_testVhost, _testVhost.getMessageStore());
+ _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore());
}
/**