diff options
| author | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
|---|---|---|
| committer | Steven Shaw <steshaw@apache.org> | 2006-11-25 22:04:39 +0000 |
| commit | 7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6 (patch) | |
| tree | 3122525268281cd9df870e0a9cb309ee7410a424 /dotnet/Qpid.Client/Client/State | |
| parent | 8f32ca18d5281eaa5baafa769c99fa70c830b14f (diff) | |
| download | qpid-python-7c1f9158be7a5d1124a48f42f8d7dcfb6d5df2a6.tar.gz | |
QPID-128 Initial import of the C# sources.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@479211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/State')
8 files changed, 556 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/State/AMQState.cs b/dotnet/Qpid.Client/Client/State/AMQState.cs new file mode 100644 index 0000000000..fc71fe647c --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/AMQState.cs @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.State +{ + public enum AMQState + { + CONNECTION_NOT_STARTED, + CONNECTION_NOT_TUNED, + CONNECTION_NOT_OPENED, + CONNECTION_OPEN, + CONNECTION_CLOSING, + CONNECTION_CLOSED, + ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all". + } +} + diff --git a/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs new file mode 100644 index 0000000000..60d44da824 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.State +{ + public class AMQStateChangedEvent + { + private readonly AMQState _oldState; + + private readonly AMQState _newState; + + public AMQStateChangedEvent(AMQState oldState, AMQState newState) + { + _oldState = oldState; + _newState = newState; + } + + public AMQState OldState + { + get + { + return _oldState; + } + } + + public AMQState NewState + { + get + { + return _newState; + } + } + + } +} diff --git a/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs new file mode 100644 index 0000000000..05f673d520 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Qpid.Client.Handler; +using Qpid.Client.Protocol; +using Qpid.Client.Protocol.Listener; +using Qpid.Framing; + +namespace Qpid.Client.State +{ + public class AMQStateManager : IAMQMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager)); + + const bool InfoLoggingHack = true; + + /// <summary> + /// The current state + /// </summary> + private AMQState _currentState; + + /// <summary> + /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler. + /// The class must be a subclass of AMQFrame. + /// </summary> + private readonly IDictionary _state2HandlersMap = new Hashtable(); + + //private CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + private ArrayList _stateListeners = ArrayList.Synchronized(new ArrayList(5)); + + public AMQStateManager() + { + _currentState = AMQState.CONNECTION_NOT_STARTED; + RegisterListeners(); + } + + private void RegisterListeners() + { + IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler(); + IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler(); + IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler(); + IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler(); + IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler(); + IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler(); + IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler(); + IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler(); + IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler(); + + // We need to register a map for the null (i.e. all state) handlers otherwise you get + // a stack overflow in the handler searching code when you present it with a frame for which + // no handlers are registered. + _state2HandlersMap[AMQState.ALL] = new Hashtable(); + + { + Hashtable notStarted = new Hashtable(); + notStarted[typeof(ConnectionStartBody)] = connectionStart; + notStarted[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted; + } + { + Hashtable notTuned = new Hashtable(); + notTuned[typeof(ConnectionTuneBody)] = connectionTune; + notTuned[typeof(ConnectionSecureBody)] = connectionSecure; + notTuned[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned; + } + { + Hashtable notOpened = new Hashtable(); + notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk; + notOpened[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened; + } + { + Hashtable open = new Hashtable(); + open[typeof(ChannelCloseBody)] = channelClose; + open[typeof(ConnectionCloseBody)] = connectionClose; + open[typeof(BasicDeliverBody)] = basicDeliver; + open[typeof(BasicReturnBody)] = basicReturn; + _state2HandlersMap[AMQState.CONNECTION_OPEN] = open; + } + { + Hashtable closing = new Hashtable(); + closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk; + _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing; + } + } + + public AMQState CurrentState + { + get + { + return _currentState; + } + } + + /// <summary> + /// Changes the state. + /// </summary> + /// <param name="newState">The new state.</param> + /// <exception cref="AMQException">if there is an error changing state</exception> + public void ChangeState(AMQState newState) + { + if (InfoLoggingHack) + { + _logger.Info("State changing to " + newState + " from old state " + _currentState); + } + _logger.Debug("State changing to " + newState + " from old state " + _currentState); + AMQState oldState = _currentState; + _currentState = newState; + + foreach (IStateListener l in _stateListeners) + { + l.StateChanged(oldState, newState); + } + } + + public void Error(Exception e) + { + _logger.Debug("State manager receive error notification: " + e); + foreach (IStateListener l in _stateListeners) + { + l.Error(e); + } + } + + public bool MethodReceived(AMQMethodEvent evt) + { + _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType())); + IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method); + if (handler != null) + { + handler.MethodReceived(this, evt); + return true; + } + return false; + } + + /// <summary> + /// Finds the state transition handler. + /// </summary> + /// <param name="currentState">State of the current.</param> + /// <param name="frame">The frame.</param> + /// <returns></returns> + /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception> + private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState, + AMQMethodBody frame) + { + Type clazz = frame.GetType(); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Looking for state transition handler for frame " + clazz); + } + IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState]; + + if (classToHandlerMap == null) + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz]; + if (handler == null) + { + if (currentState == AMQState.ALL) + { + _logger.Debug("No state transition handler defined for receiving frame " + frame); + return null; + } + else + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + } + else + { + return handler; + } + } + + public void AddStateListener(IStateListener listener) + { + _logger.Debug("Adding state listener"); + _stateListeners.Add(listener); + } + + public void RemoveStateListener(IStateListener listener) + { + _stateListeners.Remove(listener); + } + + public void AttainState(AMQState s) + { + if (_currentState != s) + { + _logger.Debug("Adding state wait to reach state " + s); + StateWaiter sw = new StateWaiter(s); + AddStateListener(sw); + sw.WaituntilStateHasChanged(); + // at this point the state will have changed. + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs new file mode 100644 index 0000000000..ff27cd841e --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.State +{ + public interface IAMQStateListener + { + void StateChanged(AMQStateChangedEvent evt); + } +} + diff --git a/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs new file mode 100644 index 0000000000..256fe1c3f3 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Client.Protocol; + +namespace Qpid.Client.State +{ + public interface IStateAwareMethodListener + { + void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt); + } +} + diff --git a/dotnet/Qpid.Client/Client/State/IStateListener.cs b/dotnet/Qpid.Client/Client/State/IStateListener.cs new file mode 100644 index 0000000000..6073b2bb0c --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IStateListener.cs @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.State +{ + public interface IStateListener + { + void StateChanged(AMQState oldState, AMQState newState); + + void Error(Exception e); + } +} + diff --git a/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs new file mode 100644 index 0000000000..723ae04397 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.State +{ + public class IllegalStateTransitionException : AMQException + { + private AMQState _originalState; + + private Type _frame; + + public IllegalStateTransitionException(AMQState originalState, Type frame) + : base("No valid state transition defined for receiving frame " + frame + + " from state " + originalState) + { + _originalState = originalState; + _frame = frame; + } + + public AMQState OriginalState + { + get + { + return _originalState; + } + } + + public Type FrameType + { + get + { + return _frame; + } + } + } +} + diff --git a/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/dotnet/Qpid.Client/Client/State/StateWaiter.cs new file mode 100644 index 0000000000..cb7f604499 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/StateWaiter.cs @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; + +namespace Qpid.Client.State +{ + public class StateWaiter : IStateListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter)); + + private readonly AMQState _state; + + private volatile bool _newStateAchieved; + + private volatile Exception _exception; + + private ManualResetEvent _resetEvent = new ManualResetEvent(false); + + public StateWaiter(AMQState state) + { + _state = state; + } + + public void StateChanged(AMQState oldState, AMQState newState) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("stateChanged called"); + } + if (_state == newState) + { + _newStateAchieved = true; + + if (_logger.IsDebugEnabled) + { + _logger.Debug("New state reached so notifying monitor"); + } + _resetEvent.Set(); + } + } + + public void Error(Exception e) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("exceptionThrown called"); + } + + _exception = e; + _resetEvent.Set(); + } + + public void WaituntilStateHasChanged() + { + // + // The guard is required in case we are woken up by a spurious + // notify(). + // + while (!_newStateAchieved && _exception == null) + { + _logger.Debug("State not achieved so waiting..."); + _resetEvent.WaitOne(); + } + + if (_exception != null) + { + _logger.Debug("Throwable reached state waiter: " + _exception); + if (_exception is AMQException) + { + throw _exception; + } + else + { + throw new AMQException("Error: " + _exception, _exception); + } + } + } + } +} |
