diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 14:20:37 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 14:20:37 +0000 |
| commit | 9ba2ca90c9127ea98372a9758e731dd9fe19c212 (patch) | |
| tree | d49fa569caeb1f37ae6b9ecd8d003da1252f1857 /java/client | |
| parent | 2fc9bb58d66e5490c1432ac4b8f8a96731f7a8f3 (diff) | |
| download | qpid-python-9ba2ca90c9127ea98372a9758e731dd9fe19c212.tar.gz | |
Merged the refactor to a common AMQMethodListener class on trunk, plus the race condition fix of Robert Godfrey. This opens the way for Request and Response managers to use a common event dispatch for both client and server.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496326 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
6 files changed, 66 insertions, 60 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 53291a3fd0..50596d4bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -89,7 +89,7 @@ public class FailoverHandler implements Runnable // have a state waiter waiting until the connection is closed for some reason. Or in future we may have // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); - _amqProtocolHandler.setStateManager(new AMQStateManager()); + _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession())); if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { _amqProtocolHandler.setStateManager(existingStateManager); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java deleted file mode 100644 index 2cbd8f0e32..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.client.protocol.AMQProtocolSession; - -public interface AMQMethodListener -{ - /** - * Invoked when a method frame has been received - * @param evt the event - * @return true if the handler has processed the method frame, false otherwise. Note - * that this does not prohibit the method event being delivered to subsequent listeners - * but can be used to determine if nobody has dealt with an incoming method frame. - * @throws AMQException if an error has occurred. This exception will be delivered - * to all registered listeners using the error() method (see below) allowing them to - * perform cleanup if necessary. - */ - boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException; - - /** - * Callback when an error has occurred. Allows listeners to clean up. - * @param e - */ - void error(Exception e); -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index a0399e1450..12ace3c705 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -46,6 +46,7 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.ssl.BogusSSLContextFactory; @@ -148,7 +149,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } - _protocolSession = new AMQProtocolSession(this, session, _connection); + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } @@ -284,7 +285,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - _stateManager.error(e); + getStateManager().error(e); final Iterator it = _frameListeners.iterator(); while (it.hasNext()) { @@ -321,11 +322,11 @@ public class AMQProtocolHandler extends IoHandlerAdapter while (it.hasNext()) { final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } if (!wasAnyoneInterested) { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + _frameListeners); } } catch (AMQException e) @@ -383,7 +384,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void attainState(AMQState s) throws AMQException { - _stateManager.attainState(s); + getStateManager().attainState(s); } /** @@ -471,7 +472,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void closeConnection() throws AMQException { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); + getStateManager().changeState(AMQState.CONNECTION_CLOSING); // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -547,6 +548,12 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void setStateManager(AMQStateManager stateManager) { _stateManager = stateManager; + _protocolSession.setStateManager(stateManager); + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; } FailoverState getFailoverState() diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 167c3e68db..440eef54a6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -31,6 +31,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.ConnectionTuneParameters; import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -66,6 +67,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis protected static final String SASL_CLIENT = "SASLClient"; protected final IoSession _minaProtocolSession; + + private AMQStateManager _stateManager; protected WriteFuture _lastWriteFuture; @@ -102,6 +105,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { _protocolHandler = null; _minaProtocolSession = null; + _stateManager = new AMQStateManager(this); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -110,6 +114,19 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _minaProtocolSession = protocolSession; // properties of the connection are made available to the event handlers _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); + _stateManager = new AMQStateManager(this); + } + + public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) + { + _protocolHandler = protocolHandler; + _minaProtocolSession = protocolSession; + // properties of the connection are made available to the event handlers + _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); + + _stateManager = stateManager; + _stateManager.setProtocolSession(this); + } public void init() @@ -140,6 +157,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { getAMQConnection().setClientID(clientID); } + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public void setStateManager(AMQStateManager stateManager) + { + _stateManager = stateManager; + } public String getVirtualHost() { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 4fff4fab00..c05e6faf53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.client.protocol.AMQProtocolSession; public abstract class BlockingMethodFrameListener implements AMQMethodListener @@ -55,7 +56,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * @return true if the listener has dealt with this frame * @throws AMQException */ - public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { AMQMethodBody method = evt.getMethod(); diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index 0d9d70b244..5ace6dde69 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -22,8 +22,8 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.client.handler.*; -import org.apache.qpid.client.protocol.AMQMethodListener; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.*; import org.apache.log4j.Logger; @@ -41,6 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); + private AMQProtocolSession _protocolSession; /** * The current state @@ -54,14 +55,20 @@ public class AMQStateManager implements AMQMethodListener private final Map _state2HandlersMap = new HashMap(); private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); - + public AMQStateManager() { - this(AMQState.CONNECTION_NOT_STARTED, true); + this(null); } - protected AMQStateManager(AMQState state, boolean register) + public AMQStateManager(AMQProtocolSession protocolSession) { + this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); + } + + protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession) + { + _protocolSession = protocolSession; _currentState = state; if(register) { @@ -154,12 +161,12 @@ public class AMQStateManager implements AMQMethodListener } } - public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, protocolSession, evt); + handler.methodReceived(this, _protocolSession, evt); return true; } return false; @@ -235,4 +242,14 @@ public class AMQStateManager implements AMQMethodListener } // at this point the state will have changed. } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; + } + + public void setProtocolSession(AMQProtocolSession session) + { + _protocolSession = session; + } } |
