diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-21 21:11:31 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-21 21:11:31 +0000 |
| commit | e6522969eb5eb6177d8a78c518062ac98ce480e4 (patch) | |
| tree | ca977835692afff30301d692d0798bb0b271d826 /qpid/java | |
| parent | 87dd3772e327d206fc30d19e9ae98d4ae21977d4 (diff) | |
| download | qpid-python-e6522969eb5eb6177d8a78c518062ac98ce480e4.tar.gz | |
QPID-6175 : [Java Broker] allow maximum message size to be restricted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
24 files changed, 230 insertions, 93 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java index 7acd4aa1fa..7a8fff113c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java @@ -59,6 +59,12 @@ public interface AmqpPort<X extends AmqpPort<X>> extends ClientAuthCapablePort<X @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS) int DEFAULT_MAX_OPEN_CONNECTIONS = -1; + + String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size"; + + @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE) + int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb + String OPEN_CONNECTIONS_WARN_PERCENT = "qpid.port.open_connections_warn_percent"; @ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java index 4d15e982c4..6e1b6529d8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java @@ -21,18 +21,18 @@ package org.apache.qpid.server.plugin;/* import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.transport.network.NetworkConnection; public interface ProtocolEngineCreator extends Pluggable { Protocol getVersion(); byte[] getHeaderIdentifier(); - ServerProtocolEngine newProtocolEngine(Broker broker, + ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 5041e22104..26e8271d14 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -26,9 +26,9 @@ import java.util.List; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.util.Deletable; @@ -89,7 +89,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends long getLastIoTime(); - Port<?> getPort(); + AmqpPort<?> getPort(); Transport getTransport(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 1482576748..1c42d9b6fe 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -46,10 +46,10 @@ import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Consumer; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; @@ -618,7 +618,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public Port<?> getPort() + public AmqpPort<?> getPort() { return null; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index 869ac01c4e..cca376c54c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.nio.ByteBuffer; + import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.transport.Header; -import java.nio.ByteBuffer; - public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> { @@ -51,13 +51,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra return getMetaData().getMessageHeader(); } - public boolean isRedelivered() - { - // The *Message* is never redelivered, only queue entries are... this is here so that filters - // can run against the message on entry to an exchange - return false; - } - public long getSize() { return getMetaData().getSize(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 40c94075a1..30aecdb2d2 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -25,9 +25,9 @@ import java.net.SocketAddress; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.ConnectionDelegate; @@ -64,9 +64,9 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator return AMQP_0_10_HEADER; } - public ServerProtocolEngine newProtocolEngine(Broker broker, + public ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id) { @@ -80,12 +80,13 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator fqdn, broker.getSubjectCreator(address, transport.isSecure()) ); - ServerConnection conn = new ServerConnection(id,broker); + ServerConnection conn = new ServerConnection(id,broker, port, transport); conn.setConnectionDelegate(connDelegate); conn.setRemoteAddress(network.getRemoteAddress()); conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network, port, transport); + + return new ProtocolEngine_0_10( conn, network); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index dc60a37a7f..854cd388b9 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,7 +32,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; @@ -57,14 +56,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network, - Port port, - Transport transport) + NetworkConnection network) { super(new Assembler(conn)); _connection = conn; - _connection.setPort(port); - _connection.setTransport(transport); if(network != null) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 9bccfa53a3..bc463ef59e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -42,8 +42,8 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; @@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; - private Port<?> _port; + private AmqpPort<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; @@ -88,13 +88,24 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private volatile boolean _stopped; private int _messageCompressionThreshold; + private int _maxMessageSize; - public ServerConnection(final long connectionId, Broker broker) + public ServerConnection(final long connectionId, + Broker<?> broker, + final AmqpPort<?> port, + final Transport transport) { _connectionId = connectionId; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); _broker = broker; + _port = port; + _transport = transport; + + int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE); + _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE; + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); @@ -203,16 +214,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public Port<?> getPort() + public AmqpPort<?> getPort() { return _port; } - public void setPort(Port<?> port) - { - _port = port; - } - @Override public Transport getTransport() { @@ -231,11 +237,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _stopped; } - public void setTransport(Transport transport) - { - _transport = transport; - } - public void onOpen(final Runnable task) { _onOpenTask = task; @@ -658,4 +659,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _messageCompressionThreshold; } + + public int getMaxMessageSize() + { + return _maxMessageSize; + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 77dba71fac..969701987a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -333,16 +333,22 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - if(((ServerSession)ssn).blockingTimeoutExceeded()) + ServerSession serverSession = (ServerSession) ssn; + if(serverSession.blockingTimeoutExceeded()) { getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); - ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE, - "Session flow control was requested, but not enforced by sender"); + serverSession.close(AMQConstant.MESSAGE_TOO_LARGE, + "Session flow control was requested, but not enforced by sender"); + } + else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize()) + { + exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, + "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + serverSession.getConnection().getMaxMessageSize()); } else { - final MessageDestination exchange = getDestinationForMessage(ssn, xfr); + final MessageDestination destination = getDestinationForMessage(ssn, xfr); final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); @@ -359,7 +365,7 @@ public class ServerSessionDelegate extends SessionDelegate virtualHost.getSecurityManager() .authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), - exchange.getName(), + destination.getName(), virtualHost.getName()); } catch (AccessControlException e) @@ -372,7 +378,6 @@ public class ServerSessionDelegate extends SessionDelegate final MessageStore store = virtualHost.getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - final ServerSession serverSession = (ServerSession) ssn; final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); MessageReference<MessageTransferMessage> reference = message.newReference(); @@ -400,7 +405,7 @@ public class ServerSessionDelegate extends SessionDelegate } }; - int enqueues = serverSession.enqueue(message, instanceProperties, exchange); + int enqueues = serverSession.enqueue(message, instanceProperties, destination); if (enqueues == 0) { @@ -414,7 +419,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(), + virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), messageMetaData.getRoutingKey())); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 58de3edb61..2fc76d4f02 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -22,11 +22,20 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; public class ServerSessionTest extends QpidTestCase { @@ -60,16 +69,19 @@ public class ServerSessionTest extends QpidTestCase public void testCompareTo() throws Exception { - final Broker broker = mock(Broker.class); + final Broker<?> broker = mock(Broker.class); when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); - ServerConnection connection = new ServerConnection(1, broker); + AmqpPort amqpPort = mock(AmqpPort.class); + when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + + ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP); connection.setVirtualHost(_virtualHost); ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); // create a session with the same name but on a different connection - ServerConnection connection2 = new ServerConnection(2, broker); + ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP); connection2.setVirtualHost(_virtualHost); ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); @@ -78,5 +90,46 @@ public class ServerSessionTest extends QpidTestCase assertEquals("Unexpected compare result", 0, session1.compareTo(session1)); } + public void testOverlargeMessageTest() throws Exception + { + final Broker<?> broker = mock(Broker.class); + when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); + + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024); + ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP); + connection.setVirtualHost(_virtualHost); + final List<Method> invokedMethods = new ArrayList<>(); + ServerSession session = new ServerSession(connection, new ServerSessionDelegate(), + new Binary(getName().getBytes()), 0) + { + @Override + public void invoke(final Method m) + { + invokedMethods.add(m); + } + }; + + ServerSessionDelegate delegate = new ServerSessionDelegate(); + + MessageTransfer xfr = new MessageTransfer(); + xfr.setBody(new byte[2048]); + delegate.messageTransfer(session, xfr); + + assertFalse("No methods invoked - expecting at least 1", invokedMethods.isEmpty()); + Method firstInvoked = invokedMethods.get(0); + assertTrue("First invoked method not execution error", firstInvoked instanceof ExecutionException); + assertEquals(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, ((ExecutionException)firstInvoked).getErrorCode()); + + invokedMethods.clear(); + + // test the boundary condition + + xfr.setBody(new byte[1024]); + delegate.messageTransfer(session, xfr); + + assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty()); + } + } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 77d856af7a..d14f185254 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -207,7 +207,6 @@ public class AMQChannel private boolean _confirmOnPublish; private long _confirmedMessageCounter; - public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { _connection = connection; @@ -578,9 +577,17 @@ public class AMQChannel try { - _currentMessage.addContentBodyFrame(contentBody); - - deliverCurrentMessageIfComplete(); + long currentSize = _currentMessage.addContentBodyFrame(contentBody); + if(currentSize > _currentMessage.getSize()) + { + _connection.closeConnection(AMQConstant.FRAME_ERROR, + "More message data received than content header defined", + _channelId); + } + else + { + deliverCurrentMessageIfComplete(); + } } catch (RuntimeException e) { @@ -2385,6 +2392,11 @@ public class AMQChannel if(hasCurrentMessage()) { + if(bodySize > _connection.getMaxMessageSize()) + { + closeChannel(AMQConstant.MESSAGE_TOO_LARGE, + "Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize()); + } publishContentHeader(new ContentHeaderBody(properties, bodySize)); } else diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 49db24be52..89da98431e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -102,7 +102,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; - private final Port<?> _port; + private final AmqpPort<?> _port; private final long _creationTime; private AMQShortString _contextKey; @@ -184,11 +184,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private int _currentClassId; private int _currentMethodId; private int _binaryDataLimit; + private long _maxMessageSize; - public AMQProtocolEngine(Broker broker, + public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, final long connectionId, - Port port, + AmqpPort<?> port, Transport transport) { _broker = broker; @@ -202,6 +203,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) : DEFAULT_DEBUG_BINARY_DATA_LENGTH; + + int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE); + _maxMessageSize = (maxMessageSize > 0) ? (long) maxMessageSize : Long.MAX_VALUE; + _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); runAsSubject(new PrivilegedAction<Void>() { @@ -1164,7 +1169,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } @Override - public Port<?> getPort() + public AmqpPort<?> getPort() { return _port; } @@ -1753,6 +1758,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _binaryDataLimit; } + public long getMaxMessageSize() + { + return _maxMessageSize; + } + public final class WriteDeliverMethod implements ClientDeliveryMethod { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index d966e9c9c6..6e622fb59a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -57,10 +57,11 @@ public class IncomingMessage return _messagePublishInfo; } - public void addContentBodyFrame(final ContentBody contentChunk) + public long addContentBodyFrame(final ContentBody contentChunk) { _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); + return _bodyLengthReceived; } public boolean allContentReceived() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java index d88d8c3c2a..0058fe86a9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.network.NetworkConnection; @@ -58,9 +58,9 @@ public class ProtocolEngineCreator_0_8 implements ProtocolEngineCreator return AMQP_0_8_HEADER; } - public ServerProtocolEngine newProtocolEngine(Broker broker, + public ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java index 4f9198a509..7253111114 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.network.NetworkConnection; @@ -58,9 +58,9 @@ public class ProtocolEngineCreator_0_9 implements ProtocolEngineCreator return AMQP_0_9_HEADER; } - public ServerProtocolEngine newProtocolEngine(Broker broker, + public ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java index 5251e760ff..e72cc4d058 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.network.NetworkConnection; @@ -59,9 +59,9 @@ public class ProtocolEngineCreator_0_9_1 implements ProtocolEngineCreator return AMQP_0_9_1_HEADER; } - public ServerProtocolEngine newProtocolEngine(Broker broker, + public ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java index 2c710bcf9f..a6725a6d58 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java @@ -20,20 +20,28 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; @@ -52,7 +60,10 @@ public class AMQChannelTest extends QpidTestCase BrokerTestHelper.setUp(); _virtualHost = BrokerTestHelper.createVirtualHost(getTestName()); _broker = BrokerTestHelper.createBrokerMock(); - _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker) + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + + _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker, port) { @Override public void writeReturn(MessagePublishInfo messagePublishInfo, @@ -86,8 +97,10 @@ public class AMQChannelTest extends QpidTestCase { AMQChannel channel1 = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); // create a channel with the same channelId but on a different session - AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost, _broker), 1, _virtualHost.getMessageStore()); + AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost, _broker, port), 1, _virtualHost.getMessageStore()); assertFalse("Unexpected compare result", channel1.compareTo(channel2) == 0); assertEquals("Unexpected compare result", 0, channel1.compareTo(channel1)); } @@ -135,4 +148,33 @@ public class AMQChannelTest extends QpidTestCase assertEquals("Unexpected number of replies", 0, _replies.size()); } + public void testOverlargeMessage() throws Exception + { + + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024); + final List<AMQDataBlock> frames = new ArrayList<>(); + _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker, port) + { + @Override + public synchronized void writeFrame(final AMQDataBlock frame) + { + frames.add(frame); + } + }; + + AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); + + channel.receiveBasicPublish(AMQShortString.EMPTY_STRING, AMQShortString.EMPTY_STRING, false, false); + + final BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + channel.receiveMessageHeader(properties, 2048l); + + frames.toString(); + + assertEquals(1, frames.size()); + assertEquals(ChannelCloseBody.class, ((AMQFrame) frames.get(0)).getBodyFrame().getClass()); + assertEquals(AMQConstant.MESSAGE_TOO_LARGE.getCode(), ((ChannelCloseBody)((AMQFrame)frames.get(0)).getBodyFrame()).getReplyCode()); + } + } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java index 59d7623011..a9eb2b1680 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -29,16 +30,16 @@ import java.util.Map; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngineTest extends QpidTestCase { - private Broker _broker; - private Port _port; + private Broker<?> _broker; + private AmqpPort<?> _port; private NetworkConnection _network; private Transport _transport; @@ -49,7 +50,9 @@ public class AMQProtocolEngineTest extends QpidTestCase _broker = BrokerTestHelper.createBrokerMock(); when(_broker.getConnection_closeWhenNoRoute()).thenReturn(true); - _port = mock(Port.class); + _port = mock(AmqpPort.class); + when(_port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + _network = mock(NetworkConnection.class); _transport = Transport.TCP; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java index d4c10c50c1..a0a3d60458 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java @@ -20,12 +20,17 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -58,7 +63,10 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception { VirtualHostImpl virtualHost = createVirtualHost(hostName); - return new InternalTestProtocolSession(virtualHost, createBrokerMock()); + + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + return new InternalTestProtocolSession(virtualHost, createBrokerMock(), port); } public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 895c8e687c..c01a349509 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -60,9 +61,9 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr private AtomicInteger _deliveryCount = new AtomicInteger(0); private static final AtomicLong ID_GENERATOR = new AtomicLong(0); - public InternalTestProtocolSession(VirtualHostImpl virtualHost, Broker broker) throws AMQException + public InternalTestProtocolSession(VirtualHostImpl virtualHost, Broker<?> broker, final AmqpPort<?> port) throws AMQException { - super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null); + super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), port, null); _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 377eaab4cf..8e24d55da0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -47,7 +47,6 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -62,8 +61,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { - private final Port<?> _port; - private final Broker _broker; + private final AmqpPort<?> _port; + private final Broker<?> _broker; private final SubjectCreator _subjectCreator; private VirtualHostImpl _vhost; private final Transport _transport; @@ -102,10 +101,10 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private boolean _closedOnOpen; - public Connection_1_0(Broker broker, + public Connection_1_0(Broker<?> broker, ConnectionEndpoint conn, long connectionId, - Port port, + AmqpPort<?> port, Transport transport, final SubjectCreator subjectCreator) { _broker = broker; @@ -359,7 +358,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public Port<?> getPort() + public AmqpPort<?> getPort() { return _port; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java index 8eb89cade6..fa8134cb55 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java @@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v1_0; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.network.NetworkConnection; @@ -58,9 +58,9 @@ public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator return AMQP_SASL_1_0_0_HEADER; } - public ServerProtocolEngine newProtocolEngine(Broker broker, + public ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 550355216e..740b01e459 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -35,6 +35,7 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; + import org.apache.qpid.amqp_1_0.codec.FrameWriter; import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; import org.apache.qpid.amqp_1_0.framing.AMQFrame; @@ -53,8 +54,8 @@ import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -67,14 +68,14 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private static final org.apache.log4j.Logger _logger = org.apache.log4j.Logger.getLogger(ProtocolEngine_1_0_0_SASL.class); - private final Port _port; + private final AmqpPort<?> _port; private final Transport _transport; private long _readBytes; private long _writtenBytes; private long _lastReadTime; private long _lastWriteTime; - private final Broker _broker; + private final Broker<?> _broker; private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _endpoint; private long _connectionId; @@ -134,8 +135,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; - public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker, - long id, Port port, Transport transport) + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker, + long id, AmqpPort<?> port, Transport transport) { _connectionId = id; _broker = broker; diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java index e8aa2b3f34..007772e8be 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java @@ -157,6 +157,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase AmqpPort<?> port = mock(AmqpPort.class); when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l); MultiVersionProtocolEngineFactory factory = |
