diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-30 12:31:45 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-30 12:31:45 +0000 |
| commit | 72f706cac68f5e7146ca5facb0e72e9d35b4332b (patch) | |
| tree | 5e06aa2fbc333e958df734c8d3278ee196696957 /qpid/cpp | |
| parent | d256497d38f297019eaee1d4713d2c621aa71a6b (diff) | |
| download | qpid-python-72f706cac68f5e7146ca5facb0e72e9d35b4332b.tar.gz | |
QPID-5040: fix for string and symbol types on AmqpValue section (also clear message on fetch())
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518955 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/examples/messaging/drain.cpp | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/MessageImpl.cpp | 21 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/MessageImpl.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/Receiver.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 9 |
6 files changed, 41 insertions, 10 deletions
diff --git a/qpid/cpp/examples/messaging/drain.cpp b/qpid/cpp/examples/messaging/drain.cpp index fe4ea7950d..6ac1d3a236 100644 --- a/qpid/cpp/examples/messaging/drain.cpp +++ b/qpid/cpp/examples/messaging/drain.cpp @@ -92,11 +92,15 @@ int main(int argc, char** argv) int i = 0; while (receiver.fetch(message, timeout)) { - std::cout << "Message(properties=" << message.getProperties() << ", content='" ; + std::cout << "Message(properties=" << message.getProperties(); + if (!message.getSubject().empty()) { + std::cout << ", subject='" << message.getSubject() << "'"; + } + std::cout << ", content='"; if (message.getContentType() == "amqp/map") { std::cout << message.getContentObject().asMap(); } else { - std::cout << message.getContent(); + std::cout << message.getContentObject(); } std::cout << "')" << std::endl; session.acknowledge(); diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp index 7b5854745e..e9232804d8 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.cpp +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.cpp @@ -47,6 +47,27 @@ MessageImpl::MessageImpl(const char* chars, size_t count) : contentDecoded(false), internalId(0) {} +void MessageImpl::clear() +{ + replyTo = Address(); + subject = std::string(); + contentType = std::string(); + messageId = std::string(); + userId= std::string(); + correlationId = std::string(); + priority = 0; + ttl = 0; + durable = false; + redelivered = false; + headers = qpid::types::Variant::Map(); + + bytes = std::string(); + content = qpid::types::Variant(); + contentDecoded = false; + encoded = boost::shared_ptr<const qpid::messaging::amqp::EncodedMessage>(); + internalId = 0; +} + void MessageImpl::setReplyTo(const Address& d) { replyTo = d; diff --git a/qpid/cpp/src/qpid/messaging/MessageImpl.h b/qpid/cpp/src/qpid/messaging/MessageImpl.h index b63c8689af..647972de16 100644 --- a/qpid/cpp/src/qpid/messaging/MessageImpl.h +++ b/qpid/cpp/src/qpid/messaging/MessageImpl.h @@ -61,6 +61,7 @@ class MessageImpl MessageImpl(const std::string& c); MessageImpl(const char* chars, size_t count); + void clear(); void setReplyTo(const Address& d); QPID_MESSAGING_EXTERN const Address& getReplyTo() const; diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp index c45ebd6760..f60e5f55b3 100644 --- a/qpid/cpp/src/qpid/messaging/Receiver.cpp +++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp @@ -21,6 +21,7 @@ #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" #include "qpid/messaging/ReceiverImpl.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/PrivateImplRef.h" @@ -36,7 +37,11 @@ Receiver::~Receiver() { PI::dtor(*this); } Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); } bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); } Message Receiver::get(Duration timeout) { return impl->get(timeout); } -bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); } +bool Receiver::fetch(Message& message, Duration timeout) +{ + MessageImplAccess::get(message).clear(); + return impl->fetch(message, timeout); +} Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); } void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } uint32_t Receiver::getCapacity() { return impl->getCapacity(); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp index a0bf8dc575..09a5ea4904 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp @@ -185,15 +185,14 @@ void EncodedMessage::getCorrelationId(std::string& s) const } void EncodedMessage::getBody(std::string& raw, qpid::types::Variant& c) const { - //TODO: based on section type, populate content if (!content.isVoid()) { c = content;//integer types, floats, bool etc //TODO: populate raw data? } else { if (bodyType.empty() || bodyType == qpid::amqp::typecodes::BINARY_NAME - || bodyType == qpid::amqp::typecodes::STRING_NAME - || bodyType == qpid::amqp::typecodes::SYMBOL_NAME) + || bodyType == qpid::types::encodings::UTF8 + || bodyType == qpid::types::encodings::ASCII) { c = std::string(body.data, body.size); c.setEncoding(bodyType); diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index d00b828ddc..1849e11667 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -69,6 +69,7 @@ struct Options : public qpid::Options string readyAddress; uint receiveRate; std::string replyto; + bool noReplies; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -91,7 +92,8 @@ struct Options : public qpid::Options reportTotal(false), reportEvery(0), reportHeader(true), - receiveRate(0) + receiveRate(0), + noReplies(false) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -116,6 +118,7 @@ struct Options : public qpid::Options ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages") + ("ignore-reply-to", qpid::optValue(noReplies), "Do not send replies even if reply-to is set") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -248,7 +251,7 @@ int main(int argc, char ** argv) } else if (opts.ackFrequency && (count % opts.ackFrequency == 0)) { session.acknowledge(); } - if (msg.getReplyTo()) { // Echo message back to reply-to address. + if (msg.getReplyTo() && !opts.noReplies) { // Echo message back to reply-to address. Sender& s = replyTo[msg.getReplyTo().str()]; if (s.isNull()) { s = session.createSender(msg.getReplyTo()); @@ -263,8 +266,6 @@ int main(int argc, char ** argv) int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - // Clear out message properties & content for next iteration. - msg = Message(); // TODO aconway 2010-12-01: should be done by fetch } if (opts.reportTotal) reporter.report(); if (opts.tx) { |
