summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-10-21 21:11:31 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-10-21 21:11:31 +0000
commite6522969eb5eb6177d8a78c518062ac98ce480e4 (patch)
treeca977835692afff30301d692d0798bb0b271d826 /qpid/java
parent87dd3772e327d206fc30d19e9ae98d4ae21977d4 (diff)
downloadqpid-python-e6522969eb5eb6177d8a78c518062ac98ce480e4.tar.gz
QPID-6175 : [Java Broker] allow maximum message size to be restricted
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1633466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java11
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java34
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java21
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java59
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java18
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java46
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java11
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java11
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java11
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java1
24 files changed, 230 insertions, 93 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index 7acd4aa1fa..7a8fff113c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -59,6 +59,12 @@ public interface AmqpPort<X extends AmqpPort<X>> extends ClientAuthCapablePort<X
@ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
+
+ String PORT_MAX_MESSAGE_SIZE = "qpid.port.max_message_size";
+
+ @ManagedContextDefault(name = PORT_MAX_MESSAGE_SIZE)
+ int DEFAULT_MAX_MESSAGE_SIZE = 0x1f40000; // 500Mb
+
String OPEN_CONNECTIONS_WARN_PERCENT = "qpid.port.open_connections_warn_percent";
@ManagedContextDefault(name = OPEN_CONNECTIONS_WARN_PERCENT)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
index 4d15e982c4..6e1b6529d8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
@@ -21,18 +21,18 @@ package org.apache.qpid.server.plugin;/*
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.transport.network.NetworkConnection;
public interface ProtocolEngineCreator extends Pluggable
{
Protocol getVersion();
byte[] getHeaderIdentifier();
- ServerProtocolEngine newProtocolEngine(Broker broker,
+ ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 5041e22104..26e8271d14 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -26,9 +26,9 @@ import java.util.List;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.util.Deletable;
@@ -89,7 +89,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
long getLastIoTime();
- Port<?> getPort();
+ AmqpPort<?> getPort();
Transport getTransport();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 1482576748..1c42d9b6fe 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -46,10 +46,10 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
@@ -618,7 +618,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public Port<?> getPort()
+ public AmqpPort<?> getPort()
{
return null;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 869ac01c4e..cca376c54c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.Header;
-import java.nio.ByteBuffer;
-
public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
{
@@ -51,13 +51,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTra
return getMetaData().getMessageHeader();
}
- public boolean isRedelivered()
- {
- // The *Message* is never redelivered, only queue entries are... this is here so that filters
- // can run against the message on entry to an exchange
- return false;
- }
-
public long getSize()
{
return getMetaData().getSize();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
index 40c94075a1..30aecdb2d2 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
@@ -25,9 +25,9 @@ import java.net.SocketAddress;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.ConnectionDelegate;
@@ -64,9 +64,9 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
return AMQP_0_10_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker broker,
+ public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id)
{
@@ -80,12 +80,13 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator
fqdn, broker.getSubjectCreator(address, transport.isSecure())
);
- ServerConnection conn = new ServerConnection(id,broker);
+ ServerConnection conn = new ServerConnection(id,broker, port, transport);
conn.setConnectionDelegate(connDelegate);
conn.setRemoteAddress(network.getRemoteAddress());
conn.setLocalAddress(network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, network, port, transport);
+
+ return new ProtocolEngine_0_10( conn, network);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index dc60a37a7f..854cd388b9 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -32,7 +32,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Transport;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
@@ -57,14 +56,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network,
- Port port,
- Transport transport)
+ NetworkConnection network)
{
super(new Assembler(conn));
_connection = conn;
- _connection.setPort(port);
- _connection.setTransport(transport);
if(network != null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 9bccfa53a3..bc463ef59e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -42,8 +42,8 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
@@ -75,7 +75,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl<?,?,?> _virtualHost;
- private Port<?> _port;
+ private AmqpPort<?> _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
@@ -88,13 +88,24 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private volatile boolean _stopped;
private int _messageCompressionThreshold;
+ private int _maxMessageSize;
- public ServerConnection(final long connectionId, Broker broker)
+ public ServerConnection(final long connectionId,
+ Broker<?> broker,
+ final AmqpPort<?> port,
+ final Transport transport)
{
_connectionId = connectionId;
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
_broker = broker;
+ _port = port;
+ _transport = transport;
+
+ int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
+ _maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
+
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
_dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
_messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
@@ -203,16 +214,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
- public Port<?> getPort()
+ public AmqpPort<?> getPort()
{
return _port;
}
- public void setPort(Port<?> port)
- {
- _port = port;
- }
-
@Override
public Transport getTransport()
{
@@ -231,11 +237,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
return _stopped;
}
- public void setTransport(Transport transport)
- {
- _transport = transport;
- }
-
public void onOpen(final Runnable task)
{
_onOpenTask = task;
@@ -658,4 +659,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
return _messageCompressionThreshold;
}
+
+ public int getMaxMessageSize()
+ {
+ return _maxMessageSize;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 77dba71fac..969701987a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -333,16 +333,22 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- if(((ServerSession)ssn).blockingTimeoutExceeded())
+ ServerSession serverSession = (ServerSession) ssn;
+ if(serverSession.blockingTimeoutExceeded())
{
getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
- ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE,
- "Session flow control was requested, but not enforced by sender");
+ serverSession.close(AMQConstant.MESSAGE_TOO_LARGE,
+ "Session flow control was requested, but not enforced by sender");
+ }
+ else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize())
+ {
+ exception(ssn, xfr, ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED,
+ "Message size of " + xfr.getBodySize() + " greater than allowed maximum of " + serverSession.getConnection().getMaxMessageSize());
}
else
{
- final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
+ final MessageDestination destination = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps =
xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
@@ -359,7 +365,7 @@ public class ServerSessionDelegate extends SessionDelegate
virtualHost.getSecurityManager()
.authorisePublish(messageMetaData.isImmediate(),
messageMetaData.getRoutingKey(),
- exchange.getName(),
+ destination.getName(),
virtualHost.getName());
}
catch (AccessControlException e)
@@ -372,7 +378,6 @@ public class ServerSessionDelegate extends SessionDelegate
final MessageStore store = virtualHost.getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- final ServerSession serverSession = (ServerSession) ssn;
final MessageTransferMessage message =
new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
@@ -400,7 +405,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
};
- int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
+ int enqueues = serverSession.enqueue(message, instanceProperties, destination);
if (enqueues == 0)
{
@@ -414,7 +419,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
index 58de3edb61..2fc76d4f02 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
@@ -22,11 +22,20 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
public class ServerSessionTest extends QpidTestCase
{
@@ -60,16 +69,19 @@ public class ServerSessionTest extends QpidTestCase
public void testCompareTo() throws Exception
{
- final Broker broker = mock(Broker.class);
+ final Broker<?> broker = mock(Broker.class);
when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
- ServerConnection connection = new ServerConnection(1, broker);
+ AmqpPort amqpPort = mock(AmqpPort.class);
+ when(amqpPort.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+
+ ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
- ServerConnection connection2 = new ServerConnection(2, broker);
+ ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
@@ -78,5 +90,46 @@ public class ServerSessionTest extends QpidTestCase
assertEquals("Unexpected compare result", 0, session1.compareTo(session1));
}
+ public void testOverlargeMessageTest() throws Exception
+ {
+ final Broker<?> broker = mock(Broker.class);
+ when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
+
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024);
+ ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
+ connection.setVirtualHost(_virtualHost);
+ final List<Method> invokedMethods = new ArrayList<>();
+ ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
+ new Binary(getName().getBytes()), 0)
+ {
+ @Override
+ public void invoke(final Method m)
+ {
+ invokedMethods.add(m);
+ }
+ };
+
+ ServerSessionDelegate delegate = new ServerSessionDelegate();
+
+ MessageTransfer xfr = new MessageTransfer();
+ xfr.setBody(new byte[2048]);
+ delegate.messageTransfer(session, xfr);
+
+ assertFalse("No methods invoked - expecting at least 1", invokedMethods.isEmpty());
+ Method firstInvoked = invokedMethods.get(0);
+ assertTrue("First invoked method not execution error", firstInvoked instanceof ExecutionException);
+ assertEquals(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED, ((ExecutionException)firstInvoked).getErrorCode());
+
+ invokedMethods.clear();
+
+ // test the boundary condition
+
+ xfr.setBody(new byte[1024]);
+ delegate.messageTransfer(session, xfr);
+
+ assertTrue("Methods invoked when not expecting any", invokedMethods.isEmpty());
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 77d856af7a..d14f185254 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -207,7 +207,6 @@ public class AMQChannel
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
-
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
_connection = connection;
@@ -578,9 +577,17 @@ public class AMQChannel
try
{
- _currentMessage.addContentBodyFrame(contentBody);
-
- deliverCurrentMessageIfComplete();
+ long currentSize = _currentMessage.addContentBodyFrame(contentBody);
+ if(currentSize > _currentMessage.getSize())
+ {
+ _connection.closeConnection(AMQConstant.FRAME_ERROR,
+ "More message data received than content header defined",
+ _channelId);
+ }
+ else
+ {
+ deliverCurrentMessageIfComplete();
+ }
}
catch (RuntimeException e)
{
@@ -2385,6 +2392,11 @@ public class AMQChannel
if(hasCurrentMessage())
{
+ if(bodySize > _connection.getMaxMessageSize())
+ {
+ closeChannel(AMQConstant.MESSAGE_TOO_LARGE,
+ "Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize());
+ }
publishContentHeader(new ContentHeaderBody(properties, bodySize));
}
else
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 49db24be52..89da98431e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -102,7 +102,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
- private final Port<?> _port;
+ private final AmqpPort<?> _port;
private final long _creationTime;
private AMQShortString _contextKey;
@@ -184,11 +184,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private int _currentClassId;
private int _currentMethodId;
private int _binaryDataLimit;
+ private long _maxMessageSize;
- public AMQProtocolEngine(Broker broker,
+ public AMQProtocolEngine(Broker<?> broker,
final NetworkConnection network,
final long connectionId,
- Port port,
+ AmqpPort<?> port,
Transport transport)
{
_broker = broker;
@@ -202,6 +203,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
: DEFAULT_DEBUG_BINARY_DATA_LENGTH;
+
+ int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
+ _maxMessageSize = (maxMessageSize > 0) ? (long) maxMessageSize : Long.MAX_VALUE;
+
_authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
runAsSubject(new PrivilegedAction<Void>()
{
@@ -1164,7 +1169,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
@Override
- public Port<?> getPort()
+ public AmqpPort<?> getPort()
{
return _port;
}
@@ -1753,6 +1758,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _binaryDataLimit;
}
+ public long getMaxMessageSize()
+ {
+ return _maxMessageSize;
+ }
+
public final class WriteDeliverMethod
implements ClientDeliveryMethod
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
index d966e9c9c6..6e622fb59a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
@@ -57,10 +57,11 @@ public class IncomingMessage
return _messagePublishInfo;
}
- public void addContentBodyFrame(final ContentBody contentChunk)
+ public long addContentBodyFrame(final ContentBody contentChunk)
{
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
+ return _bodyLengthReceived;
}
public boolean allContentReceived()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
index d88d8c3c2a..0058fe86a9 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -58,9 +58,9 @@ public class ProtocolEngineCreator_0_8 implements ProtocolEngineCreator
return AMQP_0_8_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker broker,
+ public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
index 4f9198a509..7253111114 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -58,9 +58,9 @@ public class ProtocolEngineCreator_0_9 implements ProtocolEngineCreator
return AMQP_0_9_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker broker,
+ public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
index 5251e760ff..e72cc4d058 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -59,9 +59,9 @@ public class ProtocolEngineCreator_0_9_1 implements ProtocolEngineCreator
return AMQP_0_9_1_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker broker,
+ public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
index 2c710bcf9f..a6725a6d58 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
@@ -20,20 +20,28 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -52,7 +60,10 @@ public class AMQChannelTest extends QpidTestCase
BrokerTestHelper.setUp();
_virtualHost = BrokerTestHelper.createVirtualHost(getTestName());
_broker = BrokerTestHelper.createBrokerMock();
- _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker)
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+
+ _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker, port)
{
@Override
public void writeReturn(MessagePublishInfo messagePublishInfo,
@@ -86,8 +97,10 @@ public class AMQChannelTest extends QpidTestCase
{
AMQChannel channel1 = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
// create a channel with the same channelId but on a different session
- AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost, _broker), 1, _virtualHost.getMessageStore());
+ AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost, _broker, port), 1, _virtualHost.getMessageStore());
assertFalse("Unexpected compare result", channel1.compareTo(channel2) == 0);
assertEquals("Unexpected compare result", 0, channel1.compareTo(channel1));
}
@@ -135,4 +148,33 @@ public class AMQChannelTest extends QpidTestCase
assertEquals("Unexpected number of replies", 0, _replies.size());
}
+ public void testOverlargeMessage() throws Exception
+ {
+
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(1024);
+ final List<AMQDataBlock> frames = new ArrayList<>();
+ _protocolSession = new InternalTestProtocolSession(_virtualHost, _broker, port)
+ {
+ @Override
+ public synchronized void writeFrame(final AMQDataBlock frame)
+ {
+ frames.add(frame);
+ }
+ };
+
+ AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
+
+ channel.receiveBasicPublish(AMQShortString.EMPTY_STRING, AMQShortString.EMPTY_STRING, false, false);
+
+ final BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+ channel.receiveMessageHeader(properties, 2048l);
+
+ frames.toString();
+
+ assertEquals(1, frames.size());
+ assertEquals(ChannelCloseBody.class, ((AMQFrame) frames.get(0)).getBodyFrame().getClass());
+ assertEquals(AMQConstant.MESSAGE_TOO_LARGE.getCode(), ((ChannelCloseBody)((AMQFrame)frames.get(0)).getBodyFrame()).getReplyCode());
+ }
+
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
index 59d7623011..a9eb2b1680 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -29,16 +30,16 @@ import java.util.Map;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngineTest extends QpidTestCase
{
- private Broker _broker;
- private Port _port;
+ private Broker<?> _broker;
+ private AmqpPort<?> _port;
private NetworkConnection _network;
private Transport _transport;
@@ -49,7 +50,9 @@ public class AMQProtocolEngineTest extends QpidTestCase
_broker = BrokerTestHelper.createBrokerMock();
when(_broker.getConnection_closeWhenNoRoute()).thenReturn(true);
- _port = mock(Port.class);
+ _port = mock(AmqpPort.class);
+ when(_port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+
_network = mock(NetworkConnection.class);
_transport = Transport.TCP;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
index d4c10c50c1..a0a3d60458 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
@@ -20,12 +20,17 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -58,7 +63,10 @@ public class BrokerTestHelper_0_8 extends BrokerTestHelper
public static InternalTestProtocolSession createProtocolSession(String hostName) throws Exception
{
VirtualHostImpl virtualHost = createVirtualHost(hostName);
- return new InternalTestProtocolSession(virtualHost, createBrokerMock());
+
+ AmqpPort port = mock(AmqpPort.class);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
+ return new InternalTestProtocolSession(virtualHost, createBrokerMock(), port);
}
public static void publishMessages(AMQChannel channel, int numberOfMessages, String queueName, String exchangeName)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 895c8e687c..c01a349509 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -60,9 +61,9 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
private AtomicInteger _deliveryCount = new AtomicInteger(0);
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
- public InternalTestProtocolSession(VirtualHostImpl virtualHost, Broker broker) throws AMQException
+ public InternalTestProtocolSession(VirtualHostImpl virtualHost, Broker<?> broker, final AmqpPort<?> port) throws AMQException
{
- super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
+ super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), port, null);
_channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 377eaab4cf..8e24d55da0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -47,7 +47,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -62,8 +61,8 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
- private final Port<?> _port;
- private final Broker _broker;
+ private final AmqpPort<?> _port;
+ private final Broker<?> _broker;
private final SubjectCreator _subjectCreator;
private VirtualHostImpl _vhost;
private final Transport _transport;
@@ -102,10 +101,10 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private boolean _closedOnOpen;
- public Connection_1_0(Broker broker,
+ public Connection_1_0(Broker<?> broker,
ConnectionEndpoint conn,
long connectionId,
- Port port,
+ AmqpPort<?> port,
Transport transport, final SubjectCreator subjectCreator)
{
_broker = broker;
@@ -359,7 +358,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public Port<?> getPort()
+ public AmqpPort<?> getPort()
{
return _port;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index 8eb89cade6..fa8134cb55 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -22,9 +22,9 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -58,9 +58,9 @@ public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator
return AMQP_SASL_1_0_0_HEADER;
}
- public ServerProtocolEngine newProtocolEngine(Broker broker,
+ public ServerProtocolEngine newProtocolEngine(Broker<?> broker,
NetworkConnection network,
- Port port,
+ AmqpPort<?> port,
Transport transport,
long id)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 550355216e..740b01e459 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -35,6 +35,7 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
@@ -53,8 +54,8 @@ import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -67,14 +68,14 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private static final org.apache.log4j.Logger
_logger = org.apache.log4j.Logger.getLogger(ProtocolEngine_1_0_0_SASL.class);
- private final Port _port;
+ private final AmqpPort<?> _port;
private final Transport _transport;
private long _readBytes;
private long _writtenBytes;
private long _lastReadTime;
private long _lastWriteTime;
- private final Broker _broker;
+ private final Broker<?> _broker;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _endpoint;
private long _connectionId;
@@ -134,8 +135,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private State _state = State.A;
- public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker,
- long id, Port port, Transport transport)
+ public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker,
+ long id, AmqpPort<?> port, Transport transport)
{
_connectionId = id;
_broker = broker;
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index e8aa2b3f34..007772e8be 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -157,6 +157,7 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
AmqpPort<?> port = mock(AmqpPort.class);
when(port.canAcceptNewConnection(any(SocketAddress.class))).thenReturn(true);
+ when(port.getContextValue(eq(Integer.class), eq(AmqpPort.PORT_MAX_MESSAGE_SIZE))).thenReturn(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
MultiVersionProtocolEngineFactory factory =