diff options
| author | Gordon Sim <gsim@apache.org> | 2007-09-10 08:41:05 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-09-10 08:41:05 +0000 |
| commit | a5c0fde5d0b96ae0b747f0cea21414753d6ee654 (patch) | |
| tree | 4a809a880691db3e04fa3c7374db500b767ca85b /cpp/src/qpid/broker | |
| parent | 783b718d0b270121cd2e597424d0c81adea77a38 (diff) | |
| download | qpid-python-a5c0fde5d0b96ae0b747f0cea21414753d6ee654.tar.gz | |
Client side support for message and delivery properties in header segments.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574176 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerQueue.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.cpp | 87 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.h | 65 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 4 |
8 files changed, 142 insertions, 72 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index a094c7a804..1a13a31a5e 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -68,7 +68,7 @@ void Queue::deliver(Message::shared_ptr& msg){ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); } } else { @@ -358,7 +358,7 @@ void Queue::destroy() while(!messages.empty()){ DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), - &(msg.getMessage().getApplicationHeaders())); + msg.getMessage().getApplicationHeaders()); pop(); } alternateExchange->decAlternateUsers(); diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 9b33fd5f10..a8a0745104 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -95,7 +95,7 @@ void DeliveryRecord::reject() Exchange::shared_ptr alternate = queue->getAlternateExchange(); if (alternate) { DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders())); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " << alternate->getName()); } else { diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e5f92297b7..84d3478173 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -38,12 +38,12 @@ PublishAdapter Message::PUBLISH; Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {} -const std::string& Message::getRoutingKey() const +std::string Message::getRoutingKey() const { return getAdapter().getRoutingKey(frames); } -const std::string& Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -61,7 +61,7 @@ bool Message::isImmediate() const return getAdapter().isImmediate(frames); } -const FieldTable& Message::getApplicationHeaders() const +const FieldTable* Message::getApplicationHeaders() const { return getAdapter().getApplicationHeaders(frames); } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 95b3f38b55..26b31d73e5 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -59,11 +59,11 @@ public: uint64_t contentSize() const; - const std::string& getRoutingKey() const; + std::string getRoutingKey() const; const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const; - const std::string& getExchangeName() const; + std::string getExchangeName() const; bool isImmediate() const; - const framing::FieldTable& getApplicationHeaders() const; + const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); framing::FrameSet& getFrames() { return frames; } diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp new file mode 100644 index 0000000000..764bf02cf4 --- /dev/null +++ b/cpp/src/qpid/broker/MessageAdapter.cpp @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageAdapter.h" + +namespace { + const std::string empty; +} + +namespace qpid { +namespace broker{ + + std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getRoutingKey(); + } + + std::string PublishAdapter::getExchange(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getExchange(); + } + + bool PublishAdapter::isImmediate(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getImmediate(); + } + + const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f) + { + const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>(); + return p ? &(p->getHeaders()) : 0; + } + + bool PublishAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>(); + return p && p->getDeliveryMode() == 2; + } + + std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + return p ? p->getRoutingKey() : empty; + } + + std::string TransferAdapter::getExchange(const framing::FrameSet& f) + { + return f.as<framing::MessageTransferBody>()->getDestination(); + } + + bool TransferAdapter::isImmediate(const framing::FrameSet&) + { + //TODO: we seem to have lost the immediate flag + return false; + } + + const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f) + { + const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); + return p ? &(p->getApplicationHeaders()) : 0; + } + + bool TransferAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + return p && p->getDeliveryMode() == 2; + } + +}} diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h index 0b2dc6307a..e8337ec649 100644 --- a/cpp/src/qpid/broker/MessageAdapter.h +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -38,68 +38,29 @@ struct MessageAdapter { virtual ~MessageAdapter() {} - virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0; - virtual const std::string& getExchange(const framing::FrameSet& f) = 0; + virtual std::string getRoutingKey(const framing::FrameSet& f) = 0; + virtual std::string getExchange(const framing::FrameSet& f) = 0; virtual bool isImmediate(const framing::FrameSet& f) = 0; - virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0; + virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0; virtual bool isPersistent(const framing::FrameSet& f) = 0; }; struct PublishAdapter : MessageAdapter { - const std::string& getRoutingKey(const framing::FrameSet& f) - { - return f.as<framing::BasicPublishBody>()->getRoutingKey(); - } - - const std::string& getExchange(const framing::FrameSet& f) - { - return f.as<framing::BasicPublishBody>()->getExchange(); - } - - bool isImmediate(const framing::FrameSet& f) - { - return f.as<framing::BasicPublishBody>()->getImmediate(); - } - - const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders(); - } - - bool isPersistent(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2; - } + std::string getRoutingKey(const framing::FrameSet& f); + std::string getExchange(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet& f); + const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; struct TransferAdapter : MessageAdapter { - const std::string& getRoutingKey(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey(); - } - - const std::string& getExchange(const framing::FrameSet& f) - { - return f.as<framing::MessageTransferBody>()->getDestination(); - } - - bool isImmediate(const framing::FrameSet&) - { - //TODO: we seem to have lost the immediate flag - return false; - } - - const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders(); - } - - bool isPersistent(const framing::FrameSet& f) - { - return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2; - } + std::string getRoutingKey(const framing::FrameSet& f); + std::string getExchange(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet&); + const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; }} diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 1a84aa9b65..84b3cbb2ac 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -22,8 +22,8 @@ #include "Message.h" #include "MessageStore.h" -#include "qpid/Exception.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; using namespace qpid::framing; @@ -46,7 +46,7 @@ void MessageBuilder::handle(AMQFrame& frame) checkType(CONTENT_BODY, frame.getBody()->type()); break; default: - throw ConnectionException(504, "Invalid frame sequence for message."); + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); } if (staging) { store->appendContent(*message, frame.castBody<AMQContentBody>()->getData()); @@ -61,13 +61,6 @@ void MessageBuilder::handle(AMQFrame& frame) } } -void MessageBuilder::checkType(uint8_t expected, uint8_t actual) -{ - if (expected != actual) { - throw ConnectionException(504, "Invalid frame sequence for message."); - } -} - void MessageBuilder::end() { message.reset(); @@ -81,3 +74,32 @@ void MessageBuilder::start(const SequenceNumber& id) state = METHOD; staging = false; } + +namespace { + +const std::string HEADER_BODY_S = "HEADER"; +const std::string METHOD_BODY_S = "METHOD"; +const std::string CONTENT_BODY_S = "CONTENT"; +const std::string HEARTBEAT_BODY_S = "HEARTBEAT"; +const std::string UNKNOWN = "unknown"; + +std::string type_str(uint8_t type) +{ + switch(type) { + case METHOD_BODY: return METHOD_BODY_S; + case HEADER_BODY: return HEADER_BODY_S; + case CONTENT_BODY: return CONTENT_BODY_S; + case HEARTBEAT_BODY: return HEARTBEAT_BODY_S; + } + return UNKNOWN; +} + +} + +void MessageBuilder::checkType(uint8_t expected, uint8_t actual) +{ + if (expected != actual) { + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected " + << type_str(expected) << " got " << type_str(actual) << ")")); + } +} diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index a8b22cb12a..650182c807 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -336,13 +336,13 @@ void Session::route(Message::shared_ptr msg, Deliverable& strategy) { cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName); } - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); if (!strategy.delivered) { //TODO:if reject-unroutable, then reject //else route to alternate exchange if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); } } |
