diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-09 16:53:51 +0000 |
| commit | 30d213dc1e6d743f2f0abb44c8bc91868d5126b1 (patch) | |
| tree | f3d16257ed0a431f2f4c43166f4df84ccb877a6c /qpid/java/amqp-1-0-client/src | |
| parent | b165cf52a4ef16ac5a5ee181d4da2db351f7882d (diff) | |
| download | qpid-python-30d213dc1e6d743f2f0abb44c8bc91868d5126b1.tar.gz | |
QPID-5459 : Add WebSocket transport support to the Java Broker and AMQP 1-0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556873 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-client/src')
6 files changed, 417 insertions, 211 deletions
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index fc0ff427a2..b2d86c4dbc 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,26 +20,15 @@ */ package org.apache.qpid.amqp_1_0.client; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; import java.security.Principal; +import java.util.ServiceLoader; import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.net.ssl.SSLSocketFactory; - -import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; -import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.Predicate; -import org.apache.qpid.amqp_1_0.transport.StateChangeListener; -import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; @@ -47,9 +36,8 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.Error; -public class Connection implements SocketExceptionHandler +public class Connection implements ExceptionHandler { - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); private static final int MAX_FRAME_SIZE = 65536; private String _address; @@ -148,6 +136,20 @@ public class Connection implements SocketExceptionHandler } + public Connection(final String protocol, + final String address, + final int port, + final String username, + final String password, + final Container container, + final String remoteHost, + final boolean ssl, + final int channelMax) throws ConnectionException + { + this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl, + channelMax); + } + public Connection(final String address, final int port, final String username, @@ -158,141 +160,107 @@ public class Connection implements SocketExceptionHandler boolean ssl, int channelMax) throws ConnectionException { + this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,remoteHostname,ssl,channelMax); + } - _address = address; + public Connection(final String protocol, + final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname, + boolean ssl, + int channelMax) throws ConnectionException + { - try - { - final Socket s; - if(ssl) - { - s = SSLSocketFactory.getDefault().createSocket(address, port); - } - else - { - s = new Socket(address, port); - } + _address = address; - Principal principal = username == null ? null : new Principal() - { + Principal principal = username == null ? null : new Principal() + { - public String getName() - { - return username; - } - }; - _conn = new ConnectionEndpoint(container, principal, password); - if(channelMax >= 0) + public String getName() { - _conn.setChannelMax((short)channelMax); + return username; } - _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); - _conn.setRemoteAddress(s.getRemoteSocketAddress()); - _conn.setRemoteHostname(remoteHostname); - - - - ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn); - - - final OutputStream outputStream = s.getOutputStream(); - - ConnectionHandler.BytesSource src; + }; + _conn = new ConnectionEndpoint(container, principal, password); + if(channelMax >= 0) + { + _conn.setChannelMax((short)channelMax); + } + _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); + _conn.setRemoteHostname(remoteHostname); - if(_conn.requiresSASL()) - { - ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn); - - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)3, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), - new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) - ); - - _conn.setSaslFrameOutput(saslOut); - } - else - { - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) - ); - } + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn); + ConnectionHandler.BytesSource src; - ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this); - Thread outputThread = new Thread(outputHandler); - outputThread.setDaemon(true); - outputThread.start(); - _conn.setFrameOutputHandler(out); + if(_conn.requiresSASL()) + { + ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn); + + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), + new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + + _conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + } + TransportProvider transportProvider = getTransportProvider(protocol); + transportProvider.connect(_conn,address,port,ssl, this); - final ConnectionHandler handler = new ConnectionHandler(_conn); - final InputStream inputStream = s.getInputStream(); - Thread inputThread = new Thread(new Runnable() - { + _conn.open(); - public void run() - { - try - { - doRead(handler, inputStream); - } - finally - { - if(_conn.closedForInput() && _conn.closedForOutput()) - { - try - { - synchronized (outputStream) - { - s.close(); - } - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - }); - - inputThread.setDaemon(true); - inputThread.start(); + } - _conn.open(); + private TransportProvider getTransportProvider(final String protocol) throws ConnectionException + { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader); - } - catch (IOException e) + for(TransportProviderFactory tpf : providerFactories) { - throw new ConnectionException(e); + if(tpf.getSupportedTransports().contains(protocol)) + { + return tpf.getProvider(protocol); + } } - + throw new ConnectionException("Unknown protocol: " + protocol); } private Connection(ConnectionEndpoint endpoint) @@ -301,45 +269,6 @@ public class Connection implements SocketExceptionHandler } - private void doRead(final AMQPTransport transport, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - ByteBuffer bbuf = ByteBuffer.wrap(buf); - final Object lock = new Object(); - transport.setInputStateChangeListener(new StateChangeListener(){ - - public void onStateChange(final boolean active) - { - synchronized(lock) - { - lock.notifyAll(); - } - } - }); - - try - { - int read; - while((read = inputStream.read(buf)) != -1) - { - bbuf.position(0); - bbuf.limit(read); - - while(bbuf.hasRemaining() && transport.isOpenForInput()) - { - transport.processBytes(bbuf); - } - - - } - } - catch (IOException e) - { - e.printStackTrace(); - } - - } - public Session createSession() throws ConnectionException { checkNotClosed(); @@ -373,47 +302,6 @@ public class Connection implements SocketExceptionHandler } - private void doRead(final ConnectionHandler handler, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - - - try - { - int read; - boolean done = false; - while(!handler.isDone() && (read = inputStream.read(buf)) != -1) - { - ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); - Binary b = new Binary(buf,0,read); - - if(RAW_LOGGER.isLoggable(Level.FINE)) - { - RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); - } - while(bbuf.hasRemaining() && !handler.isDone()) - { - handler.parse(bbuf); - } - - - } - if(!handler.isDone()) - { - _conn.inputClosed(); - if(_conn.getConnectionEventListener() != null) - { - _conn.getConnectionEventListener().closeReceived(); - } - } - } - catch (IOException e) - { - _conn.inputClosed(); - e.printStackTrace(); - } - } - public void close() throws ConnectionErrorException { _conn.close(); @@ -465,7 +353,7 @@ public class Connection implements SocketExceptionHandler } @Override - public void processSocketException(Exception exception) + public void handleException(Exception exception) { Error socketError = new Error(); socketError.setDescription(exception.getClass() + ": " + exception.getMessage()); diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java new file mode 100644 index 0000000000..2327a3860a --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.util.Arrays; +import java.util.Collection; + +public class TCPTransportProviderFactory implements TransportProviderFactory +{ + @Override + public Collection<String> getSupportedTransports() + { + return Arrays.asList("amqp","amqps"); + } + + @Override + public TransportProvider getProvider(final String transport) + { + return new TCPTransportProvier(transport); + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java new file mode 100644 index 0000000000..1c5eb0a34c --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.type.FrameBody; +import org.apache.qpid.amqp_1_0.type.SaslFrameBody; + +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; + +class TCPTransportProvier implements TransportProvider +{ + private final String _transport; + + public TCPTransportProvier(final String transport) + { + _transport = transport; + } + + @Override + public void connect(final ConnectionEndpoint conn, + final String address, + final int port, + final boolean ssl, + final ExceptionHandler exceptionHandler) throws ConnectionException + { + try + { + final Socket s; + if(ssl) + { + s = SSLSocketFactory.getDefault().createSocket(address, port); + } + else + { + s = new Socket(address, port); + } + + conn.setRemoteAddress(s.getRemoteSocketAddress()); + + + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn); + + ConnectionHandler.BytesSource src; + + if(conn.requiresSASL()) + { + ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn); + + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,conn.getDescribedTypeRegistry()), + new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry()) + ); + + conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn,(byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,conn.getDescribedTypeRegistry()) + ); + } + + + final OutputStream outputStream = s.getOutputStream(); + ConnectionHandler.BytesOutputHandler outputHandler = + new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler); + Thread outputThread = new Thread(outputHandler); + outputThread.setDaemon(true); + outputThread.start(); + conn.setFrameOutputHandler(out); + + + final ConnectionHandler handler = new ConnectionHandler(conn); + final InputStream inputStream = s.getInputStream(); + + Thread inputThread = new Thread(new Runnable() + { + + public void run() + { + try + { + doRead(conn, handler, inputStream); + } + finally + { + if(conn.closedForInput() && conn.closedForOutput()) + { + try + { + synchronized (outputStream) + { + s.close(); + } + } + catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + }); + + inputThread.setDaemon(true); + inputThread.start(); + + } + catch (IOException e) + { + throw new ConnectionException(e); + } + } + private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream) + { + byte[] buf = new byte[2<<15]; + + + try + { + int read; + boolean done = false; + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) + { + ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); + while(bbuf.hasRemaining() && !handler.isDone()) + { + handler.parse(bbuf); + } + + + } + if(!handler.isDone()) + { + conn.inputClosed(); + if(conn.getConnectionEventListener() != null) + { + conn.getConnectionEventListener().closeReceived(); + } + } + } + catch (IOException e) + { + conn.inputClosed(); + e.printStackTrace(); + } + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java new file mode 100644 index 0000000000..2430b0e14b --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.type.FrameBody; + +public interface TransportProvider +{ + void connect(ConnectionEndpoint conn, + String address, + int port, + boolean ssl, + ExceptionHandler exceptionHandler) throws ConnectionException; +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java new file mode 100644 index 0000000000..82999c5ccc --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.util.Collection; + +public interface TransportProviderFactory +{ + Collection<String> getSupportedTransports(); + TransportProvider getProvider(String transport); +} diff --git a/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory new file mode 100644 index 0000000000..ffde030b30 --- /dev/null +++ b/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.amqp_1_0.client.TCPTransportProviderFactory
\ No newline at end of file |
