diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-01-21 14:19:20 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-21 14:19:20 +0000 |
| commit | 19c49dee096d829c2e5cc38f42c358130a772e63 (patch) | |
| tree | 89d8df90b3b584a8bf93e30cc3ae398175718e01 | |
| parent | 9664d34900b53df734783c7d9fff5b8a2bfbc81f (diff) | |
| download | qpid-python-19c49dee096d829c2e5cc38f42c358130a772e63.tar.gz | |
QPID-1605: added an assertion to catch acknowledgments of message-ids outside the range permitted on a session; added code to pause failover until messages from old sessions have been cleared out of the dispatcher queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@736316 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 129 insertions, 45 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4e8fdc2370..0aaeafc442 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1198,6 +1198,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _failoverMutex; } + public void failoverPrep() + { + _delegate.failoverPrep(); + } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { _delegate.resubscribeSessions(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index b64147fe8f..5a4abcc9bb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -41,6 +41,8 @@ public interface AMQConnectionDelegate XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + void failoverPrep(); + void resubscribeSessions() throws JMSException, AMQException, FailoverException; void closeConnection(long timeout) throws JMSException, AMQException; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 30ea4dcf8d..a2e5ac9800 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -162,9 +162,15 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return null; } - /** - * Not supported at this level. - */ + public void failoverPrep() + { + List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); + for (AMQSession s : sessions) + { + s.failoverPrep(); + } + } + public void resubscribeSessions() throws JMSException, AMQException, FailoverException { List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); @@ -218,6 +224,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { + _conn.failoverPrep(); _qpidConnection.resume(); if (_conn.firePreResubscribe()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 035e3830ca..806e4d67bc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -217,6 +217,11 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate } } + public void failoverPrep() + { + // do nothing + } + /** * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. * The caller must hold the failover mutex before calling this method. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index af0ed3faa3..733bee2d81 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -1811,6 +1812,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } + void failoverPrep() + { + startDispatcherIfNecessary(); + final CountDownLatch signal = new CountDownLatch(1); + _queue.add(new Dispatchable() { + public void dispatch(AMQSession ssn) + { + signal.countDown(); + } + }); + try + { + signal.await(); + } + catch (InterruptedException e) + { + // pass + } + } + /** * Resubscribes all producers and consumers. This is called when performing failover. * @@ -1822,7 +1843,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _failedOverDirty = true; } - + _rollbackMark.set(-1); resubscribeProducers(); resubscribeConsumers(); @@ -2509,7 +2530,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _consumers.clear(); for (C consumer : consumers) - { + { consumer.failedOver(); registerConsumer(consumer, true); } @@ -2628,6 +2649,21 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } + public interface Dispatchable + { + void dispatch(AMQSession ssn); + } + + public void dispatch(UnprocessedMessage message) + { + if (_dispatcher == null) + { + throw new java.lang.IllegalStateException("dispatcher is not started"); + } + + _dispatcher.dispatchMessage(message); + } + /** Used for debugging in the dispatcher. */ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher"); @@ -2750,37 +2786,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) + Dispatchable disp; + while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null)) { - long deliveryTag = message.getDeliveryTag(); - - synchronized (_lock) - { - - while (connectionStopped()) - { - _lock.wait(); - } - - if (!(message instanceof CloseConsumerMessage) - && tagLE(deliveryTag, _rollbackMark.get())) - { - rejectMessage(message, true); - } - else - { - synchronized (_messageDeliveryLock) - { - dispatchMessage(message); - } - } - } - - long current = _rollbackMark.get(); - if (updateRollbackMark(current, deliveryTag)) - { - _rollbackMark.compareAndSet(current, deliveryTag); - } + disp.dispatch(AMQSession.this); } } catch (InterruptedException e) @@ -2821,11 +2830,47 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void dispatchMessage(UnprocessedMessage message) { - //This if block is not needed anymore as bounce messages are handled separately - //if (message.getDeliverBody() != null) - //{ - final C consumer = - _consumers.get(message.getConsumerTag()); + long deliveryTag = message.getDeliveryTag(); + + synchronized (_lock) + { + + try + { + while (connectionStopped()) + { + _lock.wait(); + } + } + catch (InterruptedException e) + { + // pass + } + + if (!(message instanceof CloseConsumerMessage) + && tagLE(deliveryTag, _rollbackMark.get())) + { + rejectMessage(message, true); + } + else + { + synchronized (_messageDeliveryLock) + { + notifyConsumer(message); + } + } + } + + long current = _rollbackMark.get(); + if (updateRollbackMark(current, deliveryTag)) + { + _rollbackMark.compareAndSet(current, deliveryTag); + } + } + + private void notifyConsumer(UnprocessedMessage message) + { + final C consumer = _consumers.get(message.getConsumerTag()); if ((consumer == null) || consumer.isClosed()) { @@ -2833,7 +2878,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (consumer == null) { - _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" + _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + + System.identityHashCode(message) + ")" + "[" + message.getDeliveryTag() + "] from queue " + message.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } @@ -2841,7 +2887,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { if (consumer.isNoConsume()) { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + _dispatcherLogger.info("Received a message(" + + System.identityHashCode(message) + ")" + "[" + message.getDeliveryTag() + "] from queue " + " consumer(" + message.getConsumerTag() + ") is closed and a browser so dropping..."); //DROP MESSAGE @@ -2850,7 +2897,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" + _dispatcherLogger.info("Received a message(" + + System.identityHashCode(message) + ")" + "[" + message.getDeliveryTag() + "] from queue " + " consumer(" + message.getConsumerTag() + ") is closed rejecting(requeue)..."); } @@ -2866,7 +2914,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { consumer.notifyMessage(message); } - } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index ab983aa842..82f56d9985 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -154,6 +154,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic //------- overwritten methods of class AMQSession + void failoverPrep() + { + super.failoverPrep(); + clearUnacked(); + } + /** * Acknowledge one or many messages. * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 713c87260c..e2cb36a030 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.message; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageConsumer; @@ -30,7 +31,7 @@ import org.apache.qpid.client.BasicMessageConsumer; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public abstract class UnprocessedMessage +public abstract class UnprocessedMessage implements AMQSession.Dispatchable { private final int _consumerTag; @@ -49,5 +50,9 @@ public abstract class UnprocessedMessage return _consumerTag; } + public void dispatch(AMQSession ssn) + { + ssn.dispatch(this); + } }
\ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 1a44ed8973..32bb9ca612 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -292,6 +292,13 @@ public class Session extends SessionInvoker synchronized (processedLock) { log.debug("%s", processed); + + if (ge(range.getUpper(), commandsIn)) + { + throw new IllegalArgumentException + ("range exceeds max received command-id: " + range); + } + processed.add(range); Range first = processed.getFirst(); int lower = first.getLower(); |
