/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include #include "MessageMessageChannel.h" #include "../framing/AMQMethodBody.h" #include "ClientChannel.h" #include "ReturnedMessageHandler.h" #include "MessageListener.h" #include "../framing/FieldTable.h" #include "Connection.h" #include "../shared_ptr.h" #include namespace qpid { namespace client { using namespace std; using namespace sys; using namespace framing; MessageMessageChannel::MessageMessageChannel(Channel& ch) : channel(ch), tagCount(0) {} string MessageMessageChannel::newTag() { Mutex::ScopedLock l(lock); return (boost::format("__tag%d")%++tagCount).str(); } void MessageMessageChannel::consume( Queue& queue, std::string& tag, MessageListener* /*listener*/, AckMode ackMode, bool noLocal, bool /*synch*/, const FieldTable* fields) { if (tag.empty()) tag = newTag(); channel.sendAndReceive( make_shared_ptr(new MessageConsumeBody( channel.getVersion(), 0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, fields ? *fields : FieldTable()))); // // FIXME aconway 2007-02-20: Race condition! // // We could receive the first message for the consumer // // before we create the consumer below. // // Move consumer creation to handler for MessageConsumeOkBody // { // Mutex::ScopedLock l(lock); // ConsumerMap::iterator i = consumers.find(tag); // if (i != consumers.end()) // THROW_QPID_ERROR(CLIENT_ERROR, // "Consumer already exists with tag="+tag); // Consumer& c = consumers[tag]; // c.listener = listener; // c.ackMode = ackMode; // c.lastDeliveryTag = 0; // } } void MessageMessageChannel::cancel(const std::string& /*tag*/, bool /*synch*/) { // FIXME aconway 2007-02-23: // Consumer c; // { // Mutex::ScopedLock l(lock); // ConsumerMap::iterator i = consumers.find(tag); // if (i == consumers.end()) // return; // c = i->second; // consumers.erase(i); // } // if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) // channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); // channel.sendAndReceiveSync( // synch, new MessageCancelBody(channel.version, tag, !synch)); } void MessageMessageChannel::close(){ // FIXME aconway 2007-02-23: // ConsumerMap consumersCopy; // { // Mutex::ScopedLock l(lock); // consumersCopy = consumers; // consumers.clear(); // } // for (ConsumerMap::iterator i=consumersCopy.begin(); // i != consumersCopy.end(); ++i) // { // Consumer& c = i->second; // if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) // && c.lastDeliveryTag > 0) // { // channel.send(new MessageAckBody(channel.version, c.lastDeliveryTag, true)); // } // } // incoming.shutdown(); } /** Destination ID for the current get. * Must not clash with a generated consumer ID. * TODO aconway 2007-03-06: support multiple outstanding gets? */ const string getDestinationId("__get__"); /** * A destination that provides a Correlator::Action to handle * MessageEmpty responses. */ struct MessageGetDestination : public IncomingMessage::WaitableDestination { void response(shared_ptr response) { if (response->amqpClassId() == MessageOkBody::CLASS_ID) { switch (response->amqpMethodId()) { case MessageOkBody::METHOD_ID: // Nothing to do, wait for transfer. return; case MessageEmptyBody::METHOD_ID: empty(); // Wake up waiter with empty queue. return; } } throw QPID_ERROR(PROTOCOL_ERROR, "Invalid response"); } Correlator::Action action() { return boost::bind(&MessageGetDestination::response, this, _1); } }; bool MessageMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { Mutex::ScopedLock l(lock); std::string destName=newTag(); MessageGetDestination dest; incoming.addDestination(destName, dest); channel.send( make_shared_ptr( new MessageGetBody( channel.version, 0, queue.getName(), destName, ackMode)), dest.action()); return dest.wait(msg); } /** Convert a message to a transfer command. */ MessageTransferBody::shared_ptr makeTransfer( ProtocolVersion version, const Message& msg, const string& destination, const std::string& routingKey, bool mandatory, bool immediate) { return MessageTransferBody::shared_ptr( new MessageTransferBody( version, 0, // FIXME aconway 2007-04-03: ticket. destination, msg.isRedelivered(), immediate, 0, // FIXME aconway 2007-02-23: ttl msg.getPriority(), msg.getTimestamp(), static_cast(msg.getDeliveryMode()), 0, // FIXME aconway 2007-04-03: Expiration string(), // Exchange: for broker use only. routingKey, msg.getMessageId(), msg.getCorrelationId(), msg.getReplyTo(), msg.getContentType(), msg.getContentEncoding(), msg.getUserId(), msg.getAppId(), string(), // FIXME aconway 2007-04-03: TransactionId string(), //FIXME aconway 2007-04-03: SecurityToken msg.getHeaders(), Content(INLINE, msg.getData()), mandatory )); } // FIXME aconway 2007-04-05: Generated code should provide this. /** * Calculate the size of a frame containing the given body type * if all variable-lengths parts are empty. */ template size_t overhead() { static AMQFrame frame( ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion()))); return frame.size(); } void MessageMessageChannel::publish( const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) { MessageTransferBody::shared_ptr transfer = makeTransfer( channel.getVersion(), msg, exchange.getName(), routingKey, mandatory, immediate); // Frame itself uses 8 bytes. u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8; if (transfer->size() <= frameMax) { channel.sendAndReceive(transfer); } else { std::string ref = newTag(); std::string data = transfer->getBody().getValue(); size_t chunk = channel.connection->getMaxFrameSize() - (overhead() + ref.size()); // TODO aconway 2007-04-05: cast around lack of generated setters const_cast(transfer->getBody()) = Content(REFERENCE,ref); channel.send( make_shared_ptr(new MessageOpenBody(channel.version, ref))); channel.send(transfer); const char* p = data.data(); const char* end = data.data()+data.size(); while (p+chunk <= end) { channel.send( make_shared_ptr( new MessageAppendBody(channel.version, ref, std::string(p, chunk)))); p += chunk; } if (p < end) { channel.send( make_shared_ptr( new MessageAppendBody(channel.version, ref, std::string(p, end-p)))); } channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref))); } } void copy(Message& msg, MessageTransferBody& transfer) { // FIXME aconway 2007-04-05: Verify all required fields // are copied. msg.setContentType(transfer.getContentType()); msg.setContentEncoding(transfer.getContentEncoding()); msg.setHeaders(transfer.getApplicationHeaders()); msg.setDeliveryMode(DeliveryMode(transfer.getDeliveryMode())); msg.setPriority(transfer.getPriority()); msg.setCorrelationId(transfer.getCorrelationId()); msg.setReplyTo(transfer.getReplyTo()); // FIXME aconway 2007-04-05: TTL/Expiration msg.setMessageId(transfer.getMessageId()); msg.setTimestamp(transfer.getTimestamp()); msg.setUserId(transfer.getUserId()); msg.setAppId(transfer.getAppId()); msg.setDestination(transfer.getDestination()); msg.setRedelivered(transfer.getRedelivered()); msg.setDeliveryTag(0); // No meaning in 0-9 if (transfer.getBody().isInline()) msg.setData(transfer.getBody().getValue()); } void MessageMessageChannel::handle(boost::shared_ptr method) { assert(method->amqpClassId() ==MessageTransferBody::CLASS_ID); switch(method->amqpMethodId()) { case MessageAppendBody::METHOD_ID: { MessageAppendBody::shared_ptr append = shared_polymorphic_downcast(method); incoming.appendReference(append->getReference(), append->getBytes()); break; } case MessageOpenBody::METHOD_ID: { MessageOpenBody::shared_ptr open = shared_polymorphic_downcast(method); incoming.openReference(open->getReference()); break; } case MessageCloseBody::METHOD_ID: { MessageCloseBody::shared_ptr close = shared_polymorphic_downcast(method); incoming.closeReference(close->getReference()); break; } case MessageTransferBody::METHOD_ID: { MessageTransferBody::shared_ptr transfer= shared_polymorphic_downcast(method); if (transfer->getBody().isInline()) { Message msg; copy(msg, *transfer); // Deliver it. incoming.getDestination(transfer->getDestination()).message(msg); } else { Message& msg=incoming.createMessage( transfer->getDestination(), transfer->getBody().getValue()); copy(msg, *transfer); // Will be delivered when reference closes. } break; } case MessageEmptyBody::METHOD_ID: case MessageOkBody::METHOD_ID: // Nothing to do break; // FIXME aconway 2007-04-03: TODO case MessageCancelBody::METHOD_ID: case MessageCheckpointBody::METHOD_ID: case MessageOffsetBody::METHOD_ID: case MessageQosBody::METHOD_ID: case MessageRecoverBody::METHOD_ID: case MessageRejectBody::METHOD_ID: case MessageResumeBody::METHOD_ID: break; default: throw Channel::UnknownMethod(); } } void MessageMessageChannel::handle(AMQHeaderBody::shared_ptr ){ throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); } void MessageMessageChannel::handle(AMQContentBody::shared_ptr ){ throw QPID_ERROR(INTERNAL_ERROR, "Basic protocol not supported"); } // FIXME aconway 2007-02-23: // void MessageMessageChannel::deliver(IncomingMessage::Destination& consumer, Message& msg){ // //record delivery tag: // consumer.lastDeliveryTag = msg.getDeliveryTag(); // //allow registered listener to handle the message // consumer.listener->received(msg); // if(channel.isOpen()){ // bool multiple(false); // switch(consumer.ackMode){ // case LAZY_ACK: // multiple = true; // if(++(consumer.count) < channel.getPrefetch()) // break; // //else drop-through // case AUTO_ACK: // consumer.lastDeliveryTag = 0; // channel.send( // new MessageAckBody( // channel.version, msg.getDeliveryTag(), multiple)); // case NO_ACK: // Nothing to do // case CLIENT_ACK: // User code must ack. // break; // // TODO aconway 2007-02-22: Provide a way for user // // to ack! // } // } // //as it stands, transactionality is entirely orthogonal to ack // //mode, though the acks will not be processed by the broker under // //a transaction until it commits. // } void MessageMessageChannel::run() { // FIXME aconway 2007-02-23: // while(channel.isOpen()) { // try { // Message msg = incoming.waitDispatch(); // if(msg.getMethod()->isA()) { // ReturnedMessageHandler* handler=0; // { // Mutex::ScopedLock l(lock); // handler=returnsHandler; // } // if(handler == 0) { // // TODO aconway 2007-02-20: proper logging. // cout << "Message returned: " << msg.getData() << endl; // } // else // handler->returned(msg); // } // else { // MessageDeliverBody::shared_ptr deliverBody = // boost::shared_polymorphic_downcast( // msg.getMethod()); // std::string tag = deliverBody->getConsumerTag(); // Consumer consumer; // { // Mutex::ScopedLock l(lock); // ConsumerMap::iterator i = consumers.find(tag); // if(i == consumers.end()) // THROW_QPID_ERROR(PROTOCOL_ERROR+504, // "Unknown consumer tag=" + tag); // consumer = i->second; // } // deliver(consumer, msg); // } // } // catch (const ShutdownException&) { // /* Orderly shutdown */ // } // catch (const Exception& e) { // // FIXME aconway 2007-02-20: Report exception to user. // cout << "client::Message::run() terminated by: " << e.toString() // << "(" << typeid(e).name() << ")" << endl; // } // } } void MessageMessageChannel::setReturnedMessageHandler( ReturnedMessageHandler* ) { throw QPID_ERROR(INTERNAL_ERROR, "Message class does not support returns"); } void MessageMessageChannel::setQos(){ channel.sendAndReceive( make_shared_ptr(new MessageQosBody(channel.version, 0, channel.getPrefetch(), false))); if(channel.isTransactional()) channel.sendAndReceive( make_shared_ptr(new TxSelectBody(channel.version))); } }} // namespace qpid::client