diff options
author | Alan Conway <aconway@apache.org> | 2015-08-04 18:38:38 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2015-08-04 18:38:38 +0000 |
commit | a5bbf426a07a13c5a60acc274c08d7fd07a62b7c (patch) | |
tree | e0b24c6237b1fe5e1b6fd4186bafc40a0a7281bd | |
parent | 94b6a914234e49f30af88e740b87760d97219a0c (diff) | |
download | qpid-python-a5bbf426a07a13c5a60acc274c08d7fd07a62b7c.tar.gz |
QPID-6577: HA - backup broker messages are larger than primary messages.
Under the 0-10 protocol (used by HA) brokers add an "exchange" property to each
message for the exchange the message arrived on .This is different (and
sometimes longer) on the backup brokers from the primary since on the backups
the message arrives on a special replication exchange.
This fixes backup brokers to not modify the exchange property on messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1694094 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageBuilder.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageBuilder.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 5 |
4 files changed, 16 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index f5e9332052..109c9b8757 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -37,7 +37,7 @@ namespace const std::string QPID_MANAGEMENT("qpid.management"); } -MessageBuilder::MessageBuilder() : state(DORMANT) {} +MessageBuilder::MessageBuilder() : state(DORMANT), copyExchange(true) {} void MessageBuilder::handle(AMQFrame& frame) { @@ -60,7 +60,10 @@ void MessageBuilder::handle(AMQFrame& frame) header.setEof(false); message->getFrames().append(header); } else if (type == HEADER_BODY) { - frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange); + if (copyExchange) { + frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)-> + setExchange(exchange); + } } else { throw CommandInvalidException( QPID_MSG("Invalid frame sequence for message, expected header or content got " diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.h b/qpid/cpp/src/qpid/broker/MessageBuilder.h index 5673ed3b7f..e7a668e18d 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.h +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.h @@ -43,11 +43,14 @@ namespace qpid { boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage(); QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id); void end(); + void setCopyExchange(bool value) { copyExchange = value; } + private: enum State {DORMANT, METHOD, HEADER, CONTENT}; State state; boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message; std::string exchange; + bool copyExchange; void checkType(uint8_t expected, uint8_t actual); }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index c71c520f9c..b1f18747f3 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -147,6 +147,9 @@ class SessionState : public qpid::SessionState, /** Send result and completion for a given command to the client. */ void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync, const std::string& result); + + MessageBuilder& getMessageBuilder() { return msgBuilder; } + private: void handleCommand(framing::AMQMethodBody* method); void handleContent(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 7997bc6aa9..3045829ce8 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" @@ -248,6 +249,10 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed sessionHandler = &sessionHandler_; + if (sessionHandler->getSession()) { + // Don't overwrite the exchange property set on the primary. + sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false); + } AMQP_ServerProxy peer(sessionHandler->out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); FieldTable arguments; |