summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-15 14:20:37 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-15 14:20:37 +0000
commit9ba2ca90c9127ea98372a9758e731dd9fe19c212 (patch)
treed49fa569caeb1f37ae6b9ecd8d003da1252f1857 /java/client
parent2fc9bb58d66e5490c1432ac4b8f8a96731f7a8f3 (diff)
downloadqpid-python-9ba2ca90c9127ea98372a9758e731dd9fe19c212.tar.gz
Merged the refactor to a common AMQMethodListener class on trunk, plus the race condition fix of Robert Godfrey. This opens the way for Request and Response managers to use a common event dispatch for both client and server.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496326 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java46
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java29
6 files changed, 66 insertions, 60 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 53291a3fd0..50596d4bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -89,7 +89,7 @@ public class FailoverHandler implements Runnable
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
// a slightly more complex state model therefore I felt it was worthwhile doing this.
AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
- _amqProtocolHandler.setStateManager(new AMQStateManager());
+ _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
{
_amqProtocolHandler.setStateManager(existingStateManager);
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
deleted file mode 100644
index 2cbd8f0e32..0000000000
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-public interface AMQMethodListener
-{
- /**
- * Invoked when a method frame has been received
- * @param evt the event
- * @return true if the handler has processed the method frame, false otherwise. Note
- * that this does not prohibit the method event being delivered to subsequent listeners
- * but can be used to determine if nobody has dealt with an incoming method frame.
- * @throws AMQException if an error has occurred. This exception will be delivered
- * to all registered listeners using the error() method (see below) allowing them to
- * perform cleanup if necessary.
- */
- boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException;
-
- /**
- * Callback when an error has occurred. Allows listeners to clean up.
- * @param e
- */
- void error(Exception e);
-}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index a0399e1450..12ace3c705 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -46,6 +46,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.ssl.BogusSSLContextFactory;
@@ -148,7 +149,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
- _protocolSession = new AMQProtocolSession(this, session, _connection);
+ _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -284,7 +285,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- _stateManager.error(e);
+ getStateManager().error(e);
final Iterator it = _frameListeners.iterator();
while (it.hasNext())
{
@@ -321,11 +322,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter
while (it.hasNext())
{
final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
if (!wasAnyoneInterested)
{
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners);
}
}
catch (AMQException e)
@@ -383,7 +384,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void attainState(AMQState s) throws AMQException
{
- _stateManager.attainState(s);
+ getStateManager().attainState(s);
}
/**
@@ -471,7 +472,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void closeConnection() throws AMQException
{
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ getStateManager().changeState(AMQState.CONNECTION_CLOSING);
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -547,6 +548,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _protocolSession.setStateManager(stateManager);
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
}
FailoverState getFailoverState()
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 167c3e68db..440eef54a6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -66,6 +67,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
protected static final String SASL_CLIENT = "SASLClient";
protected final IoSession _minaProtocolSession;
+
+ private AMQStateManager _stateManager;
protected WriteFuture _lastWriteFuture;
@@ -102,6 +105,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
_protocolHandler = null;
_minaProtocolSession = null;
+ _stateManager = new AMQStateManager(this);
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -110,6 +114,19 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_minaProtocolSession = protocolSession;
// properties of the connection are made available to the event handlers
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ _stateManager = new AMQStateManager(this);
+ }
+
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = protocolSession;
+ // properties of the connection are made available to the event handlers
+ _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+
+ _stateManager = stateManager;
+ _stateManager.setProtocolSession(this);
+
}
public void init()
@@ -140,6 +157,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
{
getAMQConnection().setClientID(clientID);
}
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
public String getVirtualHost()
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 4fff4fab00..c05e6faf53 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
public abstract class BlockingMethodFrameListener implements AMQMethodListener
@@ -55,7 +56,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener
* @return true if the listener has dealt with this frame
* @throws AMQException
*/
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
AMQMethodBody method = evt.getMethod();
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index 0d9d70b244..5ace6dde69 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -22,8 +22,8 @@ package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.client.handler.*;
-import org.apache.qpid.client.protocol.AMQMethodListener;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -41,6 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
+ private AMQProtocolSession _protocolSession;
/**
* The current state
@@ -54,14 +55,20 @@ public class AMQStateManager implements AMQMethodListener
private final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
-
+
public AMQStateManager()
{
- this(AMQState.CONNECTION_NOT_STARTED, true);
+ this(null);
}
- protected AMQStateManager(AMQState state, boolean register)
+ public AMQStateManager(AMQProtocolSession protocolSession)
{
+ this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+ }
+
+ protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+ {
+ _protocolSession = protocolSession;
_currentState = state;
if(register)
{
@@ -154,12 +161,12 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
if (handler != null)
{
- handler.methodReceived(this, protocolSession, evt);
+ handler.methodReceived(this, _protocolSession, evt);
return true;
}
return false;
@@ -235,4 +242,14 @@ public class AMQStateManager implements AMQMethodListener
}
// at this point the state will have changed.
}
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void setProtocolSession(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
}