summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-16 15:21:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-16 15:21:41 +0000
commitc0e454cf882c7af8292832d6233940c56cc6a881 (patch)
tree317fcecc377ef9899cbb3760dfbf600295d6beb7 /qpid/java/broker-core/src
parenta22fa634fe3a3f51d1a27078e17cba82e48fcf46 (diff)
downloadqpid-python-c0e454cf882c7af8292832d6233940c56cc6a881.tar.gz
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1618375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java5
7 files changed, 30 insertions, 7 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index f8585344b0..b7be1bfd9b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -50,7 +50,7 @@ public interface ConsumerTarget
AMQSessionModel getSessionModel();
- void send(MessageInstance entry, boolean batch);
+ long send(MessageInstance entry, boolean batch);
void flushBatched();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 982ebb01c6..1a9390f210 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -128,6 +128,18 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedAttribute( defaultValue = "false")
boolean getStatisticsReportingResetEnabled();
+ String BROKER_MESSAGE_COMPRESSION_ENABLED = "broker.messageCompressionEnabled";
+ @ManagedContextDefault(name = BROKER_MESSAGE_COMPRESSION_ENABLED)
+ boolean DEFAULT_MESSAGE_COMPRESSION_ENABLED = true;
+
+ @ManagedAttribute( defaultValue = "${"+ BROKER_MESSAGE_COMPRESSION_ENABLED +"}")
+ boolean isMessageCompressionEnabled();
+
+ String MESSAGE_COMPRESSION_THRESHOLD_SIZE = "connection.messageCompressionThresholdSize";
+ @ManagedContextDefault(name = MESSAGE_COMPRESSION_THRESHOLD_SIZE)
+ int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+
+
@DerivedAttribute( persist = true )
String getModelVersion();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
index 7a965c19d7..5b3965904e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
@@ -43,6 +43,7 @@ public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
String TRANSPORT = "transport";
String PORT = "port";
+
@DerivedAttribute
String getClientId();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 67c713e9d9..af46bae1c4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -92,6 +92,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
private int _statisticsReportingPeriod;
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
+ @ManagedAttributeField
+ private boolean _messageCompressionEnabled;
private State _state = State.UNINITIALIZED;
@@ -360,6 +362,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@Override
+ public boolean isMessageCompressionEnabled()
+ {
+ return _messageCompressionEnabled;
+ }
+
+ @Override
public String getModelVersion()
{
return BrokerModel.MODEL_VERSION;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index d80aa92007..4044c938db 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -471,9 +471,8 @@ class QueueConsumerImpl
public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
- ServerMessage message = entry.getMessage();
- _deliveredBytes.addAndGet(message.getSize());
- _target.send(entry, batch);
+ long size = _target.send(entry, batch);
+ _deliveredBytes.addAndGet(size);
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 8d025c50dc..ad33ecadcf 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -167,13 +167,15 @@ public class MockConsumer implements ConsumerTarget
{
}
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
+ long size = entry.getMessage().getSize();
if (messages.contains(entry))
{
entry.setRedelivered();
}
messages.add(entry);
+ return size;
}
public void flushBatched()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index ce1c95e674..f13886d2b2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -208,10 +208,11 @@ public class StandardQueueTest extends AbstractQueueTestBase
* @param entry
* @param batch
*/
- public void send(MessageInstance entry, boolean batch)
+ public long send(MessageInstance entry, boolean batch)
{
- super.send(entry, batch);
+ long size = super.send(entry, batch);
latch.countDown();
+ return size;
}
};