summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-27 15:34:57 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-27 15:34:57 +0000
commitf10117cd6464a107b086e0b7f7ea44a496b04c3d (patch)
tree1101a6639f81979756473ddc1181c986e91f89ef /java/broker/src
parentd7c68d138a1151db2b0d133c94f8b1843850e867 (diff)
downloadqpid-python-f10117cd6464a107b086e0b7f7ea44a496b04c3d.tar.gz
Merged revisions 549530-550509 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r549530 | rupertlssmith | 2007-06-21 17:14:03 +0100 (Thu, 21 Jun 2007) | 1 line Added minimal checkstyle to project reports. Fixed some problems with site generation. ........ r549849 | rupertlssmith | 2007-06-22 16:39:27 +0100 (Fri, 22 Jun 2007) | 1 line Added Immediate and Mandatory message tests. ........ r550509 | ritchiem | 2007-06-25 15:16:30 +0100 (Mon, 25 Jun 2007) | 1 line Update to provide a SustainedTestCase, this sends batches of messages to the broker. The rate of publication is regulated by the average consume rate advertised by all connected clients. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551199 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java51
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java82
2 files changed, 73 insertions, 60 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
index 609a85c22f..988f589339 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
@@ -1,31 +1,35 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.txn;
-import java.util.List;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.StoreContext;
+import java.util.List;
+
/**
* @author Apache Software Foundation
*/
@@ -44,33 +48,26 @@ public class CleanupMessageOperation implements TxnOp
}
public void prepare(StoreContext context) throws AMQException
- {
- }
+ { }
public void undoPrepare()
{
- //don't need to do anything here, if the store's txn failed
- //when processing prepare then the message was not stored
- //or enqueued on any queues and can be discarded
+ // don't need to do anything here, if the store's txn failed
+ // when processing prepare then the message was not stored
+ // or enqueued on any queues and can be discarded
}
public void commit(StoreContext context)
{
-
- try
+ // No-op can't be done here has this is before the message has been attempted to be delivered.
+ /*try
{
_msg.checkDeliveredToConsumer();
}
catch (NoConsumersException e)
{
- //TODO: store this for delivery after the commit-ok
_returns.add(e);
- }
- catch (AMQException e)
- {
- _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " +
- e, e);
- }
+ }*/
}
public void rollback(StoreContext context)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 93459beb45..4e684098d0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -1,26 +1,27 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.txn;
-import java.util.LinkedList;
-import java.util.List;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.TxAck;
@@ -28,9 +29,13 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import java.util.LinkedList;
+import java.util.List;
+
/** A transactional context that only supports local transactions. */
public class LocalTransactionalContext implements TransactionalContext
{
@@ -54,6 +59,7 @@ public class LocalTransactionalContext implements TransactionalContext
private boolean _inTran = false;
+ /** Are there messages to deliver. NOT Has the message been delivered */
private boolean _messageDelivered = false;
private static class DeliveryDetails
@@ -62,7 +68,6 @@ public class LocalTransactionalContext implements TransactionalContext
public AMQQueue queue;
private boolean deliverFirst;
-
public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
{
this.message = message;
@@ -72,15 +77,14 @@ public class LocalTransactionalContext implements TransactionalContext
}
public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
- List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException> returnMessages)
{
_messageStore = messageStore;
_storeContext = storeContext;
_returnMessages = returnMessages;
- //_txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ // _txnBuffer.enlist(new StoreMessageOperation(messageStore));
}
-
public StoreContext getStoreContext()
{
return _storeContext;
@@ -90,11 +94,12 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.rollback(_storeContext);
// Hack to deal with uncommitted non-transactional writes
- if(_messageStore.inTran(_storeContext))
+ if (_messageStore.inTran(_storeContext))
{
_messageStore.abortTran(_storeContext);
_inTran = false;
}
+
_postCommitDeliveryList.clear();
}
@@ -106,7 +111,7 @@ public class LocalTransactionalContext implements TransactionalContext
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
-// message.incrementReference();
+ // message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
_txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
@@ -119,7 +124,7 @@ public class LocalTransactionalContext implements TransactionalContext
message.incrementReference();
_messageDelivered = true;
- */
+ */
}
private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
@@ -131,16 +136,16 @@ public class LocalTransactionalContext implements TransactionalContext
}
public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
- UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
{
- //check that the tag exists to give early failure
- if (!multiple || deliveryTag > 0)
+ // check that the tag exists to give early failure
+ if (!multiple || (deliveryTag > 0))
{
checkAck(deliveryTag, unacknowledgedMessageMap);
}
- //we use a single txn op for all acks and update this op
- //as new acks come in. If this is the first ack in the txn
- //we will need to create and enlist the op.
+ // we use a single txn op for all acks and update this op
+ // as new acks come in. If this is the first ack in the txn
+ // we will need to create and enlist the op.
if (_ackOp == null)
{
beginTranIfNecessary();
@@ -148,7 +153,7 @@ public class LocalTransactionalContext implements TransactionalContext
_txnBuffer.enlist(_ackOp);
}
// update the op to include this ack request
- if (multiple && deliveryTag == 0)
+ if (multiple && (deliveryTag == 0))
{
// if have signalled to ack all, that refers only
// to all at this time
@@ -178,6 +183,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
_log.debug("Starting transaction on message store: " + this);
}
+
_messageStore.beginTran(_storeContext);
_inTran = true;
}
@@ -189,12 +195,13 @@ public class LocalTransactionalContext implements TransactionalContext
{
_log.debug("Committing transactional context: " + this);
}
+
if (_ackOp != null)
{
_messageDelivered = true;
_ackOp.consolidate();
- //already enlisted, after commit will reset regardless of outcome
+ // already enlisted, after commit will reset regardless of outcome
_ackOp = null;
}
@@ -202,7 +209,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.enlist(new StoreMessageOperation(_messageStore));
}
- //fixme fail commit here ... QPID-440
+ // fixme fail commit here ... QPID-440
try
{
_txnBuffer.commit(_storeContext);
@@ -215,7 +222,7 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
- postCommitDelivery();
+ postCommitDelivery(_returnMessages);
}
catch (AMQException e)
{
@@ -224,23 +231,32 @@ public class LocalTransactionalContext implements TransactionalContext
}
}
- private void postCommitDelivery() throws AMQException
+ private void postCommitDelivery(List<RequiredDeliveryException> returnMessages) throws AMQException
{
if (_log.isDebugEnabled())
{
_log.debug("Performing post commit delivery");
}
+
try
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+
+ try
+ {
+ dd.message.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException nce)
+ {
+ returnMessages.add(nce);
+ }
}
}
finally
{
_postCommitDeliveryList.clear();
}
-
}
}