From 7eeab801bf9055035d4d16a78d654fa874209bc7 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 10 Jul 2012 10:56:32 +0000 Subject: QPID-4121: AMQProtocolEngine now uses lock to prevent the thread-unsafe use of AMQChannel's transaction which caused it to throw a ConcurrentModificationException. Applied patch from Philip Harvey and Keith Wall . git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1359595 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 2 +- .../qpid/server/protocol/AMQProtocolEngine.java | 40 ++++++++++++++++------ .../qpid/server/protocol/AMQProtocolSession.java | 4 --- 3 files changed, 31 insertions(+), 15 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index abbde1d6b8..3f3269605f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1532,7 +1532,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms"); } - // Close connection for idle or open transactions that have timed out + // Close session for idle or open transactions that have timed out if (idleClose > 0L && idleTime > idleClose) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index cec7ff9625..7ef5124cc4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -34,6 +34,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import javax.security.auth.Subject; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; @@ -152,8 +155,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private long _lastReceivedTime; private boolean _blocking; + private final Lock _receivedLock; + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId) { + _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(virtualHostRegistry, this); _codecFactory = new AMQCodecFactory(true, this); @@ -225,6 +231,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi final long arrivalTime = System.currentTimeMillis(); _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; + + _receivedLock.lock(); try { final ArrayList dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -249,6 +257,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.error("Unexpected exception when processing datablock", e); closeProtocolSession(); } + finally + { + _receivedLock.unlock(); + } } private void receiveComplete() @@ -815,7 +827,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - public void closeConnection(int channelId, AMQConnectionException e) throws AMQException + private void closeConnection(int channelId, AMQConnectionException e) throws AMQException { try { @@ -1308,17 +1320,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - int channelId = ((AMQChannel)session).getChannelId(); - closeChannel(channelId); + _receivedLock.lock(); + try + { + int channelId = ((AMQChannel)session).getChannelId(); + closeChannel(channelId); - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - new AMQShortString(message), - 0,0); + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + new AMQShortString(message), + 0,0); - writeFrame(responseBody.generateFrame(channelId)); + writeFrame(responseBody.generateFrame(channelId)); + } + finally + { + _receivedLock.unlock(); + } } public void close(AMQConstant cause, String message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index e833069320..01666ca58b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -154,10 +154,6 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void closeProtocolSession(); - /** This must be called to close the session in order to free up any resources managed by the session. */ - void closeConnection(int channelId, AMQConnectionException e) throws AMQException; - - /** @return a key that uniquely identifies this session */ Object getKey(); -- cgit v1.2.1