diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
| commit | c0e454cf882c7af8292832d6233940c56cc6a881 (patch) | |
| tree | 317fcecc377ef9899cbb3760dfbf600295d6beb7 /qpid/java/broker-core/src | |
| parent | a22fa634fe3a3f51d1a27078e17cba82e48fcf46 (diff) | |
| download | qpid-python-c0e454cf882c7af8292832d6233940c56cc6a881.tar.gz | |
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1618375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
7 files changed, 30 insertions, 7 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index f8585344b0..b7be1bfd9b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -50,7 +50,7 @@ public interface ConsumerTarget AMQSessionModel getSessionModel(); - void send(MessageInstance entry, boolean batch); + long send(MessageInstance entry, boolean batch); void flushBatched(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 982ebb01c6..1a9390f210 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -128,6 +128,18 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedAttribute( defaultValue = "false") boolean getStatisticsReportingResetEnabled(); + String BROKER_MESSAGE_COMPRESSION_ENABLED = "broker.messageCompressionEnabled"; + @ManagedContextDefault(name = BROKER_MESSAGE_COMPRESSION_ENABLED) + boolean DEFAULT_MESSAGE_COMPRESSION_ENABLED = true; + + @ManagedAttribute( defaultValue = "${"+ BROKER_MESSAGE_COMPRESSION_ENABLED +"}") + boolean isMessageCompressionEnabled(); + + String MESSAGE_COMPRESSION_THRESHOLD_SIZE = "connection.messageCompressionThresholdSize"; + @ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE) + int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400; + + @DerivedAttribute( persist = true ) String getModelVersion(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java index 7a965c19d7..5b3965904e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java @@ -43,6 +43,7 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X> String TRANSPORT = "transport"; String PORT = "port"; + @DerivedAttribute String getClientId(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 67c713e9d9..af46bae1c4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -92,6 +92,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple private int _statisticsReportingPeriod; @ManagedAttributeField private boolean _statisticsReportingResetEnabled; + @ManagedAttributeField + private boolean _messageCompressionEnabled; private State _state = State.UNINITIALIZED; @@ -360,6 +362,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @Override + public boolean isMessageCompressionEnabled() + { + return _messageCompressionEnabled; + } + + @Override public String getModelVersion() { return BrokerModel.MODEL_VERSION; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index d80aa92007..4044c938db 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -471,9 +471,8 @@ class QueueConsumerImpl public final void send(final QueueEntry entry, final boolean batch) { _deliveredCount.incrementAndGet(); - ServerMessage message = entry.getMessage(); - _deliveredBytes.addAndGet(message.getSize()); - _target.send(entry, batch); + long size = _target.send(entry, batch); + _deliveredBytes.addAndGet(size); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 8d025c50dc..ad33ecadcf 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -167,13 +167,15 @@ public class MockConsumer implements ConsumerTarget { } - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { + long size = entry.getMessage().getSize(); if (messages.contains(entry)) { entry.setRedelivered(); } messages.add(entry); + return size; } public void flushBatched() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index ce1c95e674..f13886d2b2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -208,10 +208,11 @@ public class StandardQueueTest extends AbstractQueueTestBase * @param entry * @param batch */ - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { - super.send(entry, batch); + long size = super.send(entry, batch); latch.countDown(); + return size; } }; |
