summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2015-08-04 18:38:38 +0000
committerAlan Conway <aconway@apache.org>2015-08-04 18:38:38 +0000
commita5bbf426a07a13c5a60acc274c08d7fd07a62b7c (patch)
treee0b24c6237b1fe5e1b6fd4186bafc40a0a7281bd
parent94b6a914234e49f30af88e740b87760d97219a0c (diff)
downloadqpid-python-a5bbf426a07a13c5a60acc274c08d7fd07a62b7c.tar.gz
QPID-6577: HA - backup broker messages are larger than primary messages.
Under the 0-10 protocol (used by HA) brokers add an "exchange" property to each message for the exchange the message arrived on .This is different (and sometimes longer) on the backup brokers from the primary since on the backups the message arrives on a special replication exchange. This fixes backup brokers to not modify the exchange property on messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1694094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/MessageBuilder.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp5
4 files changed, 16 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
index f5e9332052..109c9b8757 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -37,7 +37,7 @@ namespace
const std::string QPID_MANAGEMENT("qpid.management");
}
-MessageBuilder::MessageBuilder() : state(DORMANT) {}
+MessageBuilder::MessageBuilder() : state(DORMANT), copyExchange(true) {}
void MessageBuilder::handle(AMQFrame& frame)
{
@@ -60,7 +60,10 @@ void MessageBuilder::handle(AMQFrame& frame)
header.setEof(false);
message->getFrames().append(header);
} else if (type == HEADER_BODY) {
- frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange);
+ if (copyExchange) {
+ frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->
+ setExchange(exchange);
+ }
} else {
throw CommandInvalidException(
QPID_MSG("Invalid frame sequence for message, expected header or content got "
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.h b/qpid/cpp/src/qpid/broker/MessageBuilder.h
index 5673ed3b7f..e7a668e18d 100644
--- a/qpid/cpp/src/qpid/broker/MessageBuilder.h
+++ b/qpid/cpp/src/qpid/broker/MessageBuilder.h
@@ -43,11 +43,14 @@ namespace qpid {
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage();
QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
void end();
+ void setCopyExchange(bool value) { copyExchange = value; }
+
private:
enum State {DORMANT, METHOD, HEADER, CONTENT};
State state;
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message;
std::string exchange;
+ bool copyExchange;
void checkType(uint8_t expected, uint8_t actual);
};
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index c71c520f9c..b1f18747f3 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -147,6 +147,9 @@ class SessionState : public qpid::SessionState,
/** Send result and completion for a given command to the client. */
void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync,
const std::string& result);
+
+ MessageBuilder& getMessageBuilder() { return msgBuilder; }
+
private:
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 7997bc6aa9..3045829ce8 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -35,6 +35,7 @@
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -248,6 +249,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
sessionHandler = &sessionHandler_;
+ if (sessionHandler->getSession()) {
+ // Don't overwrite the exchange property set on the primary.
+ sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false);
+ }
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;