summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-21 21:11:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-21 21:11:31 +0000
commite6522969eb5eb6177d8a78c518062ac98ce480e4 (patch)
treeca977835692afff30301d692d0798bb0b271d826 /qpid/java/broker-plugins/amqp-0-10-protocol
parent87dd3772e327d206fc30d19e9ae98d4ae21977d4 (diff)
downloadqpid-python-e6522969eb5eb6177d8a78c518062ac98ce480e4.tar.gz
QPID-6175 : [Java Broker] allow maximum message size to be restricted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java11
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java34
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java21
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java59
6 files changed, 98 insertions, 45 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 869ac01c4e..cca376c54c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.Header;
-import java.nio.ByteBuffer;
-
public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
{
@@ -51,13 +51,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
return getMetaData().getMessageHeader();
}
- public boolean isRedelivered()
- {
- // The *Message* is never redelivered, only queue entries are... this is here so that filters
- // can run against the message on entry to an exchange
- return false;
- }
-
public long getSize()
{
return getMetaData().getSize();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 40c94075a1..30aecdb2d2 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -25,9 +25,9 @@ import java.net.SocketAddress;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.ConnectionDelegate;
@@ -64,9 +64,9 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
return AMQP_0_10_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker broker,
+ public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id)
{
@@ -80,12 +80,13 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
fqdn, broker.getSubjectCreator(address, transport.isSecure())
);
- ServerConnection conn = new ServerConnection(id,broker);
+ ServerConnection conn = new ServerConnection(id,broker, port, transport);
conn.setConnectionDelegate(connDelegate);
conn.setRemoteAddress(network.getRemoteAddress());
conn.setLocalAddress(network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, network, port, transport);
+
+ return new ProtocolEngine_0_10( conn, network);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index dc60a37a7f..854cd388b9 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -32,7 +32,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
@@ -57,14 +56,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network,
- Port port,
- Transport transport)
+ NetworkConnection network)
{
super(new Assembler(conn));
_connection = conn;
- _connection.setPort(port);
- _connection.setTransport(transport);
if(network != null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 9bccfa53a3..bc463ef59e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -42,8 +42,8 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
@@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl<?,?,?> _virtualHost;
- private Port<?> _port;
+ private AmqpPort<?> _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
@@ -88,13 +88,24 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private volatile boolean _stopped;
private int _messageCompressionThreshold;
+ private int _maxMessageSize;
- public ServerConnection(final long connectionId, Broker broker)
+ public ServerConnection(final long connectionId,
+ Broker<?> broker,
+ final AmqpPort<?> port,
+ final Transport transport)
{
_connectionId = connectionId;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
_broker = broker;
+ _port = port;
+ _transport = transport;
+
+ int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
+ _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
+
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
_dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
_messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
@@ -203,16 +214,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
- public Port<?> getPort()
+ public AmqpPort<?> getPort()
{
return _port;
}
- public void setPort(Port<?> port)
- {
- _port = port;
- }
-
@Override
public Transport getTransport()
{
@@ -231,11 +237,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
return _stopped;
}
- public void setTransport(Transport transport)
- {
- _transport = transport;
- }
-
public void onOpen(final Runnable task)
{
_onOpenTask = task;
@@ -658,4 +659,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
return _messageCompressionThreshold;
}
+
+ public int getMaxMessageSize()
+ {
+ return _maxMessageSize;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 77dba71fac..969701987a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -333,16 +333,22 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- if(((ServerSession)ssn).blockingTimeoutExceeded())
+ ServerSession serverSession = (ServerSession) ssn;
+ if(serverSession.blockingTimeoutExceeded())
{
getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
- ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE,
- "Session flow control was requested, but not enforced by sender");
+ serverSession.close(AMQConstant.MESSAGE_TOO_LARGE,
+ "Session flow control was requested, but not enforced by sender");
+ }
+ else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize())
+ {
+ exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED,
+ "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + serverSession.getConnection().getMaxMessageSize());
}
else
{
- final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
+ final MessageDestination destination = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps =
xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
@@ -359,7 +365,7 @@ public class ServerSessionDelegate extends SessionDelegate
virtualHost.getSecurityManager()
.authorisePublish(messageMetaData.isImmediate(),
messageMetaData.getRoutingKey(),
- exchange.getName(),
+ destination.getName(),
virtualHost.getName());
}
catch (AccessControlException e)
@@ -372,7 +378,6 @@ public class ServerSessionDelegate extends SessionDelegate
final MessageStore store = virtualHost.getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- final ServerSession serverSession = (ServerSession) ssn;
final MessageTransferMessage message =
new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
@@ -400,7 +405,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
};
- int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
+ int enqueues = serverSession.enqueue(message, instanceProperties, destination);
if (enqueues == 0)
{
@@ -414,7 +419,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 58de3edb61..2fc76d4f02 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -22,11 +22,20 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
public class ServerSessionTest extends QpidTestCase
{
@@ -60,16 +69,19 @@ public class ServerSessionTest extends QpidTestCase
public void testCompareTo() throws Exception
{
- final Broker broker = mock(Broker.class);
+ final Broker<?> broker = mock(Broker.class);
when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
- ServerConnection connection = new ServerConnection(1, broker);
+ AmqpPort amqpPort = mock(AmqpPort.class);
+ when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+
+ ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
- ServerConnection connection2 = new ServerConnection(2, broker);
+ ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
@@ -78,5 +90,46 @@ public class ServerSessionTest extends QpidTestCase
assertEquals("Unexpected compare result", 0, session1.compareTo(session1));
}
+ public void testOverlargeMessageTest() throws Exception
+ {
+ final Broker<?> broker = mock(Broker.class);
+ when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
+
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024);
+ ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
+ connection.setVirtualHost(_virtualHost);
+ final List<Method> invokedMethods = new ArrayList<>();
+ ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
+ new Binary(getName().getBytes()), 0)
+ {
+ @Override
+ public void invoke(final Method m)
+ {
+ invokedMethods.add(m);
+ }
+ };
+
+ ServerSessionDelegate delegate = new ServerSessionDelegate();
+
+ MessageTransfer xfr = new MessageTransfer();
+ xfr.setBody(new byte[2048]);
+ delegate.messageTransfer(session, xfr);
+
+ assertFalse("No methods invoked - expecting at least 1", invokedMethods.isEmpty());
+ Method firstInvoked = invokedMethods.get(0);
+ assertTrue("First invoked method not execution error", firstInvoked instanceof ExecutionException);
+ assertEquals(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, ((ExecutionException)firstInvoked).getErrorCode());
+
+ invokedMethods.clear();
+
+ // test the boundary condition
+
+ xfr.setBody(new byte[1024]);
+ delegate.messageTransfer(session, xfr);
+
+ assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty());
+ }
+
}