summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-08-08 22:54:37 +0000
committerRobert Gemmell <robbie@apache.org>2011-08-08 22:54:37 +0000
commit12181a6cd1fde75d48d4c048d9dd704e2f901524 (patch)
treecc5e1f99d40c5703f59302fe357d4c9823c7ed21 /java
parent04ee94c3acc0f2240af0eb39f037d7bf096c019d (diff)
downloadqpid-python-12181a6cd1fde75d48d4c048d9dd704e2f901524.tar.gz
QPID-3385: assign IDs from a generator within the MultiVersionProtocolEngineFactory, which is shared across all protocol versions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1155136 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/Broker.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java49
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java53
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java22
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java146
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java29
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java7
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java22
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java8
16 files changed, 280 insertions, 118 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Broker.java b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
index ffc323a23b..3553a10c98 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/Broker.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/Broker.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -241,7 +240,7 @@ public class Broker
IncomingNetworkTransport transport = new MinaNetworkTransport();
- transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory);
+ transport.accept(settings, new MultiVersionProtocolEngineFactory(), sslFactory);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, sslPort),
new QpidAcceptor(transport,"TCP"));
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index d6201f7cf6..c287333aff 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -70,7 +70,7 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -98,7 +98,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -151,8 +151,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
- // Create a simple ID that increments for ever new Session
- private final long _sessionID = idGenerator.getAndIncrement();
+ private final long _sessionID;
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
@@ -184,7 +183,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return _managedObject;
}
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network)
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -193,6 +192,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_network = network;
_sender = _network.getSender();
+ _sessionID = connectionId;
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
deleted file mode 100644
index 94870c98bd..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.qpid.server.protocol;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public class AMQProtocolEngineFactory implements ProtocolEngineFactory
-{
- private VirtualHostRegistry _vhosts;
-
- public AMQProtocolEngineFactory()
- {
- this(1);
- }
-
- public AMQProtocolEngineFactory(Integer port)
- {
- _vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry();
- }
-
-
- public ProtocolEngine newProtocolEngine(NetworkConnection network)
- {
- return new AMQProtocolEngine(_vhosts, network);
- }
-}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 01b12b44ce..bd1c19d30e 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
@@ -33,23 +33,27 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Set;
-public class MultiVersionProtocolEngine implements ProtocolEngine
+public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
+ private final long _id;
+
private Set<AmqpProtocolVersion> _supported;
private String _fqdn;
private IApplicationRegistry _appRegistry;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
- private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
+ private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
String fqdn,
Set<AmqpProtocolVersion> supported,
- NetworkConnection network)
+ NetworkConnection network,
+ long id)
{
+ _id = id;
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
@@ -102,6 +106,11 @@ public class MultiVersionProtocolEngine implements ProtocolEngine
_delegate.exception(t);
}
+ public long getConnectionId()
+ {
+ return _delegate.getConnectionId();
+ }
+
private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8;
private static final byte[] AMQP_0_8_HEADER =
@@ -126,7 +135,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine
(byte) 9
};
-private static final byte[] AMQP_0_9_1_HEADER =
+ private static final byte[] AMQP_0_9_1_HEADER =
new byte[] { (byte) 'A',
(byte) 'M',
(byte) 'Q',
@@ -153,7 +162,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
{
AmqpProtocolVersion getVersion();
byte[] getHeaderIdentifier();
- ProtocolEngine getProtocolEngine();
+ ServerProtocolEngine getProtocolEngine();
}
private DelegateCreator creator_0_8 = new DelegateCreator()
@@ -169,9 +178,9 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_8_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -189,9 +198,9 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_9_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -209,9 +218,9 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_9_1_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network, _id);
}
};
@@ -230,12 +239,12 @@ private static final byte[] AMQP_0_9_1_HEADER =
return AMQP_0_10_HEADER;
}
- public ProtocolEngine getProtocolEngine()
+ public ServerProtocolEngine getProtocolEngine()
{
final ConnectionDelegate connDelegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
- ServerConnection conn = new ServerConnection();
+ ServerConnection conn = new ServerConnection(_id);
conn.setConnectionDelegate(connDelegate);
return new ProtocolEngine_0_10( conn, _network, _appRegistry);
@@ -246,7 +255,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
- private class ClosedDelegateProtocolEngine implements ProtocolEngine
+ private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
public SocketAddress getRemoteAddress()
{
@@ -292,9 +301,14 @@ private static final byte[] AMQP_0_9_1_HEADER =
{
}
+
+ public long getConnectionId()
+ {
+ return _id;
+ }
}
- private class SelfDelegateProtocolEngine implements ProtocolEngine
+ private class SelfDelegateProtocolEngine implements ServerProtocolEngine
{
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
@@ -340,7 +354,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
_header.get(headerBytes);
- ProtocolEngine newDelegate = null;
+ ServerProtocolEngine newDelegate = null;
byte[] newestSupported = null;
for(int i = 0; newDelegate == null && i < _creators.length; i++)
@@ -385,6 +399,11 @@ private static final byte[] AMQP_0_9_1_HEADER =
}
+ public long getConnectionId()
+ {
+ return _id;
+ }
+
public void exception(Throwable t)
{
_logger.error("Error establishing session", t);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 96d46353c6..460ea93509 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -22,9 +22,10 @@ package org.apache.qpid.server.protocol;
import java.util.EnumSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -32,38 +33,31 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
private static final Set<AmqpProtocolVersion> ALL_VERSIONS = EnumSet.allOf(AmqpProtocolVersion.class);
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
private final IApplicationRegistry _appRegistry;
private final String _fqdn;
private final Set<AmqpProtocolVersion> _supported;
-
public MultiVersionProtocolEngineFactory()
{
- this(1, "localhost", ALL_VERSIONS);
- }
-
- public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> versions)
- {
- this(1, fqdn, versions);
+ this("localhost", ALL_VERSIONS);
}
-
public MultiVersionProtocolEngineFactory(String fqdn)
{
- this(1, fqdn, ALL_VERSIONS);
+ this(fqdn, ALL_VERSIONS);
}
- public MultiVersionProtocolEngineFactory(int instance, String fqdn, Set<AmqpProtocolVersion> supportedVersions)
+ public MultiVersionProtocolEngineFactory(String fqdn, Set<AmqpProtocolVersion> supportedVersions)
{
_appRegistry = ApplicationRegistry.getInstance();
_fqdn = fqdn;
_supported = supportedVersions;
}
-
- public ProtocolEngine newProtocolEngine(NetworkConnection network)
+ public ServerProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network);
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network, ID_GENERATOR.getAndIncrement());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 8c62441d59..3ff650a480 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
@@ -33,7 +33,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
import java.util.UUID;
-public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
@@ -191,4 +191,9 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
{
_connection.mgmtClose();
}
+
+ public long getConnectionId()
+ {
+ return _connection.getConnectionId();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 9b3673e8b7..eaa11d7acb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -64,10 +64,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private Principal _authorizedPrincipal = null;
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final long _connectionId;
- public ServerConnection()
+ public ServerConnection(final long connectionId)
{
-
+ _connectionId = connectionId;
}
public UUID getId()
@@ -379,4 +380,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
return _authorizedPrincipal;
}
+
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 27e199291d..19c11080b8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -161,6 +161,26 @@ public class ServerConnectionDelegate extends ServerDelegate
}
}
+
+ @Override
+ public void connectionTuneOk(final Connection conn, final ConnectionTuneOk ok)
+ {
+ ServerConnection sconn = (ServerConnection) conn;
+ int okChannelMax = ok.getChannelMax();
+
+ if (okChannelMax > getChannelMax())
+ {
+ _logger.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a channelMax (" + okChannelMax +
+ ") above the servers offered limit (" + getChannelMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ sconn.getSender().close();
+ return;
+ }
+
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
+ }
@Override
protected int getHeartbeatMax()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index e9168f71fb..0f4147ebb4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -670,7 +670,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
- getConnection().getConnectionId(),
+ ((ServerConnection) getConnection()).getConnectionId(),
getClientID(),
((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
getVirtualHost().getName(),
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 3af665141c..c058d0b0d8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol;
-import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -43,7 +43,6 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
-import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TestNetworkConnection;
@@ -52,10 +51,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
// ChannelID(LIST) -> LinkedList<Pair>
final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection());
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection(), ID_GENERATOR.getAndIncrement());
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
new file mode 100644
index 0000000000..9d76d5efca
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -0,0 +1,146 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.TestNetworkConnection;
+
+public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //the factory needs a registry instance
+ ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new XMLConfiguration())));
+ }
+
+ protected void tearDown()
+ {
+ //the factory opens a registry instance
+ ApplicationRegistry.remove();
+ }
+
+ private static final byte[] AMQP_0_8_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 8,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_0_9_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 9
+ };
+
+ private static final byte[] AMQP_0_9_1_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 0,
+ (byte) 0,
+ (byte) 9,
+ (byte) 1
+ };
+
+
+ private static final byte[] AMQP_0_10_HEADER =
+ new byte[] { (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 1,
+ (byte) 1,
+ (byte) 0,
+ (byte) 10
+ };
+
+ private byte[] getAmqpHeader(final AmqpProtocolVersion version)
+ {
+ switch(version)
+ {
+ case v0_8:
+ return AMQP_0_8_HEADER;
+ case v0_9:
+ return AMQP_0_9_HEADER;
+ case v0_9_1:
+ return AMQP_0_9_1_HEADER;
+ case v0_10:
+ return AMQP_0_10_HEADER;
+ default:
+ fail("unknown AMQP version, appropriate header must be added for new protocol version");
+ return null;
+ }
+ }
+
+ /**
+ * Test to verify that connections established using a MultiVersionProtocolEngine are assigned
+ * IDs from a common sequence, independent of the protocol version under use.
+ */
+ public void testDifferentProtocolVersionsShareCommonIDNumberingSequence()
+ {
+ Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
+
+ MultiVersionProtocolEngineFactory factory =
+ new MultiVersionProtocolEngineFactory("localhost", versions);
+
+ //create a dummy to retrieve the 'current' ID number
+ long previousId = factory.newProtocolEngine(new TestNetworkConnection()).getConnectionId();
+
+ //create a protocol engine and send the AMQP header for all supported AMQP verisons,
+ //ensuring the ID assigned increases as expected
+ for(AmqpProtocolVersion version : versions)
+ {
+ long expectedID = previousId + 1;
+ byte[] header = getAmqpHeader(version);
+ assertNotNull("protocol header should not be null", header);
+
+ ServerProtocolEngine engine = factory.newProtocolEngine(new TestNetworkConnection());
+ assertEquals("ID did not increment as expected", expectedID, engine.getConnectionId());
+
+ //actually feed in the AMQP header for this protocol version, and ensure the ID remains consistent
+ engine.received(ByteBuffer.wrap(header));
+ assertEquals("ID was not as expected following receipt of the AMQP version header", expectedID, engine.getConnectionId());
+
+ previousId = expectedID;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 8fa820ad95..3c6857e8a9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabase;
@@ -45,6 +46,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
public void initialise() throws Exception
{
CurrentActor.setDefault(new BrokerActor(new NullRootMessageLogger()));
+ GenericActor.setDefaultMessageLogger(new NullRootMessageLogger());
super.initialise();
}
diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
new file mode 100644
index 0000000000..e8362f79f0
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ServerProtocolEngine extends ProtocolEngine
+{
+ /**
+ * Gets the connection ID associated with this ProtocolEngine
+ */
+ long getConnectionId();
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 7d17397b2d..eef6c047d3 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -126,8 +126,6 @@ public class Connection extends ConnectionInvoker
private SecurityLayer securityLayer;
private String _clientId;
- private static final AtomicLong idGenerator = new AtomicLong(0);
- private final long _connectionId = idGenerator.incrementAndGet();
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
public Connection() {}
@@ -360,11 +358,6 @@ public class Connection extends ConnectionInvoker
_sessionFactory = sessionFactory;
}
- public long getConnectionId()
- {
- return _connectionId;
- }
-
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 11af86f412..82fa6ca473 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -170,22 +170,7 @@ public class ServerDelegate extends ConnectionDelegate
@Override
public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
- int okChannelMax = ok.getChannelMax();
-
- if (okChannelMax > getChannelMax())
- {
- _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
- "client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the servers offered limit (" + getChannelMax() +")");
-
- //Due to the error we must forcefully close the connection without negotiation
- conn.getSender().close();
- return;
- }
- //0 means no implied limit, except available server resources
- //(or that forced by protocol limitations [0xFFFF])
- conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
@Override
@@ -215,4 +200,11 @@ public class ServerDelegate extends ConnectionDelegate
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
+
+ protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
+ {
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
+ }
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
index 1a1b5af805..976b141fc0 100644
--- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.NetworkTransportConfiguration;
@@ -333,7 +334,7 @@ public class MinaNetworkHandlerTest extends QpidTestCase
}
}
- public class CountingProtocolEngine implements ProtocolEngine
+ public class CountingProtocolEngine implements ServerProtocolEngine
{
public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
private int _readBytes;
@@ -447,6 +448,11 @@ public class MinaNetworkHandlerTest extends QpidTestCase
return _closed;
}
+ public long getConnectionId()
+ {
+ return -1;
+ }
+
}
private class EchoProtocolEngine extends CountingProtocolEngine