summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client/Client/State
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-28 08:09:20 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-28 08:09:20 +0000
commit79cd6c772da003ddc917eff362f9adaa99e28b49 (patch)
treebbb1e4b46add9a52f4eb15afe83fb16b5ff6af66 /dotnet/Qpid.Client/Client/State
parente1de334597e23b55c9e91c1f853f52e8313ba103 (diff)
downloadqpid-python-79cd6c772da003ddc917eff362f9adaa99e28b49.tar.gz
Merged revisions 539783-539788 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r539783 | tomasr | 2007-05-19 18:40:32 +0100 (Sat, 19 May 2007) | 8 lines * QPID-495 (Contributed by Carlos Medina) Implement default timeouts for AttainState and SyncWrite * Fix method signatures * Remove SSL test with client-side certificates (requires extra setup) * Add locks AMSQtateManager and AMQProtocolListener to prevent modification of listener collections while processing notifications * Add library/runtime information to ConnectionStartMethodHandler * Fix some compiler warnings * Added XML documentation for some api interfaces ........ r539788 | tomasr | 2007-05-19 19:55:33 +0100 (Sat, 19 May 2007) | 1 line * Excluded failover tests from nant builds and SSL tests on mono ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551497 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'dotnet/Qpid.Client/Client/State')
-rw-r--r--dotnet/Qpid.Client/Client/State/AMQStateManager.cs52
-rw-r--r--dotnet/Qpid.Client/Client/State/StateWaiter.cs38
2 files changed, 67 insertions, 23 deletions
diff --git a/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
index 1233f9d836..9ce6d3c76a 100644
--- a/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
+++ b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
@@ -43,13 +43,15 @@ namespace Qpid.Client.State
/// Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
/// The class must be a subclass of AMQFrame.
/// </summary>
- private readonly IDictionary _state2HandlersMap = new Hashtable();
-
- //private CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
- private ArrayList _stateListeners = ArrayList.Synchronized(new ArrayList(5));
+ private readonly IDictionary _state2HandlersMap;
+ private ArrayList _stateListeners;
+ private object _syncLock;
public AMQStateManager()
{
+ _syncLock = new object();
+ _state2HandlersMap = new Hashtable();
+ _stateListeners = ArrayList.Synchronized(new ArrayList(5));
_currentState = AMQState.CONNECTION_NOT_STARTED;
RegisterListeners();
}
@@ -132,18 +134,24 @@ namespace Qpid.Client.State
AMQState oldState = _currentState;
_currentState = newState;
- foreach (IStateListener l in _stateListeners)
+ lock ( _syncLock )
{
- l.StateChanged(oldState, newState);
+ foreach ( IStateListener l in _stateListeners )
+ {
+ l.StateChanged(oldState, newState);
+ }
}
}
public void Error(Exception e)
{
_logger.Debug("State manager receive error notification: " + e);
- foreach (IStateListener l in _stateListeners)
+ lock ( _syncLock )
{
- l.Error(e);
+ foreach ( IStateListener l in _stateListeners )
+ {
+ l.Error(e);
+ }
}
}
@@ -206,23 +214,37 @@ namespace Qpid.Client.State
public void AddStateListener(IStateListener listener)
{
_logger.Debug("Adding state listener");
- _stateListeners.Add(listener);
+ lock ( _syncLock )
+ {
+ _stateListeners.Add(listener);
+ }
}
public void RemoveStateListener(IStateListener listener)
{
- _stateListeners.Remove(listener);
+ lock ( _syncLock )
+ {
+ _stateListeners.Remove(listener);
+ }
}
public void AttainState(AMQState s)
{
if (_currentState != s)
{
- _logger.Debug("Adding state wait to reach state " + s);
- StateWaiter sw = new StateWaiter(s);
- AddStateListener(sw);
- sw.WaituntilStateHasChanged();
- // at this point the state will have changed.
+ StateWaiter sw = null;
+ try
+ {
+ _logger.Debug("Adding state wait to reach state " + s);
+ sw = new StateWaiter(s);
+ AddStateListener(sw);
+ sw.WaituntilStateHasChanged();
+ // at this point the state will have changed.
+ }
+ finally
+ {
+ RemoveStateListener(sw);
+ }
}
}
}
diff --git a/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/dotnet/Qpid.Client/Client/State/StateWaiter.cs
index cb7f604499..34667da744 100644
--- a/dotnet/Qpid.Client/Client/State/StateWaiter.cs
+++ b/dotnet/Qpid.Client/Client/State/StateWaiter.cs
@@ -20,6 +20,7 @@
*/
using System;
using System.Threading;
+using Qpid.Client.Protocol;
using log4net;
namespace Qpid.Client.State
@@ -29,6 +30,7 @@ namespace Qpid.Client.State
private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter));
private readonly AMQState _state;
+ private AMQState _newState;
private volatile bool _newStateAchieved;
@@ -42,7 +44,8 @@ namespace Qpid.Client.State
}
public void StateChanged(AMQState oldState, AMQState newState)
- {
+ {
+ _newState = newState;
if (_logger.IsDebugEnabled)
{
_logger.Debug("stateChanged called");
@@ -76,23 +79,42 @@ namespace Qpid.Client.State
// The guard is required in case we are woken up by a spurious
// notify().
//
- while (!_newStateAchieved && _exception == null)
- {
+
+ TimeSpan waitTime = TimeSpan.FromMilliseconds(DefaultTimeouts.MaxWaitForState);
+ DateTime waitUntilTime = DateTime.Now + waitTime;
+
+ while ( !_newStateAchieved
+ && _exception == null
+ && waitTime.TotalMilliseconds > 0 )
+ {
_logger.Debug("State not achieved so waiting...");
- _resetEvent.WaitOne();
+ try
+ {
+ _resetEvent.WaitOne(waitTime, true);
+ }
+ finally
+ {
+ if (!_newStateAchieved)
+ {
+ waitTime = waitUntilTime - DateTime.Now;
+ }
+ }
}
if (_exception != null)
{
_logger.Debug("Throwable reached state waiter: " + _exception);
if (_exception is AMQException)
- {
throw _exception;
- }
else
- {
throw new AMQException("Error: " + _exception, _exception);
- }
+ }
+
+ if (!_newStateAchieved)
+ {
+ string error = string.Format("State not achieved within permitted time. Current state: {0}, desired state: {1}", _state, _newState);
+ _logger.Warn(error);
+ throw new AMQException(error);
}
}
}