diff options
Diffstat (limited to 'dotnet/Qpid.Client')
69 files changed, 9532 insertions, 0 deletions
diff --git a/dotnet/Qpid.Client/Client/AMQConnection.cs b/dotnet/Qpid.Client/Client/AMQConnection.cs new file mode 100644 index 0000000000..12eb9f6a21 --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQConnection.cs @@ -0,0 +1,845 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.IO; +using System.Reflection; +using System.Threading; +using log4net; +using Qpid.Client.Failover; +using Qpid.Client.Protocol; +using Qpid.Client.qms; +using Qpid.Client.State; +using Qpid.Client.Transport; +using Qpid.Collections; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client +{ + public class AMQConnection : Closeable, IConnection + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQConnection)); + + ConnectionInfo _connectionInfo; + private int _nextChannelId = 0; + + // _Connected should be refactored with a suitable wait object. + private bool _connected; + + Thread _heartBeatThread; + HeartBeatThread _heartBeatRunner; + + // The last error code that occured on the connection. Used to return the correct exception to the client + private AMQException _lastAMQException = null; + + /** + * This is the "root" mutex that must be held when doing anything that could be impacted by failover. + * This must be held by any child objects of this connection such as the session, producers and consumers. + */ + private readonly Object _failoverMutex = new Object(); + public object FailoverMutex + { + get { return _failoverMutex; } + } + + /** + * Policy dictating how to failover + */ + private FailoverPolicy _failoverPolicy; + + internal bool IsFailoverAllowed + { + get { return _failoverPolicy.FailoverAllowed(); } + } + + /// <summary> + /// A channel is roughly analogous to a session. The server can negotiate the maximum number of channels + /// per session and we must prevent the client from opening too many. Zero means unlimited. + /// </summary> + private ushort _maximumChannelCount; + + /// <summary> + /// The maximum size of frame supported by the server + /// </summary> + private uint _maximumFrameSize; + + private AMQStateManager _stateManager; + + private AMQProtocolSession _protocolSession; + public AMQProtocolSession ProtocolSession { get { return _protocolSession; } } + + /// <summary> + /// Maps from session id (Integer) to AmqChannel instance + /// </summary> + private readonly LinkedHashtable _sessions = new LinkedHashtable(); + + private ExceptionListenerDelegate _exceptionListener; + + private IConnectionListener _connectionListener; + + private ITransport _transport; + public ITransport Transport { get { return _transport; } } + + /// <summary> + /// Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for + /// message publication. + /// </summary> + private bool _started; + + private AMQProtocolListener _protocolListener; + public AMQProtocolListener ProtocolListener { get { return _protocolListener; } } + + public IProtocolWriter ProtocolWriter + { + get { return _transport.ProtocolWriter; } + } + + ProtocolWriter _protocolWriter; + + public ProtocolWriter ConvenientProtocolWriter + { + get { return _protocolWriter; } + } + + public AMQConnection(ConnectionInfo connectionInfo) + { + if (connectionInfo == null) + { + throw new ArgumentException("ConnectionInfo must be specified"); + } + _log.Info("ConnectionInfo: " + connectionInfo); + _connectionInfo = connectionInfo; + _log.Info("password = " + _connectionInfo.getPassword()); + _failoverPolicy = new FailoverPolicy(connectionInfo); + + // We are not currently connected. + _connected = false; + + Exception lastException = null; + do + { + try + { + BrokerInfo brokerInfo = _failoverPolicy.GetNextBrokerInfo(); + _log.Info("Connecting to " + brokerInfo); + MakeBrokerConnection(brokerInfo); + break; + } + catch (Exception e) + { + lastException = e; + _log.Error("Unable to connect to broker " + _failoverPolicy.GetCurrentBrokerInfo(), e); + // XXX: Should perhaps break out of the do/while here if not a SocketException... + } + } while (_failoverPolicy.FailoverAllowed()); + + _log.Debug("Are we connected:" + _connected); + + if (!_failoverPolicy.FailoverAllowed()) + { + throw new AMQConnectionException("Unable to connect", lastException); + } + + // TODO: this needs to be redone so that we are not spinning. + // A suitable object should be set that is then waited on + // and only notified when a connection is made or when + // the AMQConnection gets closed. + while (!_connected && !Closed) + { + _log.Debug("Sleeping."); + Thread.Sleep(100); + } + if (!_failoverPolicy.FailoverAllowed() || _failoverPolicy.GetCurrentBrokerInfo() == null) + { + if (_lastAMQException != null) + { + throw _lastAMQException; + } + } + } + + private ITransport LoadTransportFromAssembly(string host, int port, String assemblyName, String transportType) + { + Assembly assembly = Assembly.LoadFrom(assemblyName); + foreach (Type type in assembly.GetTypes()) + { + _log.Info(String.Format("type = {0}", type)); + } + Type transport = assembly.GetType(transportType); + if (transport == null) + { + throw new ArgumentException( + String.Format("Type is not found in assembly. Type={0} Assembly={1}", transportType, assemblyName)); + + } + _log.Info("transport = " + transport); + _log.Info("ctors = " + transport.GetConstructors()); + ConstructorInfo info = transport.GetConstructors()[0]; + ITransport result = (ITransport)info.Invoke(new object[] { host, port, this }); + + _log.Info("transport = " + result); + return result; + } + + public void Disconnect() + { + _transport.Close(); + } + + #region IConnection Members + + public string ClientID + { + get + { + CheckNotClosed(); + return _connectionInfo.GetClientName(); + } + set + { + CheckNotClosed(); + _connectionInfo.SetClientName(value); + } + } + + public override void Close() + { + lock (FailoverMutex) + { + // atomically set to closed and check the _previous value was NOT CLOSED + if (Interlocked.Exchange(ref _closed, CLOSED) == NOT_CLOSED) + { + try + { + CloseAllSessions(null); + CloseConnection(); + } + catch (AMQException e) + { + throw new QpidException("Error closing connection: " + e); + } + } + } + } + + private void CloseConnection() + { + _stateManager.ChangeState(AMQState.CONNECTION_CLOSING); + + AMQFrame frame = ConnectionCloseBody.CreateAMQFrame( + 0, 200, "Qpid.NET client is closing the connection.", 0, 0); + + ProtocolWriter.Write(frame); + + _log.Debug("Blocking for connection close ok frame"); + + _stateManager.AttainState(AMQState.CONNECTION_CLOSED); + Disconnect(); + } + + class CreateChannelFailoverSupport : FailoverSupport + { + private static readonly ILog _log = LogManager.GetLogger(typeof(CreateChannelFailoverSupport)); + + private bool _transacted; + private AcknowledgeMode _acknowledgeMode; + int _prefetch; + AMQConnection _connection; + + public CreateChannelFailoverSupport(AMQConnection connection, bool transacted, AcknowledgeMode acknowledgementMode, int prefetch) + { + _connection = connection; + _transacted = transacted; + _acknowledgeMode = acknowledgementMode; + _prefetch = prefetch; + } + + protected override object operation() + { + ushort channelId = _connection.NextChannelId(); + + if (_log.IsDebugEnabled) + { + _log.Debug("Write channel open frame for channel id " + channelId); + } + + // We must create the channel and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AmqChannel channel = new AmqChannel(_connection, + channelId, _transacted, _acknowledgeMode, _prefetch); + _connection.ProtocolSession.AddSessionByChannel(channelId, channel); + _connection.RegisterSession(channelId, channel); + + bool success = false; + try + { + _connection.createChannelOverWire(channelId, (ushort)_prefetch, _transacted); + success = true; + } + catch (AMQException e) + { + throw new QpidException("Error creating channel: " + e, e); + } + finally + { + if (!success) { + _connection.ProtocolSession.RemoveSessionByChannel(channelId); + _connection.DeregisterSession(channelId); + } + } + + if (_connection._started) + { + channel.Start(); + } + return channel; + } + } + + internal ushort NextChannelId() + { + return (ushort) Interlocked.Increment(ref _nextChannelId); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode) + { + return CreateChannel(transacted, acknowledgeMode, AmqChannel.DEFAULT_PREFETCH); + } + + public IChannel CreateChannel(bool transacted, AcknowledgeMode acknowledgeMode, int prefetch) + { + CheckNotClosed(); + if (ChannelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } + else + { + CreateChannelFailoverSupport operation = + new CreateChannelFailoverSupport(this, transacted, acknowledgeMode, prefetch); + return (IChannel)operation.execute(this); + } + } + + public void CloseSession(AmqChannel channel) + { + _protocolSession.CloseSession(channel); + + AMQFrame frame = ChannelCloseBody.CreateAMQFrame( + channel.ChannelId, 200, "JMS client closing channel", 0, 0); + + _log.Debug("Blocking for channel close frame for channel " + channel.ChannelId); + _protocolWriter.SyncWrite(frame, typeof(ChannelCloseOkBody)); + _log.Debug("Received channel close frame"); + // When control resumes at this point, a reply will have been received that + // indicates the broker has closed the channel successfully + } + + public ExceptionListenerDelegate ExceptionListener + { + get + { + CheckNotClosed(); + return _exceptionListener; + } + set + { + CheckNotClosed(); + _exceptionListener = value; + } + } + + /// <summary> + /// Start the connection, i.e. start flowing messages. Note that this method must be called only from a single thread + /// and is not thread safe (which is legal according to the JMS specification). + /// @throws JMSException + /// </summary> + public void Start() + { + CheckNotClosed(); + if (!_started) + { + foreach (DictionaryEntry lde in _sessions) + { + AmqChannel s = (AmqChannel)lde.Value; + s.Start(); + } + _started = true; + } + } + + public void Stop() + { + CheckNotClosed(); + throw new NotImplementedException(); + } + + public IConnectionListener ConnectionListener + { + get { return _connectionListener; } + set { _connectionListener = value; } + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + Close(); + } + + #endregion + + private bool ChannelLimitReached() + { + return _maximumChannelCount != 0 && _sessions.Count == _maximumChannelCount; + } + + /// <summary> + /// Close all the sessions, either due to normal connection closure or due to an error occurring. + /// @param cause if not null, the error that is causing this shutdown + /// </summary> + private void CloseAllSessions(Exception cause) + { + _log.Info("Closing all session in connection " + this); + ICollection sessions = new ArrayList(_sessions.Values); + foreach (AmqChannel channel in sessions) + { + _log.Info("Closing channel " + channel); + if (cause != null) + { + channel.Closed(cause); + } + else + { + try + { + channel.Close(); + } + catch (QpidException e) + { + _log.Error("Error closing channel: " + e); + } + } + } + _log.Info("Done closing all sessions in connection " + this); + } + + public int MaximumChannelCount + { + get + { + CheckNotClosed(); + return _maximumChannelCount; + } + } + + internal void SetMaximumChannelCount(ushort maximumChannelCount) + { + CheckNotClosed(); + _maximumChannelCount = maximumChannelCount; + } + + public uint MaximumFrameSize + { + get + { + return _maximumFrameSize; + } + + set + { + _maximumFrameSize = value; + } + } + + public IDictionary Sessions + { + get + { + return _sessions; + } + } + + public string Host + { + get + { + return _failoverPolicy.GetCurrentBrokerInfo().getHost(); + } + } + + public int Port + { + get + { + return _failoverPolicy.GetCurrentBrokerInfo().getPort(); + } + } + + public string Username + { + get + { + return _connectionInfo.getUsername(); + } + } + + public string Password + { + get + { + return _connectionInfo.getPassword(); + } + } + + public string VirtualHost + { + get + { + return _connectionInfo.getVirtualHost(); + } + } + + /// <summary> + /// Invoked by the AMQProtocolSession when a protocol session exception has occurred. + /// This method sends the exception to a JMS exception listener, if configured, and + /// propagates the exception to sessions, which in turn will propagate to consumers. + /// This allows synchronous consumers to have exceptions thrown to them. + /// </summary> + /// <param name="cause">the exception</param> + public void ExceptionReceived(Exception cause) + { + if (_exceptionListener != null) + { + // Listener expects one of these... + QpidException xe; + + if (cause is QpidException) + { + xe = (QpidException) cause; + } + else + { + xe = new QpidException("Exception thrown against " + ToString() + ": " + cause, cause); + } + // in the case of an IOException, MINA has closed the protocol session so we set _closed to true + // so that any generic client code that tries to close the connection will not mess up this error + // handling sequence + if (cause is IOException) + { + Interlocked.Exchange(ref _closed, CLOSED); + } + _exceptionListener.Invoke(xe); + } + else + { + _log.Error("Connection exception: " + cause); + } + + // An undelivered is not fatal to the connections usability. + if (!(cause is AMQUndeliveredException)) + { + Interlocked.Exchange(ref _closed, CLOSED); + CloseAllSessions(cause); + } + else + { + ; + } + } + + internal void RegisterSession(int channelId, AmqChannel channel) + { + _sessions[channelId] = channel; + } + + internal void DeregisterSession(int channelId) + { + _sessions.Remove(channelId); + } + + /** + * Fire the preFailover event to the registered connection listener (if any) + * + * @param redirect true if this is the result of a redirect request rather than a connection error + * @return true if no listener or listener does not veto change + */ + public bool FirePreFailover(bool redirect) + { + bool proceed = true; + if (_connectionListener != null) + { + proceed = _connectionListener.PreFailover(redirect); + } + return proceed; + } + + /** + * Fire the preResubscribe event to the registered connection listener (if any). If the listener + * vetoes resubscription then all the sessions are closed. + * + * @return true if no listener or listener does not veto resubscription. + * @throws JMSException + */ + public bool FirePreResubscribe() + { + if (_connectionListener != null) + { + bool resubscribe = _connectionListener.PreResubscribe(); + if (!resubscribe) + { + MarkAllSessionsClosed(); + } + return resubscribe; + } + else + { + return true; + } + } + + /** + * Marks all sessions and their children as closed without sending any protocol messages. Useful when + * you need to mark objects "visible" in userland as closed after failover or other significant event that + * impacts the connection. + * <p/> + * The caller must hold the failover mutex before calling this method. + */ + private void MarkAllSessionsClosed() + { + //LinkedList sessionCopy = new LinkedList(_sessions.values()); + ArrayList sessionCopy = new ArrayList(_sessions.Values); + foreach (AmqChannel session in sessionCopy) + { + session.MarkClosed(); + } + _sessions.Clear(); + } + + /** + * Fires a failover complete event to the registered connection listener (if any). + */ + public void FireFailoverComplete() + { + if (_connectionListener != null) + { + _connectionListener.FailoverComplete(); + } + } + + public bool AttemptReconnection(String host, int port, bool useSSL) + { + BrokerInfo bd = new AmqBrokerInfo("amqp", host, port, useSSL); + + _failoverPolicy.setBroker(bd); + + try + { + MakeBrokerConnection(bd); + return true; + } + catch (Exception e) + { + _log.Info("Unable to connect to broker at " + bd, e); + AttemptReconnection(); + } + return false; + } + + private void MakeBrokerConnection(BrokerInfo brokerDetail) + { + try + { + _stateManager = new AMQStateManager(); + _protocolListener = new AMQProtocolListener(this, _stateManager); + _protocolListener.AddFrameListener(_stateManager); + + // Currently there is only one transport option - BlockingSocket. + String assemblyName = "Qpid.Client.Transport.Socket.Blocking.dll"; + String transportType = "Qpid.Client.Transport.Socket.Blocking.BlockingSocketTransport"; + + // Load the transport assembly dynamically. + _transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType); + + // Connect. + _transport.Open(); + _protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener); + _protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this); + _protocolListener.ProtocolSession = _protocolSession; + + // Now start the connection "handshake". + _transport.ProtocolWriter.Write(new ProtocolInitiation()); + + // Blocks until the connection has been opened. + _stateManager.AttainState(AMQState.CONNECTION_OPEN); + + _failoverPolicy.attainedConnection(); + + // XXX: Again this should be changed to a suitable notify. + _connected = true; + } + catch (AMQException e) + { + _lastAMQException = e; + throw e; + } + } + + public bool AttemptReconnection() + { + while (_failoverPolicy.FailoverAllowed()) + { + try + { + MakeBrokerConnection(_failoverPolicy.GetNextBrokerInfo()); + return true; + } + catch (Exception e) + { + if (!(e is AMQException)) + { + _log.Info("Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo(), e); + } + else + { + _log.Info(e.Message + ":Unable to connect to broker at " + _failoverPolicy.GetCurrentBrokerInfo()); + } + } + } + + // Connection unsuccessful. + return false; + } + + /** + * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. + * The caller must hold the failover mutex before calling this method. + */ + public void ResubscribeSessions() + { + ArrayList sessions = new ArrayList(_sessions.Values); + _log.Info(String.Format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.Count)); + foreach (AmqChannel s in sessions) + { + _protocolSession.AddSessionByChannel(s.ChannelId, s); + ReopenChannel(s.ChannelId, (ushort)s.DefaultPrefetch, s.Transacted); + s.Resubscribe(); + } + } + + private void ReopenChannel(ushort channelId, ushort prefetch, bool transacted) + { + try + { + createChannelOverWire(channelId, prefetch, transacted); + } + catch (AMQException e) + { + _protocolSession.RemoveSessionByChannel(channelId); + DeregisterSession(channelId); + throw new AMQException("Error reopening channel " + channelId + " after failover: " + e); + } + } + + void createChannelOverWire(ushort channelId, ushort prefetch, bool transacted) + { + _protocolWriter.SyncWrite(ChannelOpenBody.CreateAMQFrame(channelId, null), typeof (ChannelOpenOkBody)); + + // Don't use the BasicQos frame if connecting to OpenAMQ (at it is not support). We + // know this when we connection using AMQP 0.7 + if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7) + { + // Basic.Qos frame appears to not be supported by OpenAMQ 1.0d. + _protocolWriter.SyncWrite( + BasicQosBody.CreateAMQFrame(channelId, 0, prefetch, false), + typeof (BasicQosOkBody)); + } + + + if (transacted) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Issuing TxSelect for " + channelId); + } + _protocolWriter.SyncWrite(TxSelectBody.CreateAMQFrame(channelId), typeof(TxSelectOkBody)); + } + } + + public String toURL() + { + return _connectionInfo.asUrl(); + } + + class HeartBeatThread + { + int _heartbeatMillis; + IProtocolWriter _protocolWriter; + bool _run = true; + + public HeartBeatThread(IProtocolWriter protocolWriter, int heartbeatMillis) + { + _protocolWriter = protocolWriter; + _heartbeatMillis = heartbeatMillis; + } + + public void Run() + { + while (_run) + { + Thread.Sleep(_heartbeatMillis); + if (!_run) break; + _log.Debug("Sending heartbeat"); + // TODO: Can we optimise this so that heartbeats are only written when we haven't sent anything recently to the broker? + _protocolWriter.Write(HeartbeatBody.FRAME); + } + _log.Info("Heatbeat thread stopped"); + } + + public void Stop() + { + _run = false; + } + } + + public void StartHeartBeatThread(int heartbeatSeconds) + { + _log.Info("Starting new heartbeat thread"); + _heartBeatRunner = new HeartBeatThread(ProtocolWriter, heartbeatSeconds * 1000); + _heartBeatThread = new Thread(new ThreadStart(_heartBeatRunner.Run)); + _heartBeatThread.Name = "HeartBeat"; + _heartBeatThread.Start(); + } + + public void StopHeartBeatThread() + { + if (_heartBeatRunner != null) + { + _log.Info("Stopping old heartbeat thread"); + _heartBeatRunner.Stop(); + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/AMQConnectionException.cs b/dotnet/Qpid.Client/Client/AMQConnectionException.cs new file mode 100644 index 0000000000..603a7b2a1c --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQConnectionException.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client +{ + public class AMQConnectionException : AMQException + { + public AMQConnectionException(String message, Exception e) : base(message, e) + { + } + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/AMQDestination.cs b/dotnet/Qpid.Client/Client/AMQDestination.cs new file mode 100644 index 0000000000..7ea6db4ee8 --- /dev/null +++ b/dotnet/Qpid.Client/Client/AMQDestination.cs @@ -0,0 +1,233 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client +{ + public abstract class AMQDestination + { + protected readonly string _exchangeName; + protected readonly string _exchangeClass; + protected readonly string _destinationName; + protected readonly bool _isExclusive; + protected readonly bool _isAutoDelete; + protected bool _isDurable; + + public bool IsDurable + { + get { return _isDurable; } + } + + protected string _queueName; + + protected AMQDestination(String exchangeName, String exchangeClass, String destinationName, bool isExclusive, + bool isAutoDelete, String queueName) + { + // XXX: This is ugly - OnlyRequired because of ReplyToDestination. +// if (destinationName == null) +// { +// throw new ArgumentNullException("destinationName"); +// } + + // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter. +// if (exchangeName == null) +// { +// throw new ArgumentNullException("exchangeName"); +// } + + // XXX: This is ugly - OnlyRequired because of SendingDestinationAdapter. +// if (exchangeClass == null) +// { +// throw new ArgumentNullException("exchangeClass"); +// } + + _exchangeName = exchangeName; + _exchangeClass = exchangeClass; + _destinationName = destinationName; + _isExclusive = isExclusive; + _isAutoDelete = isAutoDelete; + _queueName = queueName; + } + + public string Name + { + get + { + return _destinationName; + } + } + + public abstract string RoutingKey + { + get; + } + + public abstract string EncodedName + { + get; + } + + public bool AutoDelete + { + get + { + return _isAutoDelete; + } + } + + public string QueueName + { + get + { + return _queueName; + } + set + { + _queueName = value; + } + } + + public string ExchangeName + { + get + { + return _exchangeName; + } + } + + public string ExchangeClass + { + get + { + return _exchangeClass; + } + } + + public bool IsExclusive + { + get + { + return _isExclusive; + } + } + + public bool IsAutoDelete + { + get + { + return _isAutoDelete; + } + } + + public override string ToString() + { + return "Destination: " + _destinationName + ", " + + "Queue Name: " + _queueName + ", Exchange: " + _exchangeName + + ", Exchange class: " + _exchangeClass + ", Exclusive: " + _isExclusive + + ", AutoDelete: " + _isAutoDelete; // +", Routing Key: " + RoutingKey; + } + + public override bool Equals(object o) + { + if (this == o) + { + return true; + } + if (o == null || GetType() != o.GetType()) + { + return false; + } + + AMQDestination that = (AMQDestination) o; + + if (!StringsNotEqualNullSafe(_destinationName, that._destinationName)) + { + return false; + } + if (!StringsNotEqualNullSafe(_exchangeClass, that._exchangeClass)) + { + return false; + } + if (!StringsNotEqualNullSafe(_exchangeName, that._exchangeName)) + { + return false; + } + if (!StringsNotEqualNullSafe(_queueName, that._queueName)) + { + return false; + } + if (_isExclusive != that._isExclusive) + { + return false; + } + if (_isAutoDelete != that._isAutoDelete) + { + return false; + } + return true; + } + + private bool StringsNotEqualNullSafe(string one, string two) + { + if ((one == null && two != null) || + (one != null && !one.Equals(two))) + { + return false; + } + else + { + return true; + } + } + + public override int GetHashCode() + { + int result; + if (_exchangeName == null) + { + result = "".GetHashCode(); + } + else + { + result = _exchangeName.GetHashCode(); + } + if (_exchangeClass != null) + { + result = 29 * result + _exchangeClass.GetHashCode(); + } + if (_destinationName != null) + { + result = 29 * result + _destinationName.GetHashCode(); + } + if (_queueName != null) + { + result = 29 * result + _queueName.GetHashCode(); + } + result = result * (_isExclusive ? 13 : 7); + result = result * (_isAutoDelete ? 13 : 7); + + Console.WriteLine("FIXME HashCode for " + this + " = " + result); + return result; + } + + public abstract bool IsNameRequired { get; } + } +} diff --git a/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs new file mode 100644 index 0000000000..c00a427494 --- /dev/null +++ b/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs @@ -0,0 +1,313 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using Qpid.Client.qms; + +namespace Qpid.Client +{ + public class AmqBrokerInfo : BrokerInfo + { + private string _host = "localhost"; + private int _port = 5672; + private string _transport = "amqp"; + + public readonly string URL_FORMAT_EXAMPLE = + "<transport>://<hostname>[:<port Default=\""+BrokerDetailsConstants.DEFAULT_PORT+"\">][?<option>='<value>'[,<option>='<value>']]"; + + public const long DEFAULT_CONNECT_TIMEOUT = 30000L; + + private Hashtable _options = new Hashtable(); + + public AmqBrokerInfo() + { + } + + // TODO: port URL parsing. + public AmqBrokerInfo(string url) + { + throw new NotImplementedException(); +// this(); +// // URL should be of format tcp://host:port?option='value',option='value' +// try +// { +// URI connection = new URI(url); +// +// string transport = connection.getScheme(); +// +// // Handles some defaults to minimise changes to existing broker URLS e.g. localhost +// if (transport != null) +// { +// //todo this list of valid transports should be enumerated somewhere +// if ((!(transport.equalsIgnoreCase("vm") || +// transport.equalsIgnoreCase("tcp")))) +// { +// if (transport.equalsIgnoreCase("localhost")) +// { +// connection = new URI(DEFAULT_TRANSPORT + "://" + url); +// transport = connection.getScheme(); +// } +// else +// { +// if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' ) +// { +// //Then most likely we have a host:port value +// connection = new URI(DEFAULT_TRANSPORT + "://" + url); +// transport = connection.getScheme(); +// } +// else +// { +// URLHelper.parseError(0, transport.length(), "Unknown transport", url); +// } +// } +// } +// } +// else +// { +// //Default the transport +// connection = new URI(DEFAULT_TRANSPORT + "://" + url); +// transport = connection.getScheme(); +// } +// +// if (transport == null) +// { +// URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + +// " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, ""); +// } +// +// setTransport(transport); +// +// string host = connection.getHost(); +// +// // Fix for Java 1.5 +// if (host == null) +// { +// host = ""; +// } +// +// setHost(host); +// +// int port = connection.getPort(); +// +// if (port == -1) +// { +// // Another fix for Java 1.5 URI handling +// string auth = connection.getAuthority(); +// +// if (auth != null && auth.startsWith(":")) +// { +// setPort(Integer.parseInt(auth.substring(1))); +// } +// else +// { +// setPort(DEFAULT_PORT); +// } +// } +// else +// { +// setPort(port); +// } +// +// string querystring = connection.getQuery(); +// +// URLHelper.parseOptions(_options, querystring); +// +// //Fragment is #string (not used) +// } +// catch (URISyntaxException uris) +// { +// if (uris instanceof URLSyntaxException) +// { +// throw (URLSyntaxException) uris; +// } +// +// URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); +// } + } + + public AmqBrokerInfo(string transport, string host, int port, bool useSSL) : this() + { + _transport = transport; + _host = host; + _port = port; + + if (useSSL) + { + setOption(BrokerDetailsConstants.OPTIONS_SSL, "true"); + } + } + + public string getHost() + { + return _host; + } + + public void setHost(string _host) + { + this._host = _host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public string getTransport() + { + return _transport; + } + + public void setTransport(string _transport) + { + this._transport = _transport; + } + + public string getOption(string key) + { + return (string)_options[key]; + } + + public void setOption(string key, string value) + { + _options[key] = value; + } + + public long getTimeout() + { + if (_options.ContainsKey(BrokerDetailsConstants.OPTIONS_CONNECT_TIMEOUT)) + { + try + { + return long.Parse((string)_options[BrokerDetailsConstants.OPTIONS_CONNECT_TIMEOUT]); + } + catch (FormatException nfe) + { + //Do nothing as we will use the default below. + } + } + + return BrokerDetailsConstants.DEFAULT_CONNECT_TIMEOUT; + } + + public void setTimeout(long timeout) + { + setOption(BrokerDetailsConstants.OPTIONS_CONNECT_TIMEOUT, timeout.ToString()); + } + + public override string ToString() + { + StringBuilder sb = new StringBuilder(); + + sb.Append(_transport); + sb.Append("://"); + + if (!(_transport.ToLower().Equals("vm"))) + { + sb.Append(_host); + } + + sb.Append(':'); + sb.Append(_port); + + // XXX +// sb.Append(printOptionsURL()); + + return sb.ToString(); + } + + public override bool Equals(object o) + { + if (!(o is BrokerInfo)) + { + return false; + } + + BrokerInfo bd = (BrokerInfo) o; + + return StringEqualsIgnoreCase(_host, bd.getHost()) && + (_port == bd.getPort()) && + StringEqualsIgnoreCase(_transport, bd.getTransport()) && + (useSSL() == bd.useSSL()); + + //todo do we need to compare all the options as well? + } + + // TODO: move to util class. + private bool StringEqualsIgnoreCase(string one, string two) + { + return one.ToLower().Equals(two.ToLower()); + } + +// private string printOptionsURL() +// { +// stringBuffer optionsURL = new stringBuffer(); +// +// optionsURL.Append('?'); +// +// if (!(_options.isEmpty())) +// { +// +// for (string key : _options.keySet()) +// { +// optionsURL.Append(key); +// +// optionsURL.Append("='"); +// +// optionsURL.Append(_options.get(key)); +// +// optionsURL.Append("'"); +// +// optionsURL.Append(URLHelper.DEFAULT_OPTION_SEPERATOR); +// } +// } +// +// //remove the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options +// optionsURL.deleteCharAt(optionsURL.length() - 1); +// +// return optionsURL.tostring(); +// } + + public bool useSSL() + { + // To be friendly to users we should be case insensitive. + // or simply force users to conform to OPTIONS_SSL + // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL + + if (_options.ContainsKey(BrokerDetailsConstants.OPTIONS_SSL)) + { + return StringEqualsIgnoreCase((string)_options[BrokerDetailsConstants.OPTIONS_SSL], "true"); + } + + return false; + } + + public void useSSL(bool ssl) + { + setOption(BrokerDetailsConstants.OPTIONS_SSL, ssl.ToString()); + } + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/AmqChannel.cs b/dotnet/Qpid.Client/Client/AmqChannel.cs new file mode 100644 index 0000000000..02818940dd --- /dev/null +++ b/dotnet/Qpid.Client/Client/AmqChannel.cs @@ -0,0 +1,1071 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text.RegularExpressions; +using System.Threading; +using log4net; +using Qpid.Client.Message; +using Qpid.Collections; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client +{ + public class AmqChannel : Closeable, IChannel + { + private const int BASIC_CONTENT_TYPE = 60; + + private static readonly ILog _logger = LogManager.GetLogger(typeof (AmqChannel)); + + private static int _nextSessionNumber = 0; + + private int _sessionNumber; + + // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. + private int _nextConsumerNumber = 1; + + internal const int DEFAULT_PREFETCH = 5000; + + private AMQConnection _connection; + + private bool _transacted; + + private AcknowledgeMode _acknowledgeMode; + + private ushort _channelId; + + private int _defaultPrefetch = DEFAULT_PREFETCH; + + private BlockingQueue _queue = new LinkedBlockingQueue(); + + private Dispatcher _dispatcher; + + private MessageFactoryRegistry _messageFactoryRegistry; + + /// <summary> + /// Set of all producers created by this session + /// </summary> + private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// Maps from consumer tag to JMSMessageConsumer instance + /// </summary> + private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// The counter of the _next producer id. This id is generated by the session and used only to allow the + /// producer to identify itself to the session when deregistering itself. + /// + /// Access to this id does not require to be synchronized since according to the JMS specification only one + /// thread of control is allowed to create producers for any given session instance. + /// </summary> + private long _nextProducerId; + + /// <summary> + /// Responsible for decoding a message fragment and passing it to the appropriate message consumer. + /// </summary> + private class Dispatcher + { + private int _stopped = 0; + + private AmqChannel _containingChannel; + + public Dispatcher(AmqChannel containingChannel) + { + _containingChannel = containingChannel; + } + + /// <summary> + /// Runs the dispatcher. This is intended to be Run in a separate thread. + /// </summary> + public void RunDispatcher() + { + UnprocessedMessage message; + + while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.DequeueBlocking()) != null) + { + //_queue.size() + DispatchMessage(message); + } + + _logger.Info("Dispatcher thread terminating for channel " + _containingChannel._channelId); + } + +// private void DispatchMessage(UnprocessedMessage message) + private void DispatchMessage(UnprocessedMessage message) + { + if (message.DeliverBody != null) + { + BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; + + if (consumer == null) + { + _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring..."); + } + else + { + consumer.NotifyMessage(message, _containingChannel.AcknowledgeMode, _containingChannel.ChannelId); + } + } + else + { + try + { + // Bounced message is processed here, away from the mina thread + AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. + CreateMessage(0, false, message.ContentHeader, message.Bodies); + + int errorCode = message.BounceBody.ReplyCode; + string reason = message.BounceBody.ReplyText; + _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); + + _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); + } + catch (Exception e) + { + _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); + } + } + } + + public void StopDispatcher() + { + Interlocked.Exchange(ref _stopped, 1); + } + } + + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetch) : + this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.NewDefaultRegistry(), defaultPrefetch) + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AmqChannel"/> class. + /// </summary> + /// <param name="con">The con.</param> + /// <param name="channelId">The channel id.</param> + /// <param name="transacted">if set to <c>true</c> [transacted].</param> + /// <param name="acknowledgeMode">The acknowledge mode.</param> + /// <param name="messageFactoryRegistry">The message factory registry.</param> + internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, + MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) + { + _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); + _connection = con; + _transacted = transacted; + if (transacted) + { + _acknowledgeMode = AcknowledgeMode.SessionTransacted; + } + else + { + _acknowledgeMode = acknowledgeMode; + } + _channelId = channelId; + _messageFactoryRegistry = messageFactoryRegistry; + } + + public IBytesMessage CreateBytesMessage() + { + lock (_connection.FailoverMutex) + { + CheckNotClosed(); + try + { + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + catch (AMQException e) + { + throw new QpidException("Unable to create message: " + e); + } + } + } + + public IMessage CreateMessage() + { + lock (_connection.FailoverMutex) + { + CheckNotClosed(); + try + { + // TODO: this is supposed to create a message consisting only of message headers + return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); + } + catch (AMQException e) + { + throw new QpidException("Unable to create message: " + e); + } + } + } + + public ITextMessage CreateTextMessage() + { + lock (_connection.FailoverMutex) + { + CheckNotClosed(); + + try + { + return (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); + } + catch (AMQException e) + { + throw new QpidException("Unable to create message: " + e); + } + } + } + + public ITextMessage CreateTextMessage(string text) + { + lock (_connection.FailoverMutex) + { + CheckNotClosed(); + try + { + ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); + msg.Text = text; + return msg; + } + catch (AMQException e) + { + throw new QpidException("Unable to create message: " + e); + } + } + } + + public bool Transacted + { + get + { + CheckNotClosed(); + return _transacted; + } + } + + public AcknowledgeMode AcknowledgeMode + { + get + { + CheckNotClosed(); + return _acknowledgeMode; + } + } + + public void Commit() + { + CheckNotClosed(); + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + /*Channel.Commit frame = new Channel.Commit(); + frame.channelId = _channelId; + frame.confirmTag = 1;*/ + + // try + // { + // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId)); + // } + // catch (AMQException e) + // { + // throw new JMSException("Error creating session: " + e); + // } + throw new NotImplementedException(); + //_logger.Info("Transaction commited on channel " + _channelId); + } + + public void Rollback() + { + CheckNotClosed(); + CheckTransacted(); // throws IllegalOperationException if not a transacted session + + /*Channel.Rollback frame = new Channel.Rollback(); + frame.channelId = _channelId; + frame.confirmTag = 1;*/ + + // try + // { + // _connection.getProtocolHandler().writeCommandFrameAndWaitForReply(frame, new ChannelReplyListener(_channelId)); + // } + // catch (AMQException e) + // { + // throw new JMSException("Error rolling back session: " + e); + // } + throw new NotImplementedException(); + //_logger.Info("Transaction rolled back on channel " + _channelId); + } + + public override void Close() + { + lock (_connection.FailoverMutex) + { + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session + + lock (_closingLock) + { + SetClosed(); + + // we pass null since this is not an error case + CloseProducersAndConsumers(null); + + try + { + _connection.CloseSession(this); + } + catch (AMQException e) + { + throw new QpidException("Error closing session: " + e); + } + finally + { + _connection.DeregisterSession(_channelId); + } + } + } + } + + private void SetClosed() + { + Interlocked.Exchange(ref _closed, CLOSED); + } + + /// <summary> + /// Close all producers or consumers. This is called either in the error case or when closing the session normally. + /// <param name="amqe">the exception, may be null to indicate no error has occurred</param> + /// + private void CloseProducersAndConsumers(AMQException amqe) + { + try + { + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + CloseConsumers(amqe); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + /** + * Called when the server initiates the closure of the session + * unilaterally. + * @param e the exception that caused this session to be closed. Null causes the + */ + public void Closed(Exception e) + { + lock (_connection.FailoverMutex) + { + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + SetClosed(); + AMQException amqe; + if (e is AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + _connection.DeregisterSession(_channelId); + CloseProducersAndConsumers(amqe); + } + } + + /// <summary> + /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is + /// currently no way of propagating errors to message producers (this is a JMS limitation). + /// </summary> + private void CloseProducers() + { + _logger.Info("Closing producers on session " + this); + // we need to clone the list of producers since the close() method updates the _producers collection + // which would result in a concurrent modification exception + ArrayList clonedProducers = new ArrayList(_producers.Values); + + foreach (BasicMessageProducer prod in clonedProducers) + { + _logger.Info("Closing producer " + prod); + prod.Close(); + } + // at this point the _producers map is empty + } + + /// <summary> + /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. + /// <param name="error">not null if this is a result of an error occurring at the connection level</param> + /// + private void CloseConsumers(Exception error) + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer con in clonedConsumers) + { + if (error != null) + { + con.NotifyError(error); + } + else + { + con.Close(); + } + } + // at this point the _consumers map will be empty + } + + public void Recover() + { + CheckNotClosed(); + CheckNotTransacted(); // throws IllegalOperationException if not a transacted session + + // TODO: This cannot be implemented using 0.8 semantics + throw new NotImplementedException(); + } + + public void Run() + { + throw new NotImplementedException(); + } + +// public IMessagePublisher CreatePublisher(string exchangeName, string exchangeClass) +// { +// return CreatePublisher(exchangeClass, exchangeName, null); +// } + +// public IMessagePublisher CreatePublisher(string exchangeName, string exchangeClass, string routingKey) +// { +// return CreatePublisherBuilder().withExchangeName(exchangeName) +// .withRoutingKey(routingKey).Create(); +// } + + public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", + exchangeName, "none", routingKey)); + return CreateProducerImpl(exchangeName, routingKey, deliveryMode, + timeToLive, immediate, mandatory, priority); + } + + // TODO: Create a producer that doesn't require an IDestination. +// private IMessagePublisher CreateProducerImpl(IDestination destination) +// { +// lock (_closingLock) +// { +// CheckNotClosed(); +// +// AMQDestination amqd = (AMQDestination)destination; +// +// try +// { +// return new BasicMessageProducer(amqd, _transacted, _channelId, +// this, GetNextProducerId()); +// } +// catch (AMQException e) +// { +// _logger.Error("Error creating message producer: " + e, e); +// throw new QpidException("Error creating message producer", e); +// } +// } +// } + + public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, + DeliveryMode deliveryMode, + long timeToLive, bool immediate, bool mandatory, int priority) + { + lock (_closingLock) + { + CheckNotClosed(); + + try + { + return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, + this, GetNextProducerId(), + deliveryMode, timeToLive, immediate, mandatory, priority); + } + catch (AMQException e) + { + _logger.Error("Error creating message producer: " + e, e); + throw new QpidException("Error creating message producer", e); + } + } + } + + public IMessageConsumer CreateConsumer(string queueName, + int prefetch, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) + { + _logger.Debug(String.Format("CreateConsumer queueName={0} prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}", + queueName, prefetch, noLocal, exclusive, durable, subscriptionName)); + return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive, durable, subscriptionName); + } + + private IMessageConsumer CreateConsumerImpl(string queueName, + int prefetch, + bool noLocal, + bool exclusive, + bool durable, + string subscriptionName) + { + + if (durable || subscriptionName != null) + { + throw new NotImplementedException(); // TODO: durable subscriptions. + } + + lock (_closingLock) + { + CheckNotClosed(); + + BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, + _messageFactoryRegistry, this); + try + { + RegisterConsumer(consumer); + } + catch (AMQException e) + { + throw new QpidException("Error registering consumer: " + e, e); + } + + return consumer; + } + } + +// public IDestination CreateQueue(string queueName) +// { +// return new AMQQueue(queueName); +// } +// +// public IDestination CreateTopic(String topicName) +// { +// return new AMQTopic(topicName); +// } + + public IFieldTable CreateFieldTable() + { + return new FieldTable(); + } + +// public IDestination CreateTemporaryQueue() +// { +// return new AMQQueue("TempQueue" + DateTime.Now.Ticks.ToString(), true); +// +//// return new AMQTemporaryQueue(); // XXX: port AMQTemporaryQueue and AMQQueue changes. +// } + +// public IDestination CreateTemporaryTopic() +// { +// throw new NotImplementedException(); // FIXME +// } + + public void Unsubscribe(String name) + { + throw new NotImplementedException(); // FIXME + } + + private void CheckTransacted() + { + if (!Transacted) + { + throw new InvalidOperationException("Channel is not transacted"); + } + } + + private void CheckNotTransacted() + { + if (Transacted) + { + throw new InvalidOperationException("Channel is transacted"); + } + } + + public void MessageReceived(UnprocessedMessage message) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("Message received in session with channel id " + _channelId); + } + _queue.EnqueueBlocking(message); + } + + public int DefaultPrefetch + { + get + { + return _defaultPrefetch; + } + set + { + _defaultPrefetch = value; + } + } + + public ushort ChannelId + { + get + { + return _channelId; + } + } + + public AMQConnection Connection + { + get + { + return _connection; + } + } + + /// <summary> + /// Send an acknowledgement for all messages up to a specified number on this session. + /// <param name="messageNbr">the message number up to an including which all messages will be acknowledged.</param> + /// </summary> + public void SendAcknowledgement(ulong messageNbr) + { + /*if (_logger.IsDebugEnabled) + { + _logger.Debug("Channel Ack being sent for channel id " + _channelId + " and message number " + messageNbr); + }*/ + /*Channel.Ack frame = new Channel.Ack(); + frame.channelId = _channelId; + frame.messageNbr = messageNbr; + _connection.getProtocolHandler().writeFrame(frame);*/ + } + + internal void Start() + { + _dispatcher = new Dispatcher(this); + Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); + dispatcherThread.IsBackground = true; + dispatcherThread.Start(); + } + + internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) + { + _consumers[consumerTag] = consumer; + } + + /// <summary> + /// Called by the MessageConsumer when closing, to deregister the consumer from the + /// map from consumerTag to consumer instance. + /// </summary> + /// <param name="consumerTag">the consumer tag, that was broker-generated</param> + internal void DeregisterConsumer(string consumerTag) + { + _consumers.Remove(consumerTag); + } + + internal void RegisterProducer(long producerId, IMessagePublisher publisher) + { + _producers[producerId] = publisher; + } + + internal void DeregisterProducer(long producerId) + { + _producers.Remove(producerId); + } + + private long GetNextProducerId() + { + return ++_nextProducerId; + } + + public void Dispose() + { + Close(); + } + + /** + * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after + * failover when the client has veoted resubscription. + * + * The caller of this method must already hold the failover mutex. + */ + internal void MarkClosed() + { + SetClosed(); + _connection.DeregisterSession(_channelId); + MarkClosedProducersAndConsumers(); + } + + private void MarkClosedProducersAndConsumers() + { + try + { + // no need for a markClosed* method in this case since there is no protocol traffic closing a producer + CloseProducers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + try + { + MarkClosedConsumers(); + } + catch (QpidException e) + { + _logger.Error("Error closing session: " + e, e); + } + } + + private void MarkClosedConsumers() + { + if (_dispatcher != null) + { + _dispatcher.StopDispatcher(); + } + // we need to clone the list of consumers since the close() method updates the _consumers collection + // which would result in a concurrent modification exception + ArrayList clonedConsumers = new ArrayList(_consumers.Values); + + foreach (BasicMessageConsumer consumer in clonedConsumers) + { + consumer.MarkClosed(); + } + // at this point the _consumers map will be empty + } + + /** + * Resubscribes all producers and consumers. This is called when performing failover. + * @throws AMQException + */ + internal void Resubscribe() + { + ResubscribeProducers(); + ResubscribeConsumers(); + } + + private void ResubscribeProducers() + { + // FIXME: This needs to Replay DeclareExchange method calls. + +// ArrayList producers = new ArrayList(_producers.Values); +// _logger.Debug(String.Format("Resubscribing producers = {0} producers.size={1}", producers, producers.Count)); +// foreach (BasicMessageProducer producer in producers) +// { +// producer.Resubscribe(); +// } + } + + private void ResubscribeConsumers() + { + ArrayList consumers = new ArrayList(_consumers.Values); + _consumers.Clear(); + + foreach (BasicMessageConsumer consumer in consumers) + { + RegisterConsumer(consumer); + } + } + + /// <summary> + /// Callers must hold the failover mutex before calling this method. + /// </summary> + /// <param name="consumer"></param> + void RegisterConsumer(BasicMessageConsumer consumer) + { + String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.Prefetch, consumer.NoLocal, + consumer.Exclusive, consumer.AcknowledgeMode); + + consumer.ConsumerTag = consumerTag; + _consumers.Add(consumerTag, consumer); + } + + public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) + { + DoBind(queueName, exchangeName, routingKey, (FieldTable)args); + } + + public void Bind(string queueName, string exchangeName, string routingKey) + { + DoBind(queueName, exchangeName, routingKey, new FieldTable()); + } + + internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) + { + _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", + queueName, exchangeName, routingKey, args)); + + AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, + queueName, exchangeName, + routingKey, true, args); + _connection.ProtocolWriter.Write(queueBind); + } + + +// /** +// * Declare the queue. +// * @param amqd +// * @param protocolHandler +// * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. +// * @throws AMQException +// */ +// private String DeclareQueue(AMQDestination amqd) +// { +// // For queues (but not topics) we generate the name in the client rather than the +// // server. This allows the name to be reused on failover if required. In general, +// // the destination indicates whether it wants a name generated or not. +// if (amqd.IsNameRequired) +// { +// amqd.QueueName = GenerateUniqueName(); +// } +// +// return DoDeclareQueue(amqd); +// } + + private String ConsumeFromQueue(String queueName, int prefetch, + bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode) + { + // Need to generate a consumer tag on the client so we can exploit the nowait flag. + String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); + + AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, + queueName, tag, noLocal, + acknowledgeMode == AcknowledgeMode.NoAcknowledge, + exclusive, true); + + _connection.ProtocolWriter.Write(basicConsume); + return tag; + } + + public void DeleteExchange(string exchangeName) + { + throw new NotImplementedException(); // FIXME + } + + public void DeleteQueue() + { + throw new NotImplementedException(); // FIXME + } + + public MessageConsumerBuilder CreateConsumerBuilder(string queueName) + { + return new MessageConsumerBuilder(this, queueName); + } + + public MessagePublisherBuilder CreatePublisherBuilder() + { + return new MessagePublisherBuilder(this); + } + +// public void Publish(string exchangeName, string routingKey, bool mandatory, bool immediate, +// IMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, +// bool disableTimestamps) +// { +// lock (Connection.FailoverMutex) +// { +// DoBasicPublish(exchangeName, routingKey, mandatory, immediate, (AbstractQmsMessage)message, deliveryMode, timeToLive, priority, disableTimestamps); +// } +// } + + internal void BasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, + AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, + bool disableTimestamps) + { + lock (Connection.FailoverMutex) + { + DoBasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, timeToLive, priority, disableTimestamps); + } + } + + private void DoBasicPublish(string exchangeName, string routingKey, bool mandatory, bool immediate, AbstractQmsMessage message, DeliveryMode deliveryMode, uint timeToLive, int priority, bool disableTimestamps) + { + AMQFrame publishFrame = BasicPublishBody.CreateAMQFrame(_channelId, 0, exchangeName, + routingKey, mandatory, immediate); + + long currentTime = 0; + if (!disableTimestamps) + { + currentTime = DateTime.UtcNow.Ticks; + message.Timestamp = currentTime; + } + byte[] payload = message.Data; + BasicContentHeaderProperties contentHeaderProperties = message.ContentHeaderProperties; + + if (timeToLive > 0) + { + if (!disableTimestamps) + { + contentHeaderProperties.Expiration = (uint)currentTime + timeToLive; + } + } + else + { + contentHeaderProperties.Expiration = 0; + } + contentHeaderProperties.SetDeliveryMode(deliveryMode); + contentHeaderProperties.Priority = (byte)priority; + + ContentBody[] contentBodies = CreateContentBodies(payload); + AMQFrame[] frames = new AMQFrame[2 + contentBodies.Length]; + for (int i = 0; i < contentBodies.Length; i++) + { + frames[2 + i] = ContentBody.CreateAMQFrame(_channelId, contentBodies[i]); + } + if (contentBodies.Length > 0 && _logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Sending content body frames to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + // weight argument of zero indicates no child content headers, just bodies + AMQFrame contentHeaderFrame = ContentHeaderBody.CreateAMQFrame(_channelId, BASIC_CONTENT_TYPE, 0, contentHeaderProperties, + (uint)payload.Length); + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Sending content header frame to {{exchangeName={0} routingKey={1}}}", exchangeName, routingKey)); + } + + frames[0] = publishFrame; + frames[1] = contentHeaderFrame; + CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + Connection.ConvenientProtocolWriter.WriteFrame(compositeFrame); + } + + /// <summary> + /// Create content bodies. This will split a large message into numerous bodies depending on the negotiated + /// maximum frame size. + /// </summary> + /// <param name="payload"></param> + /// <returns>return the array of content bodies</returns> + private ContentBody[] CreateContentBodies(byte[] payload) + { + if (payload == null) + { + return null; + } + else if (payload.Length == 0) + { + return new ContentBody[0]; + } + // we substract one from the total frame maximum size to account for the end of frame marker in a body frame + // (0xCE byte). + long framePayloadMax = Connection.MaximumFrameSize - 1; + int lastFrame = (payload.Length % framePayloadMax) > 0 ? 1 : 0; + int frameCount = (int)(payload.Length / framePayloadMax) + lastFrame; + ContentBody[] bodies = new ContentBody[frameCount]; + + if (frameCount == 1) + { + bodies[0] = new ContentBody(); + bodies[0].Payload = payload; + } + else + { + long remaining = payload.Length; + for (int i = 0; i < bodies.Length; i++) + { + bodies[i] = new ContentBody(); + byte[] framePayload = new byte[(remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining]; + Array.Copy(payload, (int)framePayloadMax * i, framePayload, 0, framePayload.Length); + bodies[i].Payload = framePayload; + remaining -= framePayload.Length; + } + } + return bodies; + } + + public string GenerateUniqueName() + { + string result = _connection.ProtocolSession.GenerateQueueName(); + return Regex.Replace(result, "[^a-z0-9_]", "_"); + } + + public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); + } + + private string DoDeclareQueue(AMQDestination amqd) + { + string queueName = amqd.QueueName; + bool isDurable = amqd.IsDurable; + bool isExclusive = amqd.IsExclusive; + + DoQueueDeclare(queueName, isDurable, isExclusive, amqd.AutoDelete); + + _logger.Debug("returning amqp.QueueName = " + amqd.QueueName); + return amqd.QueueName; + } + + private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) + { + _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", + queueName, isDurable, isExclusive, isAutoDelete)); + + AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, + false, isDurable, isExclusive, + isAutoDelete, true, null); + + _connection.ProtocolWriter.Write(queueDeclare); + } + + public void DeclareExchange(String exchangeName, String exchangeClass) + { + _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); + + DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); + } + + // AMQP-level method. + private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, + string exchangeClass, bool passive, bool durable, + bool autoDelete, bool xinternal, bool noWait, FieldTable args) + { + _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", + _channelId, exchangeName, exchangeClass)); + + AMQFrame exchangeDeclareFrame = ExchangeDeclareBody.CreateAMQFrame( + channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args); + +// Console.WriteLine(string.Format("XXX AMQP:DeclareExchange frame=[{0}]", exchangeDeclareFrame)); + + // FIXME: Probably need to record the exchangeDeclareBody for later replay. + ExchangeDeclareBody exchangeDeclareBody = (ExchangeDeclareBody)exchangeDeclareFrame.BodyFrame; +// Console.WriteLine(string.Format("XXX AMQP:DeclareExchangeBody=[{0}]", exchangeDeclareBody)); + if (exchangeDeclareBody.Nowait) + { + _connection.ProtocolWriter.Write(exchangeDeclareFrame); + } + else + { + _connection.ConvenientProtocolWriter.SyncWrite(exchangeDeclareFrame, typeof (ExchangeDeclareOkBody)); + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs new file mode 100644 index 0000000000..6d2f1c67b6 --- /dev/null +++ b/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs @@ -0,0 +1,404 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Qpid.Client.Message; +using Qpid.Collections; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client +{ + public class BasicMessageConsumer : Closeable, IMessageConsumer + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageConsumer)); + + private bool _noLocal; + + /// We need to store the "raw" field table so that we can resubscribe in the event of failover being required. +// private FieldTable _rawSelectorFieldTable; +// +// public FieldTable RawSelectorFieldTable +// { +// get { return _rawSelectorFieldTable; } +// } + + /** + * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover + */ + private bool _exclusive; + + public bool Exclusive + { + get { return _exclusive; } + } + + public bool NoLocal + { + get { return _noLocal; } + set { _noLocal = value; } + } + + AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge; + + public AcknowledgeMode AcknowledgeMode + { + get { return _acknowledgeMode; } + } + + private MessageReceivedDelegate _messageListener; + + /// <summary> + /// The consumer tag allows us to close the consumer by sending a jmsCancel method to the + /// broker + /// </summary> + private string _consumerTag; + + /// <summary> + /// We need to know the channel id when constructing frames + /// </summary> + private ushort _channelId; + + private readonly string _queueName; + + /// <summary> + /// Protects the setting of a messageListener + /// </summary> + private readonly object _syncLock = new object(); + + /** + * We store the prefetch field in order to be able to reuse it when resubscribing in the event of failover + */ + private int _prefetch; + + /// <summary> + /// When true indicates that either a message listener is set or that + /// a blocking receive call is in progress + /// </summary> + private bool _receiving; + + /// <summary> + /// Used in the blocking receive methods to receive a message from + /// the Channel thread. Argument true indicates we want strict FIFO semantics + /// </summary> + private readonly SynchronousQueue _synchronousQueue = new SynchronousQueue(true); + + private MessageFactoryRegistry _messageFactory; + + private AmqChannel _channel; + + public BasicMessageConsumer(ushort channelId, string queueName, bool noLocal, + MessageFactoryRegistry messageFactory, AmqChannel channel) + { + _channelId = channelId; + _queueName = queueName; + _noLocal = noLocal; + _messageFactory = messageFactory; + _channel = channel; + } + + #region IMessageConsumer Members + + public MessageReceivedDelegate OnMessage + { + get + { + return _messageListener; + } + set + { + CheckNotClosed(); + + lock (_syncLock) + { + // If someone is already receiving + if (_messageListener != null && _receiving) + { + throw new InvalidOperationException("Another thread is already receiving..."); + } + + _messageListener = value; + + _receiving = (_messageListener != null); + + if (_receiving) + { + _logger.Debug("Message listener set for queue with name " + _queueName); + } + } + } + } + + public IMessage Receive(long delay) + { + CheckNotClosed(); + + lock (_syncLock) + { + // If someone is already receiving + if (_receiving) + { + throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)..."); + } + + _receiving = true; + } + + try + { + object o; + if (delay > 0) + { + //o = _synchronousQueue.Poll(l, TimeUnit.MILLISECONDS); + throw new NotImplementedException("Need to implement synchronousQueue.Poll(timeout"); + } + else + { + o = _synchronousQueue.DequeueBlocking(); + } + return ReturnMessageOrThrow(o); + } + finally + { + lock (_syncLock) + { + _receiving = false; + } + } + } + + public IMessage Receive() + { + return Receive(0); + } + + public IMessage ReceiveNoWait() + { + CheckNotClosed(); + + lock (_syncLock) + { + // If someone is already receiving + if (_receiving) + { + throw new InvalidOperationException("Another thread is already receiving (possibly asynchronously)..."); + } + + _receiving = true; + } + + try + { + object o = _synchronousQueue.Dequeue(); + return ReturnMessageOrThrow(o); + } + finally + { + lock (_syncLock) + { + _receiving = false; + } + } + } + + #endregion + + /// <summary> + /// We can get back either a Message or an exception from the queue. This method examines the argument and deals + /// with it by throwing it (if an exception) or returning it (in any other case). + /// </summary> + /// <param name="o">the object off the queue</param> + /// <returns> a message only if o is a Message</returns> + /// <exception>JMSException if the argument is a throwable. If it is a QpidMessagingException it is rethrown as is, but if not + /// a QpidMessagingException is created with the linked exception set appropriately</exception> + private IMessage ReturnMessageOrThrow(object o) + { + // errors are passed via the queue too since there is no way of interrupting the poll() via the API. + if (o is Exception) + { + Exception e = (Exception) o; + throw new QpidException("Message consumer forcibly closed due to error: " + e, e); + } + else + { + return (IMessage) o; + } + } + + #region IDisposable Members + + public void Dispose() + { + Close(); + } + + #endregion + + public override void Close() + { + lock (_channel.Connection.FailoverMutex) + { + lock (_closingLock) + { + Interlocked.Exchange(ref _closed, CLOSED); + + AMQFrame cancelFrame = BasicCancelBody.CreateAMQFrame(_channelId, _consumerTag, false); + + try + { + _channel.Connection.ConvenientProtocolWriter.SyncWrite( + cancelFrame, typeof(BasicCancelOkBody)); + } + catch (AMQException e) + { + _logger.Error("Error closing consumer: " + e, e); + throw new QpidException("Error closing consumer: " + e); + } + finally + { + DeregisterConsumer(); + } + } + } + } + + /// <summary> + /// Called from the AmqChannel when a message has arrived for this consumer. This methods handles both the case + /// of a message listener or a synchronous receive() caller. + /// </summary> + /// <param name="messageFrame">the raw unprocessed mesage</param> + /// <param name="acknowledgeMode">the acknowledge mode requested for this message</param> + /// <param name="channelId">channel on which this message was sent</param> + internal void NotifyMessage(UnprocessedMessage messageFrame, AcknowledgeMode acknowledgeMode, ushort channelId) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("notifyMessage called with message number " + messageFrame.DeliverBody.DeliveryTag); + } + try + { + AbstractQmsMessage jmsMessage = _messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag, + messageFrame.DeliverBody.Redelivered, + messageFrame.ContentHeader, + messageFrame.Bodies); + + /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge) + { + _channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag); + }*/ + if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge) + { + // we set the session so that when the user calls acknowledge() it can call the method on session + // to send out the appropriate frame + jmsMessage.Channel = _channel; + } + + lock (_syncLock) + { + if (_messageListener != null) + { + _messageListener.Invoke(jmsMessage); + } + else + { + _synchronousQueue.Enqueue(jmsMessage); + } + } + if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge) + { + _channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag); + } + } + catch (Exception e) + { + _logger.Error("Caught exception (dump follows) - ignoring...", e); + } + } + + internal void NotifyError(Exception cause) + { + lock (_syncLock) + { + SetClosed(); + + // we have no way of propagating the exception to a message listener - a JMS limitation - so we + // deal with the case where we have a synchronous receive() waiting for a message to arrive + if (_messageListener == null) + { + // offer only succeeds if there is a thread waiting for an item from the queue + if (_synchronousQueue.EnqueueNoThrow(cause)) + { + _logger.Debug("Passed exception to synchronous queue for propagation to receive()"); + } + } + DeregisterConsumer(); + } + } + + private void SetClosed() + { + Interlocked.Exchange(ref _closed, CLOSED); + } + + /// <summary> + /// Perform cleanup to deregister this consumer. This occurs when closing the consumer in both the clean + /// case and in the case of an error occurring. + /// </summary> + internal void DeregisterConsumer() + { + _channel.DeregisterConsumer(_consumerTag); + } + + public string ConsumerTag + { + get + { + return _consumerTag; + } + set + { + _consumerTag = value; + } + } + + /** + * Called when you need to invalidate a consumer. Used for example when failover has occurred and the + * client has vetoed automatic resubscription. + * The caller must hold the failover mutex. + */ + internal void MarkClosed() + { + SetClosed(); + DeregisterConsumer(); + } + + public int Prefetch + { + get { return _prefetch; } + } + + public string QueueName + { + get { return _queueName; } + } + } +} diff --git a/dotnet/Qpid.Client/Client/BasicMessageProducer.cs b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs new file mode 100644 index 0000000000..a93d2db675 --- /dev/null +++ b/dotnet/Qpid.Client/Client/BasicMessageProducer.cs @@ -0,0 +1,275 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Qpid.Client.Message; +using Qpid.Messaging; + +namespace Qpid.Client +{ + public class BasicMessageProducer : Closeable, IMessagePublisher + { + protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); + + /// <summary> + /// If true, messages will not get a timestamp. + /// </summary> + private bool _disableTimestamps; + + /// <summary> + /// Priority of messages created by this producer. + /// </summary> + private int _messagePriority; + + /// <summary> + /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. + /// + private long _timeToLive; + + /// <summary> + /// Delivery mode used for this producer. + /// </summary> + private DeliveryMode _deliveryMode; + + private bool _immediate; + private bool _mandatory; + + string _exchangeName; + string _routingKey; + + /// <summary> + /// Default encoding used for messages produced by this producer. + /// </summary> + private string _encoding; + + /// <summary> + /// Default encoding used for message produced by this producer. + /// </summary> + private string _mimeType; + + /// <summary> + /// True if this producer was created from a transacted session + /// </summary> + private bool _transacted; + + private ushort _channelId; + + /// <summary> + /// This is an id generated by the session and is used to tie individual producers to the session. This means we + /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers + /// to the session so that when an error is propagated to the session it can close the producer (meaning that + /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). + /// </summary> + private long _producerId; + + /// <summary> + /// The session used to create this producer + /// </summary> + private AmqChannel _channel; + + /// <summary> + /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue + /// </summary> + protected const bool DEFAULT_IMMEDIATE = false; + + /// <summary> + /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is + /// connected to the exchange for the message + /// </summary> + protected const bool DEFAULT_MANDATORY = true; + + public BasicMessageProducer(string exchangeName, string routingKey, + bool transacted, + ushort channelId, + AmqChannel channel, + long producerId, + DeliveryMode deliveryMode, + long timeToLive, + bool immediate, + bool mandatory, + int priority) + { + _exchangeName = exchangeName; + _routingKey = routingKey; + _transacted = transacted; + _channelId = channelId; + _channel = channel; + _producerId = producerId; + _deliveryMode = deliveryMode; + _timeToLive = timeToLive; + _immediate = immediate; + _mandatory = mandatory; + _messagePriority = priority; + + _channel.RegisterProducer(producerId, this); + } + + + #region IMessagePublisher Members + + public DeliveryMode DeliveryMode + { + get + { + CheckNotClosed(); + return _deliveryMode; + } + set + { + CheckNotClosed(); + _deliveryMode = value; + } + } + + public string ExchangeName + { + get { return _exchangeName; } + } + + public string RoutingKey + { + get { return _routingKey; } + } + + public bool DisableMessageID + { + get + { + throw new Exception("The method or operation is not implemented."); + } + set + { + throw new Exception("The method or operation is not implemented."); + } + } + + public bool DisableMessageTimestamp + { + get + { + CheckNotClosed(); + return _disableTimestamps; + } + set + { + CheckNotClosed(); + _disableTimestamps = value; + } + } + + public int Priority + { + get + { + CheckNotClosed(); + return _messagePriority; + } + set + { + CheckNotClosed(); + if (value < 0 || value > 9) + { + throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); + } + _messagePriority = value; + } + } + + public override void Close() + { + _logger.Info("Closing producer " + this); + Interlocked.Exchange(ref _closed, CLOSED); + _channel.DeregisterProducer(_producerId); + } + + public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY, + DEFAULT_IMMEDIATE); + } + + public void Send(IMessage msg) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, + DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + } + + // This is a short-term hack (knowing that this code will be re-vamped sometime soon) + // to facilitate publishing messages to potentially non-existent recipients. + public void Send(IMessage msg, bool mandatory) + { + CheckNotClosed(); + SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, + mandatory, DEFAULT_IMMEDIATE); + } + + public long TimeToLive + { + get + { + CheckNotClosed(); + return _timeToLive; + } + set + { + CheckNotClosed(); + if (value < 0) + { + throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); + } + _timeToLive = value; + } + } + + #endregion + + private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) + { + _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps); + } + + public string MimeType + { + set + { + CheckNotClosed(); + _mimeType = value; + } + } + + public string Encoding + { + set + { + CheckNotClosed(); + _encoding = value; + } + } + + public void Dispose() + { + Close(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Closeable.cs b/dotnet/Qpid.Client/Client/Closeable.cs new file mode 100644 index 0000000000..159f71ac08 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Closeable.cs @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client +{ + public abstract class Closeable + { + /// <summary> + /// Used to ensure orderly closing of the object. The only method that is allowed to be called + /// from another thread of control is close(). + /// </summary> + protected readonly object _closingLock = new object(); + + /// <summary> + /// All access to this field should be using the Inerlocked class, to make it atomic. + /// Hence it is an int since you cannot use a bool with the Interlocked class. + /// </summary> + protected int _closed = NOT_CLOSED; + + protected const int CLOSED = 1; + protected const int NOT_CLOSED = 2; + + /// <summary> + /// Checks the not closed. + /// </summary> + protected void CheckNotClosed() + { + if (_closed == CLOSED) + { + throw new InvalidOperationException("Object " + ToString() + " has been closed"); + } + } + + /// <summary> + /// Gets a value indicating whether this <see cref="Closeable"/> is closed. + /// </summary> + /// <value><c>true</c> if closed; otherwise, <c>false</c>.</value> + public bool Closed + { + get + { + return _closed == CLOSED; + } + } + + /// <summary> + /// Close the resource + /// </summary> + /// <exception cref="QpidMessagingException">If something goes wrong</exception> + public abstract void Close(); + } +} diff --git a/dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs b/dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs new file mode 100644 index 0000000000..43e9a819e4 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Collections/LinkedHashtable.cs @@ -0,0 +1,214 @@ +using System; +using System.Collections; + +namespace Qpid.Collections +{ + public class LinkedHashtable : DictionaryBase + { + /// <summary> + /// Maps from key to LinkedDictionaryEntry + /// </summary> + private Hashtable _indexedValues = new Hashtable(); + + private LinkedDictionaryEntry _head; + + private LinkedDictionaryEntry _tail; + + public class LinkedDictionaryEntry + { + public LinkedDictionaryEntry previous; + public LinkedDictionaryEntry next; + public object key; + public object value; + + public LinkedDictionaryEntry(object key, object value) + { + this.key = key; + this.value = value; + } + } + + public object this[object index] + { + get + { + return ((LinkedDictionaryEntry)_indexedValues[index]).value; + } + + set + { + Dictionary[index] = value; + } + } + + protected override void OnInsertComplete(object key, object value) + { + LinkedDictionaryEntry de = new LinkedDictionaryEntry(key, value); + if (_head == null) + { + _head = de; + _tail = de; + } + else + { + _tail.next = de; + de.previous = _tail; + _tail = de; + } + _indexedValues[key] = de; + } + + protected override void OnSetComplete(object key, object oldValue, object newValue) + { + if (oldValue == null) + { + OnInsertComplete(key, newValue); + } + } + + protected override void OnRemoveComplete(object key, object value) + { + LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; + LinkedDictionaryEntry prev = de.previous; + if (prev == null) + { + _head = de.next; + } + else + { + prev.next = de.next; + } + + LinkedDictionaryEntry next = de.next; + if (next == null) + { + _tail = de; + } + else + { + next.previous = de.previous; + } + } + + public ICollection Values + { + get + { + return InnerHashtable.Values; + } + } + + public bool Contains(object key) + { + return InnerHashtable.Contains(key); + } + + public void Remove(object key) + { + Dictionary.Remove(key); + } + + public LinkedDictionaryEntry Head + { + get + { + return _head; + } + } + + public LinkedDictionaryEntry Tail + { + get + { + return _tail; + } + } + + private class LHTEnumerator : IEnumerator + { + private LinkedHashtable _container; + + private LinkedDictionaryEntry _current; + + /// <summary> + /// Set once we have navigated off the end of the collection + /// </summary> + private bool _needsReset = false; + + public LHTEnumerator(LinkedHashtable container) + { + _container = container; + } + + public object Current + { + get + { + if (_current == null) + { + throw new Exception("Iterator before first element"); + } + else + { + return _current; + } + } + } + + public bool MoveNext() + { + if (_needsReset) + { + return false; + } + else if (_current == null) + { + _current = _container.Head; + } + else + { + _current = _current.next; + } + _needsReset = (_current == null); + return !_needsReset; + } + + public void Reset() + { + _current = null; + _needsReset = false; + } + } + + public new IEnumerator GetEnumerator() + { + return new LHTEnumerator(this); + } + + public void MoveToHead(object key) + { + LinkedDictionaryEntry de = (LinkedDictionaryEntry)_indexedValues[key]; + if (de == null) + { + throw new ArgumentException("Key " + key + " not found"); + } + // if the head is the element then there is nothing to do + if (_head == de) + { + return; + } + de.previous.next = de.next; + if (de.next != null) + { + de.next.previous = de.previous; + } + else + { + _tail = de.previous; + } + de.next = _head; + _head = de; + de.previous = null; + } + } +} diff --git a/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs b/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs new file mode 100644 index 0000000000..20f158f0ea --- /dev/null +++ b/dotnet/Qpid.Client/Client/ConnectionTuneParameters.cs @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client +{ + public class ConnectionTuneParameters + { + private uint _frameMax; + + private ushort _channelMax; + + private uint _hearbeat; + + private uint _txnLimit; + + public uint FrameMax + { + get + { + return _frameMax; + } + set + { + _frameMax = value; + } + } + + public ushort ChannelMax + { + get + { + return _channelMax; + } + set + { + _channelMax = value; + } + } + + public uint Heartbeat + { + get + { + return _hearbeat; + } + set + { + _hearbeat = value; + } + } + + public uint TxnLimit + { + get + { + return _txnLimit; + } + set + { + _txnLimit = value; + } + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverException.cs b/dotnet/Qpid.Client/Client/Failover/FailoverException.cs new file mode 100644 index 0000000000..b13b28a66b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Failover/FailoverException.cs @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.Failover +{ + /// <summary> + /// This exception is thrown when failover is taking place and we need to let other + /// parts of the client know about this. + /// </summary> + class FailoverException : Exception + { + public FailoverException(String message) : base(message) + { + } + } +} diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs new file mode 100644 index 0000000000..89a9809253 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; + +namespace Qpid.Client.Failover +{ + public class FailoverHandler + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverHandler)); + + private AMQConnection _connection; + + /** + * Used where forcing the failover host + */ + private String _host; + + /** + * Used where forcing the failover port + */ + private int _port; + + public FailoverHandler(AMQConnection connection) + { + _connection = connection; + } + + public void Run() + { + if (Thread.CurrentThread.IsBackground) + { + throw new InvalidOperationException("FailoverHandler must Run on a non-background thread."); + } + + AMQProtocolListener pl = _connection.ProtocolListener; + pl.FailoverLatch = new ManualResetEvent(false); + + // We wake up listeners. If they can handle failover, they will extend the + // FailoverSupport class and will in turn block on the latch until failover + // has completed before retrying the operation + _connection.ProtocolListener.PropagateExceptionToWaiters(new FailoverException("Failing over about to start")); + + // Since failover impacts several structures we protect them all with a single mutex. These structures + // are also in child objects of the connection. This allows us to manipulate them without affecting + // client code which runs in a separate thread. + lock (_connection.FailoverMutex) + { + _log.Info("Starting failover process"); + + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" + // state works, without us having to terminate any existing "state waiters". We could theoretically + // have a state waiter waiting until the connection is closed for some reason. Or in future we may have + // a slightly more complex state model therefore I felt it was worthwhile doing this. + AMQStateManager existingStateManager = _connection.ProtocolListener.StateManager; + _connection.ProtocolListener.StateManager = new AMQStateManager(); + if (!_connection.FirePreFailover(_host != null)) + { + _connection.ProtocolListener.StateManager = existingStateManager; + if (_host != null) + { + _connection.ExceptionReceived(new AMQDisconnectedException("Redirect was vetoed by client")); + } + else + { + _connection.ExceptionReceived(new AMQDisconnectedException("Failover was vetoed by client")); + } + pl.FailoverLatch.Set(); + pl.FailoverLatch = null; + return; + } + bool failoverSucceeded; + // when host is non null we have a specified failover host otherwise we all the client to cycle through + // all specified hosts + + // if _host has value then we are performing a redirect. + if (_host != null) + { + failoverSucceeded = _connection.AttemptReconnection(_host, _port, false); + } + else + { + failoverSucceeded = _connection.AttemptReconnection(); + } + + // XXX: at this point it appears that we are going to set StateManager to existingStateManager in + // XXX: both paths of control. + if (!failoverSucceeded) + { + _connection.ProtocolListener.StateManager = existingStateManager; + _connection.ExceptionReceived( + new AMQDisconnectedException("Server closed connection and no failover " + + "was successful")); + } + else + { + _connection.ProtocolListener.StateManager = existingStateManager; + try + { + if (_connection.FirePreResubscribe()) + { + _log.Info("Resubscribing on new connection"); + _connection.ResubscribeSessions(); + } + else + { + _log.Info("Client vetoed automatic resubscription"); + } + _connection.FireFailoverComplete(); + _connection.ProtocolListener.FailoverState = FailoverState.NOT_STARTED; + _log.Info("Connection failover completed successfully"); + } + catch (Exception e) + { + _log.Info("Failover process failed - exception being propagated by protocol handler"); + _connection.ProtocolListener.FailoverState = FailoverState.FAILED; + try + { + _connection.ProtocolListener.OnException(e); + } + catch (Exception ex) + { + _log.Error("Error notifying protocol session of error: " + ex, ex); + } + } + } + } + pl.FailoverLatch.Set(); + } + + public String getHost() + { + return _host; + } + + public void setHost(String host) + { + _host = host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int port) + { + _port = port; + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverState.cs b/dotnet/Qpid.Client/Client/Failover/FailoverState.cs new file mode 100644 index 0000000000..04322eeed4 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Failover/FailoverState.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.Failover +{ + /// <summary> + /// Enumeration of failover states. Used to handle failover from within AMQProtocolHandler where MINA events need to be + /// dealt with and can happen during failover. + /// </summary> + enum FailoverState + { + NOT_STARTED, IN_PROGRESS, FAILED + } +} diff --git a/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs b/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs new file mode 100644 index 0000000000..591c0b1d4f --- /dev/null +++ b/dotnet/Qpid.Client/Client/Failover/FailoverSupport.cs @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; + +namespace Qpid.Client.Failover +{ + public abstract class FailoverSupport + { + private static readonly ILog _log = LogManager.GetLogger(typeof(FailoverSupport)); + + public object execute(AMQConnection con) + { + // We wait until we are not in the middle of failover before acquiring the mutex and then proceeding. + // Any method that can potentially block for any reason should use this class so that deadlock will not + // occur. The FailoverException is propagated by the AMQProtocolHandler to any listeners (e.g. frame listeners) + // that might be causing a block. When that happens, the exception is caught here and the mutex is released + // before waiting for the failover to complete (either successfully or unsuccessfully). + while (true) + { + con.ProtocolListener.BlockUntilNotFailingOver(); + lock (con.FailoverMutex) + { + try + { + return operation(); + } + catch (FailoverException e) + { + _log.Info("Failover exception caught during operation", e); + } + } + } + } + + protected abstract object operation(); + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs new file mode 100644 index 0000000000..d6e196c8dd --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/BasicDeliverMethodHandler.cs @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Qpid.Client.Message; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class BasicDeliverMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicDeliverMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + UnprocessedMessage msg = new UnprocessedMessage(); + msg.DeliverBody = (BasicDeliverBody) evt.Method; + msg.ChannelId = evt.ChannelId; + _logger.Debug("New JmsDeliver method received"); + evt.ProtocolSession.UnprocessedMessageReceived(msg); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs new file mode 100644 index 0000000000..78526f906f --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Qpid.Client.Message; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class BasicReturnMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(BasicReturnMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("New JmsBounce method received"); + UnprocessedMessage msg = new UnprocessedMessage(); + msg.DeliverBody = null; + msg.BounceBody = (BasicReturnBody) evt.Method; + msg.ChannelId = evt.ChannelId; + + evt.ProtocolSession.UnprocessedMessageReceived(msg); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs new file mode 100644 index 0000000000..1031f804a6 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class ChannelCloseMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ChannelCloseMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ChannelClose method received"); + ChannelCloseBody method = (ChannelCloseBody) evt.Method; + + int errorCode = method.ReplyCode; + string reason = method.ReplyText; + if (_logger.IsDebugEnabled) + { + _logger.Debug("Channel close reply code: " + errorCode + ", reason: " + reason); + } + + AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId); + evt.ProtocolSession.WriteFrame(frame); + //if (errorCode != AMQConstant.REPLY_SUCCESS.getCode()) + // HACK + if (errorCode != 200) + { + _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception"); + evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason)); + } + evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs new file mode 100644 index 0000000000..c3acc0b098 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseMethodHandler.cs @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class ConnectionCloseMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ConnectionClose frame received"); + ConnectionCloseBody method = (ConnectionCloseBody) evt.Method; + + int errorCode = method.ReplyCode; + String reason = method.ReplyText; + + evt.ProtocolSession.WriteFrame(ConnectionCloseOkBody.CreateAMQFrame(evt.ChannelId)); + stateManager.ChangeState(AMQState.CONNECTION_CLOSED); + if (errorCode != 200) + { + _logger.Debug("Connection close received with error code " + errorCode); + throw new AMQConnectionClosedException(errorCode, "Error: " + reason); + } + + // this actually closes the connection in the case where it is not an error. + evt.ProtocolSession.CloseProtocolSession(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs new file mode 100644 index 0000000000..0cd60457ea --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionCloseOkHandler.cs @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class ConnectionCloseOkHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionCloseOkHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ConnectionCloseOk frame received"); + ConnectionCloseOkBody method = (ConnectionCloseOkBody)evt.Method; + stateManager.ChangeState(AMQState.CONNECTION_CLOSED); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs new file mode 100644 index 0000000000..b43e2700f6 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionOpenOkMethodHandler.cs @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Client.Protocol; +using Qpid.Client.State; + +namespace Qpid.Client.Handler +{ + public class ConnectionOpenOkMethodHandler : IStateAwareMethodListener + { + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + stateManager.ChangeState(AMQState.CONNECTION_OPEN); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs new file mode 100644 index 0000000000..4437290f5c --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionRedirectMethodHandler.cs @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; + +namespace Qpid.Client.Handler +{ + public class ConnectionRedirectMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionRedirectMethodHandler)); + + private const int DEFAULT_REDIRECT_PORT = 5672; + + private static ConnectionRedirectMethodHandler _handler = new ConnectionRedirectMethodHandler(); + + public static ConnectionRedirectMethodHandler GetInstance() + { + return _handler; + } + + private ConnectionRedirectMethodHandler() + { + } + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + /*_logger.Info("ConnectionRedirect frame received"); + ConnectionRedirectBody method = (ConnectionRedirectBody) evt.Method; + + // the host is in the form hostname:port with the port being optional + int portIndex = method.Host.IndexOf(':'); + String host; + int port; + if (portIndex == -1) + { + host = method.Host; + port = DEFAULT_REDIRECT_PORT; + } + else + { + host = method.Host.Substring(0, portIndex); + port = Int32.Parse(method.Host.Substring(portIndex + 1)); + } + evt.ProtocolSession.Failover(host, port);*/ + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs new file mode 100644 index 0000000000..7c0fbd8f40 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionSecureMethodHandler.cs @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class ConnectionSecureMethodHandler : IStateAwareMethodListener + { + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + AMQFrame response = ConnectionSecureOkBody.CreateAMQFrame(evt.ChannelId, null); + evt.ProtocolSession.WriteFrame(response); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs new file mode 100644 index 0000000000..2bba8662bb --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class ConnectionStartMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(ConnectionStartMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + ConnectionStartBody body = (ConnectionStartBody) evt.Method; + AMQProtocolSession ps = evt.ProtocolSession; + string username = ps.Username; + string password = ps.Password; + + try + { + if (body.Mechanisms == null) + { + throw new AMQException("mechanism not specified in ConnectionStart method frame"); + } + string allMechanisms = Encoding.ASCII.GetString(body.Mechanisms); + string[] mechanisms = allMechanisms.Split(' '); + string selectedMechanism = null; + foreach (string mechanism in mechanisms) + { + if (mechanism.Equals("PLAIN")) + { + selectedMechanism = mechanism; + break; + } + } + + if (selectedMechanism == null) + { + throw new AMQException("No supported security mechanism found, passed: " + mechanisms); + } + + // we always write out a null authzid which we don't currently use + byte[] plainData = new byte[1 + ps.Username.Length + 1 + ps.Password.Length]; + Encoding.UTF8.GetBytes(username, 0, username.Length, plainData, 1); + Encoding.UTF8.GetBytes(password, 0, password.Length, plainData, username.Length + 2); + if (body.Locales == null) + { + throw new AMQException("Locales is not defined in Connection Start method"); + } + string allLocales = Encoding.ASCII.GetString(body.Locales); + string[] locales = allLocales.Split(new char[] { ' ' }); + string selectedLocale; + if (locales != null && locales.Length > 0) + { + selectedLocale = locales[0]; + } + else + { + throw new AMQException("No locales sent from server, passed: " + locales); + } + + stateManager.ChangeState(AMQState.CONNECTION_NOT_TUNED); + FieldTable clientProperties = new FieldTable(); + clientProperties["product"] = "Qpid.NET"; + clientProperties["version"] = "1.0"; + clientProperties["platform"] = GetFullSystemInfo(); + AMQFrame frame = ConnectionStartOkBody.CreateAMQFrame(evt.ChannelId, clientProperties, selectedMechanism, + plainData, selectedLocale); + ps.WriteFrame(frame); + } + catch (Exception e) + { + throw new AMQException(_log, "Unable to decode data: " + e, e); + } + } + + private string GetFullSystemInfo() + { + /*StringBuffer fullSystemInfo = new StringBuffer(); + fullSystemInfo.append(System.getProperty("java.runtime.name")); + fullSystemInfo.append(", " + System.getProperty("java.runtime.version")); + fullSystemInfo.append(", " + System.getProperty("java.vendor")); + fullSystemInfo.append(", " + System.getProperty("os.arch")); + fullSystemInfo.append(", " + System.getProperty("os.name")); + fullSystemInfo.append(", " + System.getProperty("os.version")); + fullSystemInfo.append(", " + System.getProperty("sun.os.patch.level"));*/ + // TODO: add in details here + return ".NET 1.1 Client"; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs new file mode 100644 index 0000000000..8b276c09e9 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Handler/ConnectionTuneMethodHandler.cs @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Qpid.Client.Protocol; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Handler +{ + public class ConnectionTuneMethodHandler : IStateAwareMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(ConnectionTuneMethodHandler)); + + public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt) + { + _logger.Debug("ConnectionTune frame received"); + ConnectionTuneBody frame = (ConnectionTuneBody) evt.Method; + AMQProtocolSession session = evt.ProtocolSession; + + ConnectionTuneParameters parameters = session.ConnectionTuneParameters; + if (parameters == null) + { + parameters = new ConnectionTuneParameters(); + } + + _logger.Info(String.Format("ConnectionTune.heartbeat = {0}.", frame.Heartbeat)); + + parameters.FrameMax = frame.FrameMax; + parameters.FrameMax = 65535; + //params.setChannelMax(frame.channelMax); + parameters.Heartbeat = frame.Heartbeat; + session.ConnectionTuneParameters = parameters; + + stateManager.ChangeState(AMQState.CONNECTION_NOT_OPENED); + session.WriteFrame(ConnectionTuneOkBody.CreateAMQFrame( + evt.ChannelId, frame.ChannelMax, 65535, frame.Heartbeat)); + session.WriteFrame(ConnectionOpenBody.CreateAMQFrame( + evt.ChannelId, session.AMQConnection.VirtualHost, null, true)); + + if (frame.Heartbeat > 0) + { + evt.ProtocolSession.AMQConnection.StartHeartBeatThread(frame.Heartbeat); + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs new file mode 100644 index 0000000000..eb34fa45db --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/AMQMessage.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class AMQMessage + { + protected IContentHeaderProperties _contentHeaderProperties; + + /// <summary> + /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required + /// </summary> + protected AmqChannel _channel; + + public AMQMessage(IContentHeaderProperties properties) + { + _contentHeaderProperties = properties; + } + + public AmqChannel Channel + { + get + { + return _channel; + } + + set + { + _channel = value; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs new file mode 100644 index 0000000000..e485bb6d34 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public abstract class AbstractQmsMessageFactory : IMessageFactory + { + public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, ContentHeaderBody contentHeader, IList bodies) + { + AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); + msg.Redelivered = redelivered; + return msg; + } + + public abstract AbstractQmsMessage CreateMessage(); + + /// <summary> + /// + /// </summary> + /// <param name="messageNbr"></param> + /// <param name="contentHeader"></param> + /// <param name="bodies"></param> + /// <returns></returns> + /// <exception cref="AMQException"></exception> + protected abstract AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, + ContentHeaderBody contentHeader, + IList bodies); + } +} diff --git a/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs new file mode 100644 index 0000000000..c84e9de1b9 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -0,0 +1,800 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class SendOnlyDestination : AMQDestination + { + private static readonly ILog _log = LogManager.GetLogger(typeof(string)); + + public SendOnlyDestination(string exchangeName, string routingKey) + : base(exchangeName, null, null, false, false, routingKey) + { + _log.Debug( + string.Format("Creating SendOnlyDestination with exchangeName={0} and routingKey={1}", + exchangeName, routingKey)); + } + + public override string EncodedName + { + get { return ExchangeName + ":" + QueueName; } + } + + public override string RoutingKey + { + get { return QueueName; } + } + + public override bool IsNameRequired + { + get { throw new NotImplementedException(); } + } + } + + public abstract class AbstractQmsMessage : AMQMessage, IMessage + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AbstractQmsMessage)); + + protected ulong _messageNbr; + + protected bool _redelivered; + + protected AbstractQmsMessage() : base(new BasicContentHeaderProperties()) + { + } + + protected AbstractQmsMessage(ulong messageNbr, BasicContentHeaderProperties contentHeader) + : this(contentHeader) + { + _messageNbr = messageNbr; + } + + protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader) + : base(contentHeader) + { + } + + public string MessageId + { + get + { + if (ContentHeaderProperties.MessageId == null) + { + ContentHeaderProperties.MessageId = "ID:" + _messageNbr; + } + return ContentHeaderProperties.MessageId; + } + set + { + ContentHeaderProperties.MessageId = value; + } + } + + public long Timestamp + { + get + { + // TODO: look at ulong/long choice + return (long) ContentHeaderProperties.Timestamp; + } + set + { + ContentHeaderProperties.Timestamp = (ulong) value; + } + } + + public byte[] CorrelationIdAsBytes + { + get + { + return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); + } + set + { + ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); + } + } + + public string CorrelationId + { + get + { + return ContentHeaderProperties.CorrelationId; + } + set + { + ContentHeaderProperties.ContentType = value; + } + } + + struct Dest + { + public string ExchangeName; + public string RoutingKey; + + public Dest(string exchangeName, string routingKey) + { + ExchangeName = exchangeName; + RoutingKey = routingKey; + } + } + + public string ReplyToExchangeName + { + get + { + Dest dest = ReadReplyToHeader(); + return dest.ExchangeName; + } + set + { + Dest dest = ReadReplyToHeader(); + dest.ExchangeName = value; + WriteReplyToHeader(dest); + } + } + + public string ReplyToRoutingKey + { + get + { + Dest dest = ReadReplyToHeader(); + return dest.RoutingKey; + } + set + { + Dest dest = ReadReplyToHeader(); + dest.RoutingKey = value; + WriteReplyToHeader(dest); + } + } + + private Dest ReadReplyToHeader() + { + string replyToEncoding = ContentHeaderProperties.ReplyTo; + if (replyToEncoding == null) + { + return new Dest(); + } + else + { + string routingKey; + string exchangeName = GetExchangeName(replyToEncoding, out routingKey); + return new Dest(exchangeName, routingKey); + } + } + + private void WriteReplyToHeader(Dest dest) + { + string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); + ContentHeaderProperties.ReplyTo = encodedDestination; + } + + private static string GetExchangeName(string replyToEncoding, out string routingKey) + { + string[] split = replyToEncoding.Split(new char[':']); + if (_log.IsDebugEnabled) + { + _log.Debug(string.Format("replyToEncoding = '{0}'", replyToEncoding)); + _log.Debug(string.Format("split = {0}", split)); + _log.Debug(string.Format("split.Length = {0}", split.Length)); + } + if (split.Length == 1) + { + // Using an alternative split implementation here since it appears that string.Split + // is broken in .NET. It doesn't split when the first character is the delimiter. + // Here we check for the first character being the delimiter. This handles the case + // where ExchangeName is empty (i.e. sends will be to the default exchange). + if (replyToEncoding[0] == ':') + { + split = new string[2]; + split[0] = null; + split[1] = replyToEncoding.Substring(1); + if (_log.IsDebugEnabled) + { + _log.Debug("Alternative split method..."); + _log.Debug(string.Format("split = {0}", split)); + _log.Debug(string.Format("split.Length = {0}", split.Length)); + } + } + } + if (split.Length != 2) + { + throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); + } + + string exchangeName = split[0]; + routingKey = split[1]; + return exchangeName; + } + + public DeliveryMode DeliveryMode + { + get + { + byte b = ContentHeaderProperties.DeliveryMode; + switch (b) + { + case 1: + return DeliveryMode.NonPersistent; + case 2: + return DeliveryMode.Persistent; + default: + throw new QpidException("Illegal value for delivery mode in content header properties"); + } + } + set + { + ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); + } + } + + public bool Redelivered + { + get + { + return _redelivered; + } + set + { + _redelivered = value; + } + } + + public string Type + { + get + { + return MimeType; + } + set + { + //MimeType = value; + } + } + + public long Expiration + { + get + { + return ContentHeaderProperties.Expiration; + } + set + { + ContentHeaderProperties.Expiration = (uint) value; + } + } + + public int Priority + { + get + { + return ContentHeaderProperties.Priority; + } + set + { + ContentHeaderProperties.Priority = (byte) value; + } + } + + // FIXME: implement + public string ContentType + { + get { throw new NotImplementedException(); } + set { throw new NotImplementedException(); } + } + + // FIXME: implement + public string ContentEncoding + { + get { throw new NotImplementedException(); } + set { throw new NotImplementedException(); } + } + + public void Acknowledge() + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_channel != null) + { + _channel.SendAcknowledgement(_messageNbr); + } + } + + public IHeaders Headers + { + get { return new QpidHeaders(this); } + } + + public abstract void ClearBody(); + + /// <summary> + /// Get a String representation of the body of the message. Used in the + /// toString() method which outputs this before message properties. + /// </summary> + /// <exception cref="QpidException"></exception> + public abstract string ToBodyString(); + + /// <summary> + /// Return the raw byte array that is used to populate the frame when sending + /// the message. + /// </summary> + /// <value>a byte array of message data</value> + public abstract byte[] Data + { + get; + set; + } + + public abstract string MimeType + { + get; + } + + public override string ToString() + { + try + { + StringBuilder buf = new StringBuilder("Body:\n"); + buf.Append(ToBodyString()); + buf.Append("\nQmsTimestamp: ").Append(Timestamp); + buf.Append("\nQmsExpiration: ").Append(Expiration); + buf.Append("\nQmsPriority: ").Append(Priority); + buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); + buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); + buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); + buf.Append("\nAMQ message number: ").Append(_messageNbr); + buf.Append("\nProperties:"); + if (ContentHeaderProperties.Headers == null) + { + buf.Append("<NONE>"); + } + else + { + buf.Append(Headers.ToString()); + } + return buf.ToString(); + } + catch (Exception e) + { + return e.ToString(); + } + } + + public IFieldTable UnderlyingMessagePropertiesMap + { + get + { + return ContentHeaderProperties.Headers; + } + set + { + ContentHeaderProperties.Headers = (FieldTable)value; + } + } + + public FieldTable PopulateHeadersFromMessageProperties() + { + if (ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + // + // We need to convert every property into a String representation + // Note that type information is preserved in the property name + // + FieldTable table = new FieldTable(); + foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) + { + string propertyName = (string) entry.Key; + if (propertyName == null) + { + continue; + } + else + { + table[propertyName] = entry.Value.ToString(); + } + } + return table; + } + } + + /// <summary> + /// Get the AMQ message number assigned to this message + /// </summary> + /// <returns>the message number</returns> + public ulong MessageNbr + { + get + { + return _messageNbr; + } + set + { + _messageNbr = value; + } + } + + public BasicContentHeaderProperties ContentHeaderProperties + { + get + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + } + } + + internal class QpidHeaders : IHeaders + { + public const char BOOLEAN_PROPERTY_PREFIX = 'B'; + public const char BYTE_PROPERTY_PREFIX = 'b'; + public const char SHORT_PROPERTY_PREFIX = 's'; + public const char INT_PROPERTY_PREFIX = 'i'; + public const char LONG_PROPERTY_PREFIX = 'l'; + public const char FLOAT_PROPERTY_PREFIX = 'f'; + public const char DOUBLE_PROPERTY_PREFIX = 'd'; + public const char STRING_PROPERTY_PREFIX = 'S'; + + AbstractQmsMessage _message; + + public QpidHeaders(AbstractQmsMessage message) + { + _message = message; + } + + public bool Contains(string name) + { + CheckPropertyName(name); + if (_message.ContentHeaderProperties.Headers == null) + { + return false; + } + else + { + // TODO: fix this + return _message.ContentHeaderProperties.Headers.Contains(STRING_PROPERTY_PREFIX + name); + } + } + + public void Clear() + { + if (_message.ContentHeaderProperties.Headers != null) + { + _message.ContentHeaderProperties.Headers.Clear(); + } + } + + public string this[string name] + { + get + { + return GetString(name); + } + set + { + SetString(name, value); + } + } + + public bool GetBoolean(string name) + { + CheckPropertyName(name); + if (_message.ContentHeaderProperties.Headers == null) + { + return false; + } + else + { + object b = _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name]; + + if (b == null) + { + return false; + } + else + { + return (bool)b; + } + } + } + + public void SetBoolean(string name, bool b) + { + CheckPropertyName(name); + _message.ContentHeaderProperties.Headers[BOOLEAN_PROPERTY_PREFIX + name] = b; + } + + public byte GetByte(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object b = _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName]; + if (b == null) + { + return 0; + } + else + { + return (byte)b; + } + } + } + + public void SetByte(string propertyName, byte b) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[BYTE_PROPERTY_PREFIX + propertyName] = b; + } + + public short GetShort(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object s = _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName]; + if (s == null) + { + return 0; + } + else + { + return (short)s; + } + } + } + + public void SetShort(string propertyName, short i) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[SHORT_PROPERTY_PREFIX + propertyName] = i; + } + + public int GetInt(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object i = _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName]; + if (i == null) + { + return 0; + } + else + { + return (int)i; + } + } + } + + public void SetInt(string propertyName, int i) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[INT_PROPERTY_PREFIX + propertyName] = i; + } + + public long GetLong(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object l = _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName]; + if (l == null) + { + // temp - the spec says do this but this throws a NumberFormatException + //return Long.valueOf(null).longValue(); + return 0; + } + else + { + return (long)l; + } + } + } + + public void SetLong(string propertyName, long l) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[LONG_PROPERTY_PREFIX + propertyName] = l; + } + + public float GetFloat(String propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object f = _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName]; + if (f == null) + { + return 0; + } + else + { + return (float)f; + } + } + } + + public void SetFloat(string propertyName, float f) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[FLOAT_PROPERTY_PREFIX + propertyName] = f; + } + + public double GetDouble(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return 0; + } + else + { + object d = _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName]; + if (d == null) + { + return 0; + } + else + { + return (double)d; + } + } + } + + public void SetDouble(string propertyName, double v) + { + CheckPropertyName(propertyName); + _message.ContentHeaderProperties.Headers[DOUBLE_PROPERTY_PREFIX + propertyName] = v; + } + + public string GetString(string propertyName) + { + CheckPropertyName(propertyName); + if (_message.ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + return (string)_message.ContentHeaderProperties.Headers[STRING_PROPERTY_PREFIX + propertyName]; + } + } + + public void SetString(string propertyName, string value) + { + CheckPropertyName(propertyName); + CreatePropertyMapIfRequired(); + propertyName = STRING_PROPERTY_PREFIX + propertyName; + _message.ContentHeaderProperties.Headers[propertyName] = value; + } + + private void CheckPropertyName(string propertyName) + { + if (propertyName == null) + { + throw new ArgumentException("Property name must not be null"); + } + else if ("".Equals(propertyName)) + { + throw new ArgumentException("Property name must not be the empty string"); + } + + if (_message.ContentHeaderProperties.Headers == null) + { + _message.ContentHeaderProperties.Headers = new FieldTable(); + } + } + + private void CreatePropertyMapIfRequired() + { + if (_message.ContentHeaderProperties.Headers == null) + { + _message.ContentHeaderProperties.Headers = new FieldTable(); + } + } + + public override string ToString() + { + StringBuilder buf = new StringBuilder("{"); + int i = 0; + foreach (DictionaryEntry entry in _message.ContentHeaderProperties.Headers) + { + ++i; + if (i > 1) + { + buf.Append(", "); + } + string propertyName = (string)entry.Key; + if (propertyName == null) + { + buf.Append("\nInternal error: Property with NULL key defined"); + } + else + { + buf.Append(propertyName.Substring(1)); + + buf.Append(" : "); + + char typeIdentifier = propertyName[0]; + buf.Append(typeIdentifierToName(typeIdentifier)); + buf.Append(" = ").Append(entry.Value); + } + } + buf.Append("}"); + return buf.ToString(); + } + + private static string typeIdentifierToName(char typeIdentifier) + { + switch (typeIdentifier) + { + case BOOLEAN_PROPERTY_PREFIX: + return "boolean"; + case BYTE_PROPERTY_PREFIX: + return "byte"; + case SHORT_PROPERTY_PREFIX: + return "short"; + case INT_PROPERTY_PREFIX: + return "int"; + case LONG_PROPERTY_PREFIX: + return "long"; + case FLOAT_PROPERTY_PREFIX: + return "float"; + case DOUBLE_PROPERTY_PREFIX: + return "double"; + case STRING_PROPERTY_PREFIX: + return "string"; + default: + return "unknown ( '" + typeIdentifier + "')"; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs new file mode 100644 index 0000000000..2e71bfc948 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public interface IMessageFactory + { + /// <summary> + /// Create a message + /// </summary> + /// <param name="messageNbr"></param> + /// <param name="redelivered"></param> + /// <param name="contentHeader"></param> + /// <param name="bodies"></param> + /// <returns></returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies); + + /// <summary> + /// Creates the message. + /// </summary> + /// <returns></returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(); + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs new file mode 100644 index 0000000000..3965d531bb --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class MessageFactoryRegistry + { + private readonly Hashtable _mimeToFactoryMap = new Hashtable(); + + public void RegisterFactory(string mimeType, IMessageFactory mf) + { + if (mf == null) + { + throw new ArgumentNullException("Message factory"); + } + if (mimeType == null) + { + throw new ArgumentNullException("mf"); + } + _mimeToFactoryMap[mimeType] = mf; + } + + public void DeregisterFactory(string mimeType) + { + _mimeToFactoryMap.Remove(mimeType); + } + + /// <summary> + /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate + /// concrete message type. + /// </summary> + /// <param name="messageNbr">the AMQ message id</param> + /// <param name="redelivered">true if redelivered</param> + /// <param name="contentHeader">the content header that was received</param> + /// <param name="bodies">a list of ContentBody instances</param> + /// <returns>the message.</returns> + /// <exception cref="AMQException"/> + /// <exception cref="QpidException"/> + public AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) + { + BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.Properties; + + if (properties.ContentType == null) + { + properties.ContentType = ""; + } + + IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType]; + if (mf == null) + { + throw new AMQException("Unsupport MIME type of " + properties.ContentType); + } + else + { + return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies); + } + } + + public AbstractQmsMessage CreateMessage(string mimeType) + { + if (mimeType == null) + { + throw new ArgumentNullException("Mime type must not be null"); + } + IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType]; + if (mf == null) + { + throw new AMQException("Unsupport MIME type of " + mimeType); + } + else + { + return mf.CreateMessage(); + } + } + + /// <summary> + /// Construct a new registry with the default message factories registered + /// </summary> + /// <returns>a message factory registry</returns> + public static MessageFactoryRegistry NewDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.RegisterFactory("text/plain", new QpidTextMessageFactory()); + mf.RegisterFactory("text/xml", new QpidTextMessageFactory()); + mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory()); + // TODO: use bytes message for default message factory + // MJA - just added this bit back in... + mf.RegisterFactory("", new QpidBytesMessageFactory()); + return mf; + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs new file mode 100644 index 0000000000..b7911b44b9 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs @@ -0,0 +1,581 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Text; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage + { + private const string MIME_TYPE = "application/octet-stream"; + + /// <summary> + /// The backingstore for the data + /// </summary> + private MemoryStream _dataStream; + + private int _bodyLength; + + private BinaryReader _reader; + + private BinaryWriter _writer; + + public QpidBytesMessage() : this(null) + { + } + + /// <summary> + /// Construct a bytes message with existing data. + /// </summary> + /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in + /// write-only mode</param> + QpidBytesMessage(byte[] data) : base() + { + // superclass constructor has instantiated a content header at this point + ContentHeaderProperties.ContentType = MIME_TYPE; + if (data == null) + { + _dataStream = new MemoryStream(); + _writer = new BinaryWriter(_dataStream); + } + else + { + _dataStream = new MemoryStream(data); + _bodyLength = data.Length; + _reader = new BinaryReader(_dataStream); + } + } + + public QpidBytesMessage(ulong messageNbr, byte[] data, ContentHeaderBody contentHeader) + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + : base(messageNbr, (BasicContentHeaderProperties) contentHeader.Properties) + { + ContentHeaderProperties.ContentType = MIME_TYPE; + _dataStream = new MemoryStream(data); + _bodyLength = data.Length; + _reader = new BinaryReader(_dataStream); + } + + public override void ClearBody() + { + if (_reader != null) + { + _reader.Close(); + _reader = null; + } + _dataStream = new MemoryStream(); + _bodyLength = 0; + + _writer = new BinaryWriter(_dataStream); + } + + public override string ToBodyString() + { + CheckReadable(); + try + { + return GetText(); + } + catch (IOException e) + { + throw new QpidException(e.ToString()); + } + } + + private string GetText() + { + if (_dataStream != null) + { + // we cannot just read the underlying buffer since it may be larger than the amount of + // "filled" data. Length is not the same as Capacity. + byte[] data = new byte[_dataStream.Length]; + _dataStream.Read(data, 0, (int)_dataStream.Length); + return Encoding.UTF8.GetString(data); + } + else + { + return null; + } + } + + public override byte[] Data + { + get + { + if (_dataStream == null) + { + return null; + } + else + { + byte[] data = new byte[_dataStream.Length]; + _dataStream.Position = 0; + _dataStream.Read(data, 0, (int) _dataStream.Length); + return data; + } + } + set + { + throw new NotSupportedException("Cannot set data payload except during construction"); + } + } + + public override string MimeType + { + get + { + return MIME_TYPE; + } + } + + public long BodyLength + { + get + { + CheckReadable(); + return _bodyLength; + } + } + + /// <summary> + /// + /// </summary> + /// <exception cref="MessageNotReadableException">if the message is in write mode</exception> + private void CheckReadable() + { + if (_reader == null) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + private void CheckWritable() + { + if (_reader != null) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + public bool ReadBoolean() + { + CheckReadable(); + try + { + return _reader.ReadBoolean(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public byte ReadByte() + { + CheckReadable(); + try + { + return _reader.ReadByte(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public short ReadSignedByte() + { + CheckReadable(); + try + { + return _reader.ReadSByte(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public short ReadShort() + { + CheckReadable(); + try + { + return _reader.ReadInt16(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public char ReadChar() + { + CheckReadable(); + try + { + return _reader.ReadChar(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadInt() + { + CheckReadable(); + try + { + return _reader.ReadInt32(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public long ReadLong() + { + CheckReadable(); + try + { + return _reader.ReadInt64(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public float ReadFloat() + { + CheckReadable(); + try + { + return _reader.ReadSingle(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public double ReadDouble() + { + CheckReadable(); + try + { + return _reader.ReadDouble(); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public string ReadUTF() + { + CheckReadable(); + try + { + byte[] data = _reader.ReadBytes((int)_dataStream.Length); + return Encoding.UTF8.GetString(data); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadBytes(byte[] bytes) + { + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + CheckReadable(); + try + { + return _reader.Read(bytes, 0, bytes.Length); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadBytes(byte[] bytes, int count) + { + CheckReadable(); + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + if (count < 0) + { + throw new ArgumentOutOfRangeException("count must be >= 0"); + } + if (count > bytes.Length) + { + count = bytes.Length; + } + + try + { + return _reader.Read(bytes, 0, count); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteBoolean(bool b) + { + CheckWritable(); + try + { + _writer.Write(b); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteByte(byte b) + { + CheckWritable(); + try + { + _writer.Write(b); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteShort(short i) + { + CheckWritable(); + try + { + _writer.Write(i); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteChar(char c) + { + CheckWritable(); + try + { + _writer.Write(c); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteSignedByte(short value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteDouble(double value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteFloat(float value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteInt(int value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteLong(long value) + { + CheckWritable(); + try + { + _writer.Write(value); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(int i) + { + CheckWritable(); + try + { + _writer.Write(i); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(long l) + { + CheckWritable(); + try + { + _writer.Write(l); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(float v) + { + CheckWritable(); + try + { + _writer.Write(v); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Write(double v) + { + CheckWritable(); + try + { + _writer.Write(v); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteUTF(string value) + { + CheckWritable(); + try + { + byte[] encodedData = Encoding.UTF8.GetBytes(value); + _writer.Write(encodedData); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteBytes(byte[] bytes) + { + CheckWritable(); + try + { + _writer.Write(bytes); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void WriteBytes(byte[] bytes, int offset, int length) + { + CheckWritable(); + try + { + _writer.Write(bytes, offset, length); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public void Reset() + { + CheckWritable(); + try + { + _writer.Close(); + _writer = null; + _reader = new BinaryReader(_dataStream); + _bodyLength = (int) _dataStream.Length; + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs new file mode 100644 index 0000000000..3f2a6c531f --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class QpidBytesMessageFactory : AbstractQmsMessageFactory + { + protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, + ContentHeaderBody contentHeader, + IList bodies) + { + byte[] data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + data = ((ContentBody)bodies[0]).Payload; + } + else + { + data = new byte[(long)contentHeader.BodySize]; + int currentPosition = 0; + foreach (ContentBody cb in bodies) + { + Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + currentPosition += cb.Payload.Length; + } + } + + return new QpidBytesMessage(messageNbr, data, contentHeader); + } + + public override AbstractQmsMessage CreateMessage() + { + return new QpidBytesMessage(); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs new file mode 100644 index 0000000000..4c16038d4b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Qpid.Framing; +using Qpid.Messaging; + +namespace Qpid.Client.Message +{ + public class QpidTextMessage : AbstractQmsMessage, ITextMessage + { + private const string MIME_TYPE = "text/plain"; + + private byte[] _data; + + private string _decodedValue; + + public QpidTextMessage() : this(null, null) + { + } + + public QpidTextMessage(byte[] data, String encoding) : base() + { + // the superclass has instantied a content header at this point + ContentHeaderProperties.ContentType= MIME_TYPE; + _data = data; + ContentHeaderProperties.Encoding = encoding; + } + + public QpidTextMessage(ulong messageNbr, byte[] data, BasicContentHeaderProperties contentHeader) + : base(messageNbr, contentHeader) + { + contentHeader.ContentType = MIME_TYPE; + _data = data; + } + + public QpidTextMessage(byte[] data) : this(data, null) + { + } + + public QpidTextMessage(string text) + { + Text = text; + } + + public override void ClearBody() + { + _data = null; + _decodedValue = null; + } + + public override string ToBodyString() + { + return Text; + } + + public override byte[] Data + { + get + { + return _data; + } + set + { + _data = value; + } + } + + public override string MimeType + { + get + { + return MIME_TYPE; + } + } + + public string Text + { + get + { + if (_data == null && _decodedValue == null) + { + return null; + } + else if (_decodedValue != null) + { + return _decodedValue; + } + else + { + if (ContentHeaderProperties.Encoding != null) + { + // throw ArgumentException if the encoding is not supported + _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data); + } + else + { + _decodedValue = Encoding.Default.GetString(_data); + } + return _decodedValue; + } + } + + set + { + if (ContentHeaderProperties.Encoding == null) + { + _data = Encoding.Default.GetBytes(value); + } + else + { + // throw ArgumentException if the encoding is not supported + _data = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value); + } + _decodedValue = value; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs new file mode 100644 index 0000000000..5457b2301e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class QpidTextMessageFactory : AbstractQmsMessageFactory + { + protected override AbstractQmsMessage CreateMessageWithBody(ulong messageNbr, ContentHeaderBody contentHeader, + IList bodies) + { + byte[] data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + data = ((ContentBody)bodies[0]).Payload; + } + else + { + data = new byte[(int)contentHeader.BodySize]; + int currentPosition = 0; + foreach (ContentBody cb in bodies) + { + Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + currentPosition += cb.Payload.Length; + } + } + + return new QpidTextMessage(messageNbr, data, (BasicContentHeaderProperties)contentHeader.Properties); + } + + + public override AbstractQmsMessage CreateMessage() + { + return new QpidTextMessage(); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs new file mode 100644 index 0000000000..679114d105 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; + +namespace Qpid.Client.Message +{ + /// <summary> + /// Raised when a message body is received unexpectedly by the client. This typically occurs when the + /// length of bodies received does not match with the declared length in the content header. + /// </summary> + public class UnexpectedBodyReceivedException : AMQException + { + public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t) + : base(logger, msg, t) + { + } + + public UnexpectedBodyReceivedException(ILog logger, string msg) + : base(logger, msg) + { + } + + public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg) + : base(logger, errorCode, msg) + { + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs new file mode 100644 index 0000000000..cb4e64718b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Qpid.Framing; + +namespace Qpid.Client.Message +{ + public class UnprocessedMessage + { + private ulong _bytesReceived = 0; + + public BasicDeliverBody DeliverBody; + public BasicReturnBody BounceBody; + public ushort ChannelId; + public ContentHeaderBody ContentHeader; + + /// <summary> + /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general + /// </summary> + /// TODO: write and use linked list class + public IList Bodies = new ArrayList(); + + public void ReceiveBody(ContentBody body) + { + Bodies.Add(body); + if (body.Payload != null) + { + _bytesReceived += (uint)body.Payload.Length; + } + } + + public bool IsAllBodyDataReceived() + { + return _bytesReceived == ContentHeader.BodySize; + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs new file mode 100644 index 0000000000..ab40a83b3e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/AMQMethodEvent.cs @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public class AMQMethodEvent + { + private AMQMethodBody _method; + + private ushort _channelId; + + private AMQProtocolSession _protocolSession; + + public AMQMethodEvent(ushort channelId, AMQMethodBody method, AMQProtocolSession protocolSession) + { + _channelId = channelId; + _method = method; + _protocolSession = protocolSession; + } + + public AMQMethodBody Method + { + get + { + return _method; + } + } + + public ushort ChannelId + { + get + { + return _channelId; + } + } + + public AMQProtocolSession ProtocolSession + { + get + { + return _protocolSession; + } + } + + public override String ToString() + { + StringBuilder buf = new StringBuilder("Method event: "); + buf.Append("\nChannel id: ").Append(_channelId); + buf.Append("\nMethod: ").Append(_method); + return buf.ToString(); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs new file mode 100644 index 0000000000..7256ab9250 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolListener.cs @@ -0,0 +1,289 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; +using log4net; +using Qpid.Client.Failover; +using Qpid.Client.Protocol.Listener; +using Qpid.Client.State; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public class AMQProtocolListener : IProtocolListener + { + private static readonly ILog _log = LogManager.GetLogger(typeof(AMQProtocolListener)); + + /** + * We create the failover handler when the session is created since it needs a reference to the IoSession in order + * to be able to send errors during failover back to the client application. The session won't be available in the + * case where we failing over due to a Connection.Redirect message from the broker. + */ + private FailoverHandler _failoverHandler; + + /** + * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly + * attempting failover where it is failing. + */ + internal FailoverState _failoverState = FailoverState.NOT_STARTED; + + internal FailoverState FailoverState + { + get { return _failoverState; } + set { _failoverState = value; } + } + + internal ManualResetEvent FailoverLatch; + + AMQConnection _connection; + AMQStateManager _stateManager; + + public AMQStateManager StateManager + { + get { return _stateManager; } + set { _stateManager = value; } + } + + //private readonly CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); + private readonly ArrayList _frameListeners = ArrayList.Synchronized(new ArrayList()); + + AMQProtocolSession _protocolSession = null; // FIXME + public AMQProtocolSession ProtocolSession { set { _protocolSession = value; } } // FIXME: can this be fixed? + + + private readonly Object _lock = new Object(); + + public AMQProtocolListener(AMQConnection connection, AMQStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _failoverHandler = new FailoverHandler(connection); + } + + public void OnMessage(IDataBlock message) + { + // Handle incorrect protocol version. + if (message is ProtocolInitiation) + { + string error = String.Format("Protocol mismatch - {0}", message.ToString()); + AMQException e = new AMQProtocolHeaderException(error); + _log.Error("Closing connection because of protocol mismatch", e); + //_protocolSession.CloseProtocolSession(); + _stateManager.Error(e); + return; + } + + AMQFrame frame = (AMQFrame)message; + + if (frame.BodyFrame is AMQMethodBody) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Method frame received: " + frame); + } + AMQMethodEvent evt = new AMQMethodEvent(frame.Channel, (AMQMethodBody)frame.BodyFrame, _protocolSession); + try + { + bool wasAnyoneInterested = false; + lock (_frameListeners.SyncRoot) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + wasAnyoneInterested = listener.MethodReceived(evt) || wasAnyoneInterested; + } + } + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener."); + } + } + catch (Exception e) + { + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + } + else if (frame.BodyFrame is ContentHeaderBody) + { + _protocolSession.MessageContentHeaderReceived(frame.Channel, + (ContentHeaderBody)frame.BodyFrame); + } + else if (frame.BodyFrame is ContentBody) + { + _protocolSession.MessageContentBodyReceived(frame.Channel, + (ContentBody)frame.BodyFrame); + } + else if (frame.BodyFrame is HeartbeatBody) + { + _log.Debug("HeartBeat received"); + } + //_connection.BytesReceived(_protocolSession.Channel.ReadBytes); // XXX: is this really useful? + } + + public void OnException(Exception cause) + { + _log.Warn("Protocol Listener received exception", cause); + lock (_lock) + { + if (_failoverState == FailoverState.NOT_STARTED) + { + if (!(cause is AMQUndeliveredException)) + { + WhenClosed(); + } + } + // We reach this point if failover was attempted and failed therefore we need to let the calling app + // know since we cannot recover the situation. + else if (_failoverState == FailoverState.FAILED) + { + // we notify the state manager of the error in case we have any clients waiting on a state + // change. Those "waiters" will be interrupted and can handle the exception + AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); + PropagateExceptionToWaiters(amqe); + _connection.ExceptionReceived(cause); + } + } + } + + /** + * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by + * sessionClosed() depending on whether we were trying to send data at the time of failure. + * + * @param session + * @throws Exception + */ + void WhenClosed() + { + _connection.StopHeartBeatThread(); + + // TODO: Server just closes session with no warning if auth fails. + if (_connection.Closed) + { + _log.Info("Channel closed called by client"); + } + else + { + _log.Info("Channel closed called with failover state currently " + _failoverState); + + // Reconnectablility was introduced here so as not to disturb the client as they have made their intentions + // known through the policy settings. + + if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.IsFailoverAllowed) + { + _log.Info("FAILOVER STARTING"); + if (_failoverState == FailoverState.NOT_STARTED) + { + _failoverState = FailoverState.IN_PROGRESS; + startFailoverThread(); + } + else + { + _log.Info("Not starting failover as state currently " + _failoverState); + } + } + else + { + _log.Info("Failover not allowed by policy."); + + if (_failoverState != FailoverState.IN_PROGRESS) + { + _log.Info("sessionClose() not allowed to failover"); + _connection.ExceptionReceived( + new AMQDisconnectedException("Server closed connection and reconnection not permitted.")); + } + else + { + _log.Info("sessionClose() failover in progress"); + } + } + } + + _log.Info("Protocol Channel [" + this + "] closed"); + } + + /// <summary> + /// There are two cases where we have other threads potentially blocking for events to be handled by this + /// class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a + /// particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can + /// react appropriately. + /// + /// <param name="e">the exception to propagate</param> + /// </summary> + public void PropagateExceptionToWaiters(Exception e) + { + // FIXME: not sure if required as StateManager is in _frameListeners. Probably something to do with fail-over. + _stateManager.Error(e); + + foreach (IAMQMethodListener listener in _frameListeners) + { + listener.Error(e); + } + } + + public void AddFrameListener(IAMQMethodListener listener) + { + _frameListeners.Add(listener); + } + + public void RemoveFrameListener(IAMQMethodListener listener) + { + if (_log.IsDebugEnabled) + { + _log.Debug("Removing frame listener: " + listener.ToString()); + } + _frameListeners.Remove(listener); + } + + public void BlockUntilNotFailingOver() + { + if (FailoverLatch != null) + { + FailoverLatch.WaitOne(); + } + } + + /// <summary> + /// "Failover" for redirection. + /// </summary> + /// <param name="host"></param> + /// <param name="port"></param> + public void Failover(string host, int port) + { + _failoverHandler.setHost(host); + _failoverHandler.setPort(port); + // see javadoc for FailoverHandler to see rationale for separate thread + startFailoverThread(); + } + + private void startFailoverThread() + { + Thread failoverThread = new Thread(new ThreadStart(_failoverHandler.Run)); + failoverThread.Name = "Failover"; + // Do not inherit daemon-ness from current thread as this can be a daemon + // thread such as a AnonymousIoService thread. + failoverThread.IsBackground = false; + failoverThread.Start(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs new file mode 100644 index 0000000000..65aca0d942 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs @@ -0,0 +1,269 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Qpid.Client.Message; +using Qpid.Client.Transport; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public class AMQProtocolSession + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQProtocolSession)); + + private readonly IProtocolWriter _protocolWriter; + private readonly IConnectionCloser _connectionCloser; + + /** + * Counter to ensure unique queue names + */ + private int _queueId = 1; + private readonly Object _queueIdLock = new Object(); + + /// <summary> + /// Maps from the channel id to the AmqChannel that it represents. + /// </summary> + //private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap(); + private Hashtable _channelId2SessionMap = Hashtable.Synchronized(new Hashtable()); + + //private ConcurrentMap _closingChannels = new ConcurrentHashMap(); + private Hashtable _closingChannels = Hashtable.Synchronized(new Hashtable()); + + /// <summary> + /// Maps from a channel id to an unprocessed message. This is used to tie together the + /// JmsDeliverBody (which arrives first) with the subsequent content header and content bodies. + /// </summary> + //private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private Hashtable _channelId2UnprocessedMsgMap = Hashtable.Synchronized(new Hashtable()); + + private AMQConnection _connection; + + public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection) + { + _protocolWriter = protocolWriter; + _connectionCloser = connectionCloser; + _connection = connection; + } + + public void Init() + { + // start the process of setting up the connection. This is the first place that + // data is written to the server. + _protocolWriter.Write(new ProtocolInitiation()); + } + + public string Username + { + get + { + return AMQConnection.Username; + } + } + + public string Password + { + get + { + return AMQConnection.Password; + } + } + + ConnectionTuneParameters _connectionTuneParameters; // TODO: should be able to have this in the Java too. + + public ConnectionTuneParameters ConnectionTuneParameters + { + get + { + return _connectionTuneParameters; + } + set + { + _connectionTuneParameters = value; + AMQConnection con = AMQConnection; + con.SetMaximumChannelCount(value.ChannelMax); + con.MaximumFrameSize = value.FrameMax; + } + } + + /// <summary> + /// Callback invoked from the BasicDeliverMethodHandler when a message has been received. + /// This is invoked on the MINA dispatcher thread. + /// </summary> + /// <param name="message">the unprocessed message</param> + /// <exception cname="AMQException">if this was not expected</exception> + public void UnprocessedMessageReceived(UnprocessedMessage message) + { + _channelId2UnprocessedMsgMap[message.ChannelId] = message; + } + + public void MessageContentHeaderReceived(ushort channelId, ContentHeaderBody contentHeader) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content header without having received a JMSDeliver frame first"); + } + if (msg.ContentHeader != null) + { + throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames"); + } + msg.ContentHeader = contentHeader; + if (contentHeader.BodySize == 0) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + public void MessageContentBodyReceived(ushort channelId, ContentBody contentBody) + { + UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap[channelId]; + if (msg == null) + { + throw new AMQException("Error: received content body without having received a BasicDeliver frame first"); + } + if (msg.ContentHeader == null) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw new AMQException("Error: received content body without having received a ContentHeader frame first"); + } + try + { + msg.ReceiveBody(contentBody); + } + catch (UnexpectedBodyReceivedException e) + { + _channelId2UnprocessedMsgMap.Remove(channelId); + throw e; + } + if (msg.IsAllBodyDataReceived()) + { + DeliverMessageToAMQSession(channelId, msg); + } + } + + /// <summary> + /// Deliver a message to the appropriate session, removing the unprocessed message + /// from our map + /// <param name="channelId">the channel id the message should be delivered to</param> + /// <param name="msg"> the message</param> + private void DeliverMessageToAMQSession(ushort channelId, UnprocessedMessage msg) + { + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.MessageReceived(msg); + _channelId2UnprocessedMsgMap.Remove(channelId); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session. Equivalent + /// to calling getProtocolSession().write(). + /// </summary> + /// <param name="frame">the frame to write</param> + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + public void AddSessionByChannel(ushort channelId, AmqChannel channel) + { + if (channel == null) + { + throw new ArgumentNullException("Attempt to register a null channel"); + } + _logger.Debug("Add channel with channel id " + channelId); + _channelId2SessionMap[channelId] = channel; + } + + public void RemoveSessionByChannel(ushort channelId) + { + _logger.Debug("Removing session with channelId " + channelId); + _channelId2SessionMap.Remove(channelId); + } + + /// <summary> + /// Starts the process of closing a channel + /// </summary> + /// <param name="channel" the AmqChannel being closed</param> + public void CloseSession(AmqChannel channel) + { + _logger.Debug("closeSession called on protocol channel for channel " + channel.ChannelId); + ushort channelId = channel.ChannelId; + + // we need to know when a channel is closing so that we can respond + // with a channel.close frame when we receive any other type of frame + // on that channel + _closingChannels[channelId] = channel; + + } + + /// <summary> + /// Called from the ChannelClose handler when a channel close frame is received. + /// This method decides whether this is a response or an initiation. The latter + /// case causes the AmqChannel to be closed and an exception to be thrown if + /// appropriate. + /// </summary> + /// <param name="channelId">the id of the channel (session)</param> + /// <returns>true if the client must respond to the server, i.e. if the server + /// initiated the channel close, false if the channel close is just the server + /// responding to the client's earlier request to close the channel.</returns> + public bool ChannelClosed(ushort channelId, int code, string text) + { + // if this is not a response to an earlier request to close the channel + if (!_closingChannels.ContainsKey(channelId)) + { + _closingChannels.Remove(channelId); + AmqChannel channel = (AmqChannel) _channelId2SessionMap[channelId]; + channel.Closed(new AMQException(_logger, code, text)); + return true; + } + else + { + return false; + } + } + + public AMQConnection AMQConnection + { + get + { + return _connection; + } + } + + public void CloseProtocolSession() + { + _logger.Debug("Closing protocol session"); + _connectionCloser.Close(); + } + + internal string GenerateQueueName() + { + int id; + lock(_queueIdLock) + { + id = _queueId++; + } + + return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id; + } + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs new file mode 100644 index 0000000000..be8a24a9f4 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/IConnectionCloser.cs @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.Protocol +{ + public interface IConnectionCloser + { + void Close(); + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs new file mode 100644 index 0000000000..6ac8a7537e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/IProtocolListener.cs @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Client.Protocol.Listener; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + public interface IProtocolListener + { + void OnMessage(IDataBlock message); + void OnException(Exception e); + + // XXX: .NET way of doing listeners? + void AddFrameListener(IAMQMethodListener listener); + void RemoveFrameListener(IAMQMethodListener listener); + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs new file mode 100644 index 0000000000..99643fe59f --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/Listener/BlockingMethodFrameListener.cs @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using Qpid.Framing; + +namespace Qpid.Client.Protocol.Listener +{ + public abstract class BlockingMethodFrameListener : IAMQMethodListener + { + private ManualResetEvent _resetEvent; + + public abstract bool ProcessMethod(ushort channelId, AMQMethodBody frame); + + /// <summary> + /// This is set if there is an exception thrown from processCommandFrame and the + /// exception is rethrown to the caller of blockForFrame() + /// </summary> + private volatile Exception _error; + + protected ushort _channelId; + + protected AMQMethodEvent _doneEvt = null; + + public BlockingMethodFrameListener(ushort channelId) + { + _channelId = channelId; + _resetEvent = new ManualResetEvent(false); + } + + /// <summary> + /// This method is called by the MINA dispatching thread. Note that it could + /// be called before BlockForFrame() has been called. + /// </summary> + /// <param name="evt">the frame event</param> + /// <returns>true if the listener has dealt with this frame</returns> + /// <exception cref="AMQException"></exception> + public bool MethodReceived(AMQMethodEvent evt) + { + AMQMethodBody method = evt.Method; + + try + { + bool ready = (evt.ChannelId == _channelId) && ProcessMethod(evt.ChannelId, method); + if (ready) + { + _doneEvt = evt; + _resetEvent.Set(); + } + + return ready; + } + catch (AMQException e) + { + Error(e); + // we rethrow the error here, and the code in the frame dispatcher will go round + // each listener informing them that an exception has been thrown + throw e; + } + } + + /// <summary> + /// This method is called by the thread that wants to wait for a frame. + /// </summary> + public AMQMethodEvent BlockForFrame() + { + _resetEvent.WaitOne(); + //at this point the event will have been signalled. The error field might or might not be set + // depending on whether an error occurred + if (_error != null) + { + throw _error; + } + + return _doneEvt; + } + + /// <summary> + /// This is a callback, called by the MINA dispatcher thread only. It is also called from within this + /// class to avoid code repetition but again is only called by the MINA dispatcher thread. + /// </summary> + /// <param name="e">the exception that caused the error</param> + public void Error(Exception e) + { + // set the error so that the thread that is blocking (in BlockForFrame()) + // can pick up the exception and rethrow to the caller + _error = e; + _resetEvent.Set(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs new file mode 100644 index 0000000000..db82eb1013 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/Listener/IAMQMethodListener.cs @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.Protocol.Listener +{ + public interface IAMQMethodListener + { + /// <summary> + /// Invoked when a method frame has been received + /// <param name="evt">the event</param> + /// <returns>true if the handler has processed the method frame, false otherwise. Note + /// that this does not prohibit the method event being delivered to subsequent listeners + /// but can be used to determine if nobody has dealt with an incoming method frame.</param> + /// <exception cname="AMQException">if an error has occurred. This exception will be delivered + /// to all registered listeners using the error() method (see below) allowing them to + /// perform cleanup if necessary.</exception> + bool MethodReceived(AMQMethodEvent evt); + + /// <summary> + /// Callback when an error has occurred. Allows listeners to clean up. + /// </summary> + /// <param name="e">the exception</param> + void Error(Exception e); + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs new file mode 100644 index 0000000000..65460a0c2e --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/Listener/SpecificMethodFrameListener.cs @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Framing; + +namespace Qpid.Client.Protocol.Listener +{ + public class SpecificMethodFrameListener : BlockingMethodFrameListener + { + private readonly Type _expectedClass; + + public SpecificMethodFrameListener(ushort channelId, Type expectedClass) : base(channelId) + { + _expectedClass = expectedClass; + } + + public override bool ProcessMethod(ushort channelId, AMQMethodBody frame) + { + return _expectedClass.IsInstanceOfType(frame); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs new file mode 100644 index 0000000000..32847f9b9b --- /dev/null +++ b/dotnet/Qpid.Client/Client/Protocol/ProtocolWriter.cs @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Client.Protocol.Listener; +using Qpid.Client.Transport; +using Qpid.Framing; + +namespace Qpid.Client.Protocol +{ + /// <summary> + /// A convenient interface to writing protocol frames. + /// </summary> + public class ProtocolWriter + { + IProtocolWriter _protocolWriter; + IProtocolListener _protocolListener; + + public ProtocolWriter(IProtocolWriter protocolWriter, IProtocolListener protocolListener) + { + _protocolWriter = protocolWriter; + _protocolListener = protocolListener; + } + + public void WriteFrame(IDataBlock frame) + { + _protocolWriter.Write(frame); + } + + /// <summary> + /// Convenience method that writes a frame to the protocol session and waits for + /// a particular response. Equivalent to calling getProtocolSession().write() then + /// waiting for the response. + /// </summary> + /// <param name="frame">the frame</param> + /// <param name="listener">the blocking listener. Note the calling thread will block.</param> + private AMQMethodEvent SyncWrite(AMQFrame frame, BlockingMethodFrameListener listener) + { + try + { + _protocolListener.AddFrameListener(listener); + _protocolWriter.Write(frame); + return listener.BlockForFrame(); + } + finally + { + _protocolListener.RemoveFrameListener(listener); + } + // When control resumes before this line, a reply will have been received + // that matches the criteria defined in the blocking listener + } + + public AMQMethodEvent SyncWrite(AMQFrame frame, Type responseType) + { + // TODO: If each frame knew it's response type, then the responseType argument would + // TODO: not be neccesary. + return SyncWrite(frame, new SpecificMethodFrameListener(frame.Channel, responseType)); + } + } +} + diff --git a/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs b/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs new file mode 100644 index 0000000000..f0c4c91db8 --- /dev/null +++ b/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Net; +using log4net; +using Qpid.Client.qms; + +namespace Qpid.Client +{ + public class QpidConnectionInfo : ConnectionInfo + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(QpidConnectionInfo)); + + string _username = "guest"; + string _password = "guest"; + string _virtualHost = "/default"; + + string _failoverMethod = null; + IDictionary _failoverOptions = new Hashtable(); + IDictionary _options = new Hashtable(); + IList _brokerInfos = new ArrayList(); // List<BrokerInfo> + string _clientName = String.Format("{0}{1:G}", Dns.GetHostName(), DateTime.Now.Ticks); + + public string asUrl() + { + string result = "amqp://"; + foreach (BrokerInfo info in _brokerInfos) + { + result += info.ToString(); + } + return result; + + } + + public string getFailoverMethod() + { + return _failoverMethod; + } + + public string getFailoverOption(string key) + { + return (string) _failoverOptions[key]; + } + + public int getBrokerCount() + { + return _brokerInfos.Count; + } + + public BrokerInfo GetBrokerDetails(int index) + { + return (BrokerInfo)_brokerInfos[index]; + } + + public void AddBrokerInfo(BrokerInfo brokerInfo) + { + if (!_brokerInfos.Contains(brokerInfo)) + { + _brokerInfos.Add(brokerInfo); + } + } + + public IList GetAllBrokerInfos() + { + return _brokerInfos; + } + + public string GetClientName() + { + return _clientName; + } + + public void SetClientName(string clientName) + { + _clientName = clientName; + } + + public string getUsername() + { + return _username; + } + + public void setUsername(string username) + { + _username = username; + } + + public string getPassword() + { + return _password; + } + + public void setPassword(string password) + { + _password = password; + } + + public string getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(string virtualHost) + { + _virtualHost = virtualHost; + } + + public string getOption(string key) + { + return (string) _options[key]; + } + + public void setOption(string key, string value) + { + _options[key] = value; + } + + public override string ToString() + { + return asUrl(); + } + } +} diff --git a/dotnet/Qpid.Client/Client/State/AMQState.cs b/dotnet/Qpid.Client/Client/State/AMQState.cs new file mode 100644 index 0000000000..fc71fe647c --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/AMQState.cs @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.State +{ + public enum AMQState + { + CONNECTION_NOT_STARTED, + CONNECTION_NOT_TUNED, + CONNECTION_NOT_OPENED, + CONNECTION_OPEN, + CONNECTION_CLOSING, + CONNECTION_CLOSED, + ALL // all is a special state used in the state manager. It is not valid to be "in" the state "all". + } +} + diff --git a/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs b/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs new file mode 100644 index 0000000000..60d44da824 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/AMQStateChangedEvent.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.State +{ + public class AMQStateChangedEvent + { + private readonly AMQState _oldState; + + private readonly AMQState _newState; + + public AMQStateChangedEvent(AMQState oldState, AMQState newState) + { + _oldState = oldState; + _newState = newState; + } + + public AMQState OldState + { + get + { + return _oldState; + } + } + + public AMQState NewState + { + get + { + return _newState; + } + } + + } +} diff --git a/dotnet/Qpid.Client/Client/State/AMQStateManager.cs b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs new file mode 100644 index 0000000000..05f673d520 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/AMQStateManager.cs @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Qpid.Client.Handler; +using Qpid.Client.Protocol; +using Qpid.Client.Protocol.Listener; +using Qpid.Framing; + +namespace Qpid.Client.State +{ + public class AMQStateManager : IAMQMethodListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(AMQStateManager)); + + const bool InfoLoggingHack = true; + + /// <summary> + /// The current state + /// </summary> + private AMQState _currentState; + + /// <summary> + /// Maps from an AMQState instance to a Map from Class to StateTransitionHandler. + /// The class must be a subclass of AMQFrame. + /// </summary> + private readonly IDictionary _state2HandlersMap = new Hashtable(); + + //private CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + private ArrayList _stateListeners = ArrayList.Synchronized(new ArrayList(5)); + + public AMQStateManager() + { + _currentState = AMQState.CONNECTION_NOT_STARTED; + RegisterListeners(); + } + + private void RegisterListeners() + { + IStateAwareMethodListener connectionStart = new ConnectionStartMethodHandler(); + IStateAwareMethodListener connectionClose = new ConnectionCloseMethodHandler(); + IStateAwareMethodListener connectionCloseOk = new ConnectionCloseOkHandler(); + IStateAwareMethodListener connectionTune = new ConnectionTuneMethodHandler(); + IStateAwareMethodListener connectionSecure = new ConnectionSecureMethodHandler(); + IStateAwareMethodListener connectionOpenOk = new ConnectionOpenOkMethodHandler(); + IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler(); + IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler(); + IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler(); + + // We need to register a map for the null (i.e. all state) handlers otherwise you get + // a stack overflow in the handler searching code when you present it with a frame for which + // no handlers are registered. + _state2HandlersMap[AMQState.ALL] = new Hashtable(); + + { + Hashtable notStarted = new Hashtable(); + notStarted[typeof(ConnectionStartBody)] = connectionStart; + notStarted[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_STARTED] = notStarted; + } + { + Hashtable notTuned = new Hashtable(); + notTuned[typeof(ConnectionTuneBody)] = connectionTune; + notTuned[typeof(ConnectionSecureBody)] = connectionSecure; + notTuned[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_TUNED] = notTuned; + } + { + Hashtable notOpened = new Hashtable(); + notOpened[typeof(ConnectionOpenOkBody)] = connectionOpenOk; + notOpened[typeof(ConnectionCloseBody)] = connectionClose; + _state2HandlersMap[AMQState.CONNECTION_NOT_OPENED] = notOpened; + } + { + Hashtable open = new Hashtable(); + open[typeof(ChannelCloseBody)] = channelClose; + open[typeof(ConnectionCloseBody)] = connectionClose; + open[typeof(BasicDeliverBody)] = basicDeliver; + open[typeof(BasicReturnBody)] = basicReturn; + _state2HandlersMap[AMQState.CONNECTION_OPEN] = open; + } + { + Hashtable closing = new Hashtable(); + closing[typeof(ConnectionCloseOkBody)] = connectionCloseOk; + _state2HandlersMap[AMQState.CONNECTION_CLOSING] = closing; + } + } + + public AMQState CurrentState + { + get + { + return _currentState; + } + } + + /// <summary> + /// Changes the state. + /// </summary> + /// <param name="newState">The new state.</param> + /// <exception cref="AMQException">if there is an error changing state</exception> + public void ChangeState(AMQState newState) + { + if (InfoLoggingHack) + { + _logger.Info("State changing to " + newState + " from old state " + _currentState); + } + _logger.Debug("State changing to " + newState + " from old state " + _currentState); + AMQState oldState = _currentState; + _currentState = newState; + + foreach (IStateListener l in _stateListeners) + { + l.StateChanged(oldState, newState); + } + } + + public void Error(Exception e) + { + _logger.Debug("State manager receive error notification: " + e); + foreach (IStateListener l in _stateListeners) + { + l.Error(e); + } + } + + public bool MethodReceived(AMQMethodEvent evt) + { + _logger.Debug(String.Format("Finding method handler. currentState={0} type={1}", _currentState, evt.Method.GetType())); + IStateAwareMethodListener handler = FindStateTransitionHandler(_currentState, evt.Method); + if (handler != null) + { + handler.MethodReceived(this, evt); + return true; + } + return false; + } + + /// <summary> + /// Finds the state transition handler. + /// </summary> + /// <param name="currentState">State of the current.</param> + /// <param name="frame">The frame.</param> + /// <returns></returns> + /// <exception cref="IllegalStateTransitionException">if the state transition if not allowed</exception> + private IStateAwareMethodListener FindStateTransitionHandler(AMQState currentState, + AMQMethodBody frame) + { + Type clazz = frame.GetType(); + if (_logger.IsDebugEnabled) + { + _logger.Debug("Looking for state transition handler for frame " + clazz); + } + IDictionary classToHandlerMap = (IDictionary) _state2HandlersMap[currentState]; + + if (classToHandlerMap == null) + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + IStateAwareMethodListener handler = (IStateAwareMethodListener) classToHandlerMap[clazz]; + if (handler == null) + { + if (currentState == AMQState.ALL) + { + _logger.Debug("No state transition handler defined for receiving frame " + frame); + return null; + } + else + { + // if no specialised per state handler is registered look for a + // handler registered for "all" states + return FindStateTransitionHandler(AMQState.ALL, frame); + } + } + else + { + return handler; + } + } + + public void AddStateListener(IStateListener listener) + { + _logger.Debug("Adding state listener"); + _stateListeners.Add(listener); + } + + public void RemoveStateListener(IStateListener listener) + { + _stateListeners.Remove(listener); + } + + public void AttainState(AMQState s) + { + if (_currentState != s) + { + _logger.Debug("Adding state wait to reach state " + s); + StateWaiter sw = new StateWaiter(s); + AddStateListener(sw); + sw.WaituntilStateHasChanged(); + // at this point the state will have changed. + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs b/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs new file mode 100644 index 0000000000..ff27cd841e --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IAMQStateListener.cs @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace Qpid.Client.State +{ + public interface IAMQStateListener + { + void StateChanged(AMQStateChangedEvent evt); + } +} + diff --git a/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs b/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs new file mode 100644 index 0000000000..256fe1c3f3 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IStateAwareMethodListener.cs @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Client.Protocol; + +namespace Qpid.Client.State +{ + public interface IStateAwareMethodListener + { + void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt); + } +} + diff --git a/dotnet/Qpid.Client/Client/State/IStateListener.cs b/dotnet/Qpid.Client/Client/State/IStateListener.cs new file mode 100644 index 0000000000..6073b2bb0c --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IStateListener.cs @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.State +{ + public interface IStateListener + { + void StateChanged(AMQState oldState, AMQState newState); + + void Error(Exception e); + } +} + diff --git a/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs b/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs new file mode 100644 index 0000000000..723ae04397 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/IllegalStateTransitionException.cs @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.State +{ + public class IllegalStateTransitionException : AMQException + { + private AMQState _originalState; + + private Type _frame; + + public IllegalStateTransitionException(AMQState originalState, Type frame) + : base("No valid state transition defined for receiving frame " + frame + + " from state " + originalState) + { + _originalState = originalState; + _frame = frame; + } + + public AMQState OriginalState + { + get + { + return _originalState; + } + } + + public Type FrameType + { + get + { + return _frame; + } + } + } +} + diff --git a/dotnet/Qpid.Client/Client/State/StateWaiter.cs b/dotnet/Qpid.Client/Client/State/StateWaiter.cs new file mode 100644 index 0000000000..cb7f604499 --- /dev/null +++ b/dotnet/Qpid.Client/Client/State/StateWaiter.cs @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Threading; +using log4net; + +namespace Qpid.Client.State +{ + public class StateWaiter : IStateListener + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(StateWaiter)); + + private readonly AMQState _state; + + private volatile bool _newStateAchieved; + + private volatile Exception _exception; + + private ManualResetEvent _resetEvent = new ManualResetEvent(false); + + public StateWaiter(AMQState state) + { + _state = state; + } + + public void StateChanged(AMQState oldState, AMQState newState) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("stateChanged called"); + } + if (_state == newState) + { + _newStateAchieved = true; + + if (_logger.IsDebugEnabled) + { + _logger.Debug("New state reached so notifying monitor"); + } + _resetEvent.Set(); + } + } + + public void Error(Exception e) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug("exceptionThrown called"); + } + + _exception = e; + _resetEvent.Set(); + } + + public void WaituntilStateHasChanged() + { + // + // The guard is required in case we are woken up by a spurious + // notify(). + // + while (!_newStateAchieved && _exception == null) + { + _logger.Debug("State not achieved so waiting..."); + _resetEvent.WaitOne(); + } + + if (_exception != null) + { + _logger.Debug("Throwable reached state waiter: " + _exception); + if (_exception is AMQException) + { + throw _exception; + } + else + { + throw new AMQException("Error: " + _exception, _exception); + } + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs b/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs new file mode 100644 index 0000000000..1024fa5575 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/AMQProtocolProvider.cs @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Codec; +using Qpid.Codec.Demux; +using Qpid.Framing; + +namespace Qpid.Client.Transport +{ + public class AMQProtocolProvider + { + private DemuxingProtocolCodecFactory _factory; + + public AMQProtocolProvider() + { + _factory = new DemuxingProtocolCodecFactory(); + _factory.Register(new AMQDataBlockEncoder()); + _factory.Register(new AMQDataBlockDecoder()); + _factory.Register(new ProtocolInitiation.Decoder()); + } + + public IProtocolCodecFactory CodecFactory + { + get + { + return _factory; + } + } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs new file mode 100644 index 0000000000..d3add546fe --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using log4net; +using Qpid.Buffer; +using Qpid.Codec; +using Qpid.Codec.Support; +using Qpid.Framing; + +namespace Qpid.Client.Transport +{ + public class AmqpChannel : IProtocolChannel + { + // Warning: don't use this log for regular logging. + static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing"); + + IByteChannel byteChannel; + IProtocolEncoder encoder; + IProtocolDecoder decoder; + + public AmqpChannel(IByteChannel byteChannel) + { + this.byteChannel = byteChannel; + + AMQProtocolProvider protocolProvider = new AMQProtocolProvider(); + IProtocolCodecFactory factory = protocolProvider.CodecFactory; + encoder = factory.Encoder; + decoder = factory.Decoder; + } + + public Queue Read() + { + ByteBuffer buffer = byteChannel.Read(); + + Queue frames = Decode(buffer); + + // TODO: Refactor to decorator. + if (_protocolTraceLog.IsDebugEnabled) + { + foreach (object o in frames) + { + _protocolTraceLog.Debug(String.Format("READ {0}", o)); + } + } + + return frames; + } + + public void Write(IDataBlock o) + { + // TODO: Refactor to decorator. + if (_protocolTraceLog.IsDebugEnabled) + { + _protocolTraceLog.Debug(String.Format("WRITE {0}", o)); + } + + byteChannel.Write(Encode(o)); + } + + private ByteBuffer Encode(object o) + { + SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput(); + encoder.Encode(o, output); + return output.buffer; + } + + private Queue Decode(ByteBuffer byteBuffer) + { + SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput(); + decoder.Decode(byteBuffer, outx); + return outx.MessageQueue; + } + } +} + diff --git a/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs new file mode 100644 index 0000000000..4a3ff385a8 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Buffer; + +namespace Qpid.Client.Transport +{ + public interface IByteChannel + { + ByteBuffer Read(); + void Write(ByteBuffer buffer); + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs new file mode 100644 index 0000000000..4943a45d68 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; + +namespace Qpid.Client.Transport +{ + public interface IProtocolChannel : IProtocolWriter + { + Queue Read(); + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs b/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs new file mode 100644 index 0000000000..ac19977927 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/IProtocolWriter.cs @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Framing; + +namespace Qpid.Client.Transport +{ + public interface IProtocolWriter + { + void Write(IDataBlock o); + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Client/Transport/ITransport.cs b/dotnet/Qpid.Client/Client/Transport/ITransport.cs new file mode 100644 index 0000000000..aebe58b439 --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/ITransport.cs @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Qpid.Client.Protocol; + +namespace Qpid.Client.Transport +{ + public interface ITransport : IConnectionCloser + { + void Open(); + string getLocalEndPoint(); + IProtocolWriter ProtocolWriter { get; } + } +} diff --git a/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs b/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs new file mode 100644 index 0000000000..5b5392769a --- /dev/null +++ b/dotnet/Qpid.Client/Client/Transport/SingleProtocolEncoderOutput.cs @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using Qpid.Buffer; +using Qpid.Codec; + +namespace Qpid.Client.Transport +{ + public class SingleProtocolEncoderOutput : IProtocolEncoderOutput + { + public ByteBuffer buffer; + + public void Write(ByteBuffer buf) + { + if (buffer != null) + { + throw new InvalidOperationException("{0} does not allow the writing of more than one buffer"); + } + buffer = buf; + } + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/Properties/AssemblyInfo.cs b/dotnet/Qpid.Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..0c7da88839 --- /dev/null +++ b/dotnet/Qpid.Client/Properties/AssemblyInfo.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Reflection; +using System.Runtime.InteropServices; +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Qpid.Client")] +[assembly: AssemblyDescription("Qpid Client API implementation")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Apache Qpid")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("380cb124-07a8-40c2-b67d-69a0d94cb620")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Revision and Build Numbers +// by using the '*' as shown below: +[assembly: AssemblyVersion("0.5.*")] diff --git a/dotnet/Qpid.Client/Qpid.Client.csproj b/dotnet/Qpid.Client/Qpid.Client.csproj new file mode 100644 index 0000000000..eb84402e1f --- /dev/null +++ b/dotnet/Qpid.Client/Qpid.Client.csproj @@ -0,0 +1,133 @@ +<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.50727</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{68987C05-3768-452C-A6FC-6BA1D372852F}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Qpid.Client</RootNamespace>
+ <AssemblyName>Qpid.Client</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net, Version=1.2.0.30714, Culture=neutral, PublicKeyToken=500ffcafb14f92df">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Qpid.Common\lib\log4net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Client\AmqBrokerInfo.cs" />
+ <Compile Include="Client\AMQConnection.cs" />
+ <Compile Include="Client\AMQConnectionException.cs" />
+ <Compile Include="Client\AMQDestination.cs" />
+ <Compile Include="Client\AmqChannel.cs" />
+ <Compile Include="Client\QpidConnectionInfo.cs" />
+ <Compile Include="Client\BasicMessageConsumer.cs" />
+ <Compile Include="Client\BasicMessageProducer.cs" />
+ <Compile Include="Client\Closeable.cs" />
+ <Compile Include="Client\ConnectionTuneParameters.cs" />
+ <Compile Include="Client\Failover\FailoverException.cs" />
+ <Compile Include="Client\Failover\FailoverHandler.cs" />
+ <Compile Include="Client\Failover\FailoverState.cs" />
+ <Compile Include="Client\Failover\FailoverSupport.cs" />
+ <Compile Include="Client\Handler\BasicDeliverMethodHandler.cs" />
+ <Compile Include="Client\Handler\BasicReturnMethodHandler.cs" />
+ <Compile Include="Client\Handler\ChannelCloseMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionCloseMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionCloseOkHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionOpenOkMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionRedirectMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionSecureMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionStartMethodHandler.cs" />
+ <Compile Include="Client\Handler\ConnectionTuneMethodHandler.cs" />
+ <Compile Include="Client\Message\AbstractQmsMessage.cs" />
+ <Compile Include="Client\Message\AMQMessage.cs" />
+ <Compile Include="Client\Message\AMQMessageFactory.cs" />
+ <Compile Include="Client\Message\IMessageFactory.cs" />
+ <Compile Include="Client\Message\MessageFactoryRegistry.cs" />
+ <Compile Include="Client\Message\UnexpectedBodyReceivedException.cs" />
+ <Compile Include="Client\Message\UnprocessedMessage.cs" />
+ <Compile Include="Client\Message\QpidBytesMessage.cs" />
+ <Compile Include="Client\Message\QpidBytesMessageFactory.cs" />
+ <Compile Include="Client\Message\QpidTextMessage.cs" />
+ <Compile Include="Client\Message\QpidTextMessageFactory.cs" />
+ <Compile Include="Client\Protocol\AMQMethodEvent.cs" />
+ <Compile Include="Client\Protocol\AMQProtocolListener.cs" />
+ <Compile Include="Client\Protocol\AMQProtocolSession.cs" />
+ <Compile Include="Client\Protocol\Listener\BlockingMethodFrameListener.cs" />
+ <Compile Include="Client\Protocol\Listener\IAMQMethodListener.cs" />
+ <Compile Include="Client\Protocol\IConnectionCloser.cs" />
+ <Compile Include="Client\Protocol\ProtocolWriter.cs" />
+ <Compile Include="Client\Protocol\IProtocolListener.cs" />
+ <Compile Include="Client\State\AMQState.cs" />
+ <Compile Include="Client\State\AMQStateChangedEvent.cs" />
+ <Compile Include="Client\State\AMQStateManager.cs" />
+ <Compile Include="Client\State\IAMQStateListener.cs" />
+ <Compile Include="Client\State\IllegalStateTransitionException.cs" />
+ <Compile Include="Client\State\IStateAwareMethodListener.cs" />
+ <Compile Include="Client\State\IStateListener.cs" />
+ <Compile Include="Client\Protocol\Listener\SpecificMethodFrameListener.cs" />
+ <Compile Include="Client\State\StateWaiter.cs" />
+ <Compile Include="Client\Transport\AmqpChannel.cs" />
+ <Compile Include="Client\Transport\AMQProtocolProvider.cs" />
+ <Compile Include="Client\Transport\IByteChannel.cs" />
+ <Compile Include="Client\Transport\IProtocolChannel.cs" />
+ <Compile Include="Client\Transport\IProtocolWriter.cs" />
+ <Compile Include="Client\Transport\ITransport.cs" />
+ <Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="qms\BrokerInfo.cs" />
+ <Compile Include="qms\ConnectionInfo.cs" />
+ <Compile Include="qms\FailoverPolicy.cs" />
+ <Compile Include="qms\failover\FailoverMethod.cs" />
+ <Compile Include="qms\failover\FailoverRoundRobin.cs" />
+ <Compile Include="qms\failover\FailoverSingleServer.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Codec\Qpid.Codec.csproj">
+ <Project>{22D0D0C2-77AF-4DE3-B456-7FF3893F9F88}</Project>
+ <Name>Qpid.Codec</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Buffer\Qpid.Buffer.csproj">
+ <Project>{44384DF2-B0A4-4580-BDBC-EE4BAA87D995}</Project>
+ <Name>Qpid.Buffer</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Messaging\Qpid.Messaging.csproj">
+ <Project>{6688F826-C58E-4C1B-AA1F-22AFAB4B7D07}</Project>
+ <Name>Qpid.Messaging</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common\Qpid.Common.csproj">
+ <Project>{77064C42-24D2-4CEB-9EA2-0EF481A43205}</Project>
+ <Name>Qpid.Common</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/dotnet/Qpid.Client/default.build b/dotnet/Qpid.Client/default.build new file mode 100644 index 0000000000..00e2854326 --- /dev/null +++ b/dotnet/Qpid.Client/default.build @@ -0,0 +1,52 @@ +<?xml version="1.0"?> +<project name="XMS.AMQ.Client" default="build"> + <property name="nant.settings.currentframework" value="net-1.0" /> + <property name="basename" value="XMSClient"/> + <property name="debug" value="true"/> + <property name="CommonCollectionsDir" value="../CommonCollections"/> + <property name="MINADir" value="../minadotnet"/> + <property name="XMSCommonDir" value="../xmscommon"/> + + <if test="${debug}"> + <property name="targetdir" value="bin/${nant.settings.currentframework}/Debug"/> + </if> + <ifnot test="${debug}"> + <property name="targetdir" value="bin/${nant.settings.currentframework}/Release"/> + </ifnot> + + <target name="clean"> + <delete> + <fileset> + <include name="${targetdir}/${basename}.dll"/> + <include name="${targetdir}/${basename}.pdb"/> + </fileset> + </delete> + </target> + + <target name="init"> + <mkdir dir="${targetdir}"/> + </target> + + <target name="build" depends="init"> + <csc target="library" output="${targetdir}/${basename}.dll" debug="${debug}"> + <sources> + <include name="**/*.cs"/> + <exclude name="Properties/Settings.Designer.cs" /> + </sources> + <references> + <lib> + <include name="${CommonCollectionsDir}/${targetdir}" /> + <include name="${MINADir}/${targetdir}" /> + <include name="${XMSCommonDir}/${targetdir}" /> + <include name="${XMSCommonDir}/lib/**" /> + <include name="lib/**" /> + </lib> + <include name="CommonCollections.dll" /> + <include name="log4net.dll" /> + <include name="MINA.dll" /> + <include name="IBM.XMS.dll" /> + <include name="XMSCommon.dll" /> + </references> + </csc> + </target> +</project>
\ No newline at end of file diff --git a/dotnet/Qpid.Client/qms/BrokerInfo.cs b/dotnet/Qpid.Client/qms/BrokerInfo.cs new file mode 100644 index 0000000000..dd0504968e --- /dev/null +++ b/dotnet/Qpid.Client/qms/BrokerInfo.cs @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.qms +{ + /// <summary> + /// Know URL option names. + /// <seealso cref="ConnectionInfo"/> + /// </summary> + public class BrokerDetailsConstants + { + public const String OPTIONS_RETRY = "retries"; + public const String OPTIONS_SSL = ConnectionUrlConstants.OPTIONS_SSL; + public const String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public const int DEFAULT_PORT = 5672; + public const String DEFAULT_TRANSPORT = "tcp"; + + public readonly string URL_FORMAT_EXAMPLE = + "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]"; + + public const long DEFAULT_CONNECT_TIMEOUT = 30000L; + } + + public interface BrokerInfo + { + String getHost(); + void setHost(string host); + + int getPort(); + void setPort(int port); + + String getTransport(); + void setTransport(string transport); + + bool useSSL(); + void useSSL(bool ssl); + + String getOption(string key); + void setOption(string key, string value); + + long getTimeout(); + void setTimeout(long timeout); + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/qms/ConnectionInfo.cs b/dotnet/Qpid.Client/qms/ConnectionInfo.cs new file mode 100644 index 0000000000..1d099daa3e --- /dev/null +++ b/dotnet/Qpid.Client/qms/ConnectionInfo.cs @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; + +namespace Qpid.Client.qms +{ + class ConnectionUrlConstants + { + public const string AMQ_PROTOCOL = "amqp"; + public const string OPTIONS_BROKERLIST = "brokerlist"; + public const string OPTIONS_FAILOVER = "failover"; + public const string OPTIONS_FAILOVER_CYCLE = "cyclecount"; + public const string OPTIONS_SSL = "ssl"; + } + + /** + Connection URL format + amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" + Options are of course optional except for requiring a single broker in the broker list. + The option seperator is defined to be either '&' or ',' + */ + public interface ConnectionInfo + { + string asUrl(); + + string getFailoverMethod(); + + string getFailoverOption(string key); + + int getBrokerCount(); + + BrokerInfo GetBrokerDetails(int index); + + void AddBrokerInfo(BrokerInfo broker); + + IList GetAllBrokerInfos(); + + string GetClientName(); + + void SetClientName(string clientName); + + string getUsername(); + + void setUsername(string username); + + string getPassword(); + + void setPassword(string password); + + string getVirtualHost(); + + void setVirtualHost(string virtualHost); + + string getOption(string key); + + void setOption(string key, string value); + } +} diff --git a/dotnet/Qpid.Client/qms/FailoverPolicy.cs b/dotnet/Qpid.Client/qms/FailoverPolicy.cs new file mode 100644 index 0000000000..15d52491df --- /dev/null +++ b/dotnet/Qpid.Client/qms/FailoverPolicy.cs @@ -0,0 +1,315 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using log4net; +using Qpid.Client.qms.failover; + +namespace Qpid.Client.qms +{ + public class FailoverPolicy + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverPolicy)); + + private const long MINUTE = 60000L; + + private const long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE; + private const long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE; + + private FailoverMethod[] _methods = new FailoverMethod[1]; + + private int _currentMethod; + + private int _methodsRetries; + + private int _currentRetry; + + private bool _timing; + + private long _lastMethodTime; + private long _lastFailTime; + + public FailoverPolicy(ConnectionInfo connectionInfo) + { + FailoverMethod method; + + //todo This should be integrated in to the connection url when it supports + // multiple strategies. + + _methodsRetries = 0; + + if (connectionInfo.getFailoverMethod() == null) + { + if (connectionInfo.getBrokerCount() > 1) + { + method = new FailoverRoundRobin(connectionInfo); + } + else + { + method = new FailoverSingleServer(connectionInfo); + } + } + else + { + string failoverMethod = connectionInfo.getFailoverMethod(); + + /* + if (failoverMethod.equals(FailoverMethod.RANDOM)) + { + //todo write a random connection Failover + } + */ + if (failoverMethod.Equals(FailoverMethodConstants.ROUND_ROBIN)) + { + method = new FailoverRoundRobin(connectionInfo); + } + else + { + throw new NotImplementedException("Dynamic loading of FailoverMethods not yet implemented."); +// try +// { +// Type[] constructorSpec = {ConnectionInfo.class}; +// Object [] params = {connectionInfo}; +// +// method = (FailoverMethod) ClassLoader.getSystemClassLoader(). +// loadClass(failoverMethod). +// getConstructor(constructorSpec).newInstance(params); +// } +// catch (Exception cnfe) +// { +// throw new IllegalArgumentException("Unknown failover method:" + failoverMethod); +// } + } + } + + if (method == null) + { + throw new ArgumentException("Unknown failover method specified."); + } + + reset(); + + _methods[_currentMethod] = method; + } + + public FailoverPolicy(FailoverMethod method) : this(method, 0) + { + } + + public FailoverPolicy(FailoverMethod method, int retries) + { + _methodsRetries = retries; + + reset(); + + _methods[_currentMethod] = method; + } + + private void reset() + { + _currentMethod = 0; + _currentRetry = 0; + _timing = false; + + } + + public bool FailoverAllowed() + { + bool failoverAllowed; + + if (_timing) + { + long now = CurrentTimeMilliseconds(); + + if ((now - _lastMethodTime) >= DEFAULT_METHOD_TIMEOUT) + { + _logger.Info("Failover method timeout"); + _lastMethodTime = now; + + if (!nextMethod()) + { + return false; + } + + + } + else if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT) + { + _logger.Info("Failover timeout"); + return false; + } + else + { + _lastMethodTime = now; + } + } + else + { + _timing = true; + _lastMethodTime = CurrentTimeMilliseconds(); + _lastFailTime = _lastMethodTime; + } + + + if (_methods[_currentMethod].failoverAllowed()) + { + failoverAllowed = true; + } + else + { + if (_currentMethod < (_methods.Length - 1)) + { + nextMethod(); + _logger.Info("Changing method to " + _methods[_currentMethod].methodName()); + return FailoverAllowed(); + } + else + { + return cycleMethods(); + } + } + + return failoverAllowed; + } + + private static long CurrentTimeMilliseconds() + { + return DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond; + } + + private bool nextMethod() + { + if (_currentMethod < (_methods.Length - 1)) + { + _currentMethod++; + _methods[_currentMethod].reset(); + return true; + } + else + { + return cycleMethods(); + } + } + + private bool cycleMethods() + { + if (_currentRetry < _methodsRetries) + { + _currentRetry++; + + _currentMethod = 0; + + _logger.Info("Retrying methods starting with " + _methods[_currentMethod].methodName()); + _methods[_currentMethod].reset(); + return FailoverAllowed(); + } + else + { + _logger.Debug("All failover methods exhausted"); + return false; + } + } + + /** + * Notification that connection was successful. + */ + public void attainedConnection() + { + _currentRetry = 0; + + _methods[_currentMethod].attainedConnection(); + + _timing = false; + } + + public BrokerInfo GetCurrentBrokerInfo() + { + return _methods[_currentMethod].GetCurrentBrokerInfo(); + } + + public BrokerInfo GetNextBrokerInfo() + { + return _methods[_currentMethod].getNextBrokerDetails(); + } + + public void setBroker(BrokerInfo broker) + { + _methods[_currentMethod].setBroker(broker); + } + + public void addMethod(FailoverMethod method) + { + int len = _methods.Length + 1; + FailoverMethod[] newMethods = new FailoverMethod[len]; + _methods.CopyTo(newMethods, 0); +// System.arraycopy(_methods, 0, newMethods, 0, _methods.length); + int index = len - 1; + newMethods[index] = method; + _methods = newMethods; + } + + public void setMethodRetries(int retries) + { + _methodsRetries = retries; + } + + public FailoverMethod getCurrentMethod() + { + if (_currentMethod >= 0 && _currentMethod < (_methods.Length - 1)) + { + return _methods[_currentMethod]; + } + else + { + return null; + } + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + + sb.Append("Failover Policy:\n"); + + if (FailoverAllowed()) + { + sb.Append("Failover allowed\n"); + } + else + { + sb.Append("Failover not allowed\n"); + } + + sb.Append("Failover policy methods\n"); + for (int i = 0; i < _methods.Length; i++) + { + + if (i == _currentMethod) + { + sb.Append(">"); + } + sb.Append(_methods[i].ToString()); + } + + return sb.ToString(); + } + } +}
\ No newline at end of file diff --git a/dotnet/Qpid.Client/qms/failover/FailoverMethod.cs b/dotnet/Qpid.Client/qms/failover/FailoverMethod.cs new file mode 100644 index 0000000000..7db9ef32fa --- /dev/null +++ b/dotnet/Qpid.Client/qms/failover/FailoverMethod.cs @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.qms.failover +{ + public class FailoverMethodConstants + { + public const String ROUND_ROBIN = "roundrobin"; + public const String RANDOM = "random"; + } + + public interface FailoverMethod + { + /** + * Reset the Failover to initial conditions + */ + void reset(); + + /** + * Check if failover is possible for this method + * + * @return true if failover is allowed + */ + bool failoverAllowed(); + + /** + * Notification to the Failover method that a connection has been attained. + */ + void attainedConnection(); + + /** + * If there is no current BrokerInfo the null will be returned. + * @return The current BrokerDetail value to use + */ + BrokerInfo GetCurrentBrokerInfo(); + + /** + * Move to the next BrokerInfo if one is available. + * @return the next BrokerDetail or null if there is none. + */ + BrokerInfo getNextBrokerDetails(); + + /** + * Set the currently active broker to be the new value. + * @param broker The new BrokerDetail value + */ + void setBroker(BrokerInfo broker); + + /** + * Set the retries for this method + * @param maxRetries the maximum number of time to retry this Method + */ + void setRetries(int maxRetries); + + /** + * @return The name of this method for display purposes. + */ + String methodName(); + } +} diff --git a/dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs b/dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs new file mode 100644 index 0000000000..c0e832ce21 --- /dev/null +++ b/dotnet/Qpid.Client/qms/failover/FailoverRoundRobin.cs @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using log4net; + +namespace Qpid.Client.qms.failover +{ + public class FailoverRoundRobin : FailoverMethod + { + private static readonly ILog _logger = LogManager.GetLogger(typeof(FailoverRoundRobin)); + + /** The default number of times to cycle through all servers */ + public const int DEFAULT_CYCLE_RETRIES = 0; + /** The default number of times to retry each server */ + public const int DEFAULT_SERVER_RETRIES = 0; + + /** + * The index into the hostDetails array of the broker to which we are connected + */ + private int _currentBrokerIndex = -1; + + /** + * The number of times to retry connecting for each server + */ + private int _serverRetries; + + /** + * The current number of retry attempts made + */ + private int _currentServerRetry; + + /** + * The number of times to cycle through the servers + */ + private int _cycleRetries; + + /** + * The current number of cycles performed. + */ + private int _currentCycleRetries; + + /** + * Array of BrokerDetail used to make connections. + */ + private ConnectionInfo _connectionDetails; + + public FailoverRoundRobin(ConnectionInfo connectionDetails) + { + if (!(connectionDetails.getBrokerCount() > 0)) + { + throw new ArgumentException("At least one broker details must be specified."); + } + + _connectionDetails = connectionDetails; + + //There is no current broker at startup so set it to -1. + _currentBrokerIndex = -1; + + String cycleRetries = _connectionDetails.getFailoverOption(ConnectionUrlConstants.OPTIONS_FAILOVER_CYCLE); + + if (cycleRetries != null) + { + try + { + _cycleRetries = int.Parse(cycleRetries); + } + catch (FormatException) + { + _cycleRetries = DEFAULT_CYCLE_RETRIES; + } + } + + _currentCycleRetries = 0; + + _serverRetries = 0; + _currentServerRetry = -1; + } + + public void reset() + { + _currentBrokerIndex = 0; + _currentCycleRetries = 0; + _currentServerRetry = -1; + } + + public bool failoverAllowed() + { + return ((_currentCycleRetries < _cycleRetries) + || (_currentServerRetry < _serverRetries) + || (_currentBrokerIndex < (_connectionDetails.getBrokerCount() - 1))); + } + + public void attainedConnection() + { + _currentCycleRetries = 0; + _currentServerRetry = -1; + } + + public BrokerInfo GetCurrentBrokerInfo() + { + if (_currentBrokerIndex == -1) + { + return null; + } + + return _connectionDetails.GetBrokerDetails(_currentBrokerIndex); + } + + public BrokerInfo getNextBrokerDetails() + { + if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1)) + { + if (_currentServerRetry < _serverRetries) + { + if (_currentBrokerIndex == -1) + { + _currentBrokerIndex = 0; + + setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex )); + + _logger.Info("First Run using " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex)); + } + else + { + _logger.Info("Retrying " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex)); + } + + _currentServerRetry++; + } + else + { + _currentCycleRetries++; + //failed to connect to first broker + _currentBrokerIndex = 0; + + setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex )); + + // This is zero rather than -1 as we are already retrieving the details. + _currentServerRetry = 0; + } + //else - should force client to stop as max retries has been reached. + } + else + { + if (_currentServerRetry < _serverRetries) + { + if (_currentBrokerIndex == -1) + { + _currentBrokerIndex = 0; + + setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex )); + + _logger.Info("First Run using " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex)); + } + else + { + _logger.Info("Retrying " + _connectionDetails.GetBrokerDetails(_currentBrokerIndex)); + } + _currentServerRetry++; + } + else + { + _currentBrokerIndex++; + + setBroker(_connectionDetails.GetBrokerDetails(_currentBrokerIndex )); + // This is zero rather than -1 as we are already retrieving the details. + _currentServerRetry = 0; + } + } + + return _connectionDetails.GetBrokerDetails(_currentBrokerIndex); + } + + public void setBroker(BrokerInfo broker) + { + _connectionDetails.AddBrokerInfo(broker); + + int index = _connectionDetails.GetAllBrokerInfos().IndexOf(broker); + + String serverRetries = broker.getOption(BrokerDetailsConstants.OPTIONS_RETRY); + + if (serverRetries != null) + { + try + { + _serverRetries = int.Parse(serverRetries); + } + catch (FormatException) + { + _serverRetries = DEFAULT_SERVER_RETRIES; + } + } + + _currentServerRetry = -1; + _currentBrokerIndex = index; + } + + public void setRetries(int maxRetries) + { + _cycleRetries = maxRetries; + } + + public String methodName() + { + return "Cycle Servers"; + } + + public override string ToString() + { + StringBuilder sb = new StringBuilder(); + + sb.Append(GetType().Name).Append("\n"); + + sb.Append("Broker count: ").Append(_connectionDetails.getBrokerCount()); + sb.Append("\ncurrent broker index: ").Append(_currentBrokerIndex); + + sb.Append("\nCycle Retries: ").Append(_cycleRetries); + sb.Append("\nCurrent Cycle:").Append(_currentCycleRetries); + sb.Append("\nServer Retries:").Append(_serverRetries); + sb.Append("\nCurrent Retry:").Append(_currentServerRetry); + sb.Append("\n"); + + for(int i=0; i < _connectionDetails.getBrokerCount() ; i++) + { + if (i == _currentBrokerIndex) + { + sb.Append(">"); + } + sb.Append(_connectionDetails.GetBrokerDetails(i)); + sb.Append("\n"); + } + + return sb.ToString(); + } + } +} diff --git a/dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs b/dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs new file mode 100644 index 0000000000..f077f75fdf --- /dev/null +++ b/dotnet/Qpid.Client/qms/failover/FailoverSingleServer.cs @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; + +namespace Qpid.Client.qms.failover +{ + public class FailoverSingleServer : FailoverMethod + { + /** The default number of times to rety a conection to this server */ + public const int DEFAULT_SERVER_RETRIES = 1; + + /** + * The details of the Single Server + */ + private BrokerInfo _brokerDetail; + + /** + * The number of times to retry connecting to the sever + */ + private int _retries; + + /** + * The current number of attempts made to the server + */ + private int _currentRetries; + + + public FailoverSingleServer(ConnectionInfo connectionDetails) + { + if (connectionDetails.getBrokerCount() > 0) + { + setBroker(connectionDetails.GetBrokerDetails(0)); + } + else + { + throw new ArgumentException("BrokerInfo details required for connection."); + } + } + + public FailoverSingleServer(BrokerInfo brokerDetail) + { + setBroker(brokerDetail); + } + + public void reset() + { + _currentRetries = -1; + } + + public bool failoverAllowed() + { + return _currentRetries < _retries; + } + + public void attainedConnection() + { + reset(); + } + + public BrokerInfo GetCurrentBrokerInfo() + { + return _brokerDetail; + } + + public BrokerInfo getNextBrokerDetails() + { + if (_currentRetries == _retries) + { + return null; + } + else + { + if (_currentRetries < _retries) + { + _currentRetries ++; + } + + return _brokerDetail; + } + } + + public void setBroker(BrokerInfo broker) + { + if (broker == null) + { + throw new ArgumentException("BrokerInfo details cannot be null"); + } + _brokerDetail = broker; + + String retries = broker.getOption(BrokerDetailsConstants.OPTIONS_RETRY); + if (retries != null) + { + try + { + _retries = int.Parse(retries); + } + catch (FormatException nfe) + { + _retries = DEFAULT_SERVER_RETRIES; + } + } + else + { + _retries = DEFAULT_SERVER_RETRIES; + } + + reset(); + } + + public void setRetries(int retries) + { + _retries = retries; + } + + public String methodName() + { + return "Single Server"; + } + + public String toString() + { + return "SingleServer:\n"+ + "Max Retries:"+_retries+ + "\nCurrent Retry:"+_currentRetries+ + "\n"+_brokerDetail+"\n"; + } + + } +}
\ No newline at end of file |
