diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client.Transport.Socket.Blocking')
6 files changed, 514 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs new file mode 100644 index 0000000000..b62b11a3db --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketProcessor.cs @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Net; +using System.Net.Sockets; +using log4net; +using Apache.Qpid.Buffer; +using Apache.Qpid.Client.Protocol; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + class BlockingSocketProcessor : IConnectionCloser + { + private static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketProcessor)); + + string _host; + int _port; + System.Net.Sockets.Socket _socket; + private NetworkStream _networkStream; + IByteChannel _byteChannel; + IProtocolListener _protocolListener; + + public BlockingSocketProcessor(string host, int port, IProtocolListener protocolListener) + { + _host = host; + _port = port; + _protocolListener = protocolListener; + _byteChannel = new ByteChannel(this); + } + + /// <summary> + /// Synchronous blocking connect. + /// </summary> + public void Connect() + { + _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + + /// For future note TCP Set NoDelay options may help, though with the blocking io not sure + /// The Don't linger may help with detecting disconnect but that hasn't been the case in testing. + /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.NoDelay, 0); + /// _socket.SetSocketOption (SocketOptionLevel.Socket, SocketOptionName.DontLinger, 0); + + IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility. + IPAddress ipAddress = ipHostInfo.AddressList[0]; + + IPEndPoint ipe = new IPEndPoint(ipAddress, _port); + + _socket.Connect(ipe); + _networkStream = new NetworkStream(_socket, true); + } + + public string getLocalEndPoint() + { + return _socket.LocalEndPoint.ToString(); + } + + public void Write(ByteBuffer byteBuffer) + { + try + { + _networkStream.Write(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit()); // FIXME + } + catch (Exception e) + { + _log.Error("Write caused exception", e); + _protocolListener.OnException(e); + // We should provide the error synchronously as we are doing blocking io. + throw e; + } + } + + public ByteBuffer Read() + { + const int bufferSize = 4 * 1024; // TODO: Prevent constant allocation of buffers. + byte[] bytes = new byte[bufferSize]; + + int numOctets = _networkStream.Read(bytes, 0, bytes.Length); + + /// Read only returns 0 if the socket has been gracefully shutdown. + /// http://msdn2.microsoft.com/en-us/library/system.net.sockets.networkstream.read(VS.71).aspx + /// We can use this to block Send so the next Read will force an exception forcing failover. + /// Otherwise we need to wait ~20 seconds for the NetworkStream/Socket code to realise that + /// the socket has been closed. + if (numOctets == 0) + { + _socket.Shutdown(SocketShutdown.Send); + _socket.Close(); + } + + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.limit(numOctets); + + byteBuffer.flip(); + + return byteBuffer; + } + + public void Disconnect() + { + _networkStream.Flush(); + _networkStream.Close(); + _socket.Close(); + } + + public void Close() + { + Disconnect(); + } + + public IByteChannel ByteChannel + { + get { return _byteChannel; } + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketTransport.cs b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketTransport.cs new file mode 100644 index 0000000000..17f911fb6d --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/BlockingSocketTransport.cs @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Threading; +using log4net; +using Apache.Qpid.Client.Protocol; +using Apache.Qpid.Framing; + +namespace Qpid.Client.Transport.Socket.Blocking +{ + public class BlockingSocketTransport : ITransport + { +// static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketTransport)); + + // Configuration variables. + string _host; + int _port; + IProtocolListener _protocolListener; + + // Runtime variables. + private BlockingSocketProcessor _socketProcessor; + private AmqpChannel _amqpChannel; + + private ReaderRunner _readerRunner; + private Thread _readerThread; + + public BlockingSocketTransport(string host, int port, AMQConnection connection) + { + _host = host; + _port = port; + _protocolListener = connection.ProtocolListener; + } + + public void Open() + { + _socketProcessor = new BlockingSocketProcessor(_host, _port, _protocolListener); + _socketProcessor.Connect(); + _amqpChannel = new AmqpChannel(_socketProcessor.ByteChannel); + _readerRunner = new ReaderRunner(this); + _readerThread = new Thread(new ThreadStart(_readerRunner.Run)); + _readerThread.Start(); + } + + public string getLocalEndPoint() + { + return _socketProcessor.getLocalEndPoint(); + } + + public void Close() + { + StopReaderThread(); + _socketProcessor.Disconnect(); + } + + public IProtocolChannel ProtocolChannel { get { return _amqpChannel; } } + public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } } + + public void StopReaderThread() + { + _readerRunner.Stop(); + } + + class ReaderRunner + { + BlockingSocketTransport _transport; + bool _running = true; + + public ReaderRunner(BlockingSocketTransport transport) + { + _transport = transport; + } + + public void Run() + { + try + { + while (_running) + { + Queue frames = _transport.ProtocolChannel.Read(); + + foreach (IDataBlock dataBlock in frames) + { + _transport._protocolListener.OnMessage(dataBlock); + } + } + } + catch (Exception e) + { + _transport._protocolListener.OnException(e); + } + } + + public void Stop() + { + // TODO: Check if this is thread safe. running is not volitile.... + _running = false; + } + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs new file mode 100644 index 0000000000..5f67e99838 --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Transport.Socket.Blocking +{ + class ByteChannel : IByteChannel + { + // Warning: don't use this log for regular logging. + private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel"); + + BlockingSocketProcessor processor; + + public ByteChannel(BlockingSocketProcessor processor) + { + this.processor = processor; + } + + public ByteBuffer Read() + { + ByteBuffer result = processor.Read(); + + // TODO: Move into decorator. + if (_ioTraceLog.IsDebugEnabled) + { + _ioTraceLog.Debug(String.Format("READ {0}", result)); + } + + return result; + } + + public void Write(ByteBuffer buffer) + { + // TODO: Move into decorator. + if (_ioTraceLog.IsDebugEnabled) + { + _ioTraceLog.Debug(String.Format("WRITE {0}", buffer)); + } + + processor.Write(buffer); + } + } +} diff --git a/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..19599b0833 --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Reflection; +using System.Runtime.InteropServices; +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Apache.Qpid.Transport.Blocking")] +[assembly: AssemblyDescription("Built from svn revision number: ")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Apache Software Foundation")] +[assembly: AssemblyProduct("Apache.Qpid.Transport.Blocking")] +[assembly: AssemblyCopyright("Apache Software Foundation")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("ca23e89c-f5b9-4f7a-929a-4fae00ef055b")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Revision and Build Numbers +// by using the '*' as shown below: +[assembly: AssemblyVersion("0.5.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Qpid.Client.Transport.Socket.Blocking.csproj b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Qpid.Client.Transport.Socket.Blocking.csproj new file mode 100644 index 0000000000..6a0b1cd8e6 --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Qpid.Client.Transport.Socket.Blocking.csproj @@ -0,0 +1,92 @@ +<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.50727</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{52AC4940-2077-4104-A753-29A9C8C16957}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.Client.Transport.Socket.Blocking</RootNamespace>
+ <AssemblyName>Apache.Qpid.Client.Transport.Socket.Blocking</AssemblyName>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net, Version=1.2.0.30714, Culture=neutral, PublicKeyToken=500ffcafb14f92df">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Qpid.Common\lib\log4net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BlockingSocketProcessor.cs" />
+ <Compile Include="BlockingSocketTransport.cs" />
+ <Compile Include="ByteChannel.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Qpid.Buffer\Qpid.Buffer.csproj">
+ <Project>{44384DF2-B0A4-4580-BDBC-EE4BAA87D995}</Project>
+ <Name>Qpid.Buffer</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Client\Qpid.Client.csproj">
+ <Project>{68987C05-3768-452C-A6FC-6BA1D372852F}</Project>
+ <Name>Qpid.Client</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Common\Qpid.Common.csproj">
+ <Project>{77064C42-24D2-4CEB-9EA2-0EF481A43205}</Project>
+ <Name>Qpid.Common</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\Qpid.Messaging\Qpid.Messaging.csproj">
+ <Project>{6688F826-C58E-4C1B-AA1F-22AFAB4B7D07}</Project>
+ <Name>Qpid.Messaging</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
diff --git a/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Qpid.Client.Transport.Socket.Blocking.mdp b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Qpid.Client.Transport.Socket.Blocking.mdp new file mode 100644 index 0000000000..54c3be76e5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Qpid.Client.Transport.Socket.Blocking.mdp @@ -0,0 +1,50 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<Project name="Qpid.Client.Transport.Socket.Blocking" fileversion="2.0" language="C#" clr-version="Net_1_1" ctype="DotNetProject"> + <Configurations active="Debug"> + <Configuration name="Debug" ctype="DotNetProjectConfiguration"> + <Output directory="./bin/Debug" assembly="Qpid.Client.Transport.Socket.Blocking" /> + <Build debugmode="True" target="Library" /> + <Execution runwithwarnings="True" consolepause="False" runtime="MsNet" clr-version="Net_1_1" /> + <CodeGeneration compiler="Csc" warninglevel="4" optimize="True" unsafecodeallowed="False" generateoverflowchecks="True" generatexmldocumentation="False" ctype="CSharpCompilerParameters" /> + </Configuration> + <Configuration name="Release" ctype="DotNetProjectConfiguration"> + <Output directory="./bin/Release" assembly="Qpid.Client.Transport.Socket.Blocking" /> + <Build debugmode="False" target="Library" /> + <Execution runwithwarnings="True" consolepause="False" runtime="MsNet" clr-version="Net_1_1" /> + <CodeGeneration compiler="Csc" warninglevel="4" optimize="True" unsafecodeallowed="False" generateoverflowchecks="True" generatexmldocumentation="False" ctype="CSharpCompilerParameters" /> + </Configuration> + </Configurations> + <DeployTargets /> + <Contents> + <File name="./Properties/AssemblyInfo.cs" subtype="Code" buildaction="Compile" /> + <File name="./BlockingSocketProcessor.cs" subtype="Code" buildaction="Compile" /> + <File name="./BlockingSocketTransport.cs" subtype="Code" buildaction="Compile" /> + <File name="./ByteChannel.cs" subtype="Code" buildaction="Compile" /> + </Contents> + <References> + <ProjectReference type="Gac" localcopy="True" refto="System, Version=1.0.5000.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" /> + <ProjectReference type="Assembly" localcopy="True" refto="../Qpid.Common/lib/log4net/log4net.dll" /> + <ProjectReference type="Project" localcopy="True" refto="Qpid.Buffer" /> + <ProjectReference type="Project" localcopy="True" refto="Qpid.Client" /> + <ProjectReference type="Project" localcopy="True" refto="Qpid.Common" /> + </References> +</Project>
\ No newline at end of file |
