diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-14 18:52:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-14 18:52:42 +0000 |
| commit | c4865a9355ff5362b525819090a486f075be042c (patch) | |
| tree | db4a8c011898b185f6e01206b182a9744da7bccf /qpid/java | |
| parent | 24e93c0cf64e830980959c17777e86849cf5df7a (diff) | |
| download | qpid-python-c4865a9355ff5362b525819090a486f075be042c.tar.gz | |
QPID-4659 : [Java Broker] make protocol engines pluggable
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503024 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
18 files changed, 628 insertions, 240 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java new file mode 100644 index 0000000000..2fa9084b8b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java @@ -0,0 +1,35 @@ +package org.apache.qpid.server.plugin;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.transport.network.NetworkConnection; + +public interface ProtocolEngineCreator extends Pluggable +{ + AmqpProtocolVersion getVersion(); + byte[] getHeaderIdentifier(); + ServerProtocolEngine newProtocolEngine(Broker broker, NetworkConnection network, Port port, Transport transport, long id); +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/LinkModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/LinkModel.java new file mode 100644 index 0000000000..16120a3523 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/LinkModel.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface LinkModel +{ +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java index 0bb9a15968..67d6e9f8d1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/LinkRegistry.java @@ -18,22 +18,22 @@ * under the License. * */ -package org.apache.qpid.server.protocol.v1_0; +package org.apache.qpid.server.protocol; import java.util.HashMap; import java.util.Map; public class LinkRegistry { - private final Map<String, SendingLink_1_0> _sendingLinks = new HashMap<String, SendingLink_1_0>(); - private final Map<String, ReceivingLink_1_0> _receivingLinks = new HashMap<String, ReceivingLink_1_0>(); + private final Map<String, LinkModel> _sendingLinks = new HashMap<String, LinkModel>(); + private final Map<String, LinkModel> _receivingLinks = new HashMap<String, LinkModel>(); - public synchronized SendingLink_1_0 getDurableSendingLink(String name) + public synchronized LinkModel getDurableSendingLink(String name) { return _sendingLinks.get(name); } - public synchronized boolean registerSendingLink(String name, SendingLink_1_0 link) + public synchronized boolean registerSendingLink(String name, LinkModel link) { if(_sendingLinks.containsKey(name)) { @@ -59,12 +59,12 @@ public class LinkRegistry } } - public synchronized ReceivingLink_1_0 getDurableReceivingLink(String name) + public synchronized LinkModel getDurableReceivingLink(String name) { return _receivingLinks.get(name); } - public synchronized boolean registerReceivingLink(String name, ReceivingLink_1_0 link) + public synchronized boolean registerReceivingLink(String name, LinkModel link) { if(_receivingLinks.containsKey(name)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 5e920f1c4e..47b578c4ef 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -37,13 +37,7 @@ import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; -import org.apache.qpid.server.protocol.v0_10.ProtocolEngine_0_10; -import org.apache.qpid.server.protocol.v0_10.ServerConnection; -import org.apache.qpid.server.protocol.v0_10.ServerConnectionDelegate; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolEngine; -import org.apache.qpid.server.protocol.v1_0.ProtocolEngine_1_0_0; -import org.apache.qpid.server.protocol.v1_0.ProtocolEngine_1_0_0_SASL; -import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.SSLStatus; @@ -60,6 +54,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private final boolean _needClientAuth; private final Port _port; private final Transport _transport; + private final ProtocolEngineCreator[] _creators; private Set<AmqpProtocolVersion> _supported; private String _fqdn; @@ -74,7 +69,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth, final Set<AmqpProtocolVersion> supported, final AmqpProtocolVersion defaultSupportedReply, - Port port, Transport transport, final long id) + Port port, Transport transport, final long id, ProtocolEngineCreator[] creators) { if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply)) { @@ -91,6 +86,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _needClientAuth = needClientAuth; _port = port; _transport = transport; + _creators = creators; } @@ -147,73 +143,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; - private static final byte[] AMQP_0_8_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 8, - (byte) 0 - }; - - private static final byte[] AMQP_0_9_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 0, - (byte) 9 - }; - - private static final byte[] AMQP_0_9_1_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 0, - (byte) 0, - (byte) 9, - (byte) 1 - }; - - - private static final byte[] AMQP_0_10_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 1, - (byte) 1, - (byte) 0, - (byte) 10 - }; - - private static final byte[] AMQP_1_0_0_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }; - - private static final byte[] AMQP_SASL_1_0_0_HEADER = - new byte[] { (byte) 'A', - (byte) 'M', - (byte) 'Q', - (byte) 'P', - (byte) 3, - (byte) 1, - (byte) 0, - (byte) 0 - }; - public void setNetworkConnection(NetworkConnection networkConnection) { setNetworkConnection(networkConnection, networkConnection.getSender()); @@ -247,144 +176,6 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } - private static interface DelegateCreator - { - AmqpProtocolVersion getVersion(); - byte[] getHeaderIdentifier(); - ServerProtocolEngine getProtocolEngine(); - } - - private DelegateCreator creator_0_8 = new DelegateCreator() - { - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v0_8; - } - - public byte[] getHeaderIdentifier() - { - return AMQP_0_8_HEADER; - } - - public ServerProtocolEngine getProtocolEngine() - { - return new AMQProtocolEngine(_broker, _network, _id, _port, _transport); - } - }; - - private DelegateCreator creator_0_9 = new DelegateCreator() - { - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v0_9; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_0_9_HEADER; - } - - public ServerProtocolEngine getProtocolEngine() - { - return new AMQProtocolEngine(_broker, _network, _id, _port, _transport); - } - }; - - private DelegateCreator creator_0_9_1 = new DelegateCreator() - { - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v0_9_1; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_0_9_1_HEADER; - } - - public ServerProtocolEngine getProtocolEngine() - { - return new AMQProtocolEngine(_broker, _network, _id, _port, _transport); - } - }; - - - private DelegateCreator creator_0_10 = new DelegateCreator() - { - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v0_10; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_0_10_HEADER; - } - - public ServerProtocolEngine getProtocolEngine() - { - final ConnectionDelegate connDelegate = new ServerConnectionDelegate(_broker, - _fqdn, _broker.getSubjectCreator(getLocalAddress())); - - ServerConnection conn = new ServerConnection(_id); - - conn.setConnectionDelegate(connDelegate); - conn.setRemoteAddress(_network.getRemoteAddress()); - conn.setLocalAddress(_network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, _network, _port, _transport); - } - }; - - private DelegateCreator creator_1_0_0 = new DelegateCreator() - { - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v1_0_0; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_1_0_0_HEADER; - } - - public ServerProtocolEngine getProtocolEngine() - { - return new ProtocolEngine_1_0_0(_network, _broker, _id, _port, _transport); - } - }; - - private DelegateCreator creator_1_0_0_SASL = new DelegateCreator() - { - - public AmqpProtocolVersion getVersion() - { - return AmqpProtocolVersion.v1_0_0; - } - - - public byte[] getHeaderIdentifier() - { - return AMQP_SASL_1_0_0_HEADER; - } - - public ServerProtocolEngine getProtocolEngine() - { - return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id, _port, _transport); - } - }; - - private final DelegateCreator[] _creators = - new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10, creator_1_0_0_SASL, creator_1_0_0 }; - private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { @@ -526,7 +317,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } if(equal) { - newDelegate = _creators[i].getProtocolEngine(); + newDelegate = _creators[i].newProtocolEngine(_broker, _network, _port, _transport, _id); } } @@ -667,7 +458,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported, - _defaultSupportedReply, _port, Transport.SSL, _id); + _defaultSupportedReply, _port, Transport.SSL, _id, _creators); _engine = _sslContext.createSSLEngine(); _engine.setUseClientMode(false); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 4b76546da1..3ce9383ee0 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol; +import java.util.ArrayList; +import java.util.List; import javax.net.ssl.SSLContext; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.protocol.ServerProtocolEngine; @@ -29,6 +31,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.server.plugin.QpidServiceLoader; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { @@ -42,6 +46,7 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory private final boolean _needClientAuth; private final Port _port; private final Transport _transport; + private final ProtocolEngineCreator[] _creators; public MultiVersionProtocolEngineFactory(Broker broker, SSLContext sslContext, @@ -62,6 +67,12 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory _sslContext = sslContext; _supported = supportedVersions; _defaultSupportedReply = defaultSupportedReply; + List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>(); + for(ProtocolEngineCreator c : new QpidServiceLoader<ProtocolEngineCreator>().instancesOf(ProtocolEngineCreator.class)) + { + creators.add(c); + } + _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]); _wantClientAuth = wantClientAuth; _needClientAuth = needClientAuth; _port = port; @@ -72,7 +83,7 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth, _supported, _defaultSupportedReply, _port, _transport, - ID_GENERATOR.getAndIncrement() - ); + ID_GENERATOR.getAndIncrement(), + _creators); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java new file mode 100644 index 0000000000..ad54bc148a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator +{ + + private static final byte[] AMQP_0_10_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 10 + }; + + + public ProtocolEngineCreator_0_10() + { + } + + public AmqpProtocolVersion getVersion() + { + return AmqpProtocolVersion.v0_10; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_0_10_HEADER; + } + + public ServerProtocolEngine newProtocolEngine(Broker broker, + NetworkConnection network, + Port port, + Transport transport, + long id) + { + String fqdn = null; + SocketAddress address = network.getLocalAddress(); + if (address instanceof InetSocketAddress) + { + fqdn = ((InetSocketAddress) address).getHostName(); + } + final ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, + fqdn, broker.getSubjectCreator(address)); + + ServerConnection conn = new ServerConnection(id); + + conn.setConnectionDelegate(connDelegate); + conn.setRemoteAddress(network.getRemoteAddress()); + conn.setLocalAddress(network.getLocalAddress()); + return new ProtocolEngine_0_10( conn, network, port, transport); + } + + + private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_10(); + + public static ProtocolEngineCreator getInstance() + { + return INSTANCE; + } + + @Override + public String getType() + { + return getVersion().toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index d1919c30e2..73708d9841 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -49,7 +49,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network, Port port, Transport transport) + NetworkConnection network, + Port port, + Transport transport) { super(new Assembler(conn)); _connection = conn; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java new file mode 100644 index 0000000000..5ee56508d7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngineCreator_0_8 implements ProtocolEngineCreator +{ + private static final byte[] AMQP_0_8_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 8, + (byte) 0 + }; + + + public ProtocolEngineCreator_0_8() + { + } + + public AmqpProtocolVersion getVersion() + { + return AmqpProtocolVersion.v0_8; + } + + public byte[] getHeaderIdentifier() + { + return AMQP_0_8_HEADER; + } + + public ServerProtocolEngine newProtocolEngine(Broker broker, + NetworkConnection network, + Port port, + Transport transport, + long id) + { + return new AMQProtocolEngine(broker, network, id, port, transport); + } + + private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_8(); + + public static ProtocolEngineCreator getInstance() + { + return INSTANCE; + } + + @Override + public String getType() + { + return getVersion().toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java new file mode 100644 index 0000000000..2a29348261 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngineCreator_0_9 implements ProtocolEngineCreator +{ + private static final byte[] AMQP_0_9_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 1, + (byte) 1, + (byte) 0, + (byte) 9 + }; + + public ProtocolEngineCreator_0_9() + { + } + + public AmqpProtocolVersion getVersion() + { + return AmqpProtocolVersion.v0_9; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_0_9_HEADER; + } + + public ServerProtocolEngine newProtocolEngine(Broker broker, + NetworkConnection network, + Port port, + Transport transport, + long id) + { + return new AMQProtocolEngine(broker, network, id, port, transport); + } + + private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_9(); + + public static ProtocolEngineCreator getInstance() + { + return INSTANCE; + } + + @Override + public String getType() + { + return getVersion().toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java new file mode 100644 index 0000000000..dad6bef032 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngineCreator_0_9_1 implements ProtocolEngineCreator +{ + + private static final byte[] AMQP_0_9_1_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 0, + (byte) 0, + (byte) 9, + (byte) 1 + }; + + public ProtocolEngineCreator_0_9_1() + { + } + + public AmqpProtocolVersion getVersion() + { + return AmqpProtocolVersion.v0_9_1; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_0_9_1_HEADER; + } + + public ServerProtocolEngine newProtocolEngine(Broker broker, + NetworkConnection network, + Port port, + Transport transport, + long id) + { + return new AMQProtocolEngine(broker, network, id, port, transport); + } + + + private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_0_9_1(); + + public static ProtocolEngineCreator getInstance() + { + return INSTANCE; + } + + @Override + public String getType() + { + return getVersion().toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java index db81d3b205..5ce24f406d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0; -public interface Link_1_0 +import org.apache.qpid.server.protocol.LinkModel; + +public interface Link_1_0 extends LinkModel { void start(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java new file mode 100644 index 0000000000..c06af603de --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngineCreator_1_0_0 implements ProtocolEngineCreator +{ + private static final byte[] AMQP_1_0_0_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }; + + public ProtocolEngineCreator_1_0_0() + { + } + + public AmqpProtocolVersion getVersion() + { + return AmqpProtocolVersion.v1_0_0; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_1_0_0_HEADER; + } + + public ServerProtocolEngine newProtocolEngine(Broker broker, + NetworkConnection network, + Port port, + Transport transport, + long id) + { + return new ProtocolEngine_1_0_0(network, broker, id, port, transport); + } + + private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_1_0_0(); + + public static ProtocolEngineCreator getInstance() + { + return INSTANCE; + } + + @Override + public String getType() + { + return getVersion().toString() + "_NO_SASL"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java new file mode 100644 index 0000000000..d3936782da --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.plugin.ProtocolEngineCreator; +import org.apache.qpid.transport.network.NetworkConnection; + +public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator +{ + private static final byte[] AMQP_SASL_1_0_0_HEADER = + new byte[] { (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 3, + (byte) 1, + (byte) 0, + (byte) 0 + }; + + public ProtocolEngineCreator_1_0_0_SASL() + { + } + + public AmqpProtocolVersion getVersion() + { + return AmqpProtocolVersion.v1_0_0; + } + + + public byte[] getHeaderIdentifier() + { + return AMQP_SASL_1_0_0_HEADER; + } + + public ServerProtocolEngine newProtocolEngine(Broker broker, + NetworkConnection network, + Port port, + Transport transport, + long id) + { + return new ProtocolEngine_1_0_0_SASL(network, broker, id, port, transport); + } + + private static ProtocolEngineCreator INSTANCE = new ProtocolEngineCreator_1_0_0_SASL(); + + public static ProtocolEngineCreator getInstance() + { + return INSTANCE; + } + + @Override + public String getType() + { + return getVersion().toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ce6ef0b183..ed75a8c165 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -81,13 +82,15 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu Link_1_0 link = null; Error error = null; - final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); + final + LinkRegistry + linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId()); if(endpoint.getRole() == Role.SENDER) { - SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName()); + SendingLink_1_0 previousLink = (SendingLink_1_0) linkRegistry.getDurableSendingLink(endpoint.getName()); if(previousLink == null) { @@ -227,7 +230,8 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu else { - ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName()); + ReceivingLink_1_0 previousLink = + (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(endpoint.getName()); if(previousLink == null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 15f5becec2..58f4ed48ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeInUseException; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; @@ -51,7 +50,7 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v1_0.LinkRegistry; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; @@ -59,7 +58,6 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 2435854912..e06e785338 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -28,11 +28,8 @@ import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeInUseException; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.v1_0.LinkRegistry; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator new file mode 100644 index 0000000000..50f59d8a55 --- /dev/null +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ProtocolEngineCreator @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8 +org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9 +org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1 +org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10 +org.apache.qpid.server.protocol.v1_0.ProtocolEngineCreator_1_0_0 +org.apache.qpid.server.protocol.v1_0.ProtocolEngineCreator_1_0_0_SASL diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 78d55c42bf..6769c1c2fc 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -25,12 +25,9 @@ import java.util.concurrent.ScheduledFuture; import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.v1_0.LinkRegistry; +import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; |
