summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/client')
-rw-r--r--qpid/cpp/src/client/BasicMessageChannel.cpp10
-rw-r--r--qpid/cpp/src/client/ClientChannel.cpp35
-rw-r--r--qpid/cpp/src/client/ClientChannel.h11
-rw-r--r--qpid/cpp/src/client/ClientConnection.cpp4
-rw-r--r--qpid/cpp/src/client/ClientMessage.h2
-rw-r--r--qpid/cpp/src/client/MessageMessageChannel.cpp103
6 files changed, 118 insertions, 47 deletions
diff --git a/qpid/cpp/src/client/BasicMessageChannel.cpp b/qpid/cpp/src/client/BasicMessageChannel.cpp
index 9e3d184673..c577c0a305 100644
--- a/qpid/cpp/src/client/BasicMessageChannel.cpp
+++ b/qpid/cpp/src/client/BasicMessageChannel.cpp
@@ -81,10 +81,10 @@ void BasicMessageChannel::consume(
BasicConsumeOkBody::shared_ptr ok =
channel.sendAndReceiveSync<BasicConsumeOkBody>(
synch,
- new BasicConsumeBody(
+ make_shared_ptr(new BasicConsumeBody(
channel.version, 0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
+ fields ? *fields : FieldTable())));
tag = ok->getConsumerTag();
}
@@ -102,7 +102,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
channel.sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(channel.version, tag, !synch));
+ synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
@@ -337,9 +337,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand
void BasicMessageChannel::setQos(){
channel.sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
- channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+ channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client
diff --git a/qpid/cpp/src/client/ClientChannel.cpp b/qpid/cpp/src/client/ClientChannel.cpp
index 99eece46bc..533b590010 100644
--- a/qpid/cpp/src/client/ClientChannel.cpp
+++ b/qpid/cpp/src/client/ClientChannel.cpp
@@ -60,7 +60,7 @@ void Channel::open(ChannelId id, Connection& con)
init(id, con, con.getVersion()); // ChannelAdapter initialization.
string oob;
if (id != 0)
- sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob));
+ sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob)));
}
void Channel::protocolInit(
@@ -77,10 +77,10 @@ void Channel::protocolInit(
string locale("en_US");
ConnectionTuneBody::shared_ptr proposal =
sendAndReceive<ConnectionTuneBody>(
- new ConnectionStartOkBody(
+ make_shared_ptr(new ConnectionStartOkBody(
version, connectionStart->getRequestId(),
props, mechanism,
- response, locale));
+ response, locale)));
/**
* Assume for now that further challenges will not be required
@@ -136,15 +136,15 @@ void Channel::declareExchange(Exchange& exchange, bool synch){
FieldTable args;
sendAndReceiveSync<ExchangeDeclareOkBody>(
synch,
- new ExchangeDeclareBody(
- version, 0, name, type, false, false, false, false, !synch, args));
+ make_shared_ptr(new ExchangeDeclareBody(
+ version, 0, name, type, false, false, false, false, !synch, args)));
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
sendAndReceiveSync<ExchangeDeleteOkBody>(
synch,
- new ExchangeDeleteBody(version, 0, name, false, !synch));
+ make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch)));
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -153,9 +153,9 @@ void Channel::declareQueue(Queue& queue, bool synch){
QueueDeclareOkBody::shared_ptr response =
sendAndReceiveSync<QueueDeclareOkBody>(
synch,
- new QueueDeclareBody(
+ make_shared_ptr(new QueueDeclareBody(
version, 0, name, false/*passive*/, queue.isDurable(),
- queue.isExclusive(), queue.isAutoDelete(), !synch, args));
+ queue.isExclusive(), queue.isAutoDelete(), !synch, args)));
if(synch) {
if(queue.getName().length() == 0)
queue.setName(response->getQueue());
@@ -167,7 +167,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch)
string name = queue.getName();
sendAndReceiveSync<QueueDeleteOkBody>(
synch,
- new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)));
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
@@ -175,15 +175,15 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
string q = queue.getName();
sendAndReceiveSync<QueueBindOkBody>(
synch,
- new QueueBindBody(version, 0, q, e, key,!synch, args));
+ make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args)));
}
void Channel::commit(){
- sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
+ sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version)));
}
void Channel::rollback(){
- sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version));
+ sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version)));
}
void Channel::handleMethodInContext(
@@ -203,7 +203,8 @@ void Channel::handleMethodInContext(
}
try {
switch (method->amqpClassId()) {
- case BasicDeliverBody::CLASS_ID: messaging->handle(method); break;
+ case MessageOkBody::CLASS_ID:
+ case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
@@ -261,8 +262,8 @@ void Channel::close(
try {
if (getId() != 0) {
sendAndReceive<ChannelCloseOkBody>(
- new ChannelCloseBody(
- version, code, text, classId, methodId));
+ make_shared_ptr(new ChannelCloseBody(
+ version, code, text, classId, methodId)));
}
static_cast<ConnectionForChannel*>(connection)->erase(getId());
closeInternal();
@@ -292,7 +293,7 @@ void Channel::closeInternal() {
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(
- AMQMethodBody* toSend, ClassId c, MethodId m)
+ AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
{
responses.expect();
send(toSend);
@@ -300,7 +301,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive(
}
AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
- bool sync, AMQMethodBody* body, ClassId c, MethodId m)
+ bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m)
{
if(sync)
return sendAndReceive(body, c, m);
diff --git a/qpid/cpp/src/client/ClientChannel.h b/qpid/cpp/src/client/ClientChannel.h
index cf2ea1dbe5..328fc23f68 100644
--- a/qpid/cpp/src/client/ClientChannel.h
+++ b/qpid/cpp/src/client/ClientChannel.h
@@ -56,6 +56,7 @@ class Channel : public framing::ChannelAdapter
{
private:
struct UnknownMethod {};
+ typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
sys::Mutex lock;
boost::scoped_ptr<MessageChannel> messaging;
@@ -82,21 +83,23 @@ class Channel : public framing::ChannelAdapter
const std::string& vhost);
framing::AMQMethodBody::shared_ptr sendAndReceive(
- framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
framing::AMQMethodBody::shared_ptr sendAndReceiveSync(
bool sync,
- framing::AMQMethodBody*, framing::ClassId, framing::MethodId);
+ framing::AMQMethodBody::shared_ptr,
+ framing::ClassId, framing::MethodId);
template <class BodyType>
- boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) {
+ boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID));
}
template <class BodyType>
boost::shared_ptr<BodyType> sendAndReceiveSync(
- bool sync, framing::AMQMethodBody* body) {
+ bool sync, framing::AMQMethodBody::shared_ptr body) {
return boost::shared_polymorphic_downcast<BodyType>(
sendAndReceiveSync(
sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID));
diff --git a/qpid/cpp/src/client/ClientConnection.cpp b/qpid/cpp/src/client/ClientConnection.cpp
index 365311ab37..b053a45b0f 100644
--- a/qpid/cpp/src/client/ClientConnection.cpp
+++ b/qpid/cpp/src/client/ClientConnection.cpp
@@ -87,8 +87,8 @@ void Connection::close(
// partly closed with threads left unjoined.
isOpen = false;
channel0.sendAndReceive<ConnectionCloseOkBody>(
- new ConnectionCloseBody(
- getVersion(), code, msg, classId, methodId));
+ make_shared_ptr(new ConnectionCloseBody(
+ getVersion(), code, msg, classId, methodId)));
using boost::bind;
for_each(channels.begin(), channels.end(),
diff --git a/qpid/cpp/src/client/ClientMessage.h b/qpid/cpp/src/client/ClientMessage.h
index dc25b4070b..35aed6c734 100644
--- a/qpid/cpp/src/client/ClientMessage.h
+++ b/qpid/cpp/src/client/ClientMessage.h
@@ -33,6 +33,8 @@ namespace client {
*
* \ingroup clientapi
*/
+// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not
+// basic header properties.
class Message : public framing::BasicHeaderProperties {
public:
Message(const std::string& data_=std::string()) : data(data_) {}
diff --git a/qpid/cpp/src/client/MessageMessageChannel.cpp b/qpid/cpp/src/client/MessageMessageChannel.cpp
index 25fbb95413..164a1cb426 100644
--- a/qpid/cpp/src/client/MessageMessageChannel.cpp
+++ b/qpid/cpp/src/client/MessageMessageChannel.cpp
@@ -25,6 +25,7 @@
#include "../framing/FieldTable.h"
#include "Connection.h"
#include "../shared_ptr.h"
+#include <boost/bind.hpp>
namespace qpid {
namespace client {
@@ -48,9 +49,9 @@ void MessageMessageChannel::consume(
if (tag.empty())
tag = newTag();
channel.sendAndReceive<MessageOkBody>(
- new MessageConsumeBody(
+ make_shared_ptr(new MessageConsumeBody(
channel.getVersion(), 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, fields ? *fields : FieldTable()));
+ ackMode == NO_ACK, false, fields ? *fields : FieldTable())));
// // FIXME aconway 2007-02-20: Race condition!
// // We could receive the first message for the consumer
@@ -115,16 +116,44 @@ void MessageMessageChannel::close(){
*/
const string getDestinationId("__get__");
+/**
+ * A destination that provides a Correlator::Action to handle
+ * MessageEmpty responses.
+ */
+struct MessageGetDestination : public IncomingMessage::WaitableDestination
+{
+ void response(shared_ptr<AMQResponseBody> response) {
+ if (response->amqpClassId() == MessageOkBody::CLASS_ID) {
+ switch (response->amqpMethodId()) {
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do, wait for transfer.
+ return;
+ case MessageEmptyBody::METHOD_ID:
+ empty(); // Wake up waiter with empty queue.
+ return;
+ }
+ }
+ throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response");
+ }
+
+ Correlator::Action action() {
+ return boost::bind(&MessageGetDestination::response, this, _1);
+ }
+};
+
bool MessageMessageChannel::get(
- Message& , const Queue& , AckMode )
+ Message& msg, const Queue& queue, AckMode ackMode)
{
Mutex::ScopedLock l(lock);
-// incoming.addDestination(getDestinationId, getDest);
-// channel.send(
-// new MessageGetBody(
-// channel.version, 0, queue.getName(), getDestinationId, ackMode));
-// return getDest.wait(msg);
- return false;
+ std::string destName=newTag();
+ MessageGetDestination dest;
+ incoming.addDestination(destName, dest);
+ channel.send(
+ make_shared_ptr(
+ new MessageGetBody(
+ channel.version, 0, queue.getName(), destName, ackMode)),
+ dest.action());
+ return dest.wait(msg);
}
@@ -176,9 +205,30 @@ void MessageMessageChannel::publish(
// FIXME aconway 2007-02-23:
throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
}
- channel.sendAndReceive<MessageOkBody>(transfer.get());
+ channel.sendAndReceive<MessageOkBody>(transfer);
}
+void copy(Message& msg, MessageTransferBody& transfer) {
+ // FIXME aconway 2007-04-05: Verify all required fields
+ // are copied.
+ msg.setContentType(transfer.getContentType());
+ msg.setContentEncoding(transfer.getContentEncoding());
+ msg.setHeaders(transfer.getApplicationHeaders());
+ msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode()));
+ msg.setPriority(transfer.getPriority());
+ msg.setCorrelationId(transfer.getCorrelationId());
+ msg.setReplyTo(transfer.getReplyTo());
+ // FIXME aconway 2007-04-05: TTL/Expiration
+ msg.setMessageId(transfer.getMessageId());
+ msg.setTimestamp(transfer.getTimestamp());
+ msg.setUserId(transfer.getUserId());
+ msg.setAppId(transfer.getAppId());
+ msg.setDestination(transfer.getDestination());
+ msg.setRedelivered(transfer.getRedelivered());
+ msg.setDeliveryTag(0); // No meaning in 0-9
+ if (transfer.getBody().isInline())
+ msg.setData(transfer.getBody().getValue());
+}
void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID);
@@ -203,23 +253,38 @@ void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) {
break;
}
- case MessageEmptyBody::METHOD_ID: {
- // FIXME aconway 2007-04-04:
- // getDest.empty();
+ case MessageTransferBody::METHOD_ID: {
+ MessageTransferBody::shared_ptr transfer=
+ shared_polymorphic_downcast<MessageTransferBody>(method);
+ if (transfer->getBody().isInline()) {
+ Message msg;
+ copy(msg, *transfer);
+ // Deliver it.
+ incoming.getDestination(transfer->getDestination()).message(msg);
+ }
+ else {
+ Message& msg=incoming.createMessage(
+ transfer->getDestination(), transfer->getBody().getValue());
+ copy(msg, *transfer);
+ // Will be delivered when reference closes.
+ }
break;
}
- case MessageCancelBody::METHOD_ID:
- case MessageCheckpointBody::METHOD_ID:
+ case MessageEmptyBody::METHOD_ID:
+ case MessageOkBody::METHOD_ID:
+ // Nothing to do
+ break;
// FIXME aconway 2007-04-03: TODO
- case MessageOkBody::METHOD_ID:
+ case MessageCancelBody::METHOD_ID:
+ case MessageCheckpointBody::METHOD_ID:
case MessageOffsetBody::METHOD_ID:
case MessageQosBody::METHOD_ID:
case MessageRecoverBody::METHOD_ID:
case MessageRejectBody::METHOD_ID:
case MessageResumeBody::METHOD_ID:
- case MessageTransferBody::METHOD_ID:
+ break;
default:
throw Channel::UnknownMethod();
}
@@ -322,10 +387,10 @@ void MessageMessageChannel::setReturnedMessageHandler(
void MessageMessageChannel::setQos(){
channel.sendAndReceive<MessageOkBody>(
- new MessageQosBody(channel.version, 0, channel.getPrefetch(), false));
+ make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)));
if(channel.isTransactional())
channel.sendAndReceive<TxSelectOkBody>(
- new TxSelectBody(channel.version));
+ make_shared_ptr(new TxSelectBody(channel.version)));
}
}} // namespace qpid::client