From 41e967fa0309eca6743dd777436bbc130e7437c0 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 27 Mar 2013 20:42:18 +0000 Subject: QPID-4673 : [Java Broker AMQP 1.0] Remove potential for deadlock between connection and subscription in 1.0 codepath git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1461844 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/v1_0/Subscription_1_0.java | 48 ++++++++++++++-------- 1 file changed, 30 insertions(+), 18 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 8bde913149..f865837350 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -717,15 +717,18 @@ class Subscription_1_0 implements Subscription getEndpoint().detach(); } - public synchronized boolean wouldSuspend(final QueueEntry msg) + public boolean wouldSuspend(final QueueEntry msg) { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); - if(!hasCredit && getState() == State.ACTIVE) + synchronized (_link.getLock()) { - suspend(); - } + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + if(!hasCredit && getState() == State.ACTIVE) + { + suspend(); + } - return !hasCredit; + return !hasCredit; + } } public boolean trySendLock() @@ -733,11 +736,14 @@ class Subscription_1_0 implements Subscription return _stateChangeLock.tryLock(); } - public synchronized void suspend() + public void suspend() { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + synchronized(_link.getLock()) { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } } } @@ -808,26 +814,32 @@ class Subscription_1_0 implements Subscription return false; //TODO } - public synchronized void queueEmpty() + public void queueEmpty() { - if(_link.drained()) + synchronized(_link.getLock()) { - if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + if(_link.drained()) { - _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED)) + { + _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED); + } } } } - public synchronized void flowStateChanged() + public void flowStateChanged() { - if(isSuspended() && getEndpoint() != null) + synchronized(_link.getLock()) { - if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + if(isSuspended() && getEndpoint() != null) { - _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE)) + { + _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE); + } + _transactionId = _link.getTransactionId(); } - _transactionId = _link.getTransactionId(); } } -- cgit v1.2.1