diff options
Diffstat (limited to 'qpid/cpp/src/client')
| -rw-r--r-- | qpid/cpp/src/client/BasicMessageChannel.cpp | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/client/ClientChannel.cpp | 35 | ||||
| -rw-r--r-- | qpid/cpp/src/client/ClientChannel.h | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/client/ClientConnection.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/client/ClientMessage.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/client/MessageMessageChannel.cpp | 103 |
6 files changed, 118 insertions, 47 deletions
diff --git a/qpid/cpp/src/client/BasicMessageChannel.cpp b/qpid/cpp/src/client/BasicMessageChannel.cpp index 9e3d184673..c577c0a305 100644 --- a/qpid/cpp/src/client/BasicMessageChannel.cpp +++ b/qpid/cpp/src/client/BasicMessageChannel.cpp @@ -81,10 +81,10 @@ void BasicMessageChannel::consume( BasicConsumeOkBody::shared_ptr ok = channel.sendAndReceiveSync<BasicConsumeOkBody>( synch, - new BasicConsumeBody( + make_shared_ptr(new BasicConsumeBody( channel.version, 0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); + fields ? *fields : FieldTable()))); tag = ok->getConsumerTag(); } @@ -102,7 +102,7 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(channel.version, tag, !synch)); + synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); } void BasicMessageChannel::close(){ @@ -337,9 +337,9 @@ void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* hand void BasicMessageChannel::setQos(){ channel.sendAndReceive<BasicQosOkBody>( - new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); + make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); + channel.sendAndReceive<TxSelectOkBody>(make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client diff --git a/qpid/cpp/src/client/ClientChannel.cpp b/qpid/cpp/src/client/ClientChannel.cpp index 99eece46bc..533b590010 100644 --- a/qpid/cpp/src/client/ClientChannel.cpp +++ b/qpid/cpp/src/client/ClientChannel.cpp @@ -60,7 +60,7 @@ void Channel::open(ChannelId id, Connection& con) init(id, con, con.getVersion()); // ChannelAdapter initialization. string oob; if (id != 0) - sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); + sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob))); } void Channel::protocolInit( @@ -77,10 +77,10 @@ void Channel::protocolInit( string locale("en_US"); ConnectionTuneBody::shared_ptr proposal = sendAndReceive<ConnectionTuneBody>( - new ConnectionStartOkBody( + make_shared_ptr(new ConnectionStartOkBody( version, connectionStart->getRequestId(), props, mechanism, - response, locale)); + response, locale))); /** * Assume for now that further challenges will not be required @@ -136,15 +136,15 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ FieldTable args; sendAndReceiveSync<ExchangeDeclareOkBody>( synch, - new ExchangeDeclareBody( - version, 0, name, type, false, false, false, false, !synch, args)); + make_shared_ptr(new ExchangeDeclareBody( + version, 0, name, type, false, false, false, false, !synch, args))); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); sendAndReceiveSync<ExchangeDeleteOkBody>( synch, - new ExchangeDeleteBody(version, 0, name, false, !synch)); + make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false, !synch))); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -153,9 +153,9 @@ void Channel::declareQueue(Queue& queue, bool synch){ QueueDeclareOkBody::shared_ptr response = sendAndReceiveSync<QueueDeclareOkBody>( synch, - new QueueDeclareBody( + make_shared_ptr(new QueueDeclareBody( version, 0, name, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args)); + queue.isExclusive(), queue.isAutoDelete(), !synch, args))); if(synch) { if(queue.getName().length() == 0) queue.setName(response->getQueue()); @@ -167,7 +167,7 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) string name = queue.getName(); sendAndReceiveSync<QueueDeleteOkBody>( synch, - new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); + make_shared_ptr(new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch))); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ @@ -175,15 +175,15 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri string q = queue.getName(); sendAndReceiveSync<QueueBindOkBody>( synch, - new QueueBindBody(version, 0, q, e, key,!synch, args)); + make_shared_ptr(new QueueBindBody(version, 0, q, e, key,!synch, args))); } void Channel::commit(){ - sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); + sendAndReceive<TxCommitOkBody>(make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version)); + sendAndReceive<TxRollbackOkBody>(make_shared_ptr(new TxRollbackBody(version))); } void Channel::handleMethodInContext( @@ -203,7 +203,8 @@ void Channel::handleMethodInContext( } try { switch (method->amqpClassId()) { - case BasicDeliverBody::CLASS_ID: messaging->handle(method); break; + case MessageOkBody::CLASS_ID: + case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; default: throw UnknownMethod(); @@ -261,8 +262,8 @@ void Channel::close( try { if (getId() != 0) { sendAndReceive<ChannelCloseOkBody>( - new ChannelCloseBody( - version, code, text, classId, methodId)); + make_shared_ptr(new ChannelCloseBody( + version, code, text, classId, methodId))); } static_cast<ConnectionForChannel*>(connection)->erase(getId()); closeInternal(); @@ -292,7 +293,7 @@ void Channel::closeInternal() { } AMQMethodBody::shared_ptr Channel::sendAndReceive( - AMQMethodBody* toSend, ClassId c, MethodId m) + AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) { responses.expect(); send(toSend); @@ -300,7 +301,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive( } AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( - bool sync, AMQMethodBody* body, ClassId c, MethodId m) + bool sync, AMQMethodBody::shared_ptr body, ClassId c, MethodId m) { if(sync) return sendAndReceive(body, c, m); diff --git a/qpid/cpp/src/client/ClientChannel.h b/qpid/cpp/src/client/ClientChannel.h index cf2ea1dbe5..328fc23f68 100644 --- a/qpid/cpp/src/client/ClientChannel.h +++ b/qpid/cpp/src/client/ClientChannel.h @@ -56,6 +56,7 @@ class Channel : public framing::ChannelAdapter { private: struct UnknownMethod {}; + typedef shared_ptr<framing::AMQMethodBody> MethodPtr; sys::Mutex lock; boost::scoped_ptr<MessageChannel> messaging; @@ -82,21 +83,23 @@ class Channel : public framing::ChannelAdapter const std::string& vhost); framing::AMQMethodBody::shared_ptr sendAndReceive( - framing::AMQMethodBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody::shared_ptr, + framing::ClassId, framing::MethodId); framing::AMQMethodBody::shared_ptr sendAndReceiveSync( bool sync, - framing::AMQMethodBody*, framing::ClassId, framing::MethodId); + framing::AMQMethodBody::shared_ptr, + framing::ClassId, framing::MethodId); template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { + boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) { return boost::shared_polymorphic_downcast<BodyType>( sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } template <class BodyType> boost::shared_ptr<BodyType> sendAndReceiveSync( - bool sync, framing::AMQMethodBody* body) { + bool sync, framing::AMQMethodBody::shared_ptr body) { return boost::shared_polymorphic_downcast<BodyType>( sendAndReceiveSync( sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); diff --git a/qpid/cpp/src/client/ClientConnection.cpp b/qpid/cpp/src/client/ClientConnection.cpp index 365311ab37..b053a45b0f 100644 --- a/qpid/cpp/src/client/ClientConnection.cpp +++ b/qpid/cpp/src/client/ClientConnection.cpp @@ -87,8 +87,8 @@ void Connection::close( // partly closed with threads left unjoined. isOpen = false; channel0.sendAndReceive<ConnectionCloseOkBody>( - new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId)); + make_shared_ptr(new ConnectionCloseBody( + getVersion(), code, msg, classId, methodId))); using boost::bind; for_each(channels.begin(), channels.end(), diff --git a/qpid/cpp/src/client/ClientMessage.h b/qpid/cpp/src/client/ClientMessage.h index dc25b4070b..35aed6c734 100644 --- a/qpid/cpp/src/client/ClientMessage.h +++ b/qpid/cpp/src/client/ClientMessage.h @@ -33,6 +33,8 @@ namespace client { * * \ingroup clientapi */ +// FIXME aconway 2007-04-05: Should be based on MessageTransfer properties not +// basic header properties. class Message : public framing::BasicHeaderProperties { public: Message(const std::string& data_=std::string()) : data(data_) {} diff --git a/qpid/cpp/src/client/MessageMessageChannel.cpp b/qpid/cpp/src/client/MessageMessageChannel.cpp index 25fbb95413..164a1cb426 100644 --- a/qpid/cpp/src/client/MessageMessageChannel.cpp +++ b/qpid/cpp/src/client/MessageMessageChannel.cpp @@ -25,6 +25,7 @@ #include "../framing/FieldTable.h" #include "Connection.h" #include "../shared_ptr.h" +#include <boost/bind.hpp> namespace qpid { namespace client { @@ -48,9 +49,9 @@ void MessageMessageChannel::consume( if (tag.empty()) tag = newTag(); channel.sendAndReceive<MessageOkBody>( - new MessageConsumeBody( + make_shared_ptr(new MessageConsumeBody( channel.getVersion(), 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, fields ? *fields : FieldTable())); + ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); // // FIXME aconway 2007-02-20: Race condition! // // We could receive the first message for the consumer @@ -115,16 +116,44 @@ void MessageMessageChannel::close(){ */ const string getDestinationId("__get__"); +/** + * A destination that provides a Correlator::Action to handle + * MessageEmpty responses. + */ +struct MessageGetDestination : public IncomingMessage::WaitableDestination +{ + void response(shared_ptr<AMQResponseBody> response) { + if (response->amqpClassId() == MessageOkBody::CLASS_ID) { + switch (response->amqpMethodId()) { + case MessageOkBody::METHOD_ID: + // Nothing to do, wait for transfer. + return; + case MessageEmptyBody::METHOD_ID: + empty(); // Wake up waiter with empty queue. + return; + } + } + throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); + } + + Correlator::Action action() { + return boost::bind(&MessageGetDestination::response, this, _1); + } +}; + bool MessageMessageChannel::get( - Message& , const Queue& , AckMode ) + Message& msg, const Queue& queue, AckMode ackMode) { Mutex::ScopedLock l(lock); -// incoming.addDestination(getDestinationId, getDest); -// channel.send( -// new MessageGetBody( -// channel.version, 0, queue.getName(), getDestinationId, ackMode)); -// return getDest.wait(msg); - return false; + std::string destName=newTag(); + MessageGetDestination dest; + incoming.addDestination(destName, dest); + channel.send( + make_shared_ptr( + new MessageGetBody( + channel.version, 0, queue.getName(), destName, ackMode)), + dest.action()); + return dest.wait(msg); } @@ -176,9 +205,30 @@ void MessageMessageChannel::publish( // FIXME aconway 2007-02-23: throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented"); } - channel.sendAndReceive<MessageOkBody>(transfer.get()); + channel.sendAndReceive<MessageOkBody>(transfer); } +void copy(Message& msg, MessageTransferBody& transfer) { + // FIXME aconway 2007-04-05: Verify all required fields + // are copied. + msg.setContentType(transfer.getContentType()); + msg.setContentEncoding(transfer.getContentEncoding()); + msg.setHeaders(transfer.getApplicationHeaders()); + msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); + msg.setPriority(transfer.getPriority()); + msg.setCorrelationId(transfer.getCorrelationId()); + msg.setReplyTo(transfer.getReplyTo()); + // FIXME aconway 2007-04-05: TTL/Expiration + msg.setMessageId(transfer.getMessageId()); + msg.setTimestamp(transfer.getTimestamp()); + msg.setUserId(transfer.getUserId()); + msg.setAppId(transfer.getAppId()); + msg.setDestination(transfer.getDestination()); + msg.setRedelivered(transfer.getRedelivered()); + msg.setDeliveryTag(0); // No meaning in 0-9 + if (transfer.getBody().isInline()) + msg.setData(transfer.getBody().getValue()); +} void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); @@ -203,23 +253,38 @@ void MessageMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { break; } - case MessageEmptyBody::METHOD_ID: { - // FIXME aconway 2007-04-04: - // getDest.empty(); + case MessageTransferBody::METHOD_ID: { + MessageTransferBody::shared_ptr transfer= + shared_polymorphic_downcast<MessageTransferBody>(method); + if (transfer->getBody().isInline()) { + Message msg; + copy(msg, *transfer); + // Deliver it. + incoming.getDestination(transfer->getDestination()).message(msg); + } + else { + Message& msg=incoming.createMessage( + transfer->getDestination(), transfer->getBody().getValue()); + copy(msg, *transfer); + // Will be delivered when reference closes. + } break; } - case MessageCancelBody::METHOD_ID: - case MessageCheckpointBody::METHOD_ID: + case MessageEmptyBody::METHOD_ID: + case MessageOkBody::METHOD_ID: + // Nothing to do + break; // FIXME aconway 2007-04-03: TODO - case MessageOkBody::METHOD_ID: + case MessageCancelBody::METHOD_ID: + case MessageCheckpointBody::METHOD_ID: case MessageOffsetBody::METHOD_ID: case MessageQosBody::METHOD_ID: case MessageRecoverBody::METHOD_ID: case MessageRejectBody::METHOD_ID: case MessageResumeBody::METHOD_ID: - case MessageTransferBody::METHOD_ID: + break; default: throw Channel::UnknownMethod(); } @@ -322,10 +387,10 @@ void MessageMessageChannel::setReturnedMessageHandler( void MessageMessageChannel::setQos(){ channel.sendAndReceive<MessageOkBody>( - new MessageQosBody(channel.version, 0, channel.getPrefetch(), false)); + make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) channel.sendAndReceive<TxSelectOkBody>( - new TxSelectBody(channel.version)); + make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client |
