summaryrefslogtreecommitdiff
path: root/dotnet/Qpid.Client
diff options
context:
space:
mode:
Diffstat (limited to 'dotnet/Qpid.Client')
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnection.cs845
-rw-r--r--dotnet/Qpid.Client/Client/AMQConnectionException.cs31
-rw-r--r--dotnet/Qpid.Client/Client/AMQDestination.cs233
-rw-r--r--dotnet/Qpid.Client/Client/AmqBrokerInfo.cs313
-rw-r--r--dotnet/Qpid.Client/Client/AmqChannel.cs1071
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageConsumer.cs404
-rw-r--r--dotnet/Qpid.Client/Client/BasicMessageProducer.cs275
-rw-r--r--dotnet/Qpid.Client/Client/Closeable.cs71
-rw-r--r--dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs214
-rw-r--r--dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs82
-rw-r--r--dotnet/Qpid.Client/Client/Failover/FailoverException.cs35
-rw-r--r--dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs173
-rw-r--r--dotnet/Qpid.Client/Client/Failover/FailoverState.cs31
-rw-r--r--dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs55
-rw-r--r--dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs42
-rw-r--r--dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs44
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs57
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs53
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs40
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs34
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs67
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs36
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs113
-rw-r--r--dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs65
-rw-r--r--dotnet/Qpid.Client/Client/Message/AMQMessage.cs52
-rw-r--r--dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs49
-rw-r--r--dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs800
-rw-r--r--dotnet/Qpid.Client/Client/Message/IMessageFactory.cs49
-rw-r--r--dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs117
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs581
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs60
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs137
-rw-r--r--dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs60
-rw-r--r--dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs48
-rw-r--r--dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs56
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs75
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs289
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs269
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs27
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs36
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs109
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs45
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs41
-rw-r--r--dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs78
-rw-r--r--dotnet/Qpid.Client/Client/QpidConnectionInfo.cs142
-rw-r--r--dotnet/Qpid.Client/Client/State/AMQState.cs34
-rw-r--r--dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs52
-rw-r--r--dotnet/Qpid.Client/Client/State/AMQStateManager.cs225
-rw-r--r--dotnet/Qpid.Client/Client/State/IAMQStateListener.cs28
-rw-r--r--dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs30
-rw-r--r--dotnet/Qpid.Client/Client/State/IStateListener.cs32
-rw-r--r--dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs56
-rw-r--r--dotnet/Qpid.Client/Client/State/StateWaiter.cs99
-rw-r--r--dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs47
-rw-r--r--dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs94
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IByteChannel.cs30
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs29
-rw-r--r--dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs29
-rw-r--r--dotnet/Qpid.Client/Client/Transport/ITransport.cs31
-rw-r--r--dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs40
-rw-r--r--dotnet/Qpid.Client/Properties/AssemblyInfo.cs52
-rw-r--r--dotnet/Qpid.Client/Qpid.Client.csproj133
-rw-r--r--dotnet/Qpid.Client/default.build52
-rw-r--r--dotnet/Qpid.Client/qms/BrokerInfo.cs63
-rw-r--r--dotnet/Qpid.Client/qms/ConnectionInfo.cs76
-rw-r--r--dotnet/Qpid.Client/qms/FailoverPolicy.cs315
-rw-r--r--dotnet/Qpid.Client/qms/failover/FailoverMethod.cs79
-rw-r--r--dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs255
-rw-r--r--dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs147
69 files changed, 9532 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs
new file mode 100644
index 0000000000..12eb9f6a21
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AMQConnection.cs
@@ -0,0 +1,845 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.IO;
+using System.Reflection;
+using System.Threading;
+using log4net;
+using Qpid.Client.Failover;
+using Qpid.Client.Protocol;
+using Qpid.Client.qms;
+using Qpid.Client.State;
+using Qpid.Client.Transport;
+using Qpid.Collections;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client
+{
+ public class AMQConnection : Closeable, IConnection
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection));
+
+ ConnectionInfo _connectionInfo;
+ private int _nextChannelId = 0;
+
+ // _Connected should be refactored with a suitable wait object.
+ private bool _connected;
+
+ Thread _heartBeatThread;
+ HeartBeatThread _heartBeatRunner;
+
+ // The last error code that occured on the connection. Used to return the correct exception to the client
+ private AMQException _lastAMQException = null;
+
+ /**
+ * This is the "root" mutex that must be held when doing anything that could be impacted by failover.
+ * This must be held by any child objects of this connection such as the session, producers and consumers.
+ */
+ private readonly Object _failoverMutex = new Object();
+ public object FailoverMutex
+ {
+ get { return _failoverMutex; }
+ }
+
+ /**
+ * Policy dictating how to failover
+ */
+ private FailoverPolicy _failoverPolicy;
+
+ internal bool IsFailoverAllowed
+ {
+ get { return _failoverPolicy.FailoverAllowed(); }
+ }
+
+ /// <summary>
+ /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels
+ /// per session and we must prevent the client from opening too many. Zero means unlimited.
+ /// </summary>
+ private ushort _maximumChannelCount;
+
+ /// <summary>
+ /// The maximum size of frame supported by the server
+ /// </summary>
+ private uint _maximumFrameSize;
+
+ private AMQStateManager _stateManager;
+
+ private AMQProtocolSession _protocolSession;
+ public AMQProtocolSession ProtocolSession { get { return _protocolSession; } }
+
+ /// <summary>
+ /// Maps from session id (Integer) to AmqChannel instance
+ /// </summary>
+ private readonly LinkedHashtable _sessions = new LinkedHashtable();
+
+ private ExceptionListenerDelegate _exceptionListener;
+
+ private IConnectionListener _connectionListener;
+
+ private ITransport _transport;
+ public ITransport Transport { get { return _transport; } }
+
+ /// <summary>
+ /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for
+ /// message publication.
+ /// </summary>
+ private bool _started;
+
+ private AMQProtocolListener _protocolListener;
+ public AMQProtocolListener ProtocolListener { get { return _protocolListener; } }
+
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _transport.ProtocolWriter; }
+ }
+
+ ProtocolWriter _protocolWriter;
+
+ public ProtocolWriter ConvenientProtocolWriter
+ {
+ get { return _protocolWriter; }
+ }
+
+ public AMQConnection(ConnectionInfo connectionInfo)
+ {
+ if (connectionInfo == null)
+ {
+ throw new ArgumentException("ConnectionInfo must be specified");
+ }
+ _log.Info("ConnectionInfo: " + connectionInfo);
+ _connectionInfo = connectionInfo;
+ _log.Info("password = " + _connectionInfo.getPassword());
+ _failoverPolicy = new FailoverPolicy(connectionInfo);
+
+ // We are not currently connected.
+ _connected = false;
+
+ Exception lastException = null;
+ do
+ {
+ try
+ {
+ BrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo();
+ _log.Info("Connecting to " + brokerInfo);
+ MakeBrokerConnection(brokerInfo);
+ break;
+ }
+ catch (Exception e)
+ {
+ lastException = e;
+ _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e);
+ // XXX: Should perhaps break out of the do/while here if not a SocketException...
+ }
+ } while (_failoverPolicy.FailoverAllowed());
+
+ _log.Debug("Are we connected:" + _connected);
+
+ if (!_failoverPolicy.FailoverAllowed())
+ {
+ throw new AMQConnectionException("Unable to connect", lastException);
+ }
+
+ // TODO: this needs to be redone so that we are not spinning.
+ // A suitable object should be set that is then waited on
+ // and only notified when a connection is made or when
+ // the AMQConnection gets closed.
+ while (!_connected && !Closed)
+ {
+ _log.Debug("Sleeping.");
+ Thread.Sleep(100);
+ }
+ if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null)
+ {
+ if (_lastAMQException != null)
+ {
+ throw _lastAMQException;
+ }
+ }
+ }
+
+ private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType)
+ {
+ Assembly assembly = Assembly.LoadFrom(assemblyName);
+ foreach (Type type in assembly.GetTypes())
+ {
+ _log.Info(String.Format("type = {0}", type));
+ }
+ Type transport = assembly.GetType(transportType);
+ if (transport == null)
+ {
+ throw new ArgumentException(
+ String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName));
+
+ }
+ _log.Info("transport = " + transport);
+ _log.Info("ctors = " + transport.GetConstructors());
+ ConstructorInfo info = transport.GetConstructors()[0];
+ ITransport result = (ITransport)info.Invoke(new object[] { host, port, this });
+
+ _log.Info("transport = " + result);
+ return result;
+ }
+
+ public void Disconnect()
+ {
+ _transport.Close();
+ }
+
+ #region IConnection Members
+
+ public string ClientID
+ {
+ get
+ {
+ CheckNotClosed();
+ return _connectionInfo.GetClientName();
+ }
+ set
+ {
+ CheckNotClosed();
+ _connectionInfo.SetClientName(value);
+ }
+ }
+
+ public override void Close()
+ {
+ lock (FailoverMutex)
+ {
+ // atomically set to closed and check the _previous value was NOT CLOSED
+ if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED)
+ {
+ try
+ {
+ CloseAllSessions(null);
+ CloseConnection();
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error closing connection: " + e);
+ }
+ }
+ }
+ }
+
+ private void CloseConnection()
+ {
+ _stateManager.ChangeState(AMQState.CONNECTION_CLOSING);
+
+ AMQFrame frame = ConnectionCloseBody.CreateAMQFrame(
+ 0, 200, "Qpid.NET client is closing the connection.", 0, 0);
+
+ ProtocolWriter.Write(frame);
+
+ _log.Debug("Blocking for connection close ok frame");
+
+ _stateManager.AttainState(AMQState.CONNECTION_CLOSED);
+ Disconnect();
+ }
+
+ class CreateChannelFailoverSupport : FailoverSupport
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport));
+
+ private bool _transacted;
+ private AcknowledgeMode _acknowledgeMode;
+ int _prefetch;
+ AMQConnection _connection;
+
+ public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgementMode, int prefetch)
+ {
+ _connection = connection;
+ _transacted = transacted;
+ _acknowledgeMode = acknowledgementMode;
+ _prefetch = prefetch;
+ }
+
+ protected override object operation()
+ {
+ ushort channelId = _connection.NextChannelId();
+
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the channel and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AmqChannel channel = new AmqChannel(_connection,
+ channelId, _transacted, _acknowledgeMode, _prefetch);
+ _connection.ProtocolSession.AddSessionByChannel(channelId, channel);
+ _connection.RegisterSession(channelId, channel);
+
+ bool success = false;
+ try
+ {
+ _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error creating channel: " + e, e);
+ }
+ finally
+ {
+ if (!success) {
+ _connection.ProtocolSession.RemoveSessionByChannel(channelId);
+ _connection.DeregisterSession(channelId);
+ }
+ }
+
+ if (_connection._started)
+ {
+ channel.Start();
+ }
+ return channel;
+ }
+ }
+
+ internal ushort NextChannelId()
+ {
+ return (ushort) Interlocked.Increment(ref _nextChannelId);
+ }
+
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode)
+ {
+ return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH);
+ }
+
+ public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch)
+ {
+ CheckNotClosed();
+ if (ChannelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_maximumChannelCount);
+ }
+ else
+ {
+ CreateChannelFailoverSupport operation =
+ new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch);
+ return (IChannel)operation.execute(this);
+ }
+ }
+
+ public void CloseSession(AmqChannel channel)
+ {
+ _protocolSession.CloseSession(channel);
+
+ AMQFrame frame = ChannelCloseBody.CreateAMQFrame(
+ channel.ChannelId, 200, "JMS client closing channel", 0, 0);
+
+ _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId);
+ _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody));
+ _log.Debug("Received channel close frame");
+ // When control resumes at this point, a reply will have been received that
+ // indicates the broker has closed the channel successfully
+ }
+
+ public ExceptionListenerDelegate ExceptionListener
+ {
+ get
+ {
+ CheckNotClosed();
+ return _exceptionListener;
+ }
+ set
+ {
+ CheckNotClosed();
+ _exceptionListener = value;
+ }
+ }
+
+ /// <summary>
+ /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread
+ /// and is not thread safe (which is legal according to the JMS specification).
+ /// @throws JMSException
+ /// </summary>
+ public void Start()
+ {
+ CheckNotClosed();
+ if (!_started)
+ {
+ foreach (DictionaryEntry lde in _sessions)
+ {
+ AmqChannel s = (AmqChannel)lde.Value;
+ s.Start();
+ }
+ _started = true;
+ }
+ }
+
+ public void Stop()
+ {
+ CheckNotClosed();
+ throw new NotImplementedException();
+ }
+
+ public IConnectionListener ConnectionListener
+ {
+ get { return _connectionListener; }
+ set { _connectionListener = value; }
+ }
+
+ #endregion
+
+ #region IDisposable Members
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #endregion
+
+ private bool ChannelLimitReached()
+ {
+ return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount;
+ }
+
+ /// <summary>
+ /// Close all the sessions, either due to normal connection closure or due to an error occurring.
+ /// @param cause if not null, the error that is causing this shutdown
+ /// </summary>
+ private void CloseAllSessions(Exception cause)
+ {
+ _log.Info("Closing all session in connection " + this);
+ ICollection sessions = new ArrayList(_sessions.Values);
+ foreach (AmqChannel channel in sessions)
+ {
+ _log.Info("Closing channel " + channel);
+ if (cause != null)
+ {
+ channel.Closed(cause);
+ }
+ else
+ {
+ try
+ {
+ channel.Close();
+ }
+ catch (QpidException e)
+ {
+ _log.Error("Error closing channel: " + e);
+ }
+ }
+ }
+ _log.Info("Done closing all sessions in connection " + this);
+ }
+
+ public int MaximumChannelCount
+ {
+ get
+ {
+ CheckNotClosed();
+ return _maximumChannelCount;
+ }
+ }
+
+ internal void SetMaximumChannelCount(ushort maximumChannelCount)
+ {
+ CheckNotClosed();
+ _maximumChannelCount = maximumChannelCount;
+ }
+
+ public uint MaximumFrameSize
+ {
+ get
+ {
+ return _maximumFrameSize;
+ }
+
+ set
+ {
+ _maximumFrameSize = value;
+ }
+ }
+
+ public IDictionary Sessions
+ {
+ get
+ {
+ return _sessions;
+ }
+ }
+
+ public string Host
+ {
+ get
+ {
+ return _failoverPolicy.GetCurrentBrokerInfo().getHost();
+ }
+ }
+
+ public int Port
+ {
+ get
+ {
+ return _failoverPolicy.GetCurrentBrokerInfo().getPort();
+ }
+ }
+
+ public string Username
+ {
+ get
+ {
+ return _connectionInfo.getUsername();
+ }
+ }
+
+ public string Password
+ {
+ get
+ {
+ return _connectionInfo.getPassword();
+ }
+ }
+
+ public string VirtualHost
+ {
+ get
+ {
+ return _connectionInfo.getVirtualHost();
+ }
+ }
+
+ /// <summary>
+ /// Invoked by the AMQProtocolSession when a protocol session exception has occurred.
+ /// This method sends the exception to a JMS exception listener, if configured, and
+ /// propagates the exception to sessions, which in turn will propagate to consumers.
+ /// This allows synchronous consumers to have exceptions thrown to them.
+ /// </summary>
+ /// <param name="cause">the exception</param>
+ public void ExceptionReceived(Exception cause)
+ {
+ if (_exceptionListener != null)
+ {
+ // Listener expects one of these...
+ QpidException xe;
+
+ if (cause is QpidException)
+ {
+ xe = (QpidException) cause;
+ }
+ else
+ {
+ xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause);
+ }
+ // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
+ // so that any generic client code that tries to close the connection will not mess up this error
+ // handling sequence
+ if (cause is IOException)
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+ _exceptionListener.Invoke(xe);
+ }
+ else
+ {
+ _log.Error("Connection exception: " + cause);
+ }
+
+ // An undelivered is not fatal to the connections usability.
+ if (!(cause is AMQUndeliveredException))
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ CloseAllSessions(cause);
+ }
+ else
+ {
+ ;
+ }
+ }
+
+ internal void RegisterSession(int channelId, AmqChannel channel)
+ {
+ _sessions[channelId] = channel;
+ }
+
+ internal void DeregisterSession(int channelId)
+ {
+ _sessions.Remove(channelId);
+ }
+
+ /**
+ * Fire the preFailover event to the registered connection listener (if any)
+ *
+ * @param redirect true if this is the result of a redirect request rather than a connection error
+ * @return true if no listener or listener does not veto change
+ */
+ public bool FirePreFailover(bool redirect)
+ {
+ bool proceed = true;
+ if (_connectionListener != null)
+ {
+ proceed = _connectionListener.PreFailover(redirect);
+ }
+ return proceed;
+ }
+
+ /**
+ * Fire the preResubscribe event to the registered connection listener (if any). If the listener
+ * vetoes resubscription then all the sessions are closed.
+ *
+ * @return true if no listener or listener does not veto resubscription.
+ * @throws JMSException
+ */
+ public bool FirePreResubscribe()
+ {
+ if (_connectionListener != null)
+ {
+ bool resubscribe = _connectionListener.PreResubscribe();
+ if (!resubscribe)
+ {
+ MarkAllSessionsClosed();
+ }
+ return resubscribe;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Marks all sessions and their children as closed without sending any protocol messages. Useful when
+ * you need to mark objects "visible" in userland as closed after failover or other significant event that
+ * impacts the connection.
+ * <p/>
+ * The caller must hold the failover mutex before calling this method.
+ */
+ private void MarkAllSessionsClosed()
+ {
+ //LinkedList sessionCopy = new LinkedList(_sessions.values());
+ ArrayList sessionCopy = new ArrayList(_sessions.Values);
+ foreach (AmqChannel session in sessionCopy)
+ {
+ session.MarkClosed();
+ }
+ _sessions.Clear();
+ }
+
+ /**
+ * Fires a failover complete event to the registered connection listener (if any).
+ */
+ public void FireFailoverComplete()
+ {
+ if (_connectionListener != null)
+ {
+ _connectionListener.FailoverComplete();
+ }
+ }
+
+ public bool AttemptReconnection(String host, int port, bool useSSL)
+ {
+ BrokerInfo bd = new AmqBrokerInfo("amqp", host, port, useSSL);
+
+ _failoverPolicy.setBroker(bd);
+
+ try
+ {
+ MakeBrokerConnection(bd);
+ return true;
+ }
+ catch (Exception e)
+ {
+ _log.Info("Unable to connect to broker at " + bd, e);
+ AttemptReconnection();
+ }
+ return false;
+ }
+
+ private void MakeBrokerConnection(BrokerInfo brokerDetail)
+ {
+ try
+ {
+ _stateManager = new AMQStateManager();
+ _protocolListener = new AMQProtocolListener(this, _stateManager);
+ _protocolListener.AddFrameListener(_stateManager);
+
+ // Currently there is only one transport option - BlockingSocket.
+ String assemblyName = "Qpid.Client.Transport.Socket.Blocking.dll";
+ String transportType = "Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport";
+
+ // Load the transport assembly dynamically.
+ _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType);
+
+ // Connect.
+ _transport.Open();
+ _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener);
+ _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this);
+ _protocolListener.ProtocolSession = _protocolSession;
+
+ // Now start the connection "handshake".
+ _transport.ProtocolWriter.Write(new ProtocolInitiation());
+
+ // Blocks until the connection has been opened.
+ _stateManager.AttainState(AMQState.CONNECTION_OPEN);
+
+ _failoverPolicy.attainedConnection();
+
+ // XXX: Again this should be changed to a suitable notify.
+ _connected = true;
+ }
+ catch (AMQException e)
+ {
+ _lastAMQException = e;
+ throw e;
+ }
+ }
+
+ public bool AttemptReconnection()
+ {
+ while (_failoverPolicy.FailoverAllowed())
+ {
+ try
+ {
+ MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo());
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (!(e is AMQException))
+ {
+ _log.Info("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e);
+ }
+ else
+ {
+ _log.Info(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo());
+ }
+ }
+ }
+
+ // Connection unsuccessful.
+ return false;
+ }
+
+ /**
+ * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+ * The caller must hold the failover mutex before calling this method.
+ */
+ public void ResubscribeSessions()
+ {
+ ArrayList sessions = new ArrayList(_sessions.Values);
+ _log.Info(String.Format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.Count));
+ foreach (AmqChannel s in sessions)
+ {
+ _protocolSession.AddSessionByChannel(s.ChannelId, s);
+ ReopenChannel(s.ChannelId, (ushort)s.DefaultPrefetch, s.Transacted);
+ s.Resubscribe();
+ }
+ }
+
+ private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted)
+ {
+ try
+ {
+ createChannelOverWire(channelId, prefetch, transacted);
+ }
+ catch (AMQException e)
+ {
+ _protocolSession.RemoveSessionByChannel(channelId);
+ DeregisterSession(channelId);
+ throw new AMQException("Error reopening channel " + channelId + " after failover: " + e);
+ }
+ }
+
+ void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted)
+ {
+ _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody));
+
+ // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We
+ // know this when we connection using AMQP 0.7
+ if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7)
+ {
+ // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
+ _protocolWriter.SyncWrite(
+ BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false),
+ typeof (BasicQosOkBody));
+ }
+
+
+ if (transacted)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Issuing TxSelect for " + channelId);
+ }
+ _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody));
+ }
+ }
+
+ public String toURL()
+ {
+ return _connectionInfo.asUrl();
+ }
+
+ class HeartBeatThread
+ {
+ int _heartbeatMillis;
+ IProtocolWriter _protocolWriter;
+ bool _run = true;
+
+ public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis)
+ {
+ _protocolWriter = protocolWriter;
+ _heartbeatMillis = heartbeatMillis;
+ }
+
+ public void Run()
+ {
+ while (_run)
+ {
+ Thread.Sleep(_heartbeatMillis);
+ if (!_run) break;
+ _log.Debug("Sending heartbeat");
+ // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker?
+ _protocolWriter.Write(HeartbeatBody.FRAME);
+ }
+ _log.Info("Heatbeat thread stopped");
+ }
+
+ public void Stop()
+ {
+ _run = false;
+ }
+ }
+
+ public void StartHeartBeatThread(int heartbeatSeconds)
+ {
+ _log.Info("Starting new heartbeat thread");
+ _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000);
+ _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run));
+ _heartBeatThread.Name = "HeartBeat";
+ _heartBeatThread.Start();
+ }
+
+ public void StopHeartBeatThread()
+ {
+ if (_heartBeatRunner != null)
+ {
+ _log.Info("Stopping old heartbeat thread");
+ _heartBeatRunner.Stop();
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AMQConnectionException.cs b/dotnet/Qpid.Client/Client/AMQConnectionException.cs
new file mode 100644
index 0000000000..603a7b2a1c
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AMQConnectionException.cs
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client
+{
+ public class AMQConnectionException : AMQException
+ {
+ public AMQConnectionException(String message, Exception e) : base(message, e)
+ {
+ }
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/AMQDestination.cs b/dotnet/Qpid.Client/Client/AMQDestination.cs
new file mode 100644
index 0000000000..7ea6db4ee8
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AMQDestination.cs
@@ -0,0 +1,233 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client
+{
+ public abstract class AMQDestination
+ {
+ protected readonly string _exchangeName;
+ protected readonly string _exchangeClass;
+ protected readonly string _destinationName;
+ protected readonly bool _isExclusive;
+ protected readonly bool _isAutoDelete;
+ protected bool _isDurable;
+
+ public bool IsDurable
+ {
+ get { return _isDurable; }
+ }
+
+ protected string _queueName;
+
+ protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, bool isExclusive,
+ bool isAutoDelete, String queueName)
+ {
+ // XXX: This is ugly - OnlyRequired because of ReplyToDestination.
+// if (destinationName == null)
+// {
+// throw new ArgumentNullException("destinationName");
+// }
+
+ // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter.
+// if (exchangeName == null)
+// {
+// throw new ArgumentNullException("exchangeName");
+// }
+
+ // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter.
+// if (exchangeClass == null)
+// {
+// throw new ArgumentNullException("exchangeClass");
+// }
+
+ _exchangeName = exchangeName;
+ _exchangeClass = exchangeClass;
+ _destinationName = destinationName;
+ _isExclusive = isExclusive;
+ _isAutoDelete = isAutoDelete;
+ _queueName = queueName;
+ }
+
+ public string Name
+ {
+ get
+ {
+ return _destinationName;
+ }
+ }
+
+ public abstract string RoutingKey
+ {
+ get;
+ }
+
+ public abstract string EncodedName
+ {
+ get;
+ }
+
+ public bool AutoDelete
+ {
+ get
+ {
+ return _isAutoDelete;
+ }
+ }
+
+ public string QueueName
+ {
+ get
+ {
+ return _queueName;
+ }
+ set
+ {
+ _queueName = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get
+ {
+ return _exchangeName;
+ }
+ }
+
+ public string ExchangeClass
+ {
+ get
+ {
+ return _exchangeClass;
+ }
+ }
+
+ public bool IsExclusive
+ {
+ get
+ {
+ return _isExclusive;
+ }
+ }
+
+ public bool IsAutoDelete
+ {
+ get
+ {
+ return _isAutoDelete;
+ }
+ }
+
+ public override string ToString()
+ {
+ return "Destination: " + _destinationName + ", " +
+ "Queue Name: " + _queueName + ", Exchange: " + _exchangeName +
+ ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive +
+ ", AutoDelete: " + _isAutoDelete; // +", Routing Key: " + RoutingKey;
+ }
+
+ public override bool Equals(object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || GetType() != o.GetType())
+ {
+ return false;
+ }
+
+ AMQDestination that = (AMQDestination) o;
+
+ if (!StringsNotEqualNullSafe(_destinationName, that._destinationName))
+ {
+ return false;
+ }
+ if (!StringsNotEqualNullSafe(_exchangeClass, that._exchangeClass))
+ {
+ return false;
+ }
+ if (!StringsNotEqualNullSafe(_exchangeName, that._exchangeName))
+ {
+ return false;
+ }
+ if (!StringsNotEqualNullSafe(_queueName, that._queueName))
+ {
+ return false;
+ }
+ if (_isExclusive != that._isExclusive)
+ {
+ return false;
+ }
+ if (_isAutoDelete != that._isAutoDelete)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private bool StringsNotEqualNullSafe(string one, string two)
+ {
+ if ((one == null && two != null) ||
+ (one != null && !one.Equals(two)))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ public override int GetHashCode()
+ {
+ int result;
+ if (_exchangeName == null)
+ {
+ result = "".GetHashCode();
+ }
+ else
+ {
+ result = _exchangeName.GetHashCode();
+ }
+ if (_exchangeClass != null)
+ {
+ result = 29 * result + _exchangeClass.GetHashCode();
+ }
+ if (_destinationName != null)
+ {
+ result = 29 * result + _destinationName.GetHashCode();
+ }
+ if (_queueName != null)
+ {
+ result = 29 * result + _queueName.GetHashCode();
+ }
+ result = result * (_isExclusive ? 13 : 7);
+ result = result * (_isAutoDelete ? 13 : 7);
+
+ Console.WriteLine("FIXME HashCode for " + this + " = " + result);
+ return result;
+ }
+
+ public abstract bool IsNameRequired { get; }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
new file mode 100644
index 0000000000..c00a427494
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
@@ -0,0 +1,313 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using Qpid.Client.qms;
+
+namespace Qpid.Client
+{
+ public class AmqBrokerInfo : BrokerInfo
+ {
+ private string _host = "localhost";
+ private int _port = 5672;
+ private string _transport = "amqp";
+
+ public readonly string URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\""+BrokerDetailsConstants.DEFAULT_PORT+"\">][?<option>='<value>'[,<option>='<value>']]";
+
+ public const long DEFAULT_CONNECT_TIMEOUT = 30000L;
+
+ private Hashtable _options = new Hashtable();
+
+ public AmqBrokerInfo()
+ {
+ }
+
+ // TODO: port URL parsing.
+ public AmqBrokerInfo(string url)
+ {
+ throw new NotImplementedException();
+// this();
+// // URL should be of format tcp://host:port?option='value',option='value'
+// try
+// {
+// URI connection = new URI(url);
+//
+// string transport = connection.getScheme();
+//
+// // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+// if (transport != null)
+// {
+// //todo this list of valid transports should be enumerated somewhere
+// if ((!(transport.equalsIgnoreCase("vm") ||
+// transport.equalsIgnoreCase("tcp"))))
+// {
+// if (transport.equalsIgnoreCase("localhost"))
+// {
+// connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+// transport = connection.getScheme();
+// }
+// else
+// {
+// if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' )
+// {
+// //Then most likely we have a host:port value
+// connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+// transport = connection.getScheme();
+// }
+// else
+// {
+// URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+// }
+// }
+// }
+// }
+// else
+// {
+// //Default the transport
+// connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+// transport = connection.getScheme();
+// }
+//
+// if (transport == null)
+// {
+// URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
+// " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+// }
+//
+// setTransport(transport);
+//
+// string host = connection.getHost();
+//
+// // Fix for Java 1.5
+// if (host == null)
+// {
+// host = "";
+// }
+//
+// setHost(host);
+//
+// int port = connection.getPort();
+//
+// if (port == -1)
+// {
+// // Another fix for Java 1.5 URI handling
+// string auth = connection.getAuthority();
+//
+// if (auth != null && auth.startsWith(":"))
+// {
+// setPort(Integer.parseInt(auth.substring(1)));
+// }
+// else
+// {
+// setPort(DEFAULT_PORT);
+// }
+// }
+// else
+// {
+// setPort(port);
+// }
+//
+// string querystring = connection.getQuery();
+//
+// URLHelper.parseOptions(_options, querystring);
+//
+// //Fragment is #string (not used)
+// }
+// catch (URISyntaxException uris)
+// {
+// if (uris instanceof URLSyntaxException)
+// {
+// throw (URLSyntaxException) uris;
+// }
+//
+// URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+// }
+ }
+
+ public AmqBrokerInfo(string transport, string host, int port, bool useSSL) : this()
+ {
+ _transport = transport;
+ _host = host;
+ _port = port;
+
+ if (useSSL)
+ {
+ setOption(BrokerDetailsConstants.OPTIONS_SSL, "true");
+ }
+ }
+
+ public string getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(string _host)
+ {
+ this._host = _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public string getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(string _transport)
+ {
+ this._transport = _transport;
+ }
+
+ public string getOption(string key)
+ {
+ return (string)_options[key];
+ }
+
+ public void setOption(string key, string value)
+ {
+ _options[key] = value;
+ }
+
+ public long getTimeout()
+ {
+ if (_options.ContainsKey(BrokerDetailsConstants.OPTIONS_CONNECT_TIMEOUT))
+ {
+ try
+ {
+ return long.Parse((string)_options[BrokerDetailsConstants.OPTIONS_CONNECT_TIMEOUT]);
+ }
+ catch (FormatException nfe)
+ {
+ //Do nothing as we will use the default below.
+ }
+ }
+
+ return BrokerDetailsConstants.DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ setOption(BrokerDetailsConstants.OPTIONS_CONNECT_TIMEOUT, timeout.ToString());
+ }
+
+ public override string ToString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.Append(_transport);
+ sb.Append("://");
+
+ if (!(_transport.ToLower().Equals("vm")))
+ {
+ sb.Append(_host);
+ }
+
+ sb.Append(':');
+ sb.Append(_port);
+
+ // XXX
+// sb.Append(printOptionsURL());
+
+ return sb.ToString();
+ }
+
+ public override bool Equals(object o)
+ {
+ if (!(o is BrokerInfo))
+ {
+ return false;
+ }
+
+ BrokerInfo bd = (BrokerInfo) o;
+
+ return StringEqualsIgnoreCase(_host, bd.getHost()) &&
+ (_port == bd.getPort()) &&
+ StringEqualsIgnoreCase(_transport, bd.getTransport()) &&
+ (useSSL() == bd.useSSL());
+
+ //todo do we need to compare all the options as well?
+ }
+
+ // TODO: move to util class.
+ private bool StringEqualsIgnoreCase(string one, string two)
+ {
+ return one.ToLower().Equals(two.ToLower());
+ }
+
+// private string printOptionsURL()
+// {
+// stringBuffer optionsURL = new stringBuffer();
+//
+// optionsURL.Append('?');
+//
+// if (!(_options.isEmpty()))
+// {
+//
+// for (string key : _options.keySet())
+// {
+// optionsURL.Append(key);
+//
+// optionsURL.Append("='");
+//
+// optionsURL.Append(_options.get(key));
+//
+// optionsURL.Append("'");
+//
+// optionsURL.Append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+// }
+// }
+//
+// //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+// optionsURL.deleteCharAt(optionsURL.length() - 1);
+//
+// return optionsURL.tostring();
+// }
+
+ public bool useSSL()
+ {
+ // To be friendly to users we should be case insensitive.
+ // or simply force users to conform to OPTIONS_SSL
+ // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+
+ if (_options.ContainsKey(BrokerDetailsConstants.OPTIONS_SSL))
+ {
+ return StringEqualsIgnoreCase((string)_options[BrokerDetailsConstants.OPTIONS_SSL], "true");
+ }
+
+ return false;
+ }
+
+ public void useSSL(bool ssl)
+ {
+ setOption(BrokerDetailsConstants.OPTIONS_SSL, ssl.ToString());
+ }
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs
new file mode 100644
index 0000000000..02818940dd
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/AmqChannel.cs
@@ -0,0 +1,1071 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text.RegularExpressions;
+using System.Threading;
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Collections;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client
+{
+ public class AmqChannel : Closeable, IChannel
+ {
+ private const int BASIC_CONTENT_TYPE = 60;
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof (AmqChannel));
+
+ private static int _nextSessionNumber = 0;
+
+ private int _sessionNumber;
+
+ // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
+ private int _nextConsumerNumber = 1;
+
+ internal const int DEFAULT_PREFETCH = 5000;
+
+ private AMQConnection _connection;
+
+ private bool _transacted;
+
+ private AcknowledgeMode _acknowledgeMode;
+
+ private ushort _channelId;
+
+ private int _defaultPrefetch = DEFAULT_PREFETCH;
+
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+
+ private Dispatcher _dispatcher;
+
+ private MessageFactoryRegistry _messageFactoryRegistry;
+
+ /// <summary>
+ /// Set of all producers created by this session
+ /// </summary>
+ private Hashtable _producers = Hashtable.Synchronized(new Hashtable());
+
+ /// <summary>
+ /// Maps from consumer tag to JMSMessageConsumer instance
+ /// </summary>
+ private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
+
+ /// <summary>
+ /// The counter of the _next producer id. This id is generated by the session and used only to allow the
+ /// producer to identify itself to the session when deregistering itself.
+ ///
+ /// Access to this id does not require to be synchronized since according to the JMS specification only one
+ /// thread of control is allowed to create producers for any given session instance.
+ /// </summary>
+ private long _nextProducerId;
+
+ /// <summary>
+ /// Responsible for decoding a message fragment and passing it to the appropriate message consumer.
+ /// </summary>
+ private class Dispatcher
+ {
+ private int _stopped = 0;
+
+ private AmqChannel _containingChannel;
+
+ public Dispatcher(AmqChannel containingChannel)
+ {
+ _containingChannel = containingChannel;
+ }
+
+ /// <summary>
+ /// Runs the dispatcher. This is intended to be Run in a separate thread.
+ /// </summary>
+ public void RunDispatcher()
+ {
+ UnprocessedMessage message;
+
+ while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null)
+ {
+ //_queue.size()
+ DispatchMessage(message);
+ }
+
+ _logger.Info("Dispatcher thread terminating for channel " + _containingChannel._channelId);
+ }
+
+// private void DispatchMessage(UnprocessedMessage message)
+ private void DispatchMessage(UnprocessedMessage message)
+ {
+ if (message.DeliverBody != null)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
+
+ if (consumer == null)
+ {
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+ }
+ else
+ {
+ consumer.NotifyMessage(message, _containingChannel.AcknowledgeMode, _containingChannel.ChannelId);
+ }
+ }
+ else
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
+ CreateMessage(0, false, message.ContentHeader, message.Bodies);
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ public void StopDispatcher()
+ {
+ Interlocked.Exchange(ref _stopped, 1);
+ }
+ }
+
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) :
+ this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AmqChannel"/> class.
+ /// </summary>
+ /// <param name="con">The con.</param>
+ /// <param name="channelId">The channel id.</param>
+ /// <param name="transacted">if set to <c>true</c> [transacted].</param>
+ /// <param name="acknowledgeMode">The acknowledge mode.</param>
+ /// <param name="messageFactoryRegistry">The message factory registry.</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
+ {
+ _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+ _connection = con;
+ _transacted = transacted;
+ if (transacted)
+ {
+ _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+ }
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
+ _channelId = channelId;
+ _messageFactoryRegistry = messageFactoryRegistry;
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+ try
+ {
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public IMessage CreateMessage()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+ try
+ {
+ // TODO: this is supposed to create a message consisting only of message headers
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+
+ try
+ {
+ return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ CheckNotClosed();
+ try
+ {
+ ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
+ msg.Text = text;
+ return msg;
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Unable to create message: " + e);
+ }
+ }
+ }
+
+ public bool Transacted
+ {
+ get
+ {
+ CheckNotClosed();
+ return _transacted;
+ }
+ }
+
+ public AcknowledgeMode AcknowledgeMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _acknowledgeMode;
+ }
+ }
+
+ public void Commit()
+ {
+ CheckNotClosed();
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+
+ /*Channel.Commit frame = new Channel.Commit();
+ frame.channelId = _channelId;
+ frame.confirmTag = 1;*/
+
+ // try
+ // {
+ // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId));
+ // }
+ // catch (AMQException e)
+ // {
+ // throw new JMSException("Error creating session: " + e);
+ // }
+ throw new NotImplementedException();
+ //_logger.Info("Transaction commited on channel " + _channelId);
+ }
+
+ public void Rollback()
+ {
+ CheckNotClosed();
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+
+ /*Channel.Rollback frame = new Channel.Rollback();
+ frame.channelId = _channelId;
+ frame.confirmTag = 1;*/
+
+ // try
+ // {
+ // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId));
+ // }
+ // catch (AMQException e)
+ // {
+ // throw new JMSException("Error rolling back session: " + e);
+ // }
+ throw new NotImplementedException();
+ //_logger.Info("Transaction rolled back on channel " + _channelId);
+ }
+
+ public override void Close()
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session
+
+ lock (_closingLock)
+ {
+ SetClosed();
+
+ // we pass null since this is not an error case
+ CloseProducersAndConsumers(null);
+
+ try
+ {
+ _connection.CloseSession(this);
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error closing session: " + e);
+ }
+ finally
+ {
+ _connection.DeregisterSession(_channelId);
+ }
+ }
+ }
+ }
+
+ private void SetClosed()
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+
+ /// <summary>
+ /// Close all producers or consumers. This is called either in the error case or when closing the session normally.
+ /// <param name="amqe">the exception, may be null to indicate no error has occurred</param>
+ ///
+ private void CloseProducersAndConsumers(AMQException amqe)
+ {
+ try
+ {
+ CloseProducers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ try
+ {
+ CloseConsumers(amqe);
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ }
+
+ /**
+ * Called when the server initiates the closure of the session
+ * unilaterally.
+ * @param e the exception that caused this session to be closed. Null causes the
+ */
+ public void Closed(Exception e)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ SetClosed();
+ AMQException amqe;
+ if (e is AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
+ _connection.DeregisterSession(_channelId);
+ CloseProducersAndConsumers(amqe);
+ }
+ }
+
+ /// <summary>
+ /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
+ /// currently no way of propagating errors to message producers (this is a JMS limitation).
+ /// </summary>
+ private void CloseProducers()
+ {
+ _logger.Info("Closing producers on session " + this);
+ // we need to clone the list of producers since the close() method updates the _producers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedProducers = new ArrayList(_producers.Values);
+
+ foreach (BasicMessageProducer prod in clonedProducers)
+ {
+ _logger.Info("Closing producer " + prod);
+ prod.Close();
+ }
+ // at this point the _producers map is empty
+ }
+
+ /// <summary>
+ /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
+ /// <param name="error">not null if this is a result of an error occurring at the connection level</param>
+ ///
+ private void CloseConsumers(Exception error)
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedConsumers = new ArrayList(_consumers.Values);
+
+ foreach (BasicMessageConsumer con in clonedConsumers)
+ {
+ if (error != null)
+ {
+ con.NotifyError(error);
+ }
+ else
+ {
+ con.Close();
+ }
+ }
+ // at this point the _consumers map will be empty
+ }
+
+ public void Recover()
+ {
+ CheckNotClosed();
+ CheckNotTransacted(); // throws IllegalOperationException if not a transacted session
+
+ // TODO: This cannot be implemented using 0.8 semantics
+ throw new NotImplementedException();
+ }
+
+ public void Run()
+ {
+ throw new NotImplementedException();
+ }
+
+// public IMessagePublisher CreatePublisher(string exchangeName, string exchangeClass)
+// {
+// return CreatePublisher(exchangeClass, exchangeName, null);
+// }
+
+// public IMessagePublisher CreatePublisher(string exchangeName, string exchangeClass, string routingKey)
+// {
+// return CreatePublisherBuilder().withExchangeName(exchangeName)
+// .withRoutingKey(routingKey).Create();
+// }
+
+ public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
+ exchangeName, "none", routingKey));
+ return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
+ timeToLive, immediate, mandatory, priority);
+ }
+
+ // TODO: Create a producer that doesn't require an IDestination.
+// private IMessagePublisher CreateProducerImpl(IDestination destination)
+// {
+// lock (_closingLock)
+// {
+// CheckNotClosed();
+//
+// AMQDestination amqd = (AMQDestination)destination;
+//
+// try
+// {
+// return new BasicMessageProducer(amqd, _transacted, _channelId,
+// this, GetNextProducerId());
+// }
+// catch (AMQException e)
+// {
+// _logger.Error("Error creating message producer: " + e, e);
+// throw new QpidException("Error creating message producer", e);
+// }
+// }
+// }
+
+ public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
+ DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ lock (_closingLock)
+ {
+ CheckNotClosed();
+
+ try
+ {
+ return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId,
+ this, GetNextProducerId(),
+ deliveryMode, timeToLive, immediate, mandatory, priority);
+ }
+ catch (AMQException e)
+ {
+ _logger.Error("Error creating message producer: " + e, e);
+ throw new QpidException("Error creating message producer", e);
+ }
+ }
+ }
+
+ public IMessageConsumer CreateConsumer(string queueName,
+ int prefetch,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
+ {
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}",
+ queueName, prefetch, noLocal, exclusive, durable, subscriptionName));
+ return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName);
+ }
+
+ private IMessageConsumer CreateConsumerImpl(string queueName,
+ int prefetch,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
+ {
+
+ if (durable || subscriptionName != null)
+ {
+ throw new NotImplementedException(); // TODO: durable subscriptions.
+ }
+
+ lock (_closingLock)
+ {
+ CheckNotClosed();
+
+ BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal,
+ _messageFactoryRegistry, this);
+ try
+ {
+ RegisterConsumer(consumer);
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Error registering consumer: " + e, e);
+ }
+
+ return consumer;
+ }
+ }
+
+// public IDestination CreateQueue(string queueName)
+// {
+// return new AMQQueue(queueName);
+// }
+//
+// public IDestination CreateTopic(String topicName)
+// {
+// return new AMQTopic(topicName);
+// }
+
+ public IFieldTable CreateFieldTable()
+ {
+ return new FieldTable();
+ }
+
+// public IDestination CreateTemporaryQueue()
+// {
+// return new AMQQueue("TempQueue" + DateTime.Now.Ticks.ToString(), true);
+//
+//// return new AMQTemporaryQueue(); // XXX: port AMQTemporaryQueue and AMQQueue changes.
+// }
+
+// public IDestination CreateTemporaryTopic()
+// {
+// throw new NotImplementedException(); // FIXME
+// }
+
+ public void Unsubscribe(String name)
+ {
+ throw new NotImplementedException(); // FIXME
+ }
+
+ private void CheckTransacted()
+ {
+ if (!Transacted)
+ {
+ throw new InvalidOperationException("Channel is not transacted");
+ }
+ }
+
+ private void CheckNotTransacted()
+ {
+ if (Transacted)
+ {
+ throw new InvalidOperationException("Channel is transacted");
+ }
+ }
+
+ public void MessageReceived(UnprocessedMessage message)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Message received in session with channel id " + _channelId);
+ }
+ _queue.EnqueueBlocking(message);
+ }
+
+ public int DefaultPrefetch
+ {
+ get
+ {
+ return _defaultPrefetch;
+ }
+ set
+ {
+ _defaultPrefetch = value;
+ }
+ }
+
+ public ushort ChannelId
+ {
+ get
+ {
+ return _channelId;
+ }
+ }
+
+ public AMQConnection Connection
+ {
+ get
+ {
+ return _connection;
+ }
+ }
+
+ /// <summary>
+ /// Send an acknowledgement for all messages up to a specified number on this session.
+ /// <param name="messageNbr">the message number up to an including which all messages will be acknowledged.</param>
+ /// </summary>
+ public void SendAcknowledgement(ulong messageNbr)
+ {
+ /*if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Channel Ack being sent for channel id " + _channelId + " and message number " + messageNbr);
+ }*/
+ /*Channel.Ack frame = new Channel.Ack();
+ frame.channelId = _channelId;
+ frame.messageNbr = messageNbr;
+ _connection.getProtocolHandler().writeFrame(frame);*/
+ }
+
+ internal void Start()
+ {
+ _dispatcher = new Dispatcher(this);
+ Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher));
+ dispatcherThread.IsBackground = true;
+ dispatcherThread.Start();
+ }
+
+ internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer)
+ {
+ _consumers[consumerTag] = consumer;
+ }
+
+ /// <summary>
+ /// Called by the MessageConsumer when closing, to deregister the consumer from the
+ /// map from consumerTag to consumer instance.
+ /// </summary>
+ /// <param name="consumerTag">the consumer tag, that was broker-generated</param>
+ internal void DeregisterConsumer(string consumerTag)
+ {
+ _consumers.Remove(consumerTag);
+ }
+
+ internal void RegisterProducer(long producerId, IMessagePublisher publisher)
+ {
+ _producers[producerId] = publisher;
+ }
+
+ internal void DeregisterProducer(long producerId)
+ {
+ _producers.Remove(producerId);
+ }
+
+ private long GetNextProducerId()
+ {
+ return ++_nextProducerId;
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ /**
+ * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
+ * failover when the client has veoted resubscription.
+ *
+ * The caller of this method must already hold the failover mutex.
+ */
+ internal void MarkClosed()
+ {
+ SetClosed();
+ _connection.DeregisterSession(_channelId);
+ MarkClosedProducersAndConsumers();
+ }
+
+ private void MarkClosedProducersAndConsumers()
+ {
+ try
+ {
+ // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
+ CloseProducers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ try
+ {
+ MarkClosedConsumers();
+ }
+ catch (QpidException e)
+ {
+ _logger.Error("Error closing session: " + e, e);
+ }
+ }
+
+ private void MarkClosedConsumers()
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.StopDispatcher();
+ }
+ // we need to clone the list of consumers since the close() method updates the _consumers collection
+ // which would result in a concurrent modification exception
+ ArrayList clonedConsumers = new ArrayList(_consumers.Values);
+
+ foreach (BasicMessageConsumer consumer in clonedConsumers)
+ {
+ consumer.MarkClosed();
+ }
+ // at this point the _consumers map will be empty
+ }
+
+ /**
+ * Resubscribes all producers and consumers. This is called when performing failover.
+ * @throws AMQException
+ */
+ internal void Resubscribe()
+ {
+ ResubscribeProducers();
+ ResubscribeConsumers();
+ }
+
+ private void ResubscribeProducers()
+ {
+ // FIXME: This needs to Replay DeclareExchange method calls.
+
+// ArrayList producers = new ArrayList(_producers.Values);
+// _logger.Debug(String.Format("Resubscribing producers = {0} producers.size={1}", producers, producers.Count));
+// foreach (BasicMessageProducer producer in producers)
+// {
+// producer.Resubscribe();
+// }
+ }
+
+ private void ResubscribeConsumers()
+ {
+ ArrayList consumers = new ArrayList(_consumers.Values);
+ _consumers.Clear();
+
+ foreach (BasicMessageConsumer consumer in consumers)
+ {
+ RegisterConsumer(consumer);
+ }
+ }
+
+ /// <summary>
+ /// Callers must hold the failover mutex before calling this method.
+ /// </summary>
+ /// <param name="consumer"></param>
+ void RegisterConsumer(BasicMessageConsumer consumer)
+ {
+ String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal,
+ consumer.Exclusive, consumer.AcknowledgeMode);
+
+ consumer.ConsumerTag = consumerTag;
+ _consumers.Add(consumerTag, consumer);
+ }
+
+ public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
+ {
+ DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
+ }
+
+ public void Bind(string queueName, string exchangeName, string routingKey)
+ {
+ DoBind(queueName, exchangeName, routingKey, new FieldTable());
+ }
+
+ internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
+ {
+ _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
+ queueName, exchangeName, routingKey, args));
+
+ AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
+ queueName, exchangeName,
+ routingKey, true, args);
+ _connection.ProtocolWriter.Write(queueBind);
+ }
+
+
+// /**
+// * Declare the queue.
+// * @param amqd
+// * @param protocolHandler
+// * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
+// * @throws AMQException
+// */
+// private String DeclareQueue(AMQDestination amqd)
+// {
+// // For queues (but not topics) we generate the name in the client rather than the
+// // server. This allows the name to be reused on failover if required. In general,
+// // the destination indicates whether it wants a name generated or not.
+// if (amqd.IsNameRequired)
+// {
+// amqd.QueueName = GenerateUniqueName();
+// }
+//
+// return DoDeclareQueue(amqd);
+// }
+
+ private String ConsumeFromQueue(String queueName, int prefetch,
+ bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode)
+ {
+ // Need to generate a consumer tag on the client so we can exploit the nowait flag.
+ String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
+
+ AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
+ queueName, tag, noLocal,
+ acknowledgeMode == AcknowledgeMode.NoAcknowledge,
+ exclusive, true);
+
+ _connection.ProtocolWriter.Write(basicConsume);
+ return tag;
+ }
+
+ public void DeleteExchange(string exchangeName)
+ {
+ throw new NotImplementedException(); // FIXME
+ }
+
+ public void DeleteQueue()
+ {
+ throw new NotImplementedException(); // FIXME
+ }
+
+ public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
+ {
+ return new MessageConsumerBuilder(this, queueName);
+ }
+
+ public MessagePublisherBuilder CreatePublisherBuilder()
+ {
+ return new MessagePublisherBuilder(this);
+ }
+
+// public void Publish(string exchangeName, string routingKey, bool mandatory, bool immediate,
+// IMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
+// bool disableTimestamps)
+// {
+// lock (Connection.FailoverMutex)
+// {
+// DoBasicPublish(exchangeName, routingKey, mandatory, immediate, (AbstractQmsMessage)message, deliveryMode, timeToLive, priority, disableTimestamps);
+// }
+// }
+
+ internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate,
+ AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive,
+ bool disableTimestamps)
+ {
+ lock (Connection.FailoverMutex)
+ {
+ DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps);
+ }
+ }
+
+ private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps)
+ {
+ AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName,
+ routingKey, mandatory, immediate);
+
+ long currentTime = 0;
+ if (!disableTimestamps)
+ {
+ currentTime = DateTime.UtcNow.Ticks;
+ message.Timestamp = currentTime;
+ }
+ byte[] payload = message.Data;
+ BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties;
+
+ if (timeToLive > 0)
+ {
+ if (!disableTimestamps)
+ {
+ contentHeaderProperties.Expiration = (uint)currentTime + timeToLive;
+ }
+ }
+ else
+ {
+ contentHeaderProperties.Expiration = 0;
+ }
+ contentHeaderProperties.SetDeliveryMode(deliveryMode);
+ contentHeaderProperties.Priority = (byte)priority;
+
+ ContentBody[] contentBodies = CreateContentBodies(payload);
+ AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length];
+ for (int i = 0; i < contentBodies.Length; i++)
+ {
+ frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]);
+ }
+ if (contentBodies.Length > 0 && _logger.IsDebugEnabled)
+ {
+ _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ // weight argument of zero indicates no child content headers, just bodies
+ AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties,
+ (uint)payload.Length);
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey));
+ }
+
+ frames[0] = publishFrame;
+ frames[1] = contentHeaderFrame;
+ CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+ Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame);
+ }
+
+ /// <summary>
+ /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+ /// maximum frame size.
+ /// </summary>
+ /// <param name="payload"></param>
+ /// <returns>return the array of content bodies</returns>
+ private ContentBody[] CreateContentBodies(byte[] payload)
+ {
+ if (payload == null)
+ {
+ return null;
+ }
+ else if (payload.Length == 0)
+ {
+ return new ContentBody[0];
+ }
+ // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+ // (0xCE byte).
+ long framePayloadMax = Connection.MaximumFrameSize - 1;
+ int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0;
+ int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame;
+ ContentBody[] bodies = new ContentBody[frameCount];
+
+ if (frameCount == 1)
+ {
+ bodies[0] = new ContentBody();
+ bodies[0].Payload = payload;
+ }
+ else
+ {
+ long remaining = payload.Length;
+ for (int i = 0; i < bodies.Length; i++)
+ {
+ bodies[i] = new ContentBody();
+ byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining];
+ Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length);
+ bodies[i].Payload = framePayload;
+ remaining -= framePayload.Length;
+ }
+ }
+ return bodies;
+ }
+
+ public string GenerateUniqueName()
+ {
+ string result = _connection.ProtocolSession.GenerateQueueName();
+ return Regex.Replace(result, "[^a-z0-9_]", "_");
+ }
+
+ public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
+ {
+ DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete);
+ }
+
+ private string DoDeclareQueue(AMQDestination amqd)
+ {
+ string queueName = amqd.QueueName;
+ bool isDurable = amqd.IsDurable;
+ bool isExclusive = amqd.IsExclusive;
+
+ DoQueueDeclare(queueName, isDurable, isExclusive, amqd.AutoDelete);
+
+ _logger.Debug("returning amqp.QueueName = " + amqd.QueueName);
+ return amqd.QueueName;
+ }
+
+ private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
+ {
+ _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}",
+ queueName, isDurable, isExclusive, isAutoDelete));
+
+ AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName,
+ false, isDurable, isExclusive,
+ isAutoDelete, true, null);
+
+ _connection.ProtocolWriter.Write(queueDeclare);
+ }
+
+ public void DeclareExchange(String exchangeName, String exchangeClass)
+ {
+ _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
+
+ DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
+ }
+
+ // AMQP-level method.
+ private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName,
+ string exchangeClass, bool passive, bool durable,
+ bool autoDelete, bool xinternal, bool noWait, FieldTable args)
+ {
+ _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}",
+ _channelId, exchangeName, exchangeClass));
+
+ AMQFrame exchangeDeclareFrame = ExchangeDeclareBody.CreateAMQFrame(
+ channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args);
+
+// Console.WriteLine(string.Format("XXX AMQP:DeclareExchange frame=[{0}]", exchangeDeclareFrame));
+
+ // FIXME: Probably need to record the exchangeDeclareBody for later replay.
+ ExchangeDeclareBody exchangeDeclareBody = (ExchangeDeclareBody)exchangeDeclareFrame.BodyFrame;
+// Console.WriteLine(string.Format("XXX AMQP:DeclareExchangeBody=[{0}]", exchangeDeclareBody));
+ if (exchangeDeclareBody.Nowait)
+ {
+ _connection.ProtocolWriter.Write(exchangeDeclareFrame);
+ }
+ else
+ {
+ _connection.ConvenientProtocolWriter.SyncWrite(exchangeDeclareFrame, typeof (ExchangeDeclareOkBody));
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
new file mode 100644
index 0000000000..6d2f1c67b6
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
@@ -0,0 +1,404 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Collections;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client
+{
+ public class BasicMessageConsumer : Closeable, IMessageConsumer
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageConsumer));
+
+ private bool _noLocal;
+
+ /// We need to store the "raw" field table so that we can resubscribe in the event of failover being required.
+// private FieldTable _rawSelectorFieldTable;
+//
+// public FieldTable RawSelectorFieldTable
+// {
+// get { return _rawSelectorFieldTable; }
+// }
+
+ /**
+ * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private bool _exclusive;
+
+ public bool Exclusive
+ {
+ get { return _exclusive; }
+ }
+
+ public bool NoLocal
+ {
+ get { return _noLocal; }
+ set { _noLocal = value; }
+ }
+
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge;
+
+ public AcknowledgeMode AcknowledgeMode
+ {
+ get { return _acknowledgeMode; }
+ }
+
+ private MessageReceivedDelegate _messageListener;
+
+ /// <summary>
+ /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the
+ /// broker
+ /// </summary>
+ private string _consumerTag;
+
+ /// <summary>
+ /// We need to know the channel id when constructing frames
+ /// </summary>
+ private ushort _channelId;
+
+ private readonly string _queueName;
+
+ /// <summary>
+ /// Protects the setting of a messageListener
+ /// </summary>
+ private readonly object _syncLock = new object();
+
+ /**
+ * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover
+ */
+ private int _prefetch;
+
+ /// <summary>
+ /// When true indicates that either a message listener is set or that
+ /// a blocking receive call is in progress
+ /// </summary>
+ private bool _receiving;
+
+ /// <summary>
+ /// Used in the blocking receive methods to receive a message from
+ /// the Channel thread. Argument true indicates we want strict FIFO semantics
+ /// </summary>
+ private readonly SynchronousQueue _synchronousQueue = new SynchronousQueue(true);
+
+ private MessageFactoryRegistry _messageFactory;
+
+ private AmqChannel _channel;
+
+ public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal,
+ MessageFactoryRegistry messageFactory, AmqChannel channel)
+ {
+ _channelId = channelId;
+ _queueName = queueName;
+ _noLocal = noLocal;
+ _messageFactory = messageFactory;
+ _channel = channel;
+ }
+
+ #region IMessageConsumer Members
+
+ public MessageReceivedDelegate OnMessage
+ {
+ get
+ {
+ return _messageListener;
+ }
+ set
+ {
+ CheckNotClosed();
+
+ lock (_syncLock)
+ {
+ // If someone is already receiving
+ if (_messageListener != null && _receiving)
+ {
+ throw new InvalidOperationException("Another thread is already receiving...");
+ }
+
+ _messageListener = value;
+
+ _receiving = (_messageListener != null);
+
+ if (_receiving)
+ {
+ _logger.Debug("Message listener set for queue with name " + _queueName);
+ }
+ }
+ }
+ }
+
+ public IMessage Receive(long delay)
+ {
+ CheckNotClosed();
+
+ lock (_syncLock)
+ {
+ // If someone is already receiving
+ if (_receiving)
+ {
+ throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)...");
+ }
+
+ _receiving = true;
+ }
+
+ try
+ {
+ object o;
+ if (delay > 0)
+ {
+ //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS);
+ throw new NotImplementedException("Need to implement synchronousQueue.Poll(timeout");
+ }
+ else
+ {
+ o = _synchronousQueue.DequeueBlocking();
+ }
+ return ReturnMessageOrThrow(o);
+ }
+ finally
+ {
+ lock (_syncLock)
+ {
+ _receiving = false;
+ }
+ }
+ }
+
+ public IMessage Receive()
+ {
+ return Receive(0);
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ CheckNotClosed();
+
+ lock (_syncLock)
+ {
+ // If someone is already receiving
+ if (_receiving)
+ {
+ throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)...");
+ }
+
+ _receiving = true;
+ }
+
+ try
+ {
+ object o = _synchronousQueue.Dequeue();
+ return ReturnMessageOrThrow(o);
+ }
+ finally
+ {
+ lock (_syncLock)
+ {
+ _receiving = false;
+ }
+ }
+ }
+
+ #endregion
+
+ /// <summary>
+ /// We can get back either a Message or an exception from the queue. This method examines the argument and deals
+ /// with it by throwing it (if an exception) or returning it (in any other case).
+ /// </summary>
+ /// <param name="o">the object off the queue</param>
+ /// <returns> a message only if o is a Message</returns>
+ /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not
+ /// a QpidMessagingException is created with the linked exception set appropriately</exception>
+ private IMessage ReturnMessageOrThrow(object o)
+ {
+ // errors are passed via the queue too since there is no way of interrupting the poll() via the API.
+ if (o is Exception)
+ {
+ Exception e = (Exception) o;
+ throw new QpidException("Message consumer forcibly closed due to error: " + e, e);
+ }
+ else
+ {
+ return (IMessage) o;
+ }
+ }
+
+ #region IDisposable Members
+
+ public void Dispose()
+ {
+ Close();
+ }
+
+ #endregion
+
+ public override void Close()
+ {
+ lock (_channel.Connection.FailoverMutex)
+ {
+ lock (_closingLock)
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+
+ AMQFrame cancelFrame = BasicCancelBody.CreateAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _channel.Connection.ConvenientProtocolWriter.SyncWrite(
+ cancelFrame, typeof(BasicCancelOkBody));
+ }
+ catch (AMQException e)
+ {
+ _logger.Error("Error closing consumer: " + e, e);
+ throw new QpidException("Error closing consumer: " + e);
+ }
+ finally
+ {
+ DeregisterConsumer();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case
+ /// of a message listener or a synchronous receive() caller.
+ /// </summary>
+ /// <param name="messageFrame">the raw unprocessed mesage</param>
+ /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param>
+ /// <param name="channelId">channel on which this message was sent</param>
+ internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag);
+ }
+ try
+ {
+ AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag,
+ messageFrame.DeliverBody.Redelivered,
+ messageFrame.ContentHeader,
+ messageFrame.Bodies);
+
+ /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
+ {
+ _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
+ }*/
+ if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ jmsMessage.Channel = _channel;
+ }
+
+ lock (_syncLock)
+ {
+ if (_messageListener != null)
+ {
+ _messageListener.Invoke(jmsMessage);
+ }
+ else
+ {
+ _synchronousQueue.Enqueue(jmsMessage);
+ }
+ }
+ if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+ {
+ _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Caught exception (dump follows) - ignoring...", e);
+ }
+ }
+
+ internal void NotifyError(Exception cause)
+ {
+ lock (_syncLock)
+ {
+ SetClosed();
+
+ // we have no way of propagating the exception to a message listener - a JMS limitation - so we
+ // deal with the case where we have a synchronous receive() waiting for a message to arrive
+ if (_messageListener == null)
+ {
+ // offer only succeeds if there is a thread waiting for an item from the queue
+ if (_synchronousQueue.EnqueueNoThrow(cause))
+ {
+ _logger.Debug("Passed exception to synchronous queue for propagation to receive()");
+ }
+ }
+ DeregisterConsumer();
+ }
+ }
+
+ private void SetClosed()
+ {
+ Interlocked.Exchange(ref _closed, CLOSED);
+ }
+
+ /// <summary>
+ /// Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean
+ /// case and in the case of an error occurring.
+ /// </summary>
+ internal void DeregisterConsumer()
+ {
+ _channel.DeregisterConsumer(_consumerTag);
+ }
+
+ public string ConsumerTag
+ {
+ get
+ {
+ return _consumerTag;
+ }
+ set
+ {
+ _consumerTag = value;
+ }
+ }
+
+ /**
+ * Called when you need to invalidate a consumer. Used for example when failover has occurred and the
+ * client has vetoed automatic resubscription.
+ * The caller must hold the failover mutex.
+ */
+ internal void MarkClosed()
+ {
+ SetClosed();
+ DeregisterConsumer();
+ }
+
+ public int Prefetch
+ {
+ get { return _prefetch; }
+ }
+
+ public string QueueName
+ {
+ get { return _queueName; }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
new file mode 100644
index 0000000000..a93d2db675
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs
@@ -0,0 +1,275 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Messaging;
+
+namespace Qpid.Client
+{
+ public class BasicMessageProducer : Closeable, IMessagePublisher
+ {
+ protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer));
+
+ /// <summary>
+ /// If true, messages will not get a timestamp.
+ /// </summary>
+ private bool _disableTimestamps;
+
+ /// <summary>
+ /// Priority of messages created by this producer.
+ /// </summary>
+ private int _messagePriority;
+
+ /// <summary>
+ /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
+ ///
+ private long _timeToLive;
+
+ /// <summary>
+ /// Delivery mode used for this producer.
+ /// </summary>
+ private DeliveryMode _deliveryMode;
+
+ private bool _immediate;
+ private bool _mandatory;
+
+ string _exchangeName;
+ string _routingKey;
+
+ /// <summary>
+ /// Default encoding used for messages produced by this producer.
+ /// </summary>
+ private string _encoding;
+
+ /// <summary>
+ /// Default encoding used for message produced by this producer.
+ /// </summary>
+ private string _mimeType;
+
+ /// <summary>
+ /// True if this producer was created from a transacted session
+ /// </summary>
+ private bool _transacted;
+
+ private ushort _channelId;
+
+ /// <summary>
+ /// This is an id generated by the session and is used to tie individual producers to the session. This means we
+ /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers
+ /// to the session so that when an error is propagated to the session it can close the producer (meaning that
+ /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
+ /// </summary>
+ private long _producerId;
+
+ /// <summary>
+ /// The session used to create this producer
+ /// </summary>
+ private AmqChannel _channel;
+
+ /// <summary>
+ /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue
+ /// </summary>
+ protected const bool DEFAULT_IMMEDIATE = false;
+
+ /// <summary>
+ /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is
+ /// connected to the exchange for the message
+ /// </summary>
+ protected const bool DEFAULT_MANDATORY = true;
+
+ public BasicMessageProducer(string exchangeName, string routingKey,
+ bool transacted,
+ ushort channelId,
+ AmqChannel channel,
+ long producerId,
+ DeliveryMode deliveryMode,
+ long timeToLive,
+ bool immediate,
+ bool mandatory,
+ int priority)
+ {
+ _exchangeName = exchangeName;
+ _routingKey = routingKey;
+ _transacted = transacted;
+ _channelId = channelId;
+ _channel = channel;
+ _producerId = producerId;
+ _deliveryMode = deliveryMode;
+ _timeToLive = timeToLive;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _messagePriority = priority;
+
+ _channel.RegisterProducer(producerId, this);
+ }
+
+
+ #region IMessagePublisher Members
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _deliveryMode;
+ }
+ set
+ {
+ CheckNotClosed();
+ _deliveryMode = value;
+ }
+ }
+
+ public string ExchangeName
+ {
+ get { return _exchangeName; }
+ }
+
+ public string RoutingKey
+ {
+ get { return _routingKey; }
+ }
+
+ public bool DisableMessageID
+ {
+ get
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ set
+ {
+ throw new Exception("The method or operation is not implemented.");
+ }
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get
+ {
+ CheckNotClosed();
+ return _disableTimestamps;
+ }
+ set
+ {
+ CheckNotClosed();
+ _disableTimestamps = value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ CheckNotClosed();
+ return _messagePriority;
+ }
+ set
+ {
+ CheckNotClosed();
+ if (value < 0 || value > 9)
+ {
+ throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9");
+ }
+ _messagePriority = value;
+ }
+ }
+
+ public override void Close()
+ {
+ _logger.Info("Closing producer " + this);
+ Interlocked.Exchange(ref _closed, CLOSED);
+ _channel.DeregisterProducer(_producerId);
+ }
+
+ public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY,
+ DEFAULT_IMMEDIATE);
+ }
+
+ public void Send(IMessage msg)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ }
+
+ // This is a short-term hack (knowing that this code will be re-vamped sometime soon)
+ // to facilitate publishing messages to potentially non-existent recipients.
+ public void Send(IMessage msg, bool mandatory)
+ {
+ CheckNotClosed();
+ SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive,
+ mandatory, DEFAULT_IMMEDIATE);
+ }
+
+ public long TimeToLive
+ {
+ get
+ {
+ CheckNotClosed();
+ return _timeToLive;
+ }
+ set
+ {
+ CheckNotClosed();
+ if (value < 0)
+ {
+ throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value);
+ }
+ _timeToLive = value;
+ }
+ }
+
+ #endregion
+
+ private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate)
+ {
+ _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps);
+ }
+
+ public string MimeType
+ {
+ set
+ {
+ CheckNotClosed();
+ _mimeType = value;
+ }
+ }
+
+ public string Encoding
+ {
+ set
+ {
+ CheckNotClosed();
+ _encoding = value;
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Closeable.cs b/dotnet/Qpid.Client/Client/Closeable.cs
new file mode 100644
index 0000000000..159f71ac08
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Closeable.cs
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client
+{
+ public abstract class Closeable
+ {
+ /// <summary>
+ /// Used to ensure orderly closing of the object. The only method that is allowed to be called
+ /// from another thread of control is close().
+ /// </summary>
+ protected readonly object _closingLock = new object();
+
+ /// <summary>
+ /// All access to this field should be using the Inerlocked class, to make it atomic.
+ /// Hence it is an int since you cannot use a bool with the Interlocked class.
+ /// </summary>
+ protected int _closed = NOT_CLOSED;
+
+ protected const int CLOSED = 1;
+ protected const int NOT_CLOSED = 2;
+
+ /// <summary>
+ /// Checks the not closed.
+ /// </summary>
+ protected void CheckNotClosed()
+ {
+ if (_closed == CLOSED)
+ {
+ throw new InvalidOperationException("Object " + ToString() + " has been closed");
+ }
+ }
+
+ /// <summary>
+ /// Gets a value indicating whether this <see cref="Closeable"/> is closed.
+ /// </summary>
+ /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value>
+ public bool Closed
+ {
+ get
+ {
+ return _closed == CLOSED;
+ }
+ }
+
+ /// <summary>
+ /// Close the resource
+ /// </summary>
+ /// <exception cref="QpidMessagingException">If something goes wrong</exception>
+ public abstract void Close();
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs b/dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs
new file mode 100644
index 0000000000..43e9a819e4
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs
@@ -0,0 +1,214 @@
+using System;
+using System.Collections;
+
+namespace Qpid.Collections
+{
+ public class LinkedHashtable : DictionaryBase
+ {
+ /// <summary>
+ /// Maps from key to LinkedDictionaryEntry
+ /// </summary>
+ private Hashtable _indexedValues = new Hashtable();
+
+ private LinkedDictionaryEntry _head;
+
+ private LinkedDictionaryEntry _tail;
+
+ public class LinkedDictionaryEntry
+ {
+ public LinkedDictionaryEntry previous;
+ public LinkedDictionaryEntry next;
+ public object key;
+ public object value;
+
+ public LinkedDictionaryEntry(object key, object value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
+ public object this[object index]
+ {
+ get
+ {
+ return ((LinkedDictionaryEntry)_indexedValues[index]).value;
+ }
+
+ set
+ {
+ Dictionary[index] = value;
+ }
+ }
+
+ protected override void OnInsertComplete(object key, object value)
+ {
+ LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value);
+ if (_head == null)
+ {
+ _head = de;
+ _tail = de;
+ }
+ else
+ {
+ _tail.next = de;
+ de.previous = _tail;
+ _tail = de;
+ }
+ _indexedValues[key] = de;
+ }
+
+ protected override void OnSetComplete(object key, object oldValue, object newValue)
+ {
+ if (oldValue == null)
+ {
+ OnInsertComplete(key, newValue);
+ }
+ }
+
+ protected override void OnRemoveComplete(object key, object value)
+ {
+ LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key];
+ LinkedDictionaryEntry prev = de.previous;
+ if (prev == null)
+ {
+ _head = de.next;
+ }
+ else
+ {
+ prev.next = de.next;
+ }
+
+ LinkedDictionaryEntry next = de.next;
+ if (next == null)
+ {
+ _tail = de;
+ }
+ else
+ {
+ next.previous = de.previous;
+ }
+ }
+
+ public ICollection Values
+ {
+ get
+ {
+ return InnerHashtable.Values;
+ }
+ }
+
+ public bool Contains(object key)
+ {
+ return InnerHashtable.Contains(key);
+ }
+
+ public void Remove(object key)
+ {
+ Dictionary.Remove(key);
+ }
+
+ public LinkedDictionaryEntry Head
+ {
+ get
+ {
+ return _head;
+ }
+ }
+
+ public LinkedDictionaryEntry Tail
+ {
+ get
+ {
+ return _tail;
+ }
+ }
+
+ private class LHTEnumerator : IEnumerator
+ {
+ private LinkedHashtable _container;
+
+ private LinkedDictionaryEntry _current;
+
+ /// <summary>
+ /// Set once we have navigated off the end of the collection
+ /// </summary>
+ private bool _needsReset = false;
+
+ public LHTEnumerator(LinkedHashtable container)
+ {
+ _container = container;
+ }
+
+ public object Current
+ {
+ get
+ {
+ if (_current == null)
+ {
+ throw new Exception("Iterator before first element");
+ }
+ else
+ {
+ return _current;
+ }
+ }
+ }
+
+ public bool MoveNext()
+ {
+ if (_needsReset)
+ {
+ return false;
+ }
+ else if (_current == null)
+ {
+ _current = _container.Head;
+ }
+ else
+ {
+ _current = _current.next;
+ }
+ _needsReset = (_current == null);
+ return !_needsReset;
+ }
+
+ public void Reset()
+ {
+ _current = null;
+ _needsReset = false;
+ }
+ }
+
+ public new IEnumerator GetEnumerator()
+ {
+ return new LHTEnumerator(this);
+ }
+
+ public void MoveToHead(object key)
+ {
+ LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key];
+ if (de == null)
+ {
+ throw new ArgumentException("Key " + key + " not found");
+ }
+ // if the head is the element then there is nothing to do
+ if (_head == de)
+ {
+ return;
+ }
+ de.previous.next = de.next;
+ if (de.next != null)
+ {
+ de.next.previous = de.previous;
+ }
+ else
+ {
+ _tail = de.previous;
+ }
+ de.next = _head;
+ _head = de;
+ de.previous = null;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs b/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs
new file mode 100644
index 0000000000..20f158f0ea
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client
+{
+ public class ConnectionTuneParameters
+ {
+ private uint _frameMax;
+
+ private ushort _channelMax;
+
+ private uint _hearbeat;
+
+ private uint _txnLimit;
+
+ public uint FrameMax
+ {
+ get
+ {
+ return _frameMax;
+ }
+ set
+ {
+ _frameMax = value;
+ }
+ }
+
+ public ushort ChannelMax
+ {
+ get
+ {
+ return _channelMax;
+ }
+ set
+ {
+ _channelMax = value;
+ }
+ }
+
+ public uint Heartbeat
+ {
+ get
+ {
+ return _hearbeat;
+ }
+ set
+ {
+ _hearbeat = value;
+ }
+ }
+
+ public uint TxnLimit
+ {
+ get
+ {
+ return _txnLimit;
+ }
+ set
+ {
+ _txnLimit = value;
+ }
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverException.cs b/dotnet/Qpid.Client/Client/Failover/FailoverException.cs
new file mode 100644
index 0000000000..b13b28a66b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Failover/FailoverException.cs
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.Failover
+{
+ /// <summary>
+ /// This exception is thrown when failover is taking place and we need to let other
+ /// parts of the client know about this.
+ /// </summary>
+ class FailoverException : Exception
+ {
+ public FailoverException(String message) : base(message)
+ {
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
new file mode 100644
index 0000000000..89a9809253
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+
+namespace Qpid.Client.Failover
+{
+ public class FailoverHandler
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverHandler));
+
+ private AMQConnection _connection;
+
+ /**
+ * Used where forcing the failover host
+ */
+ private String _host;
+
+ /**
+ * Used where forcing the failover port
+ */
+ private int _port;
+
+ public FailoverHandler(AMQConnection connection)
+ {
+ _connection = connection;
+ }
+
+ public void Run()
+ {
+ if (Thread.CurrentThread.IsBackground)
+ {
+ throw new InvalidOperationException("FailoverHandler must Run on a non-background thread.");
+ }
+
+ AMQProtocolListener pl = _connection.ProtocolListener;
+ pl.FailoverLatch = new ManualResetEvent(false);
+
+ // We wake up listeners. If they can handle failover, they will extend the
+ // FailoverSupport class and will in turn block on the latch until failover
+ // has completed before retrying the operation
+ _connection.ProtocolListener.PropagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+
+ // Since failover impacts several structures we protect them all with a single mutex. These structures
+ // are also in child objects of the connection. This allows us to manipulate them without affecting
+ // client code which runs in a separate thread.
+ lock (_connection.FailoverMutex)
+ {
+ _log.Info("Starting failover process");
+
+ // We switch in a new state manager temporarily so that the interaction to get to the "connection open"
+ // state works, without us having to terminate any existing "state waiters". We could theoretically
+ // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
+ // a slightly more complex state model therefore I felt it was worthwhile doing this.
+ AMQStateManager existingStateManager = _connection.ProtocolListener.StateManager;
+ _connection.ProtocolListener.StateManager = new AMQStateManager();
+ if (!_connection.FirePreFailover(_host != null))
+ {
+ _connection.ProtocolListener.StateManager = existingStateManager;
+ if (_host != null)
+ {
+ _connection.ExceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client"));
+ }
+ else
+ {
+ _connection.ExceptionReceived(new AMQDisconnectedException("Failover was vetoed by client"));
+ }
+ pl.FailoverLatch.Set();
+ pl.FailoverLatch = null;
+ return;
+ }
+ bool failoverSucceeded;
+ // when host is non null we have a specified failover host otherwise we all the client to cycle through
+ // all specified hosts
+
+ // if _host has value then we are performing a redirect.
+ if (_host != null)
+ {
+ failoverSucceeded = _connection.AttemptReconnection(_host, _port, false);
+ }
+ else
+ {
+ failoverSucceeded = _connection.AttemptReconnection();
+ }
+
+ // XXX: at this point it appears that we are going to set StateManager to existingStateManager in
+ // XXX: both paths of control.
+ if (!failoverSucceeded)
+ {
+ _connection.ProtocolListener.StateManager = existingStateManager;
+ _connection.ExceptionReceived(
+ new AMQDisconnectedException("Server closed connection and no failover " +
+ "was successful"));
+ }
+ else
+ {
+ _connection.ProtocolListener.StateManager = existingStateManager;
+ try
+ {
+ if (_connection.FirePreResubscribe())
+ {
+ _log.Info("Resubscribing on new connection");
+ _connection.ResubscribeSessions();
+ }
+ else
+ {
+ _log.Info("Client vetoed automatic resubscription");
+ }
+ _connection.FireFailoverComplete();
+ _connection.ProtocolListener.FailoverState = FailoverState.NOT_STARTED;
+ _log.Info("Connection failover completed successfully");
+ }
+ catch (Exception e)
+ {
+ _log.Info("Failover process failed - exception being propagated by protocol handler");
+ _connection.ProtocolListener.FailoverState = FailoverState.FAILED;
+ try
+ {
+ _connection.ProtocolListener.OnException(e);
+ }
+ catch (Exception ex)
+ {
+ _log.Error("Error notifying protocol session of error: " + ex, ex);
+ }
+ }
+ }
+ }
+ pl.FailoverLatch.Set();
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(String host)
+ {
+ _host = host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int port)
+ {
+ _port = port;
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverState.cs b/dotnet/Qpid.Client/Client/Failover/FailoverState.cs
new file mode 100644
index 0000000000..04322eeed4
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Failover/FailoverState.cs
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client.Failover
+{
+ /// <summary>
+ /// Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be
+ /// dealt with and can happen during failover.
+ /// </summary>
+ enum FailoverState
+ {
+ NOT_STARTED, IN_PROGRESS, FAILED
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs b/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs
new file mode 100644
index 0000000000..591c0b1d4f
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+
+namespace Qpid.Client.Failover
+{
+ public abstract class FailoverSupport
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverSupport));
+
+ public object execute(AMQConnection con)
+ {
+ // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding.
+ // Any method that can potentially block for any reason should use this class so that deadlock will not
+ // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners)
+ // that might be causing a block. When that happens, the exception is caught here and the mutex is released
+ // before waiting for the failover to complete (either successfully or unsuccessfully).
+ while (true)
+ {
+ con.ProtocolListener.BlockUntilNotFailingOver();
+ lock (con.FailoverMutex)
+ {
+ try
+ {
+ return operation();
+ }
+ catch (FailoverException e)
+ {
+ _log.Info("Failover exception caught during operation", e);
+ }
+ }
+ }
+ }
+
+ protected abstract object operation();
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs
new file mode 100644
index 0000000000..d6e196c8dd
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class BasicDeliverMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicDeliverMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ UnprocessedMessage msg = new UnprocessedMessage();
+ msg.DeliverBody = (BasicDeliverBody) evt.Method;
+ msg.ChannelId = evt.ChannelId;
+ _logger.Debug("New JmsDeliver method received");
+ evt.ProtocolSession.UnprocessedMessageReceived(msg);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
new file mode 100644
index 0000000000..78526f906f
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class BasicReturnMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicReturnMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("New JmsBounce method received");
+ UnprocessedMessage msg = new UnprocessedMessage();
+ msg.DeliverBody = null;
+ msg.BounceBody = (BasicReturnBody) evt.Method;
+ msg.ChannelId = evt.ChannelId;
+
+ evt.ProtocolSession.UnprocessedMessageReceived(msg);
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
new file mode 100644
index 0000000000..1031f804a6
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class ChannelCloseMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ChannelCloseMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ChannelClose method received");
+ ChannelCloseBody method = (ChannelCloseBody) evt.Method;
+
+ int errorCode = method.ReplyCode;
+ string reason = method.ReplyText;
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Channel close reply code: " + errorCode + ", reason: " + reason);
+ }
+
+ AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
+ evt.ProtocolSession.WriteFrame(frame);
+ //if (errorCode != AMQConstant.REPLY_SUCCESS.getCode())
+ // HACK
+ if (errorCode != 200)
+ {
+ _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
+ evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason));
+ }
+ evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs
new file mode 100644
index 0000000000..c3acc0b098
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionCloseMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ConnectionClose frame received");
+ ConnectionCloseBody method = (ConnectionCloseBody) evt.Method;
+
+ int errorCode = method.ReplyCode;
+ String reason = method.ReplyText;
+
+ evt.ProtocolSession.WriteFrame(ConnectionCloseOkBody.CreateAMQFrame(evt.ChannelId));
+ stateManager.ChangeState(AMQState.CONNECTION_CLOSED);
+ if (errorCode != 200)
+ {
+ _logger.Debug("Connection close received with error code " + errorCode);
+ throw new AMQConnectionClosedException(errorCode, "Error: " + reason);
+ }
+
+ // this actually closes the connection in the case where it is not an error.
+ evt.ProtocolSession.CloseProtocolSession();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs
new file mode 100644
index 0000000000..0cd60457ea
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionCloseOkHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseOkHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ConnectionCloseOk frame received");
+ ConnectionCloseOkBody method = (ConnectionCloseOkBody)evt.Method;
+ stateManager.ChangeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs
new file mode 100644
index 0000000000..b43e2700f6
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionOpenOkMethodHandler : IStateAwareMethodListener
+ {
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ stateManager.ChangeState(AMQState.CONNECTION_OPEN);
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs
new file mode 100644
index 0000000000..4437290f5c
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionRedirectMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionRedirectMethodHandler));
+
+ private const int DEFAULT_REDIRECT_PORT = 5672;
+
+ private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler();
+
+ public static ConnectionRedirectMethodHandler GetInstance()
+ {
+ return _handler;
+ }
+
+ private ConnectionRedirectMethodHandler()
+ {
+ }
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ /*_logger.Info("ConnectionRedirect frame received");
+ ConnectionRedirectBody method = (ConnectionRedirectBody) evt.Method;
+
+ // the host is in the form hostname:port with the port being optional
+ int portIndex = method.Host.IndexOf(':');
+ String host;
+ int port;
+ if (portIndex == -1)
+ {
+ host = method.Host;
+ port = DEFAULT_REDIRECT_PORT;
+ }
+ else
+ {
+ host = method.Host.Substring(0, portIndex);
+ port = Int32.Parse(method.Host.Substring(portIndex + 1));
+ }
+ evt.ProtocolSession.Failover(host, port);*/
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs
new file mode 100644
index 0000000000..7c0fbd8f40
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionSecureMethodHandler : IStateAwareMethodListener
+ {
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ AMQFrame response = ConnectionSecureOkBody.CreateAMQFrame(evt.ChannelId, null);
+ evt.ProtocolSession.WriteFrame(response);
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs
new file mode 100644
index 0000000000..2bba8662bb
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionStartMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(ConnectionStartMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ ConnectionStartBody body = (ConnectionStartBody) evt.Method;
+ AMQProtocolSession ps = evt.ProtocolSession;
+ string username = ps.Username;
+ string password = ps.Password;
+
+ try
+ {
+ if (body.Mechanisms == null)
+ {
+ throw new AMQException("mechanism not specified in ConnectionStart method frame");
+ }
+ string allMechanisms = Encoding.ASCII.GetString(body.Mechanisms);
+ string[] mechanisms = allMechanisms.Split(' ');
+ string selectedMechanism = null;
+ foreach (string mechanism in mechanisms)
+ {
+ if (mechanism.Equals("PLAIN"))
+ {
+ selectedMechanism = mechanism;
+ break;
+ }
+ }
+
+ if (selectedMechanism == null)
+ {
+ throw new AMQException("No supported security mechanism found, passed: " + mechanisms);
+ }
+
+ // we always write out a null authzid which we don't currently use
+ byte[] plainData = new byte[1 + ps.Username.Length + 1 + ps.Password.Length];
+ Encoding.UTF8.GetBytes(username, 0, username.Length, plainData, 1);
+ Encoding.UTF8.GetBytes(password, 0, password.Length, plainData, username.Length + 2);
+ if (body.Locales == null)
+ {
+ throw new AMQException("Locales is not defined in Connection Start method");
+ }
+ string allLocales = Encoding.ASCII.GetString(body.Locales);
+ string[] locales = allLocales.Split(new char[] { ' ' });
+ string selectedLocale;
+ if (locales != null && locales.Length > 0)
+ {
+ selectedLocale = locales[0];
+ }
+ else
+ {
+ throw new AMQException("No locales sent from server, passed: " + locales);
+ }
+
+ stateManager.ChangeState(AMQState.CONNECTION_NOT_TUNED);
+ FieldTable clientProperties = new FieldTable();
+ clientProperties["product"] = "Qpid.NET";
+ clientProperties["version"] = "1.0";
+ clientProperties["platform"] = GetFullSystemInfo();
+ AMQFrame frame = ConnectionStartOkBody.CreateAMQFrame(evt.ChannelId, clientProperties, selectedMechanism,
+ plainData, selectedLocale);
+ ps.WriteFrame(frame);
+ }
+ catch (Exception e)
+ {
+ throw new AMQException(_log, "Unable to decode data: " + e, e);
+ }
+ }
+
+ private string GetFullSystemInfo()
+ {
+ /*StringBuffer fullSystemInfo = new StringBuffer();
+ fullSystemInfo.append(System.getProperty("java.runtime.name"));
+ fullSystemInfo.append(", " + System.getProperty("java.runtime.version"));
+ fullSystemInfo.append(", " + System.getProperty("java.vendor"));
+ fullSystemInfo.append(", " + System.getProperty("os.arch"));
+ fullSystemInfo.append(", " + System.getProperty("os.name"));
+ fullSystemInfo.append(", " + System.getProperty("os.version"));
+ fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level"));*/
+ // TODO: add in details here
+ return ".NET 1.1 Client";
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
new file mode 100644
index 0000000000..8b276c09e9
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class ConnectionTuneMethodHandler : IStateAwareMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionTuneMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ _logger.Debug("ConnectionTune frame received");
+ ConnectionTuneBody frame = (ConnectionTuneBody) evt.Method;
+ AMQProtocolSession session = evt.ProtocolSession;
+
+ ConnectionTuneParameters parameters = session.ConnectionTuneParameters;
+ if (parameters == null)
+ {
+ parameters = new ConnectionTuneParameters();
+ }
+
+ _logger.Info(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat));
+
+ parameters.FrameMax = frame.FrameMax;
+ parameters.FrameMax = 65535;
+ //params.setChannelMax(frame.channelMax);
+ parameters.Heartbeat = frame.Heartbeat;
+ session.ConnectionTuneParameters = parameters;
+
+ stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED);
+ session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame(
+ evt.ChannelId, frame.ChannelMax, 65535, frame.Heartbeat));
+ session.WriteFrame(ConnectionOpenBody.CreateAMQFrame(
+ evt.ChannelId, session.AMQConnection.VirtualHost, null, true));
+
+ if (frame.Heartbeat > 0)
+ {
+ evt.ProtocolSession.AMQConnection.StartHeartBeatThread(frame.Heartbeat);
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
new file mode 100644
index 0000000000..eb34fa45db
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class AMQMessage
+ {
+ protected IContentHeaderProperties _contentHeaderProperties;
+
+ /// <summary>
+ /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required
+ /// </summary>
+ protected AmqChannel _channel;
+
+ public AMQMessage(IContentHeaderProperties properties)
+ {
+ _contentHeaderProperties = properties;
+ }
+
+ public AmqChannel Channel
+ {
+ get
+ {
+ return _channel;
+ }
+
+ set
+ {
+ _channel = value;
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
new file mode 100644
index 0000000000..e485bb6d34
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public abstract class AbstractQmsMessageFactory : IMessageFactory
+ {
+ public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies)
+ {
+ AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies);
+ msg.Redelivered = redelivered;
+ return msg;
+ }
+
+ public abstract AbstractQmsMessage CreateMessage();
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="messageNbr"></param>
+ /// <param name="contentHeader"></param>
+ /// <param name="bodies"></param>
+ /// <returns></returns>
+ /// <exception cref="AMQException"></exception>
+ protected abstract AbstractQmsMessage CreateMessageWithBody(ulong messageNbr,
+ ContentHeaderBody contentHeader,
+ IList bodies);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
new file mode 100644
index 0000000000..c84e9de1b9
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
@@ -0,0 +1,800 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using log4net;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class SendOnlyDestination : AMQDestination
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(string));
+
+ public SendOnlyDestination(string exchangeName, string routingKey)
+ : base(exchangeName, null, null, false, false, routingKey)
+ {
+ _log.Debug(
+ string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}",
+ exchangeName, routingKey));
+ }
+
+ public override string EncodedName
+ {
+ get { return ExchangeName + ":" + QueueName; }
+ }
+
+ public override string RoutingKey
+ {
+ get { return QueueName; }
+ }
+
+ public override bool IsNameRequired
+ {
+ get { throw new NotImplementedException(); }
+ }
+ }
+
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage));
+
+ protected ulong _messageNbr;
+
+ protected bool _redelivered;
+
+ protected AbstractQmsMessage() : base(new BasicContentHeaderProperties())
+ {
+ }
+
+ protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader)
+ : this(contentHeader)
+ {
+ _messageNbr = messageNbr;
+ }
+
+ protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader)
+ : base(contentHeader)
+ {
+ }
+
+ public string MessageId
+ {
+ get
+ {
+ if (ContentHeaderProperties.MessageId == null)
+ {
+ ContentHeaderProperties.MessageId = "ID:" + _messageNbr;
+ }
+ return ContentHeaderProperties.MessageId;
+ }
+ set
+ {
+ ContentHeaderProperties.MessageId = value;
+ }
+ }
+
+ public long Timestamp
+ {
+ get
+ {
+ // TODO: look at ulong/long choice
+ return (long) ContentHeaderProperties.Timestamp;
+ }
+ set
+ {
+ ContentHeaderProperties.Timestamp = (ulong) value;
+ }
+ }
+
+ public byte[] CorrelationIdAsBytes
+ {
+ get
+ {
+ return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId);
+ }
+ set
+ {
+ ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value);
+ }
+ }
+
+ public string CorrelationId
+ {
+ get
+ {
+ return ContentHeaderProperties.CorrelationId;
+ }
+ set
+ {
+ ContentHeaderProperties.ContentType = value;
+ }
+ }
+
+ struct Dest
+ {
+ public string ExchangeName;
+ public string RoutingKey;
+
+ public Dest(string exchangeName, string routingKey)
+ {
+ ExchangeName = exchangeName;
+ RoutingKey = routingKey;
+ }
+ }
+
+ public string ReplyToExchangeName
+ {
+ get
+ {
+ Dest dest = ReadReplyToHeader();
+ return dest.ExchangeName;
+ }
+ set
+ {
+ Dest dest = ReadReplyToHeader();
+ dest.ExchangeName = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+
+ public string ReplyToRoutingKey
+ {
+ get
+ {
+ Dest dest = ReadReplyToHeader();
+ return dest.RoutingKey;
+ }
+ set
+ {
+ Dest dest = ReadReplyToHeader();
+ dest.RoutingKey = value;
+ WriteReplyToHeader(dest);
+ }
+ }
+
+ private Dest ReadReplyToHeader()
+ {
+ string replyToEncoding = ContentHeaderProperties.ReplyTo;
+ if (replyToEncoding == null)
+ {
+ return new Dest();
+ }
+ else
+ {
+ string routingKey;
+ string exchangeName = GetExchangeName(replyToEncoding, out routingKey);
+ return new Dest(exchangeName, routingKey);
+ }
+ }
+
+ private void WriteReplyToHeader(Dest dest)
+ {
+ string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
+ ContentHeaderProperties.ReplyTo = encodedDestination;
+ }
+
+ private static string GetExchangeName(string replyToEncoding, out string routingKey)
+ {
+ string[] split = replyToEncoding.Split(new char[':']);
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug(string.Format("replyToEncoding = '{0}'", replyToEncoding));
+ _log.Debug(string.Format("split = {0}", split));
+ _log.Debug(string.Format("split.Length = {0}", split.Length));
+ }
+ if (split.Length == 1)
+ {
+ // Using an alternative split implementation here since it appears that string.Split
+ // is broken in .NET. It doesn't split when the first character is the delimiter.
+ // Here we check for the first character being the delimiter. This handles the case
+ // where ExchangeName is empty (i.e. sends will be to the default exchange).
+ if (replyToEncoding[0] == ':')
+ {
+ split = new string[2];
+ split[0] = null;
+ split[1] = replyToEncoding.Substring(1);
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Alternative split method...");
+ _log.Debug(string.Format("split = {0}", split));
+ _log.Debug(string.Format("split.Length = {0}", split.Length));
+ }
+ }
+ }
+ if (split.Length != 2)
+ {
+ throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
+ }
+
+ string exchangeName = split[0];
+ routingKey = split[1];
+ return exchangeName;
+ }
+
+ public DeliveryMode DeliveryMode
+ {
+ get
+ {
+ byte b = ContentHeaderProperties.DeliveryMode;
+ switch (b)
+ {
+ case 1:
+ return DeliveryMode.NonPersistent;
+ case 2:
+ return DeliveryMode.Persistent;
+ default:
+ throw new QpidException("Illegal value for delivery mode in content header properties");
+ }
+ }
+ set
+ {
+ ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
+ }
+ }
+
+ public bool Redelivered
+ {
+ get
+ {
+ return _redelivered;
+ }
+ set
+ {
+ _redelivered = value;
+ }
+ }
+
+ public string Type
+ {
+ get
+ {
+ return MimeType;
+ }
+ set
+ {
+ //MimeType = value;
+ }
+ }
+
+ public long Expiration
+ {
+ get
+ {
+ return ContentHeaderProperties.Expiration;
+ }
+ set
+ {
+ ContentHeaderProperties.Expiration = (uint) value;
+ }
+ }
+
+ public int Priority
+ {
+ get
+ {
+ return ContentHeaderProperties.Priority;
+ }
+ set
+ {
+ ContentHeaderProperties.Priority = (byte) value;
+ }
+ }
+
+ // FIXME: implement
+ public string ContentType
+ {
+ get { throw new NotImplementedException(); }
+ set { throw new NotImplementedException(); }
+ }
+
+ // FIXME: implement
+ public string ContentEncoding
+ {
+ get { throw new NotImplementedException(); }
+ set { throw new NotImplementedException(); }
+ }
+
+ public void Acknowledge()
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_channel != null)
+ {
+ _channel.SendAcknowledgement(_messageNbr);
+ }
+ }
+
+ public IHeaders Headers
+ {
+ get { return new QpidHeaders(this); }
+ }
+
+ public abstract void ClearBody();
+
+ /// <summary>
+ /// Get a String representation of the body of the message. Used in the
+ /// toString() method which outputs this before message properties.
+ /// </summary>
+ /// <exception cref="QpidException"></exception>
+ public abstract string ToBodyString();
+
+ /// <summary>
+ /// Return the raw byte array that is used to populate the frame when sending
+ /// the message.
+ /// </summary>
+ /// <value>a byte array of message data</value>
+ public abstract byte[] Data
+ {
+ get;
+ set;
+ }
+
+ public abstract string MimeType
+ {
+ get;
+ }
+
+ public override string ToString()
+ {
+ try
+ {
+ StringBuilder buf = new StringBuilder("Body:\n");
+ buf.Append(ToBodyString());
+ buf.Append("\nQmsTimestamp: ").Append(Timestamp);
+ buf.Append("\nQmsExpiration: ").Append(Expiration);
+ buf.Append("\nQmsPriority: ").Append(Priority);
+ buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
+ buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
+ buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
+ buf.Append("\nAMQ message number: ").Append(_messageNbr);
+ buf.Append("\nProperties:");
+ if (ContentHeaderProperties.Headers == null)
+ {
+ buf.Append("<NONE>");
+ }
+ else
+ {
+ buf.Append(Headers.ToString());
+ }
+ return buf.ToString();
+ }
+ catch (Exception e)
+ {
+ return e.ToString();
+ }
+ }
+
+ public IFieldTable UnderlyingMessagePropertiesMap
+ {
+ get
+ {
+ return ContentHeaderProperties.Headers;
+ }
+ set
+ {
+ ContentHeaderProperties.Headers = (FieldTable)value;
+ }
+ }
+
+ public FieldTable PopulateHeadersFromMessageProperties()
+ {
+ if (ContentHeaderProperties.Headers == null)
+ {
+ return null;
+ }
+ else
+ {
+ //
+ // We need to convert every property into a String representation
+ // Note that type information is preserved in the property name
+ //
+ FieldTable table = new FieldTable();
+ foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
+ {
+ string propertyName = (string) entry.Key;
+ if (propertyName == null)
+ {
+ continue;
+ }
+ else
+ {
+ table[propertyName] = entry.Value.ToString();
+ }
+ }
+ return table;
+ }
+ }
+
+ /// <summary>
+ /// Get the AMQ message number assigned to this message
+ /// </summary>
+ /// <returns>the message number</returns>
+ public ulong MessageNbr
+ {
+ get
+ {
+ return _messageNbr;
+ }
+ set
+ {
+ _messageNbr = value;
+ }
+ }
+
+ public BasicContentHeaderProperties ContentHeaderProperties
+ {
+ get
+ {
+ return (BasicContentHeaderProperties) _contentHeaderProperties;
+ }
+ }
+ }
+
+ internal class QpidHeaders : IHeaders
+ {
+ public const char BOOLEAN_PROPERTY_PREFIX = 'B';
+ public const char BYTE_PROPERTY_PREFIX = 'b';
+ public const char SHORT_PROPERTY_PREFIX = 's';
+ public const char INT_PROPERTY_PREFIX = 'i';
+ public const char LONG_PROPERTY_PREFIX = 'l';
+ public const char FLOAT_PROPERTY_PREFIX = 'f';
+ public const char DOUBLE_PROPERTY_PREFIX = 'd';
+ public const char STRING_PROPERTY_PREFIX = 'S';
+
+ AbstractQmsMessage _message;
+
+ public QpidHeaders(AbstractQmsMessage message)
+ {
+ _message = message;
+ }
+
+ public bool Contains(string name)
+ {
+ CheckPropertyName(name);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return false;
+ }
+ else
+ {
+ // TODO: fix this
+ return _message.ContentHeaderProperties.Headers.Contains(STRING_PROPERTY_PREFIX + name);
+ }
+ }
+
+ public void Clear()
+ {
+ if (_message.ContentHeaderProperties.Headers != null)
+ {
+ _message.ContentHeaderProperties.Headers.Clear();
+ }
+ }
+
+ public string this[string name]
+ {
+ get
+ {
+ return GetString(name);
+ }
+ set
+ {
+ SetString(name, value);
+ }
+ }
+
+ public bool GetBoolean(string name)
+ {
+ CheckPropertyName(name);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return false;
+ }
+ else
+ {
+ object b = _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name];
+
+ if (b == null)
+ {
+ return false;
+ }
+ else
+ {
+ return (bool)b;
+ }
+ }
+ }
+
+ public void SetBoolean(string name, bool b)
+ {
+ CheckPropertyName(name);
+ _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name] = b;
+ }
+
+ public byte GetByte(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object b = _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName];
+ if (b == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (byte)b;
+ }
+ }
+ }
+
+ public void SetByte(string propertyName, byte b)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName] = b;
+ }
+
+ public short GetShort(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object s = _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName];
+ if (s == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (short)s;
+ }
+ }
+ }
+
+ public void SetShort(string propertyName, short i)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName] = i;
+ }
+
+ public int GetInt(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object i = _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName];
+ if (i == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (int)i;
+ }
+ }
+ }
+
+ public void SetInt(string propertyName, int i)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName] = i;
+ }
+
+ public long GetLong(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object l = _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName];
+ if (l == null)
+ {
+ // temp - the spec says do this but this throws a NumberFormatException
+ //return Long.valueOf(null).longValue();
+ return 0;
+ }
+ else
+ {
+ return (long)l;
+ }
+ }
+ }
+
+ public void SetLong(string propertyName, long l)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName] = l;
+ }
+
+ public float GetFloat(String propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object f = _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName];
+ if (f == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (float)f;
+ }
+ }
+ }
+
+ public void SetFloat(string propertyName, float f)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName] = f;
+ }
+
+ public double GetDouble(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return 0;
+ }
+ else
+ {
+ object d = _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName];
+ if (d == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return (double)d;
+ }
+ }
+ }
+
+ public void SetDouble(string propertyName, double v)
+ {
+ CheckPropertyName(propertyName);
+ _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName] = v;
+ }
+
+ public string GetString(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ return null;
+ }
+ else
+ {
+ return (string)_message.ContentHeaderProperties.Headers[STRING_PROPERTY_PREFIX + propertyName];
+ }
+ }
+
+ public void SetString(string propertyName, string value)
+ {
+ CheckPropertyName(propertyName);
+ CreatePropertyMapIfRequired();
+ propertyName = STRING_PROPERTY_PREFIX + propertyName;
+ _message.ContentHeaderProperties.Headers[propertyName] = value;
+ }
+
+ private void CheckPropertyName(string propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new ArgumentException("Property name must not be null");
+ }
+ else if ("".Equals(propertyName))
+ {
+ throw new ArgumentException("Property name must not be the empty string");
+ }
+
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ _message.ContentHeaderProperties.Headers = new FieldTable();
+ }
+ }
+
+ private void CreatePropertyMapIfRequired()
+ {
+ if (_message.ContentHeaderProperties.Headers == null)
+ {
+ _message.ContentHeaderProperties.Headers = new FieldTable();
+ }
+ }
+
+ public override string ToString()
+ {
+ StringBuilder buf = new StringBuilder("{");
+ int i = 0;
+ foreach (DictionaryEntry entry in _message.ContentHeaderProperties.Headers)
+ {
+ ++i;
+ if (i > 1)
+ {
+ buf.Append(", ");
+ }
+ string propertyName = (string)entry.Key;
+ if (propertyName == null)
+ {
+ buf.Append("\nInternal error: Property with NULL key defined");
+ }
+ else
+ {
+ buf.Append(propertyName.Substring(1));
+
+ buf.Append(" : ");
+
+ char typeIdentifier = propertyName[0];
+ buf.Append(typeIdentifierToName(typeIdentifier));
+ buf.Append(" = ").Append(entry.Value);
+ }
+ }
+ buf.Append("}");
+ return buf.ToString();
+ }
+
+ private static string typeIdentifierToName(char typeIdentifier)
+ {
+ switch (typeIdentifier)
+ {
+ case BOOLEAN_PROPERTY_PREFIX:
+ return "boolean";
+ case BYTE_PROPERTY_PREFIX:
+ return "byte";
+ case SHORT_PROPERTY_PREFIX:
+ return "short";
+ case INT_PROPERTY_PREFIX:
+ return "int";
+ case LONG_PROPERTY_PREFIX:
+ return "long";
+ case FLOAT_PROPERTY_PREFIX:
+ return "float";
+ case DOUBLE_PROPERTY_PREFIX:
+ return "double";
+ case STRING_PROPERTY_PREFIX:
+ return "string";
+ default:
+ return "unknown ( '" + typeIdentifier + "')";
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
new file mode 100644
index 0000000000..2e71bfc948
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public interface IMessageFactory
+ {
+ /// <summary>
+ /// Create a message
+ /// </summary>
+ /// <param name="messageNbr"></param>
+ /// <param name="redelivered"></param>
+ /// <param name="contentHeader"></param>
+ /// <param name="bodies"></param>
+ /// <returns></returns>
+ /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
+ AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies);
+
+ /// <summary>
+ /// Creates the message.
+ /// </summary>
+ /// <returns></returns>
+ /// <exception cref="QpidMessagingException">if the message cannot be created</exception>
+ AbstractQmsMessage CreateMessage();
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
new file mode 100644
index 0000000000..3965d531bb
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class MessageFactoryRegistry
+ {
+ private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+
+ public void RegisterFactory(string mimeType, IMessageFactory mf)
+ {
+ if (mf == null)
+ {
+ throw new ArgumentNullException("Message factory");
+ }
+ if (mimeType == null)
+ {
+ throw new ArgumentNullException("mf");
+ }
+ _mimeToFactoryMap[mimeType] = mf;
+ }
+
+ public void DeregisterFactory(string mimeType)
+ {
+ _mimeToFactoryMap.Remove(mimeType);
+ }
+
+ /// <summary>
+ /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ /// concrete message type.
+ /// </summary>
+ /// <param name="messageNbr">the AMQ message id</param>
+ /// <param name="redelivered">true if redelivered</param>
+ /// <param name="contentHeader">the content header that was received</param>
+ /// <param name="bodies">a list of ContentBody instances</param>
+ /// <returns>the message.</returns>
+ /// <exception cref="AMQException"/>
+ /// <exception cref="QpidException"/>
+ public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.Properties;
+
+ if (properties.ContentType == null)
+ {
+ properties.ContentType = "";
+ }
+
+ IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType];
+ if (mf == null)
+ {
+ throw new AMQException("Unsupport MIME type of " + properties.ContentType);
+ }
+ else
+ {
+ return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
+ }
+ }
+
+ public AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ if (mimeType == null)
+ {
+ throw new ArgumentNullException("Mime type must not be null");
+ }
+ IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType];
+ if (mf == null)
+ {
+ throw new AMQException("Unsupport MIME type of " + mimeType);
+ }
+ else
+ {
+ return mf.CreateMessage();
+ }
+ }
+
+ /// <summary>
+ /// Construct a new registry with the default message factories registered
+ /// </summary>
+ /// <returns>a message factory registry</returns>
+ public static MessageFactoryRegistry NewDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
+ mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
+ mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
+ // TODO: use bytes message for default message factory
+ // MJA - just added this bit back in...
+ mf.RegisterFactory("", new QpidBytesMessageFactory());
+ return mf;
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
new file mode 100644
index 0000000000..b7911b44b9
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
@@ -0,0 +1,581 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.IO;
+using System.Text;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
+ {
+ private const string MIME_TYPE = "application/octet-stream";
+
+ /// <summary>
+ /// The backingstore for the data
+ /// </summary>
+ private MemoryStream _dataStream;
+
+ private int _bodyLength;
+
+ private BinaryReader _reader;
+
+ private BinaryWriter _writer;
+
+ public QpidBytesMessage() : this(null)
+ {
+ }
+
+ /// <summary>
+ /// Construct a bytes message with existing data.
+ /// </summary>
+ /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in
+ /// write-only mode</param>
+ QpidBytesMessage(byte[] data) : base()
+ {
+ // superclass constructor has instantiated a content header at this point
+ ContentHeaderProperties.ContentType = MIME_TYPE;
+ if (data == null)
+ {
+ _dataStream = new MemoryStream();
+ _writer = new BinaryWriter(_dataStream);
+ }
+ else
+ {
+ _dataStream = new MemoryStream(data);
+ _bodyLength = data.Length;
+ _reader = new BinaryReader(_dataStream);
+ }
+ }
+
+ public QpidBytesMessage(ulong messageNbr, byte[] data, ContentHeaderBody contentHeader)
+ // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+ : base(messageNbr, (BasicContentHeaderProperties) contentHeader.Properties)
+ {
+ ContentHeaderProperties.ContentType = MIME_TYPE;
+ _dataStream = new MemoryStream(data);
+ _bodyLength = data.Length;
+ _reader = new BinaryReader(_dataStream);
+ }
+
+ public override void ClearBody()
+ {
+ if (_reader != null)
+ {
+ _reader.Close();
+ _reader = null;
+ }
+ _dataStream = new MemoryStream();
+ _bodyLength = 0;
+
+ _writer = new BinaryWriter(_dataStream);
+ }
+
+ public override string ToBodyString()
+ {
+ CheckReadable();
+ try
+ {
+ return GetText();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString());
+ }
+ }
+
+ private string GetText()
+ {
+ if (_dataStream != null)
+ {
+ // we cannot just read the underlying buffer since it may be larger than the amount of
+ // "filled" data. Length is not the same as Capacity.
+ byte[] data = new byte[_dataStream.Length];
+ _dataStream.Read(data, 0, (int)_dataStream.Length);
+ return Encoding.UTF8.GetString(data);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public override byte[] Data
+ {
+ get
+ {
+ if (_dataStream == null)
+ {
+ return null;
+ }
+ else
+ {
+ byte[] data = new byte[_dataStream.Length];
+ _dataStream.Position = 0;
+ _dataStream.Read(data, 0, (int) _dataStream.Length);
+ return data;
+ }
+ }
+ set
+ {
+ throw new NotSupportedException("Cannot set data payload except during construction");
+ }
+ }
+
+ public override string MimeType
+ {
+ get
+ {
+ return MIME_TYPE;
+ }
+ }
+
+ public long BodyLength
+ {
+ get
+ {
+ CheckReadable();
+ return _bodyLength;
+ }
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <exception cref="MessageNotReadableException">if the message is in write mode</exception>
+ private void CheckReadable()
+ {
+ if (_reader == null)
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ private void CheckWritable()
+ {
+ if (_reader != null)
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+
+ public bool ReadBoolean()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadBoolean();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public byte ReadByte()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadByte();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public short ReadSignedByte()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadSByte();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public short ReadShort()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadInt16();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadChar();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public int ReadInt()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadInt32();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public long ReadLong()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadInt64();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public float ReadFloat()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadSingle();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ CheckReadable();
+ try
+ {
+ return _reader.ReadDouble();
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public string ReadUTF()
+ {
+ CheckReadable();
+ try
+ {
+ byte[] data = _reader.ReadBytes((int)_dataStream.Length);
+ return Encoding.UTF8.GetString(data);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public int ReadBytes(byte[] bytes)
+ {
+ if (bytes == null)
+ {
+ throw new ArgumentNullException("bytes");
+ }
+ CheckReadable();
+ try
+ {
+ return _reader.Read(bytes, 0, bytes.Length);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public int ReadBytes(byte[] bytes, int count)
+ {
+ CheckReadable();
+ if (bytes == null)
+ {
+ throw new ArgumentNullException("bytes");
+ }
+ if (count < 0)
+ {
+ throw new ArgumentOutOfRangeException("count must be >= 0");
+ }
+ if (count > bytes.Length)
+ {
+ count = bytes.Length;
+ }
+
+ try
+ {
+ return _reader.Read(bytes, 0, count);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteBoolean(bool b)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(b);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteByte(byte b)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(b);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteShort(short i)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(i);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteChar(char c)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(c);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteSignedByte(short value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteDouble(double value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteFloat(float value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteInt(int value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteLong(long value)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(value);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(int i)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(i);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(long l)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(l);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(float v)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(v);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Write(double v)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(v);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteUTF(string value)
+ {
+ CheckWritable();
+ try
+ {
+ byte[] encodedData = Encoding.UTF8.GetBytes(value);
+ _writer.Write(encodedData);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteBytes(byte[] bytes)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(bytes);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void WriteBytes(byte[] bytes, int offset, int length)
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Write(bytes, offset, length);
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+
+ public void Reset()
+ {
+ CheckWritable();
+ try
+ {
+ _writer.Close();
+ _writer = null;
+ _reader = new BinaryReader(_dataStream);
+ _bodyLength = (int) _dataStream.Length;
+ }
+ catch (IOException e)
+ {
+ throw new QpidException(e.ToString(), e);
+ }
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
new file mode 100644
index 0000000000..3f2a6c531f
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class QpidBytesMessageFactory : AbstractQmsMessageFactory
+ {
+ protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ byte[] data;
+
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ data = ((ContentBody)bodies[0]).Payload;
+ }
+ else
+ {
+ data = new byte[(long)contentHeader.BodySize];
+ int currentPosition = 0;
+ foreach (ContentBody cb in bodies)
+ {
+ Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ currentPosition += cb.Payload.Length;
+ }
+ }
+
+ return new QpidBytesMessage(messageNbr, data, contentHeader);
+ }
+
+ public override AbstractQmsMessage CreateMessage()
+ {
+ return new QpidBytesMessage();
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
new file mode 100644
index 0000000000..4c16038d4b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Qpid.Framing;
+using Qpid.Messaging;
+
+namespace Qpid.Client.Message
+{
+ public class QpidTextMessage : AbstractQmsMessage, ITextMessage
+ {
+ private const string MIME_TYPE = "text/plain";
+
+ private byte[] _data;
+
+ private string _decodedValue;
+
+ public QpidTextMessage() : this(null, null)
+ {
+ }
+
+ public QpidTextMessage(byte[] data, String encoding) : base()
+ {
+ // the superclass has instantied a content header at this point
+ ContentHeaderProperties.ContentType= MIME_TYPE;
+ _data = data;
+ ContentHeaderProperties.Encoding = encoding;
+ }
+
+ public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader)
+ : base(messageNbr, contentHeader)
+ {
+ contentHeader.ContentType = MIME_TYPE;
+ _data = data;
+ }
+
+ public QpidTextMessage(byte[] data) : this(data, null)
+ {
+ }
+
+ public QpidTextMessage(string text)
+ {
+ Text = text;
+ }
+
+ public override void ClearBody()
+ {
+ _data = null;
+ _decodedValue = null;
+ }
+
+ public override string ToBodyString()
+ {
+ return Text;
+ }
+
+ public override byte[] Data
+ {
+ get
+ {
+ return _data;
+ }
+ set
+ {
+ _data = value;
+ }
+ }
+
+ public override string MimeType
+ {
+ get
+ {
+ return MIME_TYPE;
+ }
+ }
+
+ public string Text
+ {
+ get
+ {
+ if (_data == null && _decodedValue == null)
+ {
+ return null;
+ }
+ else if (_decodedValue != null)
+ {
+ return _decodedValue;
+ }
+ else
+ {
+ if (ContentHeaderProperties.Encoding != null)
+ {
+ // throw ArgumentException if the encoding is not supported
+ _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data);
+ }
+ else
+ {
+ _decodedValue = Encoding.Default.GetString(_data);
+ }
+ return _decodedValue;
+ }
+ }
+
+ set
+ {
+ if (ContentHeaderProperties.Encoding == null)
+ {
+ _data = Encoding.Default.GetBytes(value);
+ }
+ else
+ {
+ // throw ArgumentException if the encoding is not supported
+ _data = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
+ }
+ _decodedValue = value;
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
new file mode 100644
index 0000000000..5457b2301e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class QpidTextMessageFactory : AbstractQmsMessageFactory
+ {
+ protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ byte[] data;
+
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ data = ((ContentBody)bodies[0]).Payload;
+ }
+ else
+ {
+ data = new byte[(int)contentHeader.BodySize];
+ int currentPosition = 0;
+ foreach (ContentBody cb in bodies)
+ {
+ Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length);
+ currentPosition += cb.Payload.Length;
+ }
+ }
+
+ return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties);
+ }
+
+
+ public override AbstractQmsMessage CreateMessage()
+ {
+ return new QpidTextMessage();
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs
new file mode 100644
index 0000000000..679114d105
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using log4net;
+
+namespace Qpid.Client.Message
+{
+ /// <summary>
+ /// Raised when a message body is received unexpectedly by the client. This typically occurs when the
+ /// length of bodies received does not match with the declared length in the content header.
+ /// </summary>
+ public class UnexpectedBodyReceivedException : AMQException
+ {
+ public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t)
+ : base(logger, msg, t)
+ {
+ }
+
+ public UnexpectedBodyReceivedException(ILog logger, string msg)
+ : base(logger, msg)
+ {
+ }
+
+ public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg)
+ : base(logger, errorCode, msg)
+ {
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
new file mode 100644
index 0000000000..cb4e64718b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+using Qpid.Framing;
+
+namespace Qpid.Client.Message
+{
+ public class UnprocessedMessage
+ {
+ private ulong _bytesReceived = 0;
+
+ public BasicDeliverBody DeliverBody;
+ public BasicReturnBody BounceBody;
+ public ushort ChannelId;
+ public ContentHeaderBody ContentHeader;
+
+ /// <summary>
+ /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
+ /// </summary>
+ /// TODO: write and use linked list class
+ public IList Bodies = new ArrayList();
+
+ public void ReceiveBody(ContentBody body)
+ {
+ Bodies.Add(body);
+ if (body.Payload != null)
+ {
+ _bytesReceived += (uint)body.Payload.Length;
+ }
+ }
+
+ public bool IsAllBodyDataReceived()
+ {
+ return _bytesReceived == ContentHeader.BodySize;
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
new file mode 100644
index 0000000000..ab40a83b3e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public class AMQMethodEvent
+ {
+ private AMQMethodBody _method;
+
+ private ushort _channelId;
+
+ private AMQProtocolSession _protocolSession;
+
+ public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
+ {
+ _channelId = channelId;
+ _method = method;
+ _protocolSession = protocolSession;
+ }
+
+ public AMQMethodBody Method
+ {
+ get
+ {
+ return _method;
+ }
+ }
+
+ public ushort ChannelId
+ {
+ get
+ {
+ return _channelId;
+ }
+ }
+
+ public AMQProtocolSession ProtocolSession
+ {
+ get
+ {
+ return _protocolSession;
+ }
+ }
+
+ public override String ToString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: ");
+ buf.Append("\nChannel id: ").Append(_channelId);
+ buf.Append("\nMethod: ").Append(_method);
+ return buf.ToString();
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
new file mode 100644
index 0000000000..7256ab9250
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs
@@ -0,0 +1,289 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+using log4net;
+using Qpid.Client.Failover;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public class AMQProtocolListener : IProtocolListener
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener));
+
+ /**
+ * We create the failover handler when the session is created since it needs a reference to the IoSession in order
+ * to be able to send errors during failover back to the client application. The session won't be available in the
+ * case where we failing over due to a Connection.Redirect message from the broker.
+ */
+ private FailoverHandler _failoverHandler;
+
+ /**
+ * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
+ * attempting failover where it is failing.
+ */
+ internal FailoverState _failoverState = FailoverState.NOT_STARTED;
+
+ internal FailoverState FailoverState
+ {
+ get { return _failoverState; }
+ set { _failoverState = value; }
+ }
+
+ internal ManualResetEvent FailoverLatch;
+
+ AMQConnection _connection;
+ AMQStateManager _stateManager;
+
+ public AMQStateManager StateManager
+ {
+ get { return _stateManager; }
+ set { _stateManager = value; }
+ }
+
+ //private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+ private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList());
+
+ AMQProtocolSession _protocolSession = null; // FIXME
+ public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed?
+
+
+ private readonly Object _lock = new Object();
+
+ public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager)
+ {
+ _connection = connection;
+ _stateManager = stateManager;
+ _failoverHandler = new FailoverHandler(connection);
+ }
+
+ public void OnMessage(IDataBlock message)
+ {
+ // Handle incorrect protocol version.
+ if (message is ProtocolInitiation)
+ {
+ string error = String.Format("Protocol mismatch - {0}", message.ToString());
+ AMQException e = new AMQProtocolHeaderException(error);
+ _log.Error("Closing connection because of protocol mismatch", e);
+ //_protocolSession.CloseProtocolSession();
+ _stateManager.Error(e);
+ return;
+ }
+
+ AMQFrame frame = (AMQFrame)message;
+
+ if (frame.BodyFrame is AMQMethodBody)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Method frame received: " + frame);
+ }
+ AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession);
+ try
+ {
+ bool wasAnyoneInterested = false;
+ lock (_frameListeners.SyncRoot)
+ {
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ }
+ }
+ catch (Exception e)
+ {
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ listener.Error(e);
+ }
+ }
+ }
+ else if (frame.BodyFrame is ContentHeaderBody)
+ {
+ _protocolSession.MessageContentHeaderReceived(frame.Channel,
+ (ContentHeaderBody)frame.BodyFrame);
+ }
+ else if (frame.BodyFrame is ContentBody)
+ {
+ _protocolSession.MessageContentBodyReceived(frame.Channel,
+ (ContentBody)frame.BodyFrame);
+ }
+ else if (frame.BodyFrame is HeartbeatBody)
+ {
+ _log.Debug("HeartBeat received");
+ }
+ //_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful?
+ }
+
+ public void OnException(Exception cause)
+ {
+ _log.Warn("Protocol Listener received exception", cause);
+ lock (_lock)
+ {
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ if (!(cause is AMQUndeliveredException))
+ {
+ WhenClosed();
+ }
+ }
+ // We reach this point if failover was attempted and failed therefore we need to let the calling app
+ // know since we cannot recover the situation.
+ else if (_failoverState == FailoverState.FAILED)
+ {
+ // we notify the state manager of the error in case we have any clients waiting on a state
+ // change. Those "waiters" will be interrupted and can handle the exception
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ PropagateExceptionToWaiters(amqe);
+ _connection.ExceptionReceived(cause);
+ }
+ }
+ }
+
+ /**
+ * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
+ * sessionClosed() depending on whether we were trying to send data at the time of failure.
+ *
+ * @param session
+ * @throws Exception
+ */
+ void WhenClosed()
+ {
+ _connection.StopHeartBeatThread();
+
+ // TODO: Server just closes session with no warning if auth fails.
+ if (_connection.Closed)
+ {
+ _log.Info("Channel closed called by client");
+ }
+ else
+ {
+ _log.Info("Channel closed called with failover state currently " + _failoverState);
+
+ // Reconnectablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+
+ if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed)
+ {
+ _log.Info("FAILOVER STARTING");
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+ startFailoverThread();
+ }
+ else
+ {
+ _log.Info("Not starting failover as state currently " + _failoverState);
+ }
+ }
+ else
+ {
+ _log.Info("Failover not allowed by policy.");
+
+ if (_failoverState != FailoverState.IN_PROGRESS)
+ {
+ _log.Info("sessionClose() not allowed to failover");
+ _connection.ExceptionReceived(
+ new AMQDisconnectedException("Server closed connection and reconnection not permitted."));
+ }
+ else
+ {
+ _log.Info("sessionClose() failover in progress");
+ }
+ }
+ }
+
+ _log.Info("Protocol Channel [" + this + "] closed");
+ }
+
+ /// <summary>
+ /// There are two cases where we have other threads potentially blocking for events to be handled by this
+ /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
+ /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
+ /// react appropriately.
+ ///
+ /// <param name="e">the exception to propagate</param>
+ /// </summary>
+ public void PropagateExceptionToWaiters(Exception e)
+ {
+ // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over.
+ _stateManager.Error(e);
+
+ foreach (IAMQMethodListener listener in _frameListeners)
+ {
+ listener.Error(e);
+ }
+ }
+
+ public void AddFrameListener(IAMQMethodListener listener)
+ {
+ _frameListeners.Add(listener);
+ }
+
+ public void RemoveFrameListener(IAMQMethodListener listener)
+ {
+ if (_log.IsDebugEnabled)
+ {
+ _log.Debug("Removing frame listener: " + listener.ToString());
+ }
+ _frameListeners.Remove(listener);
+ }
+
+ public void BlockUntilNotFailingOver()
+ {
+ if (FailoverLatch != null)
+ {
+ FailoverLatch.WaitOne();
+ }
+ }
+
+ /// <summary>
+ /// "Failover" for redirection.
+ /// </summary>
+ /// <param name="host"></param>
+ /// <param name="port"></param>
+ public void Failover(string host, int port)
+ {
+ _failoverHandler.setHost(host);
+ _failoverHandler.setPort(port);
+ // see javadoc for FailoverHandler to see rationale for separate thread
+ startFailoverThread();
+ }
+
+ private void startFailoverThread()
+ {
+ Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run));
+ failoverThread.Name = "Failover";
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.IsBackground = false;
+ failoverThread.Start();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
new file mode 100644
index 0000000000..65aca0d942
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
@@ -0,0 +1,269 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Transport;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public class AMQProtocolSession
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession));
+
+ private readonly IProtocolWriter _protocolWriter;
+ private readonly IConnectionCloser _connectionCloser;
+
+ /**
+ * Counter to ensure unique queue names
+ */
+ private int _queueId = 1;
+ private readonly Object _queueIdLock = new Object();
+
+ /// <summary>
+ /// Maps from the channel id to the AmqChannel that it represents.
+ /// </summary>
+ //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable());
+
+ //private ConcurrentMap _closingChannels = new ConcurrentHashMap();
+ private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable());
+
+ /// <summary>
+ /// Maps from a channel id to an unprocessed message. This is used to tie together the
+ /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ /// </summary>
+ //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable());
+
+ private AMQConnection _connection;
+
+ public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection)
+ {
+ _protocolWriter = protocolWriter;
+ _connectionCloser = connectionCloser;
+ _connection = connection;
+ }
+
+ public void Init()
+ {
+ // start the process of setting up the connection. This is the first place that
+ // data is written to the server.
+ _protocolWriter.Write(new ProtocolInitiation());
+ }
+
+ public string Username
+ {
+ get
+ {
+ return AMQConnection.Username;
+ }
+ }
+
+ public string Password
+ {
+ get
+ {
+ return AMQConnection.Password;
+ }
+ }
+
+ ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too.
+
+ public ConnectionTuneParameters ConnectionTuneParameters
+ {
+ get
+ {
+ return _connectionTuneParameters;
+ }
+ set
+ {
+ _connectionTuneParameters = value;
+ AMQConnection con = AMQConnection;
+ con.SetMaximumChannelCount(value.ChannelMax);
+ con.MaximumFrameSize = value.FrameMax;
+ }
+ }
+
+ /// <summary>
+ /// Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+ /// This is invoked on the MINA dispatcher thread.
+ /// </summary>
+ /// <param name="message">the unprocessed message</param>
+ /// <exception cname="AMQException">if this was not expected</exception>
+ public void UnprocessedMessageReceived(UnprocessedMessage message)
+ {
+ _channelId2UnprocessedMsgMap[message.ChannelId] = message;
+ }
+
+ public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content header without having received a JMSDeliver frame first");
+ }
+ if (msg.ContentHeader != null)
+ {
+ throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+ }
+ msg.ContentHeader = contentHeader;
+ if (contentHeader.BodySize == 0)
+ {
+ DeliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody)
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId];
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content body without having received a BasicDeliver frame first");
+ }
+ if (msg.ContentHeader == null)
+ {
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+ }
+ try
+ {
+ msg.ReceiveBody(contentBody);
+ }
+ catch (UnexpectedBodyReceivedException e)
+ {
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ throw e;
+ }
+ if (msg.IsAllBodyDataReceived())
+ {
+ DeliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ /// <summary>
+ /// Deliver a message to the appropriate session, removing the unprocessed message
+ /// from our map
+ /// <param name="channelId">the channel id the message should be delivered to</param>
+ /// <param name="msg"> the message</param>
+ private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg)
+ {
+ AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
+ channel.MessageReceived(msg);
+ _channelId2UnprocessedMsgMap.Remove(channelId);
+ }
+
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session. Equivalent
+ /// to calling getProtocolSession().write().
+ /// </summary>
+ /// <param name="frame">the frame to write</param>
+ public void WriteFrame(IDataBlock frame)
+ {
+ _protocolWriter.Write(frame);
+ }
+
+ public void AddSessionByChannel(ushort channelId, AmqChannel channel)
+ {
+ if (channel == null)
+ {
+ throw new ArgumentNullException("Attempt to register a null channel");
+ }
+ _logger.Debug("Add channel with channel id " + channelId);
+ _channelId2SessionMap[channelId] = channel;
+ }
+
+ public void RemoveSessionByChannel(ushort channelId)
+ {
+ _logger.Debug("Removing session with channelId " + channelId);
+ _channelId2SessionMap.Remove(channelId);
+ }
+
+ /// <summary>
+ /// Starts the process of closing a channel
+ /// </summary>
+ /// <param name="channel" the AmqChannel being closed</param>
+ public void CloseSession(AmqChannel channel)
+ {
+ _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId);
+ ushort channelId = channel.ChannelId;
+
+ // we need to know when a channel is closing so that we can respond
+ // with a channel.close frame when we receive any other type of frame
+ // on that channel
+ _closingChannels[channelId] = channel;
+
+ }
+
+ /// <summary>
+ /// Called from the ChannelClose handler when a channel close frame is received.
+ /// This method decides whether this is a response or an initiation. The latter
+ /// case causes the AmqChannel to be closed and an exception to be thrown if
+ /// appropriate.
+ /// </summary>
+ /// <param name="channelId">the id of the channel (session)</param>
+ /// <returns>true if the client must respond to the server, i.e. if the server
+ /// initiated the channel close, false if the channel close is just the server
+ /// responding to the client's earlier request to close the channel.</returns>
+ public bool ChannelClosed(ushort channelId, int code, string text)
+ {
+ // if this is not a response to an earlier request to close the channel
+ if (!_closingChannels.ContainsKey(channelId))
+ {
+ _closingChannels.Remove(channelId);
+ AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId];
+ channel.Closed(new AMQException(_logger, code, text));
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public AMQConnection AMQConnection
+ {
+ get
+ {
+ return _connection;
+ }
+ }
+
+ public void CloseProtocolSession()
+ {
+ _logger.Debug("Closing protocol session");
+ _connectionCloser.Close();
+ }
+
+ internal string GenerateQueueName()
+ {
+ int id;
+ lock(_queueIdLock)
+ {
+ id = _queueId++;
+ }
+
+ return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id;
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
new file mode 100644
index 0000000000..be8a24a9f4
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client.Protocol
+{
+ public interface IConnectionCloser
+ {
+ void Close();
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
new file mode 100644
index 0000000000..6ac8a7537e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ public interface IProtocolListener
+ {
+ void OnMessage(IDataBlock message);
+ void OnException(Exception e);
+
+ // XXX: .NET way of doing listeners?
+ void AddFrameListener(IAMQMethodListener listener);
+ void RemoveFrameListener(IAMQMethodListener listener);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
new file mode 100644
index 0000000000..99643fe59f
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol.Listener
+{
+ public abstract class BlockingMethodFrameListener : IAMQMethodListener
+ {
+ private ManualResetEvent _resetEvent;
+
+ public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame);
+
+ /// <summary>
+ /// This is set if there is an exception thrown from processCommandFrame and the
+ /// exception is rethrown to the caller of blockForFrame()
+ /// </summary>
+ private volatile Exception _error;
+
+ protected ushort _channelId;
+
+ protected AMQMethodEvent _doneEvt = null;
+
+ public BlockingMethodFrameListener(ushort channelId)
+ {
+ _channelId = channelId;
+ _resetEvent = new ManualResetEvent(false);
+ }
+
+ /// <summary>
+ /// This method is called by the MINA dispatching thread. Note that it could
+ /// be called before BlockForFrame() has been called.
+ /// </summary>
+ /// <param name="evt">the frame event</param>
+ /// <returns>true if the listener has dealt with this frame</returns>
+ /// <exception cref="AMQException"></exception>
+ public bool MethodReceived(AMQMethodEvent evt)
+ {
+ AMQMethodBody method = evt.Method;
+
+ try
+ {
+ bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method);
+ if (ready)
+ {
+ _doneEvt = evt;
+ _resetEvent.Set();
+ }
+
+ return ready;
+ }
+ catch (AMQException e)
+ {
+ Error(e);
+ // we rethrow the error here, and the code in the frame dispatcher will go round
+ // each listener informing them that an exception has been thrown
+ throw e;
+ }
+ }
+
+ /// <summary>
+ /// This method is called by the thread that wants to wait for a frame.
+ /// </summary>
+ public AMQMethodEvent BlockForFrame()
+ {
+ _resetEvent.WaitOne();
+ //at this point the event will have been signalled. The error field might or might not be set
+ // depending on whether an error occurred
+ if (_error != null)
+ {
+ throw _error;
+ }
+
+ return _doneEvt;
+ }
+
+ /// <summary>
+ /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this
+ /// class to avoid code repetition but again is only called by the MINA dispatcher thread.
+ /// </summary>
+ /// <param name="e">the exception that caused the error</param>
+ public void Error(Exception e)
+ {
+ // set the error so that the thread that is blocking (in BlockForFrame())
+ // can pick up the exception and rethrow to the caller
+ _error = e;
+ _resetEvent.Set();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
new file mode 100644
index 0000000000..db82eb1013
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.Protocol.Listener
+{
+ public interface IAMQMethodListener
+ {
+ /// <summary>
+ /// Invoked when a method frame has been received
+ /// <param name="evt">the event</param>
+ /// <returns>true if the handler has processed the method frame, false otherwise. Note
+ /// that this does not prohibit the method event being delivered to subsequent listeners
+ /// but can be used to determine if nobody has dealt with an incoming method frame.</param>
+ /// <exception cname="AMQException">if an error has occurred. This exception will be delivered
+ /// to all registered listeners using the error() method (see below) allowing them to
+ /// perform cleanup if necessary.</exception>
+ bool MethodReceived(AMQMethodEvent evt);
+
+ /// <summary>
+ /// Callback when an error has occurred. Allows listeners to clean up.
+ /// </summary>
+ /// <param name="e">the exception</param>
+ void Error(Exception e);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
new file mode 100644
index 0000000000..65460a0c2e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol.Listener
+{
+ public class SpecificMethodFrameListener : BlockingMethodFrameListener
+ {
+ private readonly Type _expectedClass;
+
+ public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId)
+ {
+ _expectedClass = expectedClass;
+ }
+
+ public override bool ProcessMethod(ushort channelId, AMQMethodBody frame)
+ {
+ return _expectedClass.IsInstanceOfType(frame);
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
new file mode 100644
index 0000000000..32847f9b9b
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Client.Transport;
+using Qpid.Framing;
+
+namespace Qpid.Client.Protocol
+{
+ /// <summary>
+ /// A convenient interface to writing protocol frames.
+ /// </summary>
+ public class ProtocolWriter
+ {
+ IProtocolWriter _protocolWriter;
+ IProtocolListener _protocolListener;
+
+ public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener)
+ {
+ _protocolWriter = protocolWriter;
+ _protocolListener = protocolListener;
+ }
+
+ public void WriteFrame(IDataBlock frame)
+ {
+ _protocolWriter.Write(frame);
+ }
+
+ /// <summary>
+ /// Convenience method that writes a frame to the protocol session and waits for
+ /// a particular response. Equivalent to calling getProtocolSession().write() then
+ /// waiting for the response.
+ /// </summary>
+ /// <param name="frame">the frame</param>
+ /// <param name="listener">the blocking listener. Note the calling thread will block.</param>
+ private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener)
+ {
+ try
+ {
+ _protocolListener.AddFrameListener(listener);
+ _protocolWriter.Write(frame);
+ return listener.BlockForFrame();
+ }
+ finally
+ {
+ _protocolListener.RemoveFrameListener(listener);
+ }
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+
+ public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType)
+ {
+ // TODO: If each frame knew it's response type, then the responseType argument would
+ // TODO: not be neccesary.
+ return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType));
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs b/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
new file mode 100644
index 0000000000..f0c4c91db8
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Net;
+using log4net;
+using Qpid.Client.qms;
+
+namespace Qpid.Client
+{
+ public class QpidConnectionInfo : ConnectionInfo
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QpidConnectionInfo));
+
+ string _username = "guest";
+ string _password = "guest";
+ string _virtualHost = "/default";
+
+ string _failoverMethod = null;
+ IDictionary _failoverOptions = new Hashtable();
+ IDictionary _options = new Hashtable();
+ IList _brokerInfos = new ArrayList(); // List<BrokerInfo>
+ string _clientName = String.Format("{0}{1:G}", Dns.GetHostName(), DateTime.Now.Ticks);
+
+ public string asUrl()
+ {
+ string result = "amqp://";
+ foreach (BrokerInfo info in _brokerInfos)
+ {
+ result += info.ToString();
+ }
+ return result;
+
+ }
+
+ public string getFailoverMethod()
+ {
+ return _failoverMethod;
+ }
+
+ public string getFailoverOption(string key)
+ {
+ return (string) _failoverOptions[key];
+ }
+
+ public int getBrokerCount()
+ {
+ return _brokerInfos.Count;
+ }
+
+ public BrokerInfo GetBrokerDetails(int index)
+ {
+ return (BrokerInfo)_brokerInfos[index];
+ }
+
+ public void AddBrokerInfo(BrokerInfo brokerInfo)
+ {
+ if (!_brokerInfos.Contains(brokerInfo))
+ {
+ _brokerInfos.Add(brokerInfo);
+ }
+ }
+
+ public IList GetAllBrokerInfos()
+ {
+ return _brokerInfos;
+ }
+
+ public string GetClientName()
+ {
+ return _clientName;
+ }
+
+ public void SetClientName(string clientName)
+ {
+ _clientName = clientName;
+ }
+
+ public string getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(string username)
+ {
+ _username = username;
+ }
+
+ public string getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(string password)
+ {
+ _password = password;
+ }
+
+ public string getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(string virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ public string getOption(string key)
+ {
+ return (string) _options[key];
+ }
+
+ public void setOption(string key, string value)
+ {
+ _options[key] = value;
+ }
+
+ public override string ToString()
+ {
+ return asUrl();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/State/AMQState.cs b/dotnet/Qpid.Client/Client/State/AMQState.cs
new file mode 100644
index 0000000000..fc71fe647c
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/AMQState.cs
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client.State
+{
+ public enum AMQState
+ {
+ CONNECTION_NOT_STARTED,
+ CONNECTION_NOT_TUNED,
+ CONNECTION_NOT_OPENED,
+ CONNECTION_OPEN,
+ CONNECTION_CLOSING,
+ CONNECTION_CLOSED,
+ ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all".
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs
new file mode 100644
index 0000000000..60d44da824
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client.State
+{
+ public class AMQStateChangedEvent
+ {
+ private readonly AMQState _oldState;
+
+ private readonly AMQState _newState;
+
+ public AMQStateChangedEvent(AMQState oldState, AMQState newState)
+ {
+ _oldState = oldState;
+ _newState = newState;
+ }
+
+ public AMQState OldState
+ {
+ get
+ {
+ return _oldState;
+ }
+ }
+
+ public AMQState NewState
+ {
+ get
+ {
+ return _newState;
+ }
+ }
+
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
new file mode 100644
index 0000000000..05f673d520
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Qpid.Client.Handler;
+using Qpid.Client.Protocol;
+using Qpid.Client.Protocol.Listener;
+using Qpid.Framing;
+
+namespace Qpid.Client.State
+{
+ public class AMQStateManager : IAMQMethodListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager));
+
+ const bool InfoLoggingHack = true;
+
+ /// <summary>
+ /// The current state
+ /// </summary>
+ private AMQState _currentState;
+
+ /// <summary>
+ /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
+ /// The class must be a subclass of AMQFrame.
+ /// </summary>
+ private readonly IDictionary _state2HandlersMap = new Hashtable();
+
+ //private CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+ private ArrayList _stateListeners = ArrayList.Synchronized(new ArrayList(5));
+
+ public AMQStateManager()
+ {
+ _currentState = AMQState.CONNECTION_NOT_STARTED;
+ RegisterListeners();
+ }
+
+ private void RegisterListeners()
+ {
+ IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler();
+ IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler();
+ IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler();
+ IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler();
+ IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler();
+ IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler();
+ IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
+ IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
+ IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+
+ // We need to register a map for the null (i.e. all state) handlers otherwise you get
+ // a stack overflow in the handler searching code when you present it with a frame for which
+ // no handlers are registered.
+ _state2HandlersMap[AMQState.ALL] = new Hashtable();
+
+ {
+ Hashtable notStarted = new Hashtable();
+ notStarted[typeof(ConnectionStartBody)] = connectionStart;
+ notStarted[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted;
+ }
+ {
+ Hashtable notTuned = new Hashtable();
+ notTuned[typeof(ConnectionTuneBody)] = connectionTune;
+ notTuned[typeof(ConnectionSecureBody)] = connectionSecure;
+ notTuned[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned;
+ }
+ {
+ Hashtable notOpened = new Hashtable();
+ notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk;
+ notOpened[typeof(ConnectionCloseBody)] = connectionClose;
+ _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened;
+ }
+ {
+ Hashtable open = new Hashtable();
+ open[typeof(ChannelCloseBody)] = channelClose;
+ open[typeof(ConnectionCloseBody)] = connectionClose;
+ open[typeof(BasicDeliverBody)] = basicDeliver;
+ open[typeof(BasicReturnBody)] = basicReturn;
+ _state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
+ }
+ {
+ Hashtable closing = new Hashtable();
+ closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk;
+ _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing;
+ }
+ }
+
+ public AMQState CurrentState
+ {
+ get
+ {
+ return _currentState;
+ }
+ }
+
+ /// <summary>
+ /// Changes the state.
+ /// </summary>
+ /// <param name="newState">The new state.</param>
+ /// <exception cref="AMQException">if there is an error changing state</exception>
+ public void ChangeState(AMQState newState)
+ {
+ if (InfoLoggingHack)
+ {
+ _logger.Info("State changing to " + newState + " from old state " + _currentState);
+ }
+ _logger.Debug("State changing to " + newState + " from old state " + _currentState);
+ AMQState oldState = _currentState;
+ _currentState = newState;
+
+ foreach (IStateListener l in _stateListeners)
+ {
+ l.StateChanged(oldState, newState);
+ }
+ }
+
+ public void Error(Exception e)
+ {
+ _logger.Debug("State manager receive error notification: " + e);
+ foreach (IStateListener l in _stateListeners)
+ {
+ l.Error(e);
+ }
+ }
+
+ public bool MethodReceived(AMQMethodEvent evt)
+ {
+ _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType()));
+ IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method);
+ if (handler != null)
+ {
+ handler.MethodReceived(this, evt);
+ return true;
+ }
+ return false;
+ }
+
+ /// <summary>
+ /// Finds the state transition handler.
+ /// </summary>
+ /// <param name="currentState">State of the current.</param>
+ /// <param name="frame">The frame.</param>
+ /// <returns></returns>
+ /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception>
+ private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState,
+ AMQMethodBody frame)
+ {
+ Type clazz = frame.GetType();
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Looking for state transition handler for frame " + clazz);
+ }
+ IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState];
+
+ if (classToHandlerMap == null)
+ {
+ // if no specialised per state handler is registered look for a
+ // handler registered for "all" states
+ return FindStateTransitionHandler(AMQState.ALL, frame);
+ }
+ IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz];
+ if (handler == null)
+ {
+ if (currentState == AMQState.ALL)
+ {
+ _logger.Debug("No state transition handler defined for receiving frame " + frame);
+ return null;
+ }
+ else
+ {
+ // if no specialised per state handler is registered look for a
+ // handler registered for "all" states
+ return FindStateTransitionHandler(AMQState.ALL, frame);
+ }
+ }
+ else
+ {
+ return handler;
+ }
+ }
+
+ public void AddStateListener(IStateListener listener)
+ {
+ _logger.Debug("Adding state listener");
+ _stateListeners.Add(listener);
+ }
+
+ public void RemoveStateListener(IStateListener listener)
+ {
+ _stateListeners.Remove(listener);
+ }
+
+ public void AttainState(AMQState s)
+ {
+ if (_currentState != s)
+ {
+ _logger.Debug("Adding state wait to reach state " + s);
+ StateWaiter sw = new StateWaiter(s);
+ AddStateListener(sw);
+ sw.WaituntilStateHasChanged();
+ // at this point the state will have changed.
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs
new file mode 100644
index 0000000000..ff27cd841e
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace Qpid.Client.State
+{
+ public interface IAMQStateListener
+ {
+ void StateChanged(AMQStateChangedEvent evt);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs
new file mode 100644
index 0000000000..256fe1c3f3
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Client.Protocol;
+
+namespace Qpid.Client.State
+{
+ public interface IStateAwareMethodListener
+ {
+ void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/State/IStateListener.cs b/dotnet/Qpid.Client/Client/State/IStateListener.cs
new file mode 100644
index 0000000000..6073b2bb0c
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/IStateListener.cs
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.State
+{
+ public interface IStateListener
+ {
+ void StateChanged(AMQState oldState, AMQState newState);
+
+ void Error(Exception e);
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs
new file mode 100644
index 0000000000..723ae04397
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.State
+{
+ public class IllegalStateTransitionException : AMQException
+ {
+ private AMQState _originalState;
+
+ private Type _frame;
+
+ public IllegalStateTransitionException(AMQState originalState, Type frame)
+ : base("No valid state transition defined for receiving frame " + frame +
+ " from state " + originalState)
+ {
+ _originalState = originalState;
+ _frame = frame;
+ }
+
+ public AMQState OriginalState
+ {
+ get
+ {
+ return _originalState;
+ }
+ }
+
+ public Type FrameType
+ {
+ get
+ {
+ return _frame;
+ }
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/dotnet/Qpid.Client/Client/State/StateWaiter.cs
new file mode 100644
index 0000000000..cb7f604499
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/State/StateWaiter.cs
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Threading;
+using log4net;
+
+namespace Qpid.Client.State
+{
+ public class StateWaiter : IStateListener
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter));
+
+ private readonly AMQState _state;
+
+ private volatile bool _newStateAchieved;
+
+ private volatile Exception _exception;
+
+ private ManualResetEvent _resetEvent = new ManualResetEvent(false);
+
+ public StateWaiter(AMQState state)
+ {
+ _state = state;
+ }
+
+ public void StateChanged(AMQState oldState, AMQState newState)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("stateChanged called");
+ }
+ if (_state == newState)
+ {
+ _newStateAchieved = true;
+
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("New state reached so notifying monitor");
+ }
+ _resetEvent.Set();
+ }
+ }
+
+ public void Error(Exception e)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("exceptionThrown called");
+ }
+
+ _exception = e;
+ _resetEvent.Set();
+ }
+
+ public void WaituntilStateHasChanged()
+ {
+ //
+ // The guard is required in case we are woken up by a spurious
+ // notify().
+ //
+ while (!_newStateAchieved && _exception == null)
+ {
+ _logger.Debug("State not achieved so waiting...");
+ _resetEvent.WaitOne();
+ }
+
+ if (_exception != null)
+ {
+ _logger.Debug("Throwable reached state waiter: " + _exception);
+ if (_exception is AMQException)
+ {
+ throw _exception;
+ }
+ else
+ {
+ throw new AMQException("Error: " + _exception, _exception);
+ }
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs b/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs
new file mode 100644
index 0000000000..1024fa5575
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Codec;
+using Qpid.Codec.Demux;
+using Qpid.Framing;
+
+namespace Qpid.Client.Transport
+{
+ public class AMQProtocolProvider
+ {
+ private DemuxingProtocolCodecFactory _factory;
+
+ public AMQProtocolProvider()
+ {
+ _factory = new DemuxingProtocolCodecFactory();
+ _factory.Register(new AMQDataBlockEncoder());
+ _factory.Register(new AMQDataBlockDecoder());
+ _factory.Register(new ProtocolInitiation.Decoder());
+ }
+
+ public IProtocolCodecFactory CodecFactory
+ {
+ get
+ {
+ return _factory;
+ }
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
new file mode 100644
index 0000000000..d3add546fe
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using log4net;
+using Qpid.Buffer;
+using Qpid.Codec;
+using Qpid.Codec.Support;
+using Qpid.Framing;
+
+namespace Qpid.Client.Transport
+{
+ public class AmqpChannel : IProtocolChannel
+ {
+ // Warning: don't use this log for regular logging.
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+
+ IByteChannel byteChannel;
+ IProtocolEncoder encoder;
+ IProtocolDecoder decoder;
+
+ public AmqpChannel(IByteChannel byteChannel)
+ {
+ this.byteChannel = byteChannel;
+
+ AMQProtocolProvider protocolProvider = new AMQProtocolProvider();
+ IProtocolCodecFactory factory = protocolProvider.CodecFactory;
+ encoder = factory.Encoder;
+ decoder = factory.Decoder;
+ }
+
+ public Queue Read()
+ {
+ ByteBuffer buffer = byteChannel.Read();
+
+ Queue frames = Decode(buffer);
+
+ // TODO: Refactor to decorator.
+ if (_protocolTraceLog.IsDebugEnabled)
+ {
+ foreach (object o in frames)
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", o));
+ }
+ }
+
+ return frames;
+ }
+
+ public void Write(IDataBlock o)
+ {
+ // TODO: Refactor to decorator.
+ if (_protocolTraceLog.IsDebugEnabled)
+ {
+ _protocolTraceLog.Debug(String.Format("WRITE {0}", o));
+ }
+
+ byteChannel.Write(Encode(o));
+ }
+
+ private ByteBuffer Encode(object o)
+ {
+ SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
+ encoder.Encode(o, output);
+ return output.buffer;
+ }
+
+ private Queue Decode(ByteBuffer byteBuffer)
+ {
+ SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput();
+ decoder.Decode(byteBuffer, outx);
+ return outx.MessageQueue;
+ }
+ }
+}
+
diff --git a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
new file mode 100644
index 0000000000..4a3ff385a8
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Buffer;
+
+namespace Qpid.Client.Transport
+{
+ public interface IByteChannel
+ {
+ ByteBuffer Read();
+ void Write(ByteBuffer buffer);
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
new file mode 100644
index 0000000000..4943a45d68
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+
+namespace Qpid.Client.Transport
+{
+ public interface IProtocolChannel : IProtocolWriter
+ {
+ Queue Read();
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs
new file mode 100644
index 0000000000..ac19977927
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Framing;
+
+namespace Qpid.Client.Transport
+{
+ public interface IProtocolWriter
+ {
+ void Write(IDataBlock o);
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/dotnet/Qpid.Client/Client/Transport/ITransport.cs
new file mode 100644
index 0000000000..aebe58b439
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/ITransport.cs
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using Qpid.Client.Protocol;
+
+namespace Qpid.Client.Transport
+{
+ public interface ITransport : IConnectionCloser
+ {
+ void Open();
+ string getLocalEndPoint();
+ IProtocolWriter ProtocolWriter { get; }
+ }
+}
diff --git a/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs
new file mode 100644
index 0000000000..5b5392769a
--- /dev/null
+++ b/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using Qpid.Buffer;
+using Qpid.Codec;
+
+namespace Qpid.Client.Transport
+{
+ public class SingleProtocolEncoderOutput : IProtocolEncoderOutput
+ {
+ public ByteBuffer buffer;
+
+ public void Write(ByteBuffer buf)
+ {
+ if (buffer != null)
+ {
+ throw new InvalidOperationException("{0} does not allow the writing of more than one buffer");
+ }
+ buffer = buf;
+ }
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/Properties/AssemblyInfo.cs b/dotnet/Qpid.Client/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000000..0c7da88839
--- /dev/null
+++ b/dotnet/Qpid.Client/Properties/AssemblyInfo.cs
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Reflection;
+using System.Runtime.InteropServices;
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Qpid.Client")]
+[assembly: AssemblyDescription("Qpid Client API implementation")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Apache Qpid")]
+[assembly: AssemblyProduct("")]
+[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("380cb124-07a8-40c2-b67d-69a0d94cb620")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Revision and Build Numbers
+// by using the '*' as shown below:
+[assembly: AssemblyVersion("0.5.*")]
diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj
new file mode 100644
index 0000000000..eb84402e1f
--- /dev/null
+++ b/dotnet/Qpid.Client/Qpid.Client.csproj
@@ -0,0 +1,133 @@
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.50727</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{68987C05-3768-452C-A6FC-6BA1D372852F}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Qpid.Client</RootNamespace>
+ <AssemblyName>Qpid.Client</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net, Version=1.2.0.30714, Culture=neutral, PublicKeyToken=500ffcafb14f92df">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Qpid.Common\lib\log4net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Client\AmqBrokerInfo.cs" />
+ <Compile Include="Client\AMQConnection.cs" />
+ <Compile Include="Client\AMQConnectionException.cs" />
+ <Compile Include="Client\AMQDestination.cs" />
+ <Compile Include="Client\AmqChannel.cs" />
+ <Compile Include="Client\QpidConnectionInfo.cs" />
+ <Compile Include="Client\BasicMessageConsumer.cs" />
+ <Compile Include="Client\BasicMessageProducer.cs" />
+ <Compile Include="Client\Closeable.cs" />
+ <Compile Include="Client\ConnectionTuneParameters.cs" />
+ <Compile Include="Client\Failover\FailoverException.cs" />
+ <Compile Include="Client\Failover\FailoverHandler.cs" />
+ <Compile Include="Client\Failover\FailoverState.cs" />
+ <Compile Include="Client\Failover\FailoverSupport.cs" />
+ <Compile Include="Client\Handler\BasicDeliverMethodHandler.cs" />
+ <Compile Include="Client\Handler\BasicReturnMethodHandler.cs" />
+ <Compile Include="Client\Handler\ChannelCloseMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionCloseMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionCloseOkHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionOpenOkMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionRedirectMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionSecureMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionStartMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionTuneMethodHandler.cs" />
+ <Compile Include="Client\Message\AbstractQmsMessage.cs" />
+ <Compile Include="Client\Message\AMQMessage.cs" />
+ <Compile Include="Client\Message\AMQMessageFactory.cs" />
+ <Compile Include="Client\Message\IMessageFactory.cs" />
+ <Compile Include="Client\Message\MessageFactoryRegistry.cs" />
+ <Compile Include="Client\Message\UnexpectedBodyReceivedException.cs" />
+ <Compile Include="Client\Message\UnprocessedMessage.cs" />
+ <Compile Include="Client\Message\QpidBytesMessage.cs" />
+ <Compile Include="Client\Message\QpidBytesMessageFactory.cs" />
+ <Compile Include="Client\Message\QpidTextMessage.cs" />
+ <Compile Include="Client\Message\QpidTextMessageFactory.cs" />
+ <Compile Include="Client\Protocol\AMQMethodEvent.cs" />
+ <Compile Include="Client\Protocol\AMQProtocolListener.cs" />
+ <Compile Include="Client\Protocol\AMQProtocolSession.cs" />
+ <Compile Include="Client\Protocol\Listener\BlockingMethodFrameListener.cs" />
+ <Compile Include="Client\Protocol\Listener\IAMQMethodListener.cs" />
+ <Compile Include="Client\Protocol\IConnectionCloser.cs" />
+ <Compile Include="Client\Protocol\ProtocolWriter.cs" />
+ <Compile Include="Client\Protocol\IProtocolListener.cs" />
+ <Compile Include="Client\State\AMQState.cs" />
+ <Compile Include="Client\State\AMQStateChangedEvent.cs" />
+ <Compile Include="Client\State\AMQStateManager.cs" />
+ <Compile Include="Client\State\IAMQStateListener.cs" />
+ <Compile Include="Client\State\IllegalStateTransitionException.cs" />
+ <Compile Include="Client\State\IStateAwareMethodListener.cs" />
+ <Compile Include="Client\State\IStateListener.cs" />
+ <Compile Include="Client\Protocol\Listener\SpecificMethodFrameListener.cs" />
+ <Compile Include="Client\State\StateWaiter.cs" />
+ <Compile Include="Client\Transport\AmqpChannel.cs" />
+ <Compile Include="Client\Transport\AMQProtocolProvider.cs" />
+ <Compile Include="Client\Transport\IByteChannel.cs" />
+ <Compile Include="Client\Transport\IProtocolChannel.cs" />
+ <Compile Include="Client\Transport\IProtocolWriter.cs" />
+ <Compile Include="Client\Transport\ITransport.cs" />
+ <Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="qms\BrokerInfo.cs" />
+ <Compile Include="qms\ConnectionInfo.cs" />
+ <Compile Include="qms\FailoverPolicy.cs" />
+ <Compile Include="qms\failover\FailoverMethod.cs" />
+ <Compile Include="qms\failover\FailoverRoundRobin.cs" />
+ <Compile Include="qms\failover\FailoverSingleServer.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Codec\Qpid.Codec.csproj">
+ <Project>{22D0D0C2-77AF-4DE3-B456-7FF3893F9F88}</Project>
+ <Name>Qpid.Codec</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Buffer\Qpid.Buffer.csproj">
+ <Project>{44384DF2-B0A4-4580-BDBC-EE4BAA87D995}</Project>
+ <Name>Qpid.Buffer</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Messaging\Qpid.Messaging.csproj">
+ <Project>{6688F826-C58E-4C1B-AA1F-22AFAB4B7D07}</Project>
+ <Name>Qpid.Messaging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common\Qpid.Common.csproj">
+ <Project>{77064C42-24D2-4CEB-9EA2-0EF481A43205}</Project>
+ <Name>Qpid.Common</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project> \ No newline at end of file
diff --git a/dotnet/Qpid.Client/default.build b/dotnet/Qpid.Client/default.build
new file mode 100644
index 0000000000..00e2854326
--- /dev/null
+++ b/dotnet/Qpid.Client/default.build
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<project name="XMS.AMQ.Client" default="build">
+ <property name="nant.settings.currentframework" value="net-1.0" />
+ <property name="basename" value="XMSClient"/>
+ <property name="debug" value="true"/>
+ <property name="CommonCollectionsDir" value="../CommonCollections"/>
+ <property name="MINADir" value="../minadotnet"/>
+ <property name="XMSCommonDir" value="../xmscommon"/>
+
+ <if test="${debug}">
+ <property name="targetdir" value="bin/${nant.settings.currentframework}/Debug"/>
+ </if>
+ <ifnot test="${debug}">
+ <property name="targetdir" value="bin/${nant.settings.currentframework}/Release"/>
+ </ifnot>
+
+ <target name="clean">
+ <delete>
+ <fileset>
+ <include name="${targetdir}/${basename}.dll"/>
+ <include name="${targetdir}/${basename}.pdb"/>
+ </fileset>
+ </delete>
+ </target>
+
+ <target name="init">
+ <mkdir dir="${targetdir}"/>
+ </target>
+
+ <target name="build" depends="init">
+ <csc target="library" output="${targetdir}/${basename}.dll" debug="${debug}">
+ <sources>
+ <include name="**/*.cs"/>
+ <exclude name="Properties/Settings.Designer.cs" />
+ </sources>
+ <references>
+ <lib>
+ <include name="${CommonCollectionsDir}/${targetdir}" />
+ <include name="${MINADir}/${targetdir}" />
+ <include name="${XMSCommonDir}/${targetdir}" />
+ <include name="${XMSCommonDir}/lib/**" />
+ <include name="lib/**" />
+ </lib>
+ <include name="CommonCollections.dll" />
+ <include name="log4net.dll" />
+ <include name="MINA.dll" />
+ <include name="IBM.XMS.dll" />
+ <include name="XMSCommon.dll" />
+ </references>
+ </csc>
+ </target>
+</project> \ No newline at end of file
diff --git a/dotnet/Qpid.Client/qms/BrokerInfo.cs b/dotnet/Qpid.Client/qms/BrokerInfo.cs
new file mode 100644
index 0000000000..dd0504968e
--- /dev/null
+++ b/dotnet/Qpid.Client/qms/BrokerInfo.cs
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.qms
+{
+ /// <summary>
+ /// Know URL option names.
+ /// <seealso cref="ConnectionInfo"/>
+ /// </summary>
+ public class BrokerDetailsConstants
+ {
+ public const String OPTIONS_RETRY = "retries";
+ public const String OPTIONS_SSL = ConnectionUrlConstants.OPTIONS_SSL;
+ public const String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public const int DEFAULT_PORT = 5672;
+ public const String DEFAULT_TRANSPORT = "tcp";
+
+ public readonly string URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+
+ public const long DEFAULT_CONNECT_TIMEOUT = 30000L;
+ }
+
+ public interface BrokerInfo
+ {
+ String getHost();
+ void setHost(string host);
+
+ int getPort();
+ void setPort(int port);
+
+ String getTransport();
+ void setTransport(string transport);
+
+ bool useSSL();
+ void useSSL(bool ssl);
+
+ String getOption(string key);
+ void setOption(string key, string value);
+
+ long getTimeout();
+ void setTimeout(long timeout);
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/qms/ConnectionInfo.cs b/dotnet/Qpid.Client/qms/ConnectionInfo.cs
new file mode 100644
index 0000000000..1d099daa3e
--- /dev/null
+++ b/dotnet/Qpid.Client/qms/ConnectionInfo.cs
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System.Collections;
+
+namespace Qpid.Client.qms
+{
+ class ConnectionUrlConstants
+ {
+ public const string AMQ_PROTOCOL = "amqp";
+ public const string OPTIONS_BROKERLIST = "brokerlist";
+ public const string OPTIONS_FAILOVER = "failover";
+ public const string OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public const string OPTIONS_SSL = "ssl";
+ }
+
+ /**
+ Connection URL format
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+ Options are of course optional except for requiring a single broker in the broker list.
+ The option seperator is defined to be either '&' or ','
+ */
+ public interface ConnectionInfo
+ {
+ string asUrl();
+
+ string getFailoverMethod();
+
+ string getFailoverOption(string key);
+
+ int getBrokerCount();
+
+ BrokerInfo GetBrokerDetails(int index);
+
+ void AddBrokerInfo(BrokerInfo broker);
+
+ IList GetAllBrokerInfos();
+
+ string GetClientName();
+
+ void SetClientName(string clientName);
+
+ string getUsername();
+
+ void setUsername(string username);
+
+ string getPassword();
+
+ void setPassword(string password);
+
+ string getVirtualHost();
+
+ void setVirtualHost(string virtualHost);
+
+ string getOption(string key);
+
+ void setOption(string key, string value);
+ }
+}
diff --git a/dotnet/Qpid.Client/qms/FailoverPolicy.cs b/dotnet/Qpid.Client/qms/FailoverPolicy.cs
new file mode 100644
index 0000000000..15d52491df
--- /dev/null
+++ b/dotnet/Qpid.Client/qms/FailoverPolicy.cs
@@ -0,0 +1,315 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using log4net;
+using Qpid.Client.qms.failover;
+
+namespace Qpid.Client.qms
+{
+ public class FailoverPolicy
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverPolicy));
+
+ private const long MINUTE = 60000L;
+
+ private const long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE;
+ private const long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE;
+
+ private FailoverMethod[] _methods = new FailoverMethod[1];
+
+ private int _currentMethod;
+
+ private int _methodsRetries;
+
+ private int _currentRetry;
+
+ private bool _timing;
+
+ private long _lastMethodTime;
+ private long _lastFailTime;
+
+ public FailoverPolicy(ConnectionInfo connectionInfo)
+ {
+ FailoverMethod method;
+
+ //todo This should be integrated in to the connection url when it supports
+ // multiple strategies.
+
+ _methodsRetries = 0;
+
+ if (connectionInfo.getFailoverMethod() == null)
+ {
+ if (connectionInfo.getBrokerCount() > 1)
+ {
+ method = new FailoverRoundRobin(connectionInfo);
+ }
+ else
+ {
+ method = new FailoverSingleServer(connectionInfo);
+ }
+ }
+ else
+ {
+ string failoverMethod = connectionInfo.getFailoverMethod();
+
+ /*
+ if (failoverMethod.equals(FailoverMethod.RANDOM))
+ {
+ //todo write a random connection Failover
+ }
+ */
+ if (failoverMethod.Equals(FailoverMethodConstants.ROUND_ROBIN))
+ {
+ method = new FailoverRoundRobin(connectionInfo);
+ }
+ else
+ {
+ throw new NotImplementedException("Dynamic loading of FailoverMethods not yet implemented.");
+// try
+// {
+// Type[] constructorSpec = {ConnectionInfo.class};
+// Object [] params = {connectionInfo};
+//
+// method = (FailoverMethod) ClassLoader.getSystemClassLoader().
+// loadClass(failoverMethod).
+// getConstructor(constructorSpec).newInstance(params);
+// }
+// catch (Exception cnfe)
+// {
+// throw new IllegalArgumentException("Unknown failover method:" + failoverMethod);
+// }
+ }
+ }
+
+ if (method == null)
+ {
+ throw new ArgumentException("Unknown failover method specified.");
+ }
+
+ reset();
+
+ _methods[_currentMethod] = method;
+ }
+
+ public FailoverPolicy(FailoverMethod method) : this(method, 0)
+ {
+ }
+
+ public FailoverPolicy(FailoverMethod method, int retries)
+ {
+ _methodsRetries = retries;
+
+ reset();
+
+ _methods[_currentMethod] = method;
+ }
+
+ private void reset()
+ {
+ _currentMethod = 0;
+ _currentRetry = 0;
+ _timing = false;
+
+ }
+
+ public bool FailoverAllowed()
+ {
+ bool failoverAllowed;
+
+ if (_timing)
+ {
+ long now = CurrentTimeMilliseconds();
+
+ if ((now - _lastMethodTime) >= DEFAULT_METHOD_TIMEOUT)
+ {
+ _logger.Info("Failover method timeout");
+ _lastMethodTime = now;
+
+ if (!nextMethod())
+ {
+ return false;
+ }
+
+
+ }
+ else if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT)
+ {
+ _logger.Info("Failover timeout");
+ return false;
+ }
+ else
+ {
+ _lastMethodTime = now;
+ }
+ }
+ else
+ {
+ _timing = true;
+ _lastMethodTime = CurrentTimeMilliseconds();
+ _lastFailTime = _lastMethodTime;
+ }
+
+
+ if (_methods[_currentMethod].failoverAllowed())
+ {
+ failoverAllowed = true;
+ }
+ else
+ {
+ if (_currentMethod < (_methods.Length - 1))
+ {
+ nextMethod();
+ _logger.Info("Changing method to " + _methods[_currentMethod].methodName());
+ return FailoverAllowed();
+ }
+ else
+ {
+ return cycleMethods();
+ }
+ }
+
+ return failoverAllowed;
+ }
+
+ private static long CurrentTimeMilliseconds()
+ {
+ return DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond;
+ }
+
+ private bool nextMethod()
+ {
+ if (_currentMethod < (_methods.Length - 1))
+ {
+ _currentMethod++;
+ _methods[_currentMethod].reset();
+ return true;
+ }
+ else
+ {
+ return cycleMethods();
+ }
+ }
+
+ private bool cycleMethods()
+ {
+ if (_currentRetry < _methodsRetries)
+ {
+ _currentRetry++;
+
+ _currentMethod = 0;
+
+ _logger.Info("Retrying methods starting with " + _methods[_currentMethod].methodName());
+ _methods[_currentMethod].reset();
+ return FailoverAllowed();
+ }
+ else
+ {
+ _logger.Debug("All failover methods exhausted");
+ return false;
+ }
+ }
+
+ /**
+ * Notification that connection was successful.
+ */
+ public void attainedConnection()
+ {
+ _currentRetry = 0;
+
+ _methods[_currentMethod].attainedConnection();
+
+ _timing = false;
+ }
+
+ public BrokerInfo GetCurrentBrokerInfo()
+ {
+ return _methods[_currentMethod].GetCurrentBrokerInfo();
+ }
+
+ public BrokerInfo GetNextBrokerInfo()
+ {
+ return _methods[_currentMethod].getNextBrokerDetails();
+ }
+
+ public void setBroker(BrokerInfo broker)
+ {
+ _methods[_currentMethod].setBroker(broker);
+ }
+
+ public void addMethod(FailoverMethod method)
+ {
+ int len = _methods.Length + 1;
+ FailoverMethod[] newMethods = new FailoverMethod[len];
+ _methods.CopyTo(newMethods, 0);
+// System.arraycopy(_methods, 0, newMethods, 0, _methods.length);
+ int index = len - 1;
+ newMethods[index] = method;
+ _methods = newMethods;
+ }
+
+ public void setMethodRetries(int retries)
+ {
+ _methodsRetries = retries;
+ }
+
+ public FailoverMethod getCurrentMethod()
+ {
+ if (_currentMethod >= 0 && _currentMethod < (_methods.Length - 1))
+ {
+ return _methods[_currentMethod];
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.Append("Failover Policy:\n");
+
+ if (FailoverAllowed())
+ {
+ sb.Append("Failover allowed\n");
+ }
+ else
+ {
+ sb.Append("Failover not allowed\n");
+ }
+
+ sb.Append("Failover policy methods\n");
+ for (int i = 0; i < _methods.Length; i++)
+ {
+
+ if (i == _currentMethod)
+ {
+ sb.Append(">");
+ }
+ sb.Append(_methods[i].ToString());
+ }
+
+ return sb.ToString();
+ }
+ }
+} \ No newline at end of file
diff --git a/dotnet/Qpid.Client/qms/failover/FailoverMethod.cs b/dotnet/Qpid.Client/qms/failover/FailoverMethod.cs
new file mode 100644
index 0000000000..7db9ef32fa
--- /dev/null
+++ b/dotnet/Qpid.Client/qms/failover/FailoverMethod.cs
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.qms.failover
+{
+ public class FailoverMethodConstants
+ {
+ public const String ROUND_ROBIN = "roundrobin";
+ public const String RANDOM = "random";
+ }
+
+ public interface FailoverMethod
+ {
+ /**
+ * Reset the Failover to initial conditions
+ */
+ void reset();
+
+ /**
+ * Check if failover is possible for this method
+ *
+ * @return true if failover is allowed
+ */
+ bool failoverAllowed();
+
+ /**
+ * Notification to the Failover method that a connection has been attained.
+ */
+ void attainedConnection();
+
+ /**
+ * If there is no current BrokerInfo the null will be returned.
+ * @return The current BrokerDetail value to use
+ */
+ BrokerInfo GetCurrentBrokerInfo();
+
+ /**
+ * Move to the next BrokerInfo if one is available.
+ * @return the next BrokerDetail or null if there is none.
+ */
+ BrokerInfo getNextBrokerDetails();
+
+ /**
+ * Set the currently active broker to be the new value.
+ * @param broker The new BrokerDetail value
+ */
+ void setBroker(BrokerInfo broker);
+
+ /**
+ * Set the retries for this method
+ * @param maxRetries the maximum number of time to retry this Method
+ */
+ void setRetries(int maxRetries);
+
+ /**
+ * @return The name of this method for display purposes.
+ */
+ String methodName();
+ }
+}
diff --git a/dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs b/dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs
new file mode 100644
index 0000000000..c0e832ce21
--- /dev/null
+++ b/dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Text;
+using log4net;
+
+namespace Qpid.Client.qms.failover
+{
+ public class FailoverRoundRobin : FailoverMethod
+ {
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverRoundRobin));
+
+ /** The default number of times to cycle through all servers */
+ public const int DEFAULT_CYCLE_RETRIES = 0;
+ /** The default number of times to retry each server */
+ public const int DEFAULT_SERVER_RETRIES = 0;
+
+ /**
+ * The index into the hostDetails array of the broker to which we are connected
+ */
+ private int _currentBrokerIndex = -1;
+
+ /**
+ * The number of times to retry connecting for each server
+ */
+ private int _serverRetries;
+
+ /**
+ * The current number of retry attempts made
+ */
+ private int _currentServerRetry;
+
+ /**
+ * The number of times to cycle through the servers
+ */
+ private int _cycleRetries;
+
+ /**
+ * The current number of cycles performed.
+ */
+ private int _currentCycleRetries;
+
+ /**
+ * Array of BrokerDetail used to make connections.
+ */
+ private ConnectionInfo _connectionDetails;
+
+ public FailoverRoundRobin(ConnectionInfo connectionDetails)
+ {
+ if (!(connectionDetails.getBrokerCount() > 0))
+ {
+ throw new ArgumentException("At least one broker details must be specified.");
+ }
+
+ _connectionDetails = connectionDetails;
+
+ //There is no current broker at startup so set it to -1.
+ _currentBrokerIndex = -1;
+
+ String cycleRetries = _connectionDetails.getFailoverOption(ConnectionUrlConstants.OPTIONS_FAILOVER_CYCLE);
+
+ if (cycleRetries != null)
+ {
+ try
+ {
+ _cycleRetries = int.Parse(cycleRetries);
+ }
+ catch (FormatException)
+ {
+ _cycleRetries = DEFAULT_CYCLE_RETRIES;
+ }
+ }
+
+ _currentCycleRetries = 0;
+
+ _serverRetries = 0;
+ _currentServerRetry = -1;
+ }
+
+ public void reset()
+ {
+ _currentBrokerIndex = 0;
+ _currentCycleRetries = 0;
+ _currentServerRetry = -1;
+ }
+
+ public bool failoverAllowed()
+ {
+ return ((_currentCycleRetries < _cycleRetries)
+ || (_currentServerRetry < _serverRetries)
+ || (_currentBrokerIndex < (_connectionDetails.getBrokerCount() - 1)));
+ }
+
+ public void attainedConnection()
+ {
+ _currentCycleRetries = 0;
+ _currentServerRetry = -1;
+ }
+
+ public BrokerInfo GetCurrentBrokerInfo()
+ {
+ if (_currentBrokerIndex == -1)
+ {
+ return null;
+ }
+
+ return _connectionDetails.GetBrokerDetails(_currentBrokerIndex);
+ }
+
+ public BrokerInfo getNextBrokerDetails()
+ {
+ if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
+ {
+ if (_currentServerRetry < _serverRetries)
+ {
+ if (_currentBrokerIndex == -1)
+ {
+ _currentBrokerIndex = 0;
+
+ setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex ));
+
+ _logger.Info("First Run using " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex));
+ }
+ else
+ {
+ _logger.Info("Retrying " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex));
+ }
+
+ _currentServerRetry++;
+ }
+ else
+ {
+ _currentCycleRetries++;
+ //failed to connect to first broker
+ _currentBrokerIndex = 0;
+
+ setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex ));
+
+ // This is zero rather than -1 as we are already retrieving the details.
+ _currentServerRetry = 0;
+ }
+ //else - should force client to stop as max retries has been reached.
+ }
+ else
+ {
+ if (_currentServerRetry < _serverRetries)
+ {
+ if (_currentBrokerIndex == -1)
+ {
+ _currentBrokerIndex = 0;
+
+ setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex ));
+
+ _logger.Info("First Run using " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex));
+ }
+ else
+ {
+ _logger.Info("Retrying " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex));
+ }
+ _currentServerRetry++;
+ }
+ else
+ {
+ _currentBrokerIndex++;
+
+ setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex ));
+ // This is zero rather than -1 as we are already retrieving the details.
+ _currentServerRetry = 0;
+ }
+ }
+
+ return _connectionDetails.GetBrokerDetails(_currentBrokerIndex);
+ }
+
+ public void setBroker(BrokerInfo broker)
+ {
+ _connectionDetails.AddBrokerInfo(broker);
+
+ int index = _connectionDetails.GetAllBrokerInfos().IndexOf(broker);
+
+ String serverRetries = broker.getOption(BrokerDetailsConstants.OPTIONS_RETRY);
+
+ if (serverRetries != null)
+ {
+ try
+ {
+ _serverRetries = int.Parse(serverRetries);
+ }
+ catch (FormatException)
+ {
+ _serverRetries = DEFAULT_SERVER_RETRIES;
+ }
+ }
+
+ _currentServerRetry = -1;
+ _currentBrokerIndex = index;
+ }
+
+ public void setRetries(int maxRetries)
+ {
+ _cycleRetries = maxRetries;
+ }
+
+ public String methodName()
+ {
+ return "Cycle Servers";
+ }
+
+ public override string ToString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.Append(GetType().Name).Append("\n");
+
+ sb.Append("Broker count: ").Append(_connectionDetails.getBrokerCount());
+ sb.Append("\ncurrent broker index: ").Append(_currentBrokerIndex);
+
+ sb.Append("\nCycle Retries: ").Append(_cycleRetries);
+ sb.Append("\nCurrent Cycle:").Append(_currentCycleRetries);
+ sb.Append("\nServer Retries:").Append(_serverRetries);
+ sb.Append("\nCurrent Retry:").Append(_currentServerRetry);
+ sb.Append("\n");
+
+ for(int i=0; i < _connectionDetails.getBrokerCount() ; i++)
+ {
+ if (i == _currentBrokerIndex)
+ {
+ sb.Append(">");
+ }
+ sb.Append(_connectionDetails.GetBrokerDetails(i));
+ sb.Append("\n");
+ }
+
+ return sb.ToString();
+ }
+ }
+}
diff --git a/dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs b/dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs
new file mode 100644
index 0000000000..f077f75fdf
--- /dev/null
+++ b/dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+
+namespace Qpid.Client.qms.failover
+{
+ public class FailoverSingleServer : FailoverMethod
+ {
+ /** The default number of times to rety a conection to this server */
+ public const int DEFAULT_SERVER_RETRIES = 1;
+
+ /**
+ * The details of the Single Server
+ */
+ private BrokerInfo _brokerDetail;
+
+ /**
+ * The number of times to retry connecting to the sever
+ */
+ private int _retries;
+
+ /**
+ * The current number of attempts made to the server
+ */
+ private int _currentRetries;
+
+
+ public FailoverSingleServer(ConnectionInfo connectionDetails)
+ {
+ if (connectionDetails.getBrokerCount() > 0)
+ {
+ setBroker(connectionDetails.GetBrokerDetails(0));
+ }
+ else
+ {
+ throw new ArgumentException("BrokerInfo details required for connection.");
+ }
+ }
+
+ public FailoverSingleServer(BrokerInfo brokerDetail)
+ {
+ setBroker(brokerDetail);
+ }
+
+ public void reset()
+ {
+ _currentRetries = -1;
+ }
+
+ public bool failoverAllowed()
+ {
+ return _currentRetries < _retries;
+ }
+
+ public void attainedConnection()
+ {
+ reset();
+ }
+
+ public BrokerInfo GetCurrentBrokerInfo()
+ {
+ return _brokerDetail;
+ }
+
+ public BrokerInfo getNextBrokerDetails()
+ {
+ if (_currentRetries == _retries)
+ {
+ return null;
+ }
+ else
+ {
+ if (_currentRetries < _retries)
+ {
+ _currentRetries ++;
+ }
+
+ return _brokerDetail;
+ }
+ }
+
+ public void setBroker(BrokerInfo broker)
+ {
+ if (broker == null)
+ {
+ throw new ArgumentException("BrokerInfo details cannot be null");
+ }
+ _brokerDetail = broker;
+
+ String retries = broker.getOption(BrokerDetailsConstants.OPTIONS_RETRY);
+ if (retries != null)
+ {
+ try
+ {
+ _retries = int.Parse(retries);
+ }
+ catch (FormatException nfe)
+ {
+ _retries = DEFAULT_SERVER_RETRIES;
+ }
+ }
+ else
+ {
+ _retries = DEFAULT_SERVER_RETRIES;
+ }
+
+ reset();
+ }
+
+ public void setRetries(int retries)
+ {
+ _retries = retries;
+ }
+
+ public String methodName()
+ {
+ return "Single Server";
+ }
+
+ public String toString()
+ {
+ return "SingleServer:\n"+
+ "Max Retries:"+_retries+
+ "\nCurrent Retry:"+_currentRetries+
+ "\n"+_brokerDetail+"\n";
+ }
+
+ }
+} \ No newline at end of file