diff options
Diffstat (limited to 'java/broker')
| -rw-r--r-- | java/broker/etc/config-systests-derby.xml | 3 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java (renamed from java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java) | 4 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java | 44 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java | 81 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java | 23 | ||||
| -rw-r--r-- | java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java | 2 |
6 files changed, 104 insertions, 53 deletions
diff --git a/java/broker/etc/config-systests-derby.xml b/java/broker/etc/config-systests-derby.xml index 2efc458e26..e9cfa04ab5 100644 --- a/java/broker/etc/config-systests-derby.xml +++ b/java/broker/etc/config-systests-derby.xml @@ -96,6 +96,7 @@ <localhost> <store> <class>org.apache.qpid.server.store.DerbyMessageStore</class> + <environment-path>${work}/derbyDB/localhost-store</environment-path> </store> <housekeeping> @@ -110,6 +111,7 @@ <development> <store> <class>org.apache.qpid.server.store.DerbyMessageStore</class> + <environment-path>${work}/derbyDB/development-store</environment-path> </store> </development> </virtualhost> @@ -119,6 +121,7 @@ <test> <store> <class>org.apache.qpid.server.store.DerbyMessageStore</class> + <environment-path>${work}/derbyDB/test-store</environment-path> </store> </test> </virtualhost> diff --git a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java index 28d64de74e..e11cbba4f4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java +++ b/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.logging.subjects; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.store.MessageStore; -public class MessagesStoreLogSubject extends AbstractLogSubject +public class MessageStoreLogSubject extends AbstractLogSubject { /** @@ -37,7 +37,7 @@ public class MessagesStoreLogSubject extends AbstractLogSubject protected static String BINDING_FORMAT = "vh(/{0})/ms({1})"; /** Create an ExchangeLogSubject that Logs in the following format. */ - public MessagesStoreLogSubject(VirtualHost vhost, MessageStore store) + public MessageStoreLogSubject(VirtualHost vhost, MessageStore store) { setLogStringWithFormat(BINDING_FORMAT, vhost.getName(), store.getClass().getSimpleName()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java new file mode 100644 index 0000000000..fd2d09b777 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.logging.LogSubject; + +public abstract class AbstractMessageStore implements MessageStore +{ + protected LogSubject _logSubject; + + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception + { + _logSubject = new MessageStoreLogSubject(virtualHost, this); + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName())); + } + + public void close() throws Exception + { + CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003()); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 18269fa5e8..90de1aa8fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -20,49 +20,51 @@ */ package org.apache.qpid.server.store; -import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.QueueRegistry; - -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.virtualhost.VirtualHost; -import java.io.File; import java.io.ByteArrayInputStream; -import java.sql.DriverManager; -import java.sql.Driver; +import java.io.File; +import java.sql.Blob; import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.Driver; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.Blob; +import java.sql.SQLException; +import java.sql.Statement; import java.sql.Types; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.List; import java.util.ArrayList; -import java.util.Map; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; -public class DerbyMessageStore implements MessageStore +public class DerbyMessageStore extends AbstractMessageStore { private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); @@ -145,6 +147,8 @@ public class DerbyMessageStore implements MessageStore public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { + super.configure(virtualHost,base,config); + stateTransition(State.INITIAL, State.CONFIGURING); initialiseDriver(); @@ -167,12 +171,15 @@ public class DerbyMessageStore implements MessageStore } } + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath())); + createOrOpenDatabase(databasePath); // this recovers durable queues and persistent messages recover(); + stateTransition(State.RECOVERING, State.STARTED); } @@ -187,6 +194,7 @@ public class DerbyMessageStore implements MessageStore private void createOrOpenDatabase(final String environmentPath) throws SQLException { + //fixme this the _vhost name should not be added here. _connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true"; Connection conn = newConnection(); @@ -309,7 +317,8 @@ public class DerbyMessageStore implements MessageStore { stateTransition(State.CONFIGURING, State.RECOVERING); - _logger.info("Recovering persistent state..."); + CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(null, false)); + StoreContext context = new StoreContext(); try @@ -324,9 +333,10 @@ public class DerbyMessageStore implements MessageStore beginTran(context); deliverMessages(context, queues); - _logger.info("Persistent state recovered successfully"); commitTran(context); + //Recovery Complete + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(null, false)); } finally { @@ -335,6 +345,7 @@ public class DerbyMessageStore implements MessageStore abortTran(context); } } + } catch (SQLException e) { @@ -342,6 +353,7 @@ public class DerbyMessageStore implements MessageStore throw new AMQException("Error recovering persistent state: " + e, e); } + } private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException @@ -486,6 +498,8 @@ public class DerbyMessageStore implements MessageStore public void close() throws Exception { _closed.getAndSet(true); + + super.close(); } public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException @@ -1353,7 +1367,6 @@ public class DerbyMessageStore implements MessageStore public void process() throws AMQException { _queue.enqueue(_context, _message); - } } @@ -1371,7 +1384,6 @@ public class DerbyMessageStore implements MessageStore Connection conn = null; try { - if(inLocaltran) { conn = getConnection(context); @@ -1381,7 +1393,6 @@ public class DerbyMessageStore implements MessageStore conn = newConnection(); } - MessageHandleFactory messageHandleFactory = new MessageHandleFactory(); long maxId = 1; @@ -1390,15 +1401,10 @@ public class DerbyMessageStore implements MessageStore Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY); - while (rs.next()) { - - - AMQShortString queueName = new AMQShortString(rs.getString(1)); - AMQQueue queue = queues.get(queueName); if (queue == null) { @@ -1406,6 +1412,9 @@ public class DerbyMessageStore implements MessageStore _virtualHost.getQueueRegistry().registerQueue(queue); queues.put(queueName, queue); + + //Log Recovery Start + CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1004(String.valueOf(queue.getName()), true)); } long messageId = rs.getLong(2); @@ -1436,11 +1445,9 @@ public class DerbyMessageStore implements MessageStore } queueRecoveries.put(queueName, ++count); - } actions.add(new ProcessAction(queue, context, message)); - } for(ProcessAction action : actions) diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 4691f02952..87ec66030d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -31,6 +31,10 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import java.util.ArrayList; import java.util.Collections; @@ -41,7 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** A simple message store that stores the messages in a threadsafe structure in memory. */ -public class MemoryMessageStore implements MessageStore +public class MemoryMessageStore extends AbstractMessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -55,27 +59,18 @@ public class MemoryMessageStore implements MessageStore private final AtomicLong _messageId = new AtomicLong(1); private AtomicBoolean _closed = new AtomicBoolean(false); + private LogSubject _logSubject; - public void configure() + public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception { - _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables"); - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(DEFAULT_HASHTABLE_CAPACITY); - } + super.configure(virtualHost,base,config); - public void configure(String base, VirtualHostConfiguration config) - { int hashtableCapacity = config.getStoreConfiguration().getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY); _log.info("Using capacity " + hashtableCapacity + " for hash tables"); _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity); _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(hashtableCapacity); } - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception - { - configure(base, config); - } - public void close() throws Exception { _closed.getAndSet(true); @@ -89,6 +84,8 @@ public class MemoryMessageStore implements MessageStore _contentBodyMap.clear(); _contentBodyMap = null; } + + super.close(); } public void removeMessage(StoreContext context, Long messageId) throws AMQException diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java index 624421f447..53edf98d79 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java @@ -34,7 +34,7 @@ public class MessageStoreLogSubjectTest extends AbstractTestLogSubject _testVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry(). getVirtualHost("test"); - _subject = new MessagesStoreLogSubject(_testVhost, _testVhost.getMessageStore()); + _subject = new MessageStoreLogSubject(_testVhost, _testVhost.getMessageStore()); } /** |
