From 2cf2cb66be056289ad8ea35827b34f8666cfc481 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 30 Apr 2007 12:24:41 +0000 Subject: QPID-476 : Remove duplicate map of channelId to session git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@533721 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQConnection.java | 15 +++-- .../java/org/apache/qpid/client/AMQSession.java | 21 +------ .../qpid/client/protocol/AMQProtocolHandler.java | 20 ------- .../qpid/client/protocol/AMQProtocolSession.java | 66 ++++++---------------- .../client/protocol/AMQProtocolSessionTest.java | 3 - 5 files changed, 28 insertions(+), 97 deletions(-) (limited to 'java') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 0e3d99eeba..347f5728e2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -96,7 +96,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap + private final Map _sessions = new LinkedHashMap(); + private String _clientName; @@ -508,7 +509,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, prefetchLow); - _protocolHandler.addSessionByChannel(channelId, session); + //_protocolHandler.addSessionByChannel(channelId, session); registerSession(channelId, session); boolean success = false; @@ -527,7 +528,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { if (!success) { - _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); } } @@ -589,7 +589,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } catch (AMQException e) { - _protocolHandler.removeSessionByChannel(channelId); deregisterSession(channelId); throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e); } @@ -1136,7 +1135,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect for (Iterator it = sessions.iterator(); it.hasNext();) { AMQSession s = (AMQSession) it.next(); - _protocolHandler.addSessionByChannel(s.getChannelId(), s); + //_protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); } @@ -1223,4 +1222,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { _taskPool.execute(task); } + + + public AMQSession getSession(int channelId) + { + return _sessions.get(channelId); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 700222dabb..ceca43b785 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -429,17 +429,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry) - { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK); - } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) - { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch); - } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) @@ -493,15 +483,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry()); - } - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch); - } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { @@ -796,7 +778,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi amqe = new AMQException("Closing session forcibly", e); } _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + closeProducersAndConsumers(amqe); } } @@ -809,6 +791,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _closed.set(true); _connection.deregisterSession(_channelId); markClosedProducersAndConsumers(); + } private void markClosedProducersAndConsumers() diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 5bc1555df7..addef94215 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -490,27 +490,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout); } - /** - * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol - * handler will ensure that messages are delivered to the consumer(s) on that session. - * - * @param channelId the channel id of the session - * @param session the session instance. - */ - public void addSessionByChannel(int channelId, AMQSession session) - { - _protocolSession.addSessionByChannel(channelId, session); - } - /** - * Convenience method to deregister an AMQSession with the protocol handler. - * - * @param channelId then channel id of the session - */ - public void removeSessionByChannel(int channelId) - { - _protocolSession.removeSessionByChannel(channelId); - } public void closeSession(AMQSession session) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 35aa69bd82..a557fc8027 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -85,7 +85,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession protected final AMQProtocolHandler _protocolHandler; /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); @@ -104,26 +104,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion()); - /** - * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for - * test - */ - public AMQProtocolSession() - { - _protocolHandler = null; - _minaProtocolSession = null; - _stateManager = new AMQStateManager(this); - } + private final AMQConnection _connection; + public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _stateManager = new AMQStateManager(this); + this(protocolHandler,protocolSession,connection, new AMQStateManager()); + } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, @@ -138,6 +125,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); _stateManager = stateManager; _stateManager.setProtocolSession(this); + _connection = connection; } @@ -305,11 +293,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg) { - AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId); + AMQSession session = getSession(channelId); session.messageReceived(msg); _channelId2UnprocessedMsgMap.remove(channelId); } + protected AMQSession getSession(int channelId) + { + return _connection.getSession(channelId); + } + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -335,32 +328,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public void addSessionByChannel(int channelId, AMQSession session) - { - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero"); - } - - if (session == null) - { - throw new IllegalArgumentException("Attempt to register a null session"); - } - - _logger.debug("Add session with channel id " + channelId); - _channelId2SessionMap.put(channelId, session); - } - - public void removeSessionByChannel(int channelId) - { - if (channelId <= 0) - { - throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero"); - } - - _logger.debug("Removing session with channelId " + channelId); - _channelId2SessionMap.remove(channelId); - } /** * Starts the process of closing a session @@ -393,11 +360,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException { - final Integer chId = channelId; + // if this is not a response to an earlier request to close the channel - if (_closingChannels.remove(chId) == null) + if (_closingChannels.remove(channelId) == null) { - final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + final AMQSession session = getSession(channelId); try { session.closed(new AMQException(code, text)); @@ -469,8 +436,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { - final Integer chId = channelId; - final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId); + final AMQSession session = getSession(channelId); session.confirmConsumerCancelled(consumerTag); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 4374329fb0..3776ff767f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -32,9 +32,6 @@ public class AMQProtocolSessionTest extends TestCase { private static class AMQProtSession extends AMQProtocolSession { - public AMQProtSession() - { - } public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { -- cgit v1.2.1