summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2007-04-30 12:24:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2007-04-30 12:24:41 +0000
commit2cf2cb66be056289ad8ea35827b34f8666cfc481 (patch)
treeda3e52957cacc2416447042916d2ec35cef58a20 /java
parent1511b413c4aae97ed8e9b4fd2f3f52089f175e4a (diff)
downloadqpid-python-2cf2cb66be056289ad8ea35827b34f8666cfc481.tar.gz
QPID-476 : Remove duplicate map of channelId to session
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@533721 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java21
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java66
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java3
5 files changed, 28 insertions, 97 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 0e3d99eeba..347f5728e2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -96,7 +96,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); // fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
+
private String _clientName;
@@ -508,7 +509,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
AMQSession session =
new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
- _protocolHandler.addSessionByChannel(channelId, session);
+ //_protocolHandler.addSessionByChannel(channelId, session);
registerSession(channelId, session);
boolean success = false;
@@ -527,7 +528,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
if (!success)
{
- _protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
}
}
@@ -589,7 +589,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
catch (AMQException e)
{
- _protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
throw new AMQException("Error reopening channel " + channelId + " after failover: " + e, e);
}
@@ -1136,7 +1135,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1223,4 +1222,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
_taskPool.execute(task);
}
+
+
+ public AMQSession getSession(int channelId)
+ {
+ return _sessions.get(channelId);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 700222dabb..ceca43b785 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -429,17 +429,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry)
- {
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
@@ -493,15 +483,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry());
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -796,7 +778,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
amqe = new AMQException("Closing session forcibly", e);
}
_connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ closeProducersAndConsumers(amqe);
}
}
@@ -809,6 +791,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_closed.set(true);
_connection.deregisterSession(_channelId);
markClosedProducersAndConsumers();
+
}
private void markClosedProducersAndConsumers()
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 5bc1555df7..addef94215 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -490,27 +490,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
}
- /**
- * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
- * handler will ensure that messages are delivered to the consumer(s) on that session.
- *
- * @param channelId the channel id of the session
- * @param session the session instance.
- */
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- _protocolSession.addSessionByChannel(channelId, session);
- }
- /**
- * Convenience method to deregister an AMQSession with the protocol handler.
- *
- * @param channelId then channel id of the session
- */
- public void removeSessionByChannel(int channelId)
- {
- _protocolSession.removeSessionByChannel(channelId);
- }
public void closeSession(AMQSession session) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 35aa69bd82..a557fc8027 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -85,7 +85,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQProtocolHandler _protocolHandler;
/** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -104,26 +104,13 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private VersionSpecificRegistry _registry =
MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
- /**
- * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
- * test
- */
- public AMQProtocolSession()
- {
- _protocolHandler = null;
- _minaProtocolSession = null;
- _stateManager = new AMQStateManager(this);
- }
+ private final AMQConnection _connection;
+
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _stateManager = new AMQStateManager(this);
+ this(protocolHandler,protocolSession,connection, new AMQStateManager());
+
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
@@ -138,6 +125,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
+ _connection = connection;
}
@@ -305,11 +293,16 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
{
- AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+ AMQSession session = getSession(channelId);
session.messageReceived(msg);
_channelId2UnprocessedMsgMap.remove(channelId);
}
+ protected AMQSession getSession(int channelId)
+ {
+ return _connection.getSession(channelId);
+ }
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -335,32 +328,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
- }
-
- if (session == null)
- {
- throw new IllegalArgumentException("Attempt to register a null session");
- }
-
- _logger.debug("Add session with channel id " + channelId);
- _channelId2SessionMap.put(channelId, session);
- }
-
- public void removeSessionByChannel(int channelId)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
- }
-
- _logger.debug("Removing session with channelId " + channelId);
- _channelId2SessionMap.remove(channelId);
- }
/**
* Starts the process of closing a session
@@ -393,11 +360,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
{
- final Integer chId = channelId;
+
// if this is not a response to an earlier request to close the channel
- if (_closingChannels.remove(chId) == null)
+ if (_closingChannels.remove(channelId) == null)
{
- final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
try
{
session.closed(new AMQException(code, text));
@@ -469,8 +436,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
- final Integer chId = channelId;
- final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
session.confirmConsumerCancelled(consumerTag);
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 4374329fb0..3776ff767f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -32,9 +32,6 @@ public class AMQProtocolSessionTest extends TestCase
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession()
- {
- }
public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{