diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:34:57 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-06-27 15:34:57 +0000 |
| commit | f10117cd6464a107b086e0b7f7ea44a496b04c3d (patch) | |
| tree | 1101a6639f81979756473ddc1181c986e91f89ef /java/broker/src | |
| parent | d7c68d138a1151db2b0d133c94f8b1843850e867 (diff) | |
| download | qpid-python-f10117cd6464a107b086e0b7f7ea44a496b04c3d.tar.gz | |
Merged revisions 549530-550509 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r549530 | rupertlssmith | 2007-06-21 17:14:03 +0100 (Thu, 21 Jun 2007) | 1 line
Added minimal checkstyle to project reports. Fixed some problems with site generation.
........
r549849 | rupertlssmith | 2007-06-22 16:39:27 +0100 (Fri, 22 Jun 2007) | 1 line
Added Immediate and Mandatory message tests.
........
r550509 | ritchiem | 2007-06-25 15:16:30 +0100 (Mon, 25 Jun 2007) | 1 line
Update to provide a SustainedTestCase, this sends batches of messages to the broker. The rate of publication is regulated by the average consume rate advertised by all connected clients.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java | 51 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java | 82 |
2 files changed, 73 insertions, 60 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java index 609a85c22f..988f589339 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java @@ -1,31 +1,35 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.server.txn; -import java.util.List; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.StoreContext; +import java.util.List; + /** * @author Apache Software Foundation */ @@ -44,33 +48,26 @@ public class CleanupMessageOperation implements TxnOp } public void prepare(StoreContext context) throws AMQException - { - } + { } public void undoPrepare() { - //don't need to do anything here, if the store's txn failed - //when processing prepare then the message was not stored - //or enqueued on any queues and can be discarded + // don't need to do anything here, if the store's txn failed + // when processing prepare then the message was not stored + // or enqueued on any queues and can be discarded } public void commit(StoreContext context) { - - try + // No-op can't be done here has this is before the message has been attempted to be delivered. + /*try { _msg.checkDeliveredToConsumer(); } catch (NoConsumersException e) { - //TODO: store this for delivery after the commit-ok _returns.add(e); - } - catch (AMQException e) - { - _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " + - e, e); - } + }*/ } public void rollback(StoreContext context) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 93459beb45..4e684098d0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -1,26 +1,27 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.server.txn; -import java.util.LinkedList; -import java.util.List; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.ack.TxAck; @@ -28,9 +29,13 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.NoConsumersException; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; +import java.util.LinkedList; +import java.util.List; + /** A transactional context that only supports local transactions. */ public class LocalTransactionalContext implements TransactionalContext { @@ -54,6 +59,7 @@ public class LocalTransactionalContext implements TransactionalContext private boolean _inTran = false; + /** Are there messages to deliver. NOT Has the message been delivered */ private boolean _messageDelivered = false; private static class DeliveryDetails @@ -62,7 +68,6 @@ public class LocalTransactionalContext implements TransactionalContext public AMQQueue queue; private boolean deliverFirst; - public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst) { this.message = message; @@ -72,15 +77,14 @@ public class LocalTransactionalContext implements TransactionalContext } public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext, - List<RequiredDeliveryException> returnMessages) + List<RequiredDeliveryException> returnMessages) { _messageStore = messageStore; _storeContext = storeContext; _returnMessages = returnMessages; - //_txnBuffer.enlist(new StoreMessageOperation(messageStore)); + // _txnBuffer.enlist(new StoreMessageOperation(messageStore)); } - public StoreContext getStoreContext() { return _storeContext; @@ -90,11 +94,12 @@ public class LocalTransactionalContext implements TransactionalContext { _txnBuffer.rollback(_storeContext); // Hack to deal with uncommitted non-transactional writes - if(_messageStore.inTran(_storeContext)) + if (_messageStore.inTran(_storeContext)) { _messageStore.abortTran(_storeContext); _inTran = false; } + _postCommitDeliveryList.clear(); } @@ -106,7 +111,7 @@ public class LocalTransactionalContext implements TransactionalContext // be added for every queue onto which the message is // enqueued. Finally a cleanup op will be added to decrement // the reference associated with the routing. -// message.incrementReference(); + // message.incrementReference(); _postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst)); _messageDelivered = true; _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages)); @@ -119,7 +124,7 @@ public class LocalTransactionalContext implements TransactionalContext message.incrementReference(); _messageDelivered = true; - */ + */ } private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException @@ -131,16 +136,16 @@ public class LocalTransactionalContext implements TransactionalContext } public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple, - UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException + UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException { - //check that the tag exists to give early failure - if (!multiple || deliveryTag > 0) + // check that the tag exists to give early failure + if (!multiple || (deliveryTag > 0)) { checkAck(deliveryTag, unacknowledgedMessageMap); } - //we use a single txn op for all acks and update this op - //as new acks come in. If this is the first ack in the txn - //we will need to create and enlist the op. + // we use a single txn op for all acks and update this op + // as new acks come in. If this is the first ack in the txn + // we will need to create and enlist the op. if (_ackOp == null) { beginTranIfNecessary(); @@ -148,7 +153,7 @@ public class LocalTransactionalContext implements TransactionalContext _txnBuffer.enlist(_ackOp); } // update the op to include this ack request - if (multiple && deliveryTag == 0) + if (multiple && (deliveryTag == 0)) { // if have signalled to ack all, that refers only // to all at this time @@ -178,6 +183,7 @@ public class LocalTransactionalContext implements TransactionalContext { _log.debug("Starting transaction on message store: " + this); } + _messageStore.beginTran(_storeContext); _inTran = true; } @@ -189,12 +195,13 @@ public class LocalTransactionalContext implements TransactionalContext { _log.debug("Committing transactional context: " + this); } + if (_ackOp != null) { _messageDelivered = true; _ackOp.consolidate(); - //already enlisted, after commit will reset regardless of outcome + // already enlisted, after commit will reset regardless of outcome _ackOp = null; } @@ -202,7 +209,7 @@ public class LocalTransactionalContext implements TransactionalContext { _txnBuffer.enlist(new StoreMessageOperation(_messageStore)); } - //fixme fail commit here ... QPID-440 + // fixme fail commit here ... QPID-440 try { _txnBuffer.commit(_storeContext); @@ -215,7 +222,7 @@ public class LocalTransactionalContext implements TransactionalContext try { - postCommitDelivery(); + postCommitDelivery(_returnMessages); } catch (AMQException e) { @@ -224,23 +231,32 @@ public class LocalTransactionalContext implements TransactionalContext } } - private void postCommitDelivery() throws AMQException + private void postCommitDelivery(List<RequiredDeliveryException> returnMessages) throws AMQException { if (_log.isDebugEnabled()) { _log.debug("Performing post commit delivery"); } + try { for (DeliveryDetails dd : _postCommitDeliveryList) { dd.queue.process(_storeContext, dd.message, dd.deliverFirst); + + try + { + dd.message.checkDeliveredToConsumer(); + } + catch (NoConsumersException nce) + { + returnMessages.add(nce); + } } } finally { _postCommitDeliveryList.clear(); } - } } |
