diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-10-21 21:11:31 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-10-21 21:11:31 +0000 |
| commit | e6522969eb5eb6177d8a78c518062ac98ce480e4 (patch) | |
| tree | ca977835692afff30301d692d0798bb0b271d826 /qpid/java/broker-plugins/amqp-0-10-protocol | |
| parent | 87dd3772e327d206fc30d19e9ae98d4ae21977d4 (diff) | |
| download | qpid-python-e6522969eb5eb6177d8a78c518062ac98ce480e4.tar.gz | |
QPID-6175 : [Java Broker] allow maximum message size to be restricted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
6 files changed, 98 insertions, 45 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index 869ac01c4e..cca376c54c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.nio.ByteBuffer; + import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.transport.Header; -import java.nio.ByteBuffer; - public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> { @@ -51,13 +51,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra return getMetaData().getMessageHeader(); } - public boolean isRedelivered() - { - // The *Message* is never redelivered, only queue entries are... this is here so that filters - // can run against the message on entry to an exchange - return false; - } - public long getSize() { return getMetaData().getSize(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 40c94075a1..30aecdb2d2 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -25,9 +25,9 @@ import java.net.SocketAddress; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.ConnectionDelegate; @@ -64,9 +64,9 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator return AMQP_0_10_HEADER; } - public ServerProtocolEngine newProtocolEngine(Broker broker, + public ServerProtocolEngine newProtocolEngine(Broker<?> broker, NetworkConnection network, - Port port, + AmqpPort<?> port, Transport transport, long id) { @@ -80,12 +80,13 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator fqdn, broker.getSubjectCreator(address, transport.isSecure()) ); - ServerConnection conn = new ServerConnection(id,broker); + ServerConnection conn = new ServerConnection(id,broker, port, transport); conn.setConnectionDelegate(connDelegate); conn.setRemoteAddress(network.getRemoteAddress()); conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network, port, transport); + + return new ProtocolEngine_0_10( conn, network); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index dc60a37a7f..854cd388b9 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,7 +32,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; -import org.apache.qpid.server.model.Transport; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; @@ -57,14 +56,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network, - Port port, - Transport transport) + NetworkConnection network) { super(new Assembler(conn)); _connection = conn; - _connection.setPort(port); - _connection.setTransport(transport); if(network != null) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 9bccfa53a3..bc463ef59e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -42,8 +42,8 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.SessionModelListener; @@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; - private Port<?> _port; + private AmqpPort<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; private Transport _transport; @@ -88,13 +88,24 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private volatile boolean _stopped; private int _messageCompressionThreshold; + private int _maxMessageSize; - public ServerConnection(final long connectionId, Broker broker) + public ServerConnection(final long connectionId, + Broker<?> broker, + final AmqpPort<?> port, + final Transport transport) { _connectionId = connectionId; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); _broker = broker; + _port = port; + _transport = transport; + + int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE); + _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE; + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); @@ -203,16 +214,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public Port<?> getPort() + public AmqpPort<?> getPort() { return _port; } - public void setPort(Port<?> port) - { - _port = port; - } - @Override public Transport getTransport() { @@ -231,11 +237,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _stopped; } - public void setTransport(Transport transport) - { - _transport = transport; - } - public void onOpen(final Runnable task) { _onOpenTask = task; @@ -658,4 +659,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _messageCompressionThreshold; } + + public int getMaxMessageSize() + { + return _maxMessageSize; + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 77dba71fac..969701987a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -333,16 +333,22 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageTransfer(Session ssn, final MessageTransfer xfr) { - if(((ServerSession)ssn).blockingTimeoutExceeded()) + ServerSession serverSession = (ServerSession) ssn; + if(serverSession.blockingTimeoutExceeded()) { getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED()); - ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE, - "Session flow control was requested, but not enforced by sender"); + serverSession.close(AMQConstant.MESSAGE_TOO_LARGE, + "Session flow control was requested, but not enforced by sender"); + } + else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize()) + { + exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, + "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + serverSession.getConnection().getMaxMessageSize()); } else { - final MessageDestination exchange = getDestinationForMessage(ssn, xfr); + final MessageDestination destination = getDestinationForMessage(ssn, xfr); final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); @@ -359,7 +365,7 @@ public class ServerSessionDelegate extends SessionDelegate virtualHost.getSecurityManager() .authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), - exchange.getName(), + destination.getName(), virtualHost.getName()); } catch (AccessControlException e) @@ -372,7 +378,6 @@ public class ServerSessionDelegate extends SessionDelegate final MessageStore store = virtualHost.getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - final ServerSession serverSession = (ServerSession) ssn; final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); MessageReference<MessageTransferMessage> reference = message.newReference(); @@ -400,7 +405,7 @@ public class ServerSessionDelegate extends SessionDelegate } }; - int enqueues = serverSession.enqueue(message, instanceProperties, exchange); + int enqueues = serverSession.enqueue(message, instanceProperties, destination); if (enqueues == 0) { @@ -414,7 +419,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(), + virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(), messageMetaData.getRoutingKey())); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java index 58de3edb61..2fc76d4f02 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java @@ -22,11 +22,20 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.List; + import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.ExecutionErrorCode; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; public class ServerSessionTest extends QpidTestCase { @@ -60,16 +69,19 @@ public class ServerSessionTest extends QpidTestCase public void testCompareTo() throws Exception { - final Broker broker = mock(Broker.class); + final Broker<?> broker = mock(Broker.class); when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); - ServerConnection connection = new ServerConnection(1, broker); + AmqpPort amqpPort = mock(AmqpPort.class); + when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE); + + ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP); connection.setVirtualHost(_virtualHost); ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); // create a session with the same name but on a different connection - ServerConnection connection2 = new ServerConnection(2, broker); + ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP); connection2.setVirtualHost(_virtualHost); ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(), new Binary(getName().getBytes()), 0); @@ -78,5 +90,46 @@ public class ServerSessionTest extends QpidTestCase assertEquals("Unexpected compare result", 0, session1.compareTo(session1)); } + public void testOverlargeMessageTest() throws Exception + { + final Broker<?> broker = mock(Broker.class); + when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l); + + AmqpPort port = mock(AmqpPort.class); + when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024); + ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP); + connection.setVirtualHost(_virtualHost); + final List<Method> invokedMethods = new ArrayList<>(); + ServerSession session = new ServerSession(connection, new ServerSessionDelegate(), + new Binary(getName().getBytes()), 0) + { + @Override + public void invoke(final Method m) + { + invokedMethods.add(m); + } + }; + + ServerSessionDelegate delegate = new ServerSessionDelegate(); + + MessageTransfer xfr = new MessageTransfer(); + xfr.setBody(new byte[2048]); + delegate.messageTransfer(session, xfr); + + assertFalse("No methods invoked - expecting at least 1", invokedMethods.isEmpty()); + Method firstInvoked = invokedMethods.get(0); + assertTrue("First invoked method not execution error", firstInvoked instanceof ExecutionException); + assertEquals(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, ((ExecutionException)firstInvoked).getErrorCode()); + + invokedMethods.clear(); + + // test the boundary condition + + xfr.setBody(new byte[1024]); + delegate.messageTransfer(session, xfr); + + assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty()); + } + } |
