summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-01-21 14:19:20 +0000
committerRafael H. Schloming <rhs@apache.org>2009-01-21 14:19:20 +0000
commit19c49dee096d829c2e5cc38f42c358130a772e63 (patch)
tree89d8df90b3b584a8bf93e30cc3ae398175718e01
parent9664d34900b53df734783c7d9fff5b8a2bfbc81f (diff)
downloadqpid-python-19c49dee096d829c2e5cc38f42c358130a772e63.tar.gz
QPID-1605: added an assertion to catch acknowledgments of message-ids outside the range permitted on a session; added code to pause failover until messages from old sessions have been cleared out of the dispatcher queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@736316 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java129
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java7
8 files changed, 129 insertions, 45 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4e8fdc2370..0aaeafc442 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1198,6 +1198,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _failoverMutex;
}
+ public void failoverPrep()
+ {
+ _delegate.failoverPrep();
+ }
+
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
_delegate.resubscribeSessions();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index b64147fe8f..5a4abcc9bb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -41,6 +41,8 @@ public interface AMQConnectionDelegate
XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
+ void failoverPrep();
+
void resubscribeSessions() throws JMSException, AMQException, FailoverException;
void closeConnection(long timeout) throws JMSException, AMQException;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 30ea4dcf8d..a2e5ac9800 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -162,9 +162,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return null;
}
- /**
- * Not supported at this level.
- */
+ public void failoverPrep()
+ {
+ List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
+ for (AMQSession s : sessions)
+ {
+ s.failoverPrep();
+ }
+ }
+
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
@@ -218,6 +224,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
+ _conn.failoverPrep();
_qpidConnection.resume();
if (_conn.firePreResubscribe())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 035e3830ca..806e4d67bc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -217,6 +217,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
}
}
+ public void failoverPrep()
+ {
+ // do nothing
+ }
+
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index af0ed3faa3..733bee2d81 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -1811,6 +1812,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
+ void failoverPrep()
+ {
+ startDispatcherIfNecessary();
+ final CountDownLatch signal = new CountDownLatch(1);
+ _queue.add(new Dispatchable() {
+ public void dispatch(AMQSession ssn)
+ {
+ signal.countDown();
+ }
+ });
+ try
+ {
+ signal.await();
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ }
+
/**
* Resubscribes all producers and consumers. This is called when performing failover.
*
@@ -1822,7 +1843,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_failedOverDirty = true;
}
-
+
_rollbackMark.set(-1);
resubscribeProducers();
resubscribeConsumers();
@@ -2509,7 +2530,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_consumers.clear();
for (C consumer : consumers)
- {
+ {
consumer.failedOver();
registerConsumer(consumer, true);
}
@@ -2628,6 +2649,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
+ public interface Dispatchable
+ {
+ void dispatch(AMQSession ssn);
+ }
+
+ public void dispatch(UnprocessedMessage message)
+ {
+ if (_dispatcher == null)
+ {
+ throw new java.lang.IllegalStateException("dispatcher is not started");
+ }
+
+ _dispatcher.dispatchMessage(message);
+ }
+
/** Used for debugging in the dispatcher. */
private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
@@ -2750,37 +2786,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
+ Dispatchable disp;
+ while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null))
{
- long deliveryTag = message.getDeliveryTag();
-
- synchronized (_lock)
- {
-
- while (connectionStopped())
- {
- _lock.wait();
- }
-
- if (!(message instanceof CloseConsumerMessage)
- && tagLE(deliveryTag, _rollbackMark.get()))
- {
- rejectMessage(message, true);
- }
- else
- {
- synchronized (_messageDeliveryLock)
- {
- dispatchMessage(message);
- }
- }
- }
-
- long current = _rollbackMark.get();
- if (updateRollbackMark(current, deliveryTag))
- {
- _rollbackMark.compareAndSet(current, deliveryTag);
- }
+ disp.dispatch(AMQSession.this);
}
}
catch (InterruptedException e)
@@ -2821,11 +2830,47 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void dispatchMessage(UnprocessedMessage message)
{
- //This if block is not needed anymore as bounce messages are handled separately
- //if (message.getDeliverBody() != null)
- //{
- final C consumer =
- _consumers.get(message.getConsumerTag());
+ long deliveryTag = message.getDeliveryTag();
+
+ synchronized (_lock)
+ {
+
+ try
+ {
+ while (connectionStopped())
+ {
+ _lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+
+ if (!(message instanceof CloseConsumerMessage)
+ && tagLE(deliveryTag, _rollbackMark.get()))
+ {
+ rejectMessage(message, true);
+ }
+ else
+ {
+ synchronized (_messageDeliveryLock)
+ {
+ notifyConsumer(message);
+ }
+ }
+ }
+
+ long current = _rollbackMark.get();
+ if (updateRollbackMark(current, deliveryTag))
+ {
+ _rollbackMark.compareAndSet(current, deliveryTag);
+ }
+ }
+
+ private void notifyConsumer(UnprocessedMessage message)
+ {
+ final C consumer = _consumers.get(message.getConsumerTag());
if ((consumer == null) || consumer.isClosed())
{
@@ -2833,7 +2878,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (consumer == null)
{
- _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+ _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message("
+ + System.identityHashCode(message) + ")" + "["
+ message.getDeliveryTag() + "] from queue "
+ message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
@@ -2841,7 +2887,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
if (consumer.isNoConsume())
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ _dispatcherLogger.info("Received a message("
+ + System.identityHashCode(message) + ")" + "["
+ message.getDeliveryTag() + "] from queue " + " consumer("
+ message.getConsumerTag() + ") is closed and a browser so dropping...");
//DROP MESSAGE
@@ -2850,7 +2897,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
+ _dispatcherLogger.info("Received a message("
+ + System.identityHashCode(message) + ")" + "["
+ message.getDeliveryTag() + "] from queue " + " consumer("
+ message.getConsumerTag() + ") is closed rejecting(requeue)...");
}
@@ -2866,7 +2914,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
consumer.notifyMessage(message);
}
-
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index ab983aa842..82f56d9985 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -154,6 +154,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
//------- overwritten methods of class AMQSession
+ void failoverPrep()
+ {
+ super.failoverPrep();
+ clearUnacked();
+ }
+
/**
* Acknowledge one or many messages.
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 713c87260c..e2cb36a030 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.message;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageConsumer;
@@ -30,7 +31,7 @@ import org.apache.qpid.client.BasicMessageConsumer;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public abstract class UnprocessedMessage
+public abstract class UnprocessedMessage implements AMQSession.Dispatchable
{
private final int _consumerTag;
@@ -49,5 +50,9 @@ public abstract class UnprocessedMessage
return _consumerTag;
}
+ public void dispatch(AMQSession ssn)
+ {
+ ssn.dispatch(this);
+ }
} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 1a44ed8973..32bb9ca612 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -292,6 +292,13 @@ public class Session extends SessionInvoker
synchronized (processedLock)
{
log.debug("%s", processed);
+
+ if (ge(range.getUpper(), commandsIn))
+ {
+ throw new IllegalArgumentException
+ ("range exceeds max received command-id: " + range);
+ }
+
processed.add(range);
Range first = processed.getFirst();
int lower = first.getLower();