summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-30 12:31:45 +0000
committerGordon Sim <gsim@apache.org>2013-08-30 12:31:45 +0000
commit72f706cac68f5e7146ca5facb0e72e9d35b4332b (patch)
tree5e06aa2fbc333e958df734c8d3278ee196696957 /qpid/cpp
parentd256497d38f297019eaee1d4713d2c621aa71a6b (diff)
downloadqpid-python-72f706cac68f5e7146ca5facb0e72e9d35b4332b.tar.gz
QPID-5040: fix for string and symbol types on AmqpValue section (also clear message on fetch())
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518955 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/examples/messaging/drain.cpp8
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.cpp21
-rw-r--r--qpid/cpp/src/qpid/messaging/MessageImpl.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/Receiver.cpp7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp5
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp9
6 files changed, 41 insertions, 10 deletions
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp
index fe4ea7950d..6ac1d3a236 100644
--- a/qpid/cpp/examples/messaging/drain.cpp
+++ b/qpid/cpp/examples/messaging/drain.cpp
@@ -92,11 +92,15 @@ int main(int argc, char** argv)
int i = 0;
while (receiver.fetch(message, timeout)) {
- std::cout << "Message(properties=" << message.getProperties() << ", content='" ;
+ std::cout << "Message(properties=" << message.getProperties();
+ if (!message.getSubject().empty()) {
+ std::cout << ", subject='" << message.getSubject() << "'";
+ }
+ std::cout << ", content='";
if (message.getContentType() == "amqp/map") {
std::cout << message.getContentObject().asMap();
} else {
- std::cout << message.getContent();
+ std::cout << message.getContentObject();
}
std::cout << "')" << std::endl;
session.acknowledge();
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
index 7b5854745e..e9232804d8 100644
--- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp
@@ -47,6 +47,27 @@ MessageImpl::MessageImpl(const char* chars, size_t count) :
contentDecoded(false),
internalId(0) {}
+void MessageImpl::clear()
+{
+ replyTo = Address();
+ subject = std::string();
+ contentType = std::string();
+ messageId = std::string();
+ userId= std::string();
+ correlationId = std::string();
+ priority = 0;
+ ttl = 0;
+ durable = false;
+ redelivered = false;
+ headers = qpid::types::Variant::Map();
+
+ bytes = std::string();
+ content = qpid::types::Variant();
+ contentDecoded = false;
+ encoded = boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage>();
+ internalId = 0;
+}
+
void MessageImpl::setReplyTo(const Address& d)
{
replyTo = d;
diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h
index b63c8689af..647972de16 100644
--- a/qpid/cpp/src/qpid/messaging/MessageImpl.h
+++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h
@@ -61,6 +61,7 @@ class MessageImpl
MessageImpl(const std::string& c);
MessageImpl(const char* chars, size_t count);
+ void clear();
void setReplyTo(const Address& d);
QPID_MESSAGING_EXTERN const Address& getReplyTo() const;
diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp
index c45ebd6760..f60e5f55b3 100644
--- a/qpid/cpp/src/qpid/messaging/Receiver.cpp
+++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp
@@ -21,6 +21,7 @@
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
+#include "qpid/messaging/MessageImpl.h"
#include "qpid/messaging/ReceiverImpl.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
@@ -36,7 +37,11 @@ Receiver::~Receiver() { PI::dtor(*this); }
Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); }
bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); }
Message Receiver::get(Duration timeout) { return impl->get(timeout); }
-bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); }
+bool Receiver::fetch(Message& message, Duration timeout)
+{
+ MessageImplAccess::get(message).clear();
+ return impl->fetch(message, timeout);
+}
Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
index a0bf8dc575..09a5ea4904 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
@@ -185,15 +185,14 @@ void EncodedMessage::getCorrelationId(std::string& s) const
}
void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const
{
- //TODO: based on section type, populate content
if (!content.isVoid()) {
c = content;//integer types, floats, bool etc
//TODO: populate raw data?
} else {
if (bodyType.empty()
|| bodyType == qpid::amqp::typecodes::BINARY_NAME
- || bodyType == qpid::amqp::typecodes::STRING_NAME
- || bodyType == qpid::amqp::typecodes::SYMBOL_NAME)
+ || bodyType == qpid::types::encodings::UTF8
+ || bodyType == qpid::types::encodings::ASCII)
{
c = std::string(body.data, body.size);
c.setEncoding(bodyType);
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index d00b828ddc..1849e11667 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -69,6 +69,7 @@ struct Options : public qpid::Options
string readyAddress;
uint receiveRate;
std::string replyto;
+ bool noReplies;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -91,7 +92,8 @@ struct Options : public qpid::Options
reportTotal(false),
reportEvery(0),
reportHeader(true),
- receiveRate(0)
+ receiveRate(0),
+ noReplies(false)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -116,6 +118,7 @@ struct Options : public qpid::Options
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages")
+ ("ignore-reply-to", qpid::optValue(noReplies), "Do not send replies even if reply-to is set")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -248,7 +251,7 @@ int main(int argc, char ** argv)
} else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) {
session.acknowledge();
}
- if (msg.getReplyTo()) { // Echo message back to reply-to address.
+ if (msg.getReplyTo() && !opts.noReplies) { // Echo message back to reply-to address.
Sender& s = replyTo[msg.getReplyTo().str()];
if (s.isNull()) {
s = session.createSender(msg.getReplyTo());
@@ -263,8 +266,6 @@ int main(int argc, char ** argv)
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
- // Clear out message properties & content for next iteration.
- msg = Message(); // TODO aconway 2010-12-01: should be done by fetch
}
if (opts.reportTotal) reporter.report();
if (opts.tx) {