summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-10 08:41:05 +0000
committerGordon Sim <gsim@apache.org>2007-09-10 08:41:05 +0000
commita5c0fde5d0b96ae0b747f0cea21414753d6ee654 (patch)
tree4a809a880691db3e04fa3c7374db500b767ca85b /cpp/src/qpid/broker
parent783b718d0b270121cd2e597424d0c81adea77a38 (diff)
downloadqpid-python-a5c0fde5d0b96ae0b747f0cea21414753d6ee654.tar.gz
Client side support for message and delivery properties in header segments.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574176 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/Message.h6
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp87
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h65
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp40
-rw-r--r--cpp/src/qpid/broker/Session.cpp4
8 files changed, 142 insertions, 72 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index a094c7a804..1a13a31a5e 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -68,7 +68,7 @@ void Queue::deliver(Message::shared_ptr& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
}
} else {
@@ -358,7 +358,7 @@ void Queue::destroy()
while(!messages.empty()){
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
- &(msg.getMessage().getApplicationHeaders()));
+ msg.getMessage().getApplicationHeaders());
pop();
}
alternateExchange->decAlternateUsers();
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 9b33fd5f10..a8a0745104 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -95,7 +95,7 @@ void DeliveryRecord::reject()
Exchange::shared_ptr alternate = queue->getAlternateExchange();
if (alternate) {
DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
<< alternate->getName());
} else {
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e5f92297b7..84d3478173 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -38,12 +38,12 @@ PublishAdapter Message::PUBLISH;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
-const std::string& Message::getRoutingKey() const
+std::string Message::getRoutingKey() const
{
return getAdapter().getRoutingKey(frames);
}
-const std::string& Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -61,7 +61,7 @@ bool Message::isImmediate() const
return getAdapter().isImmediate(frames);
}
-const FieldTable& Message::getApplicationHeaders() const
+const FieldTable* Message::getApplicationHeaders() const
{
return getAdapter().getApplicationHeaders(frames);
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 95b3f38b55..26b31d73e5 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -59,11 +59,11 @@ public:
uint64_t contentSize() const;
- const std::string& getRoutingKey() const;
+ std::string getRoutingKey() const;
const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
- const std::string& getExchangeName() const;
+ std::string getExchangeName() const;
bool isImmediate() const;
- const framing::FieldTable& getApplicationHeaders() const;
+ const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
framing::FrameSet& getFrames() { return frames; }
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
new file mode 100644
index 0000000000..764bf02cf4
--- /dev/null
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageAdapter.h"
+
+namespace {
+ const std::string empty;
+}
+
+namespace qpid {
+namespace broker{
+
+ std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getRoutingKey();
+ }
+
+ std::string PublishAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getExchange();
+ }
+
+ bool PublishAdapter::isImmediate(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getImmediate();
+ }
+
+ const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+ return p ? &(p->getHeaders()) : 0;
+ }
+
+ bool PublishAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
+ std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p ? p->getRoutingKey() : empty;
+ }
+
+ std::string TransferAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
+
+ bool TransferAdapter::isImmediate(const framing::FrameSet&)
+ {
+ //TODO: we seem to have lost the immediate flag
+ return false;
+ }
+
+ const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ return p ? &(p->getApplicationHeaders()) : 0;
+ }
+
+ bool TransferAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
+}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 0b2dc6307a..e8337ec649 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -38,68 +38,29 @@ struct MessageAdapter
{
virtual ~MessageAdapter() {}
- virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0;
- virtual const std::string& getExchange(const framing::FrameSet& f) = 0;
+ virtual std::string getRoutingKey(const framing::FrameSet& f) = 0;
+ virtual std::string getExchange(const framing::FrameSet& f) = 0;
virtual bool isImmediate(const framing::FrameSet& f) = 0;
- virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0;
+ virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0;
virtual bool isPersistent(const framing::FrameSet& f) = 0;
};
struct PublishAdapter : MessageAdapter
{
- const std::string& getRoutingKey(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getRoutingKey();
- }
-
- const std::string& getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getExchange();
- }
-
- bool isImmediate(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getImmediate();
- }
-
- const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders();
- }
-
- bool isPersistent(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2;
- }
+ std::string getRoutingKey(const framing::FrameSet& f);
+ std::string getExchange(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet& f);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
struct TransferAdapter : MessageAdapter
{
- const std::string& getRoutingKey(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey();
- }
-
- const std::string& getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::MessageTransferBody>()->getDestination();
- }
-
- bool isImmediate(const framing::FrameSet&)
- {
- //TODO: we seem to have lost the immediate flag
- return false;
- }
-
- const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders();
- }
-
- bool isPersistent(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2;
- }
+ std::string getRoutingKey(const framing::FrameSet& f);
+ std::string getExchange(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet&);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
}}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 1a84aa9b65..84b3cbb2ac 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -22,8 +22,8 @@
#include "Message.h"
#include "MessageStore.h"
-#include "qpid/Exception.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -46,7 +46,7 @@ void MessageBuilder::handle(AMQFrame& frame)
checkType(CONTENT_BODY, frame.getBody()->type());
break;
default:
- throw ConnectionException(504, "Invalid frame sequence for message.");
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
}
if (staging) {
store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
@@ -61,13 +61,6 @@ void MessageBuilder::handle(AMQFrame& frame)
}
}
-void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
-{
- if (expected != actual) {
- throw ConnectionException(504, "Invalid frame sequence for message.");
- }
-}
-
void MessageBuilder::end()
{
message.reset();
@@ -81,3 +74,32 @@ void MessageBuilder::start(const SequenceNumber& id)
state = METHOD;
staging = false;
}
+
+namespace {
+
+const std::string HEADER_BODY_S = "HEADER";
+const std::string METHOD_BODY_S = "METHOD";
+const std::string CONTENT_BODY_S = "CONTENT";
+const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
+const std::string UNKNOWN = "unknown";
+
+std::string type_str(uint8_t type)
+{
+ switch(type) {
+ case METHOD_BODY: return METHOD_BODY_S;
+ case HEADER_BODY: return HEADER_BODY_S;
+ case CONTENT_BODY: return CONTENT_BODY_S;
+ case HEARTBEAT_BODY: return HEARTBEAT_BODY_S;
+ }
+ return UNKNOWN;
+}
+
+}
+
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ << type_str(expected) << " got " << type_str(actual) << ")"));
+ }
+}
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
index a8b22cb12a..650182c807 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -336,13 +336,13 @@ void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
}
- cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
if (!strategy.delivered) {
//TODO:if reject-unroutable, then reject
//else route to alternate exchange
if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
}
}