diff options
| author | Gordon Sim <gsim@apache.org> | 2007-08-28 19:38:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-08-28 19:38:17 +0000 |
| commit | 9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch) | |
| tree | 26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/qpid/broker | |
| parent | 6b09696b216c090b512c6af92bf7976ae3407add (diff) | |
| download | qpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz | |
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
50 files changed, 892 insertions, 1828 deletions
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h index be01c5e02c..b53f4a8ba5 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.h +++ b/cpp/src/qpid/broker/AccumulatedAck.h @@ -48,13 +48,12 @@ namespace qpid { class AccumulatedAck { public: /** - * If not zero, then everything up to this value has been - * acked. + * Everything up to this value has been acked. */ DeliveryId mark; /** - * List of individually acked messages that are not - * included in the range marked by 'range'. + * List of individually acked messages greater than the + * 'mark'. */ std::list<Range> ranges; diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index b733f77390..07b7b4f638 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -21,6 +21,7 @@ #include "BrokerChannel.h" #include "Connection.h" #include "DeliveryToken.h" +#include "MessageDelivery.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" @@ -327,7 +328,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //need to generate name here, so we have it for the adapter (it is //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); - DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag)); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields); if(!nowait) client.consumeOk(newTag); @@ -340,21 +341,9 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ channel.cancel(consumerTag); } -void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool rejectUnroutable, bool immediate) -{ - - // exeption moved to ChannelAdaptor -- TODO this code should be removed once basic is removed - - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate); - channel.handlePublish(msg); - -} - void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); - DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue)); + DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); if(!channel.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack @@ -384,7 +373,7 @@ void BrokerAdapter::TxHandlerImpl::select() void BrokerAdapter::TxHandlerImpl::commit() { - channel.commit(); + channel.commit(&broker.getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 99b7f14525..9e0cf64b7f 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -183,9 +183,6 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations bool noLocal, bool noAck, bool exclusive, bool nowait, const qpid::framing::FieldTable& fields); void cancel(const std::string& consumerTag); - void publish(uint16_t ticket, - const std::string& exchange, const std::string& routingKey, - bool rejectUnroutable, bool immediate); void get(uint16_t ticket, const std::string& queue, bool noAck); void ack(uint64_t deliveryTag, bool multiple); void reject(uint64_t deliveryTag, bool requeue); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 9712b3903f..615a26beab 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -32,13 +32,12 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" -#include "BrokerMessage.h" #include "BrokerQueue.h" #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" #include "DtxTimeout.h" -#include "MessageStore.h" +#include "Message.h" #include "TxAck.h" #include "TxPublish.h" @@ -49,7 +48,7 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) : +Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id) : id(_id), connection(con), out(_out), @@ -58,8 +57,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS tagGenerator("sgen"), dtxSelected(false), accumulatedAck(0), - store(_store), - messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened flowActive(true) { @@ -108,7 +105,7 @@ void Channel::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Channel::commit() +void Channel::commit(MessageStore* const store) { if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); @@ -296,34 +293,7 @@ void Channel::ConsumerImpl::requestDispatch() queue->requestDispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - complete(msg); -} - -void Channel::handlePublish(Message* _message) -{ - Message::shared_ptr message(_message); - messageBuilder.initialise(message); -} - -void Channel::handleHeader(AMQHeaderBody* header) -{ - messageBuilder.setHeader(header); - //at this point, decide based on the size of the message whether we want - //to stage it by saving content directly to disk as it arrives -} - -void Channel::handleContent(AMQContentBody* content) -{ - messageBuilder.addContent(content); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody*) { - // TODO aconway 2007-01-17: Implement heartbeating. -} - -void Channel::complete(Message::shared_ptr msg) { +void Channel::handle(Message::shared_ptr msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); @@ -335,20 +305,12 @@ void Channel::complete(Message::shared_ptr msg) { } } - - void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { - - std::string routeToExchangeName = msg->getExchange(); - // cache the exchange lookup - if (!cacheExchange.get() || cacheExchangeName != routeToExchangeName){ - cacheExchangeName = routeToExchangeName; - cacheExchange = connection.broker.getExchanges().get(routeToExchangeName); + std::string exchangeName = msg->getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName){ + cacheExchange = connection.broker.getExchanges().get(exchangeName); } - if (!cacheExchange.get() ) - throw ChannelException(404, "Exchange not found '" + routeToExchangeName + "'"); - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); if (!strategy.delivered) { diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index fcfcd73679..cdbab37ebc 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -37,14 +37,12 @@ #include "Deliverable.h" #include "DtxBuffer.h" #include "DtxManager.h" -#include "MessageBuilder.h" #include "NameGenerator.h" #include "Prefetch.h" #include "TxBuffer.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelOpenBody.h" -#include "CompletionHandler.h" namespace qpid { namespace broker { @@ -60,7 +58,7 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : public CompletionHandler +class Channel { class ConsumerImpl : public Consumer { @@ -113,25 +111,22 @@ class Channel : public CompletionHandler DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; AccumulatedAck accumulatedAck; - MessageStore* const store; - MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - std::string cacheExchangeName; // pair holds last exchange used for routing - Exchange::shared_ptr cacheExchange; + boost::shared_ptr<Exchange> cacheExchange; void route(Message::shared_ptr msg, Deliverable& strategy); - void complete(Message::shared_ptr msg);// completion handler for MessageBuilder void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void acknowledged(const DeliveryRecord&); - + + public: - Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0); + Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id); ~Channel(); bool isOpen() const { return opened; } @@ -162,7 +157,7 @@ class Channel : public CompletionHandler bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); - void commit(); + void commit(MessageStore* const store); void rollback(); void selectDtx(); void startDtx(const std::string& xid, DtxManager& mgr, bool join); @@ -174,12 +169,8 @@ class Channel : public CompletionHandler void recover(bool requeue); void flow(bool active); void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); - void handlePublish(Message* msg); - void handleHeader(framing::AMQHeaderBody*); - void handleContent(framing::AMQContentBody*); - void handleHeartbeat(framing::AMQHeartbeatBody*); - - void handleInlineTransfer(Message::shared_ptr msg); + + void handle(Message::shared_ptr msg); }; }} // namespace broker diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h index 91c295e1b7..c3dd7b998d 100644 --- a/cpp/src/qpid/broker/BrokerExchange.h +++ b/cpp/src/qpid/broker/BrokerExchange.h @@ -51,7 +51,7 @@ namespace qpid { : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} virtual ~Exchange(){} - string getName() const { return name; } + const string& getName() const { return name; } bool isDurable() { return durable; } qpid::framing::FieldTable& getArgs() { return args; } diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp deleted file mode 100644 index bddd5802cf..0000000000 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ /dev/null @@ -1,299 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/cast.hpp> - -#include "BrokerMessage.h" -#include <iostream> - -#include "InMemoryContent.h" -#include "LazyLoadedContent.h" -#include "MessageStore.h" -#include "BrokerQueue.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/BasicGetOkBody.h" -#include "qpid/framing/BasicPublishBody.h" -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelAdapter.h" -#include "RecoveryManagerImpl.h" - -namespace qpid{ -namespace broker{ - -struct BasicGetToken : DeliveryToken -{ - typedef boost::shared_ptr<BasicGetToken> shared_ptr; - - Queue::shared_ptr queue; - - BasicGetToken(Queue::shared_ptr q) : queue(q) {} -}; - -struct BasicConsumeToken : DeliveryToken -{ - typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; - - const string consumer; - - BasicConsumeToken(const string c) : consumer(c) {} -}; - -} -} - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -BasicMessage::BasicMessage( - const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate -) : - Message(_publisher, _exchange, _routingKey, _mandatory, _immediate), - size(0) -{} - -// For tests only. -BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {} - -BasicMessage::~BasicMessage(){} - -void BasicMessage::setHeader(AMQHeaderBody* _header){ - if (_header) { - this->header = *_header; - isHeaderSet = true; - } - else - isHeaderSet = false; -} - -void BasicMessage::addContent(AMQContentBody* data){ - if (!content.get()) { - content = std::auto_ptr<Content>(new InMemoryContent()); - } - content->add(data); - size += data->size(); -} - -bool BasicMessage::isComplete(){ - return isHeaderSet && (header.getContentSize() == contentSize()); -} - -DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) -{ - return DeliveryToken::shared_ptr(new BasicGetToken(queue)); -} - -DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer) -{ - return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); -} - -void BasicMessage::deliver(ChannelAdapter& channel, - const string& consumerTag, DeliveryId id, - uint32_t framesize) -{ - channel.send(BasicDeliverBody( - channel.getVersion(), consumerTag, id.getValue(), - getRedelivered(), getExchange(), getRoutingKey())); - sendContent(channel, framesize); -} - -void BasicMessage::sendGetOk(ChannelAdapter& channel, - uint32_t messageCount, - DeliveryId id, - uint32_t framesize) -{ - channel.send( - BasicGetOkBody( - channel.getVersion(), - id.getValue(), getRedelivered(), getExchange(), - getRoutingKey(), messageCount)); - sendContent(channel, framesize); -} - -void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize) -{ - BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token); - if (consume) { - deliver(channel, consume->consumer, id, framesize); - } else { - BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token); - if (get) { - sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize); - } else { - //TODO: - //either need to be able to convert to a message transfer or - //throw error of some kind to allow this to be handled higher up - throw Exception("Conversion to BasicMessage not defined!"); - } - } -} - -void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) -{ - channel.send(header); - Mutex::ScopedLock locker(contentLock); - if (content.get()) - content->send(channel, framesize); -} - -BasicHeaderProperties* BasicMessage::getHeaderProperties(){ - return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0; -} - -const FieldTable& BasicMessage::getApplicationHeaders(){ - return getHeaderProperties()->getHeaders(); -} - -bool BasicMessage::isPersistent() -{ - if(!isHeaderSet) return false; - BasicHeaderProperties* props = getHeaderProperties(); - return props && props->getDeliveryMode() == PERSISTENT; -} - -void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) -{ - decodeHeader(buffer); - if (!headersOnly) decodeContent(buffer, contentChunkSize); -} - -void BasicMessage::decodeHeader(Buffer& buffer) -{ - //don't care about the type here, but want encode/decode to be symmetric - RecoveryManagerImpl::decodeMessageType(buffer); - - string exchange; - string routingKey; - - buffer.getShortString(exchange); - buffer.getShortString(routingKey); - setRouting(exchange, routingKey); - - uint32_t headerSize = buffer.getLong(); - AMQHeaderBody headerBody; - headerBody.decode(buffer, headerSize); - setHeader(&headerBody); -} - -void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) -{ - uint64_t expected = expectedContentSize(); - if (expected != buffer.available()) { - QPID_LOG(error, "Expected " << expectedContentSize() << " bytes, got " << buffer.available()); - throw Exception("Cannot decode content, buffer not large enough."); - } - - if (!chunkSize || chunkSize > expected) { - chunkSize = expected; - } - - uint64_t total = 0; - while (total < expectedContentSize()) { - uint64_t remaining = expected - total; - AMQContentBody contentBody; - contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize); - addContent(&contentBody); - total += chunkSize; - } -} - -void BasicMessage::encode(Buffer& buffer) const -{ - encodeHeader(buffer); - encodeContent(buffer); -} - -void BasicMessage::encodeHeader(Buffer& buffer) const -{ - RecoveryManagerImpl::encodeMessageType(*this, buffer); - buffer.putShortString(getExchange()); - buffer.putShortString(getRoutingKey()); - buffer.putLong(header.size()); - header.encode(buffer); -} - -void BasicMessage::encodeContent(Buffer& buffer) const -{ - Mutex::ScopedLock locker(contentLock); - if (content.get()) content->encode(buffer); -} - -uint32_t BasicMessage::encodedSize() const -{ - return encodedHeaderSize() + encodedContentSize(); -} - -uint32_t BasicMessage::encodedContentSize() const -{ - Mutex::ScopedLock locker(contentLock); - return content.get() ? content->size() : 0; -} - -uint32_t BasicMessage::encodedHeaderSize() const -{ - return RecoveryManagerImpl::encodedMessageTypeSize() - +getExchange().size() + 1 - + getRoutingKey().size() + 1 - + header.size() + 4;//4 extra bytes for size -} - -uint64_t BasicMessage::expectedContentSize() -{ - return isHeaderSet ? header.getContentSize() : 0; -} - -void BasicMessage::releaseContent(MessageStore* store) -{ - Mutex::ScopedLock locker(contentLock); - if (!isPersistent() && getPersistenceId() == 0) { - store->stage(*this); - } - if (!content.get() || content->size() > 0) { - //set content to lazy loading mode (but only if there is - //stored content): - - //Note: the LazyLoadedContent instance contains a raw pointer - //to the message, however it is then set as a member of that - //message so its lifetime is guaranteed to be no longer than - //that of the message itself - content = std::auto_ptr<Content>( - new LazyLoadedContent(store, this, expectedContentSize())); - } -} - -void BasicMessage::setContent(std::auto_ptr<Content>& _content) -{ - Mutex::ScopedLock locker(contentLock); - content = _content; -} - - -uint32_t BasicMessage::getRequiredCredit() const -{ - return header.size() + contentSize(); -} diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h deleted file mode 100644 index 0f46ff2e83..0000000000 --- a/cpp/src/qpid/broker/BrokerMessage.h +++ /dev/null @@ -1,146 +0,0 @@ -#ifndef _broker_BrokerMessage_h -#define _broker_BrokerMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <boost/shared_ptr.hpp> - -#include "BrokerMessageBase.h" -#include "qpid/framing/BasicHeaderProperties.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "ConnectionToken.h" -#include "Content.h" -#include "qpid/sys/Mutex.h" -#include "TxBuffer.h" - -namespace qpid { - -namespace framing { -class ChannelAdapter; -class AMQHeaderBody; -} - -namespace broker { - -class MessageStore; -class Queue; -using framing::string; - -/** - * Represents an AMQP message, i.e. a header body, a list of - * content bodies and some details about the publication - * request. - */ -class BasicMessage : public Message { - framing::AMQHeaderBody header; - bool isHeaderSet; - std::auto_ptr<Content> content; - mutable sys::Mutex contentLock; - uint64_t size; - - void sendContent(framing::ChannelAdapter&, uint32_t framesize); - - public: - typedef boost::shared_ptr<BasicMessage> shared_ptr; - - BasicMessage(const ConnectionToken* const publisher, - const string& exchange, const string& routingKey, - bool mandatory, bool immediate); - BasicMessage(); - ~BasicMessage(); - void setHeader(framing::AMQHeaderBody* header); - void addContent(framing::AMQContentBody* data); - bool isComplete(); - - static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue); - static DeliveryToken::shared_ptr createConsumeToken(const string& consumer); - void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); - - void deliver(framing::ChannelAdapter&, - const string& consumerTag, - DeliveryId deliveryTag, - uint32_t framesize); - - void sendGetOk(framing::ChannelAdapter& channel, - uint32_t messageCount, - DeliveryId deliveryTag, - uint32_t framesize); - - framing::BasicHeaderProperties* getHeaderProperties(); - const framing::FieldTable& getApplicationHeaders(); - bool isPersistent(); - uint64_t contentSize() const { return size; } - - void decode(framing::Buffer& buffer, bool headersOnly = false, - uint32_t contentChunkSize = 0); - void decodeHeader(framing::Buffer& buffer); - void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); - - void encode(framing::Buffer& buffer) const; - void encodeHeader(framing::Buffer& buffer) const; - void encodeContent(framing::Buffer& buffer) const; - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - */ - uint32_t encodedSize() const; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - */ - uint32_t encodedHeaderSize() const; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - uint32_t encodedContentSize() const; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - uint64_t expectedContentSize(); - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - void setContent(std::auto_ptr<Content>& content); - - /** - * Returns the byte credits required to transfer this message. - */ - uint32_t getRequiredCredit() const; -}; - -} -} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h deleted file mode 100644 index bac5dc6386..0000000000 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ /dev/null @@ -1,168 +0,0 @@ -#ifndef _broker_BrokerMessageBase_h -#define _broker_BrokerMessageBase_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include <boost/shared_ptr.hpp> -#include "Content.h" -#include "DeliveryId.h" -#include "DeliveryToken.h" -#include "PersistableMessage.h" -#include "qpid/framing/amqp_types.h" - -namespace qpid { - -namespace framing { -class ChannelAdapter; -class BasicHeaderProperties; -class FieldTable; -class AMQMethodBody; -class AMQContentBody; -class AMQHeaderBody; -} - - -namespace broker { -class ConnectionToken; -class MessageStore; - -/** - * Base class for all types of internal broker messages - * abstracting away the operations - * TODO; AMS: for the moment this is mostly a placeholder - */ -class Message : public PersistableMessage{ - public: - typedef boost::shared_ptr<Message> shared_ptr; - - Message(const ConnectionToken* publisher_, - const std::string& _exchange, - const std::string& _routingKey, - bool _mandatory, bool _immediate) : - publisher(publisher_), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - persistenceId(0), - redelivered(false) - {} - - Message() : - mandatory(false), - immediate(false), - persistenceId(0), - redelivered(false) - {} - - virtual ~Message() {}; - - // Accessors - const std::string& getRoutingKey() const { return routingKey; } - const std::string& getExchange() const { return exchange; } - uint64_t getPersistenceId() const { return persistenceId; } - bool getRedelivered() const { return redelivered; } - - void setRouting(const std::string& _exchange, const std::string& _routingKey) - { exchange = _exchange; routingKey = _routingKey; } - void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } - void redeliver() { redelivered = true; } - - virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/, - DeliveryToken::shared_ptr token, uint32_t framesize) = 0; - - virtual bool isComplete() = 0; - - virtual uint64_t contentSize() const = 0; - virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; - virtual const framing::FieldTable& getApplicationHeaders() = 0; - virtual bool isPersistent() = 0; - virtual const ConnectionToken* getPublisher() const { - return publisher; - } - - virtual uint32_t getRequiredCredit() const = 0; - - virtual void encode(framing::Buffer& buffer) const = 0; - virtual void encodeHeader(framing::Buffer& buffer) const = 0; - - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - */ - virtual uint32_t encodedSize() const = 0; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - */ - virtual uint32_t encodedHeaderSize() const = 0; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - virtual uint32_t encodedContentSize() const = 0; - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - virtual uint64_t expectedContentSize() = 0; - - virtual void decodeHeader(framing::Buffer& buffer) = 0; - virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0; - - static shared_ptr decode(framing::Buffer& buffer); - - // TODO: AMS 29/1/2007 Don't think these are really part of base class - - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(framing::AMQHeaderBody*) {}; - virtual void addContent(framing::AMQContentBody*) {}; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - virtual void releaseContent(MessageStore* /*store*/) {}; - - bool isImmediate() const { return immediate; } - - private: - const ConnectionToken* publisher; - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - mutable uint64_t persistenceId; - bool redelivered; -}; - -}} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp deleted file mode 100644 index 1184885aeb..0000000000 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ /dev/null @@ -1,328 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/QpidError.h" -#include "BrokerMessageMessage.h" -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageOpenBody.h" -#include "qpid/framing/MessageCloseBody.h" -#include "qpid/framing/MessageAppendBody.h" -#include "Reference.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/BasicHeaderProperties.h" -#include "RecoveryManagerImpl.h" - -#include <algorithm> - -using namespace std; -using namespace boost; -using namespace qpid::framing; - -namespace qpid { -namespace broker { - -struct MessageDeliveryToken : public DeliveryToken -{ - const std::string destination; - - MessageDeliveryToken(const std::string& d) : destination(d) {} -}; - -MessageMessage::MessageMessage( - ConnectionToken* publisher, const MessageTransferBody* transfer_ -) : Message(publisher, transfer_->getDestination(), - transfer_->getRoutingKey(), - transfer_->getRejectUnroutable(), - transfer_->getImmediate()), - transfer(*transfer_) -{ - assert(transfer.getBody().isInline()); -} - -MessageMessage::MessageMessage( - ConnectionToken* publisher, const MessageTransferBody* transfer_, - ReferencePtr reference_ -) : Message(publisher, transfer_->getDestination(), - transfer_->getRoutingKey(), - transfer_->getRejectUnroutable(), - transfer_->getImmediate()), - transfer(*transfer_), - reference(reference_) -{ - assert(!transfer.getBody().isInline()); - assert(reference_); -} - -/** - * Currently used by message store impls to recover messages - */ -MessageMessage::MessageMessage() {} - -// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring -void MessageMessage::transferMessage( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize) -{ - const framing::Content& body = transfer.getBody(); - // Send any reference data - ReferencePtr ref= getReference(); - if (ref){ - - // Open - channel.send(MessageOpenBody(channel.getVersion(), ref->getId())); - // Appends - for(Reference::Appends::const_iterator a = ref->getAppends().begin(); - a != ref->getAppends().end(); - ++a) { - uint32_t sizeleft = a->size(); - const string& content = a->getBytes(); - // Calculate overhead bytes - // Assume that the overhead is constant as the reference name doesn't change - uint32_t overhead = sizeleft - content.size(); - string::size_type contentStart = 0; - while (sizeleft) { - string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - - channel.send(MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize))); - sizeleft -= contentSize; - contentStart += contentSize; - } - } - } - - // The transfer - if ( transfer.size()<=framesize ) { - channel.send(MessageTransferBody(ProtocolVersion(), - transfer.getTicket(), - consumerTag, - getRedelivered(), - transfer.getRejectUnroutable(), - transfer.getImmediate(), - transfer.getTtl(), - transfer.getPriority(), - transfer.getTimestamp(), - transfer.getDeliveryMode(), - transfer.getExpiration(), - getExchange(), - getRoutingKey(), - transfer.getMessageId(), - transfer.getCorrelationId(), - transfer.getReplyTo(), - transfer.getContentType(), - transfer.getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer.getUserId(), - transfer.getAppId(), - transfer.getTransactionId(), - transfer.getSecurityToken(), - transfer.getApplicationHeaders(), - body)); - } else { - // Thing to do here is to construct a simple reference message then deliver that instead - // fragmentation will be taken care of in the delivery if necessary; - string content = body.getValue(); - string refname = "dummy"; - MessageTransferBody newTransfer(channel.getVersion(), - transfer.getTicket(), - consumerTag, - getRedelivered(), - transfer.getRejectUnroutable(), - transfer.getImmediate(), - transfer.getTtl(), - transfer.getPriority(), - transfer.getTimestamp(), - transfer.getDeliveryMode(), - transfer.getExpiration(), - getExchange(), - getRoutingKey(), - transfer.getMessageId(), - transfer.getCorrelationId(), - transfer.getReplyTo(), - transfer.getContentType(), - transfer.getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer.getUserId(), - transfer.getAppId(), - transfer.getTransactionId(), - transfer.getSecurityToken(), - transfer.getApplicationHeaders(), - framing::Content(REFERENCE, refname)); - ReferencePtr newRef(new Reference(refname)); - newRef->append(MessageAppendBody(channel.getVersion(), refname, content)); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), &newTransfer, newRef); - newMsg.transferMessage(channel, consumerTag, framesize); - return; - } - // Close any reference data - if (ref) - channel.send(MessageCloseBody(ProtocolVersion(), ref->getId())); -} - - -void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize) -{ - transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize); -} - -void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize) -{ - transferMessage(channel, destination, framesize); -} - -bool MessageMessage::isComplete() -{ - return true; -} - -uint64_t MessageMessage::contentSize() const -{ - if (transfer.getBody().isInline()) - return transfer.getBody().getValue().size(); - else { - assert(getReference()); - return getReference()->getSize(); - } -} - -qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() -{ - return 0; // FIXME aconway 2007-02-05: -} - -const FieldTable& MessageMessage::getApplicationHeaders() -{ - return transfer.getApplicationHeaders(); -} -bool MessageMessage::isPersistent() -{ - return transfer.getDeliveryMode() == PERSISTENT; -} - -uint32_t MessageMessage::encodedSize() const -{ - return encodedHeaderSize() + encodedContentSize(); -} - -uint32_t MessageMessage::encodedHeaderSize() const -{ - return RecoveryManagerImpl::encodedMessageTypeSize() + transfer.size(); -} - -uint32_t MessageMessage::encodedContentSize() const -{ - return 0; -} - -uint64_t MessageMessage::expectedContentSize() -{ - return 0; -} - -void MessageMessage::encode(Buffer& buffer) const -{ - encodeHeader(buffer); -} - -void MessageMessage::encodeHeader(Buffer& buffer) const -{ - RecoveryManagerImpl::encodeMessageType(*this, buffer); - if (transfer.getBody().isInline()) { - transfer.encode(buffer); - } else { - assert(getReference()); - string data; - const Reference::Appends& appends = getReference()->getAppends(); - for(Reference::Appends::const_iterator a = appends.begin(); a != appends.end(); ++a) { - data += a->getBytes(); - } - framing::Content body(INLINE, data); - copyTransfer(ProtocolVersion(), transfer.getDestination(), body).encode(buffer); - } -} - -void MessageMessage::decodeHeader(Buffer& buffer) -{ - //don't care about the type here, but want encode/decode to be symmetric - RecoveryManagerImpl::decodeMessageType(buffer); - transfer.decode(buffer); -} - -void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) -{ -} - - -MessageTransferBody MessageMessage::copyTransfer(const ProtocolVersion& version, - const string& destination, - const framing::Content& body) const -{ - return MessageTransferBody(version, - transfer.getTicket(), - destination, - getRedelivered(), - transfer.getRejectUnroutable(), - transfer.getImmediate(), - transfer.getTtl(), - transfer.getPriority(), - transfer.getTimestamp(), - transfer.getDeliveryMode(), - transfer.getExpiration(), - getExchange(), - getRoutingKey(), - transfer.getMessageId(), - transfer.getCorrelationId(), - transfer.getReplyTo(), - transfer.getContentType(), - transfer.getContentEncoding(), - 0, /*content-length*/ - string(), /*type*/ - transfer.getUserId(), - transfer.getAppId(), - transfer.getTransactionId(), - transfer.getSecurityToken(), - transfer.getApplicationHeaders(), - body); -} - -MessageMessage::ReferencePtr MessageMessage::getReference() const { - return reference; -} - -uint32_t MessageMessage::getRequiredCredit() const -{ - //TODO: change when encoding changes. Should be the payload of any - //header & body frames. - return transfer.size(); -} - - -DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination) -{ - return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination)); -} - -}} // namespace qpid::broker - diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h deleted file mode 100644 index 6bfd0e045d..0000000000 --- a/cpp/src/qpid/broker/BrokerMessageMessage.h +++ /dev/null @@ -1,90 +0,0 @@ -#ifndef _broker_BrokerMessageMessage_h -#define _broker_BrokerMessageMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "BrokerMessageBase.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/amqp_types.h" -#include <boost/weak_ptr.hpp> -#include <vector> - -namespace qpid { - -namespace broker { -class ConnectionToken; -class Reference; - -class MessageMessage: public Message{ - public: - typedef boost::shared_ptr<MessageMessage> shared_ptr; - typedef boost::shared_ptr<Reference> ReferencePtr; - - MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer); - MessageMessage(ConnectionToken* publisher, const framing::MessageTransferBody* transfer, ReferencePtr reference); - MessageMessage(); - - // Default destructor okay - - framing::MessageTransferBody* getTransfer() const { return const_cast<framing::MessageTransferBody*>(&transfer); } - ReferencePtr getReference() const ; - - void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); - void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize); - - bool isComplete(); - - uint64_t contentSize() const; - framing::BasicHeaderProperties* getHeaderProperties(); - const framing::FieldTable& getApplicationHeaders(); - bool isPersistent(); - - void encode(framing::Buffer& buffer) const; - void encodeHeader(framing::Buffer& buffer) const; - uint32_t encodedSize() const; - uint32_t encodedHeaderSize() const; - uint32_t encodedContentSize() const; - uint64_t expectedContentSize(); - void decodeHeader(framing::Buffer& buffer); - void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); - uint32_t getRequiredCredit() const; - - static DeliveryToken::shared_ptr getToken(const std::string& destination); - - private: - void transferMessage( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize); - - framing::MessageTransferBody copyTransfer( - const framing::ProtocolVersion& version, - const std::string& destination, - const framing::Content& body) const; - - framing::MessageTransferBody transfer; - const boost::shared_ptr<Reference> reference; -}; - -}} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index 5ff9f950eb..7311d043d0 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -88,7 +88,7 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued - if (store && msg->expectedContentSize() != msg->encodedContentSize()) { + if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this msg->releaseContent(store); diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 962c11d8ee..5ba103d3ed 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -28,7 +28,7 @@ #include "qpid/framing/amqp_types.h" #include "ConnectionToken.h" #include "Consumer.h" -#include "BrokerMessage.h" +#include "Message.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Serializer.h" #include "qpid/sys/Monitor.h" @@ -43,6 +43,7 @@ namespace qpid { namespace broker { class MessageStore; class QueueRegistry; + class TransactionContext; class Exchange; /** diff --git a/cpp/src/qpid/broker/CompletionHandler.h b/cpp/src/qpid/broker/CompletionHandler.h deleted file mode 100644 index 9d51656282..0000000000 --- a/cpp/src/qpid/broker/CompletionHandler.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _broker_CompletionHandler_h -#define _broker_CompletionHandler_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace broker { - -/** - * Callback interface to handle completion of a message. - */ -class CompletionHandler -{ - public: - virtual ~CompletionHandler(){} - virtual void complete(Message::shared_ptr) = 0; -}; - -}} // namespace qpid::broker - - - -#endif /*!_broker_CompletionHandler_h*/ diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index d0c397d184..dc229947b9 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -21,7 +21,7 @@ #ifndef _Consumer_ #define _Consumer_ -#include "BrokerMessage.h" +#include "Message.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/Content.h b/cpp/src/qpid/broker/Content.h deleted file mode 100644 index 97dce0d3f7..0000000000 --- a/cpp/src/qpid/broker/Content.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Content_ -#define _Content_ - -#include <boost/function.hpp> - -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/OutputHandler.h" - -namespace qpid { - -namespace framing { -class ChannelAdapter; -} - -namespace broker { -class Content{ - public: - typedef std::string DataBlock; - typedef boost::function1<void, const DataBlock&> SendFn; - - virtual ~Content(){} - - /** Add a block of data to the content */ - virtual void add(framing::AMQContentBody* data) = 0; - - /** Total size of content in bytes */ - virtual uint32_t size() = 0; - - /** - * Iterate over the content calling SendFn for each block. - * Subdivide blocks if necessary to ensure each block is - * <= framesize bytes long. - */ - virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0; - - //FIXME aconway 2007-02-07: This is inconsistently implemented - //find out what is needed. - virtual void encode(qpid::framing::Buffer& buffer) = 0; -}; -}} - - -#endif diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index e8c4f5ba19..9719d972fc 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -22,8 +22,8 @@ #define _DeliverableMessage_ #include "Deliverable.h" -#include "BrokerMessage.h" #include "BrokerQueue.h" +#include "Message.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index d59c4769d7..f645b37c23 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -21,9 +21,9 @@ #ifndef _DeliveryAdapter_ #define _DeliveryAdapter_ -#include "BrokerMessageBase.h" #include "DeliveryId.h" #include "DeliveryToken.h" +#include "Message.h" #include "qpid/framing/amqp_types.h" namespace qpid { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 745a246c78..a1f82cb757 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -25,10 +25,10 @@ #include <list> #include <ostream> #include "AccumulatedAck.h" -#include "BrokerMessage.h" -#include "Prefetch.h" #include "BrokerQueue.h" #include "DeliveryId.h" +#include "Message.h" +#include "Prefetch.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 554be295bf..7b20bd610c 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -25,7 +25,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 3cbffc6f2f..070e438bcc 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -25,7 +25,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index a99cc1c92c..48d115c1ec 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -24,7 +24,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp deleted file mode 100644 index d69dcfafe7..0000000000 --- a/cpp/src/qpid/broker/InMemoryContent.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "InMemoryContent.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelAdapter.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using boost::static_pointer_cast; - -void InMemoryContent::add(AMQContentBody* data) -{ - content.push_back(*data); -} - -uint32_t InMemoryContent::size() -{ - int sum(0); - for (content_iterator i = content.begin(); i != content.end(); i++) { - sum += i->size(); - } - return sum; -} - -void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) -{ - for (content_iterator i = content.begin(); i != content.end(); i++) { - if (i->size() > framesize) { - uint32_t offset = 0; - for (int chunk = i->size() / framesize; chunk > 0; chunk--) { - string data = i->getData().substr(offset, framesize); - channel.send(AMQContentBody(data)); - offset += framesize; - } - uint32_t remainder = i->size() % framesize; - if (remainder) { - string data = i->getData().substr(offset, remainder); - channel.send(AMQContentBody(data)); - } - } else { - channel.send(*i); - } - } -} - -void InMemoryContent::encode(Buffer& buffer) -{ - for (content_iterator i = content.begin(); i != content.end(); i++) { - i->encode(buffer); - } -} - diff --git a/cpp/src/qpid/broker/InMemoryContent.h b/cpp/src/qpid/broker/InMemoryContent.h deleted file mode 100644 index a6fca7ca98..0000000000 --- a/cpp/src/qpid/broker/InMemoryContent.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _InMemoryContent_ -#define _InMemoryContent_ - -#include "Content.h" -#include "qpid/framing/AMQContentBody.h" -#include <vector> - - -namespace qpid { - namespace broker { - class InMemoryContent : public Content{ - typedef std::vector<framing::AMQContentBody> content_list; - typedef content_list::iterator content_iterator; - - content_list content; - public: - void add(framing::AMQContentBody* data); - uint32_t size(); - void send(framing::ChannelAdapter&, uint32_t framesize); - void encode(framing::Buffer& buffer); - }; - } -} - - -#endif diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp deleted file mode 100644 index b8b5b37f45..0000000000 --- a/cpp/src/qpid/broker/LazyLoadedContent.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "LazyLoadedContent.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/ChannelAdapter.h" - -using namespace qpid::broker; -using namespace qpid::framing; - -LazyLoadedContent::~LazyLoadedContent() -{ - store->destroy(*msg); -} - -LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) : - store(_store), msg(_msg), expectedSize(_expectedSize) {} - -void LazyLoadedContent::add(AMQContentBody* data) -{ - store->appendContent(*msg, data->getData()); -} - -uint32_t LazyLoadedContent::size() -{ - return 0;//all content is written as soon as it is added -} - -void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) -{ - if (expectedSize > framesize) { - for (uint64_t offset = 0; offset < expectedSize; offset += framesize) - { - uint64_t remaining = expectedSize - offset; - string data; - store->loadContent(*msg, data, offset, - remaining > framesize ? framesize : remaining); - channel.send(AMQContentBody(data)); - } - } else { - string data; - store->loadContent(*msg, data, 0, expectedSize); - channel.send(AMQContentBody(data)); - } -} - -void LazyLoadedContent::encode(Buffer&) -{ - //do nothing as all content is written as soon as it is added -} - diff --git a/cpp/src/qpid/broker/LazyLoadedContent.h b/cpp/src/qpid/broker/LazyLoadedContent.h deleted file mode 100644 index 79a33ed7a9..0000000000 --- a/cpp/src/qpid/broker/LazyLoadedContent.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _LazyLoadedContent_ -#define _LazyLoadedContent_ - -#include "Content.h" -#include "MessageStore.h" -#include "BrokerMessageBase.h" - -namespace qpid { - namespace broker { - class LazyLoadedContent : public Content{ - MessageStore* const store; - Message* const msg; - const uint64_t expectedSize; - public: - LazyLoadedContent( - MessageStore* const store, Message* const msg, - uint64_t expectedSize); - ~LazyLoadedContent(); - void add(qpid::framing::AMQContentBody* data); - uint32_t size(); - void send( - framing::ChannelAdapter&, - uint32_t framesize); - void encode(qpid::framing::Buffer& buffer); - }; - } -} - - -#endif diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp new file mode 100644 index 0000000000..e5f92297b7 --- /dev/null +++ b/cpp/src/qpid/broker/Message.cpp @@ -0,0 +1,195 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Message.h" +#include "ExchangeRegistry.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/BasicPublishBody.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/SendContent.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/TypeFilter.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using std::string; + +TransferAdapter Message::TRANSFER; +PublishAdapter Message::PUBLISH; + +Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {} + +const std::string& Message::getRoutingKey() const +{ + return getAdapter().getRoutingKey(frames); +} + +const std::string& Message::getExchangeName() const +{ + return getAdapter().getExchange(frames); +} + +const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const +{ + if (!exchange) { + exchange = registry.get(getExchangeName()); + } + return exchange; +} + +bool Message::isImmediate() const +{ + return getAdapter().isImmediate(frames); +} + +const FieldTable& Message::getApplicationHeaders() const +{ + return getAdapter().getApplicationHeaders(frames); +} + +bool Message::isPersistent() +{ + return getAdapter().isPersistent(frames); +} + +uint32_t Message::getRequiredCredit() const +{ + //add up payload for all header and content frames in the frameset + SumBodySize sum; + frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY)); + return sum.getSize(); +} + +void Message::encode(framing::Buffer& buffer) const +{ + //encode method and header frames + EncodeFrame f1(buffer); + frames.map_if(f1, TypeFilter(METHOD_BODY, HEADER_BODY)); + + //then encode the payload of each content frame + EncodeBody f2(buffer); + frames.map_if(f2, TypeFilter(CONTENT_BODY)); +} + +uint32_t Message::encodedSize() const +{ + return encodedHeaderSize() + encodedContentSize(); +} + +uint32_t Message::encodedContentSize() const +{ + return frames.getContentSize(); +} + +uint32_t Message::encodedHeaderSize() const +{ + //add up the size for all method and header frames in the frameset + SumFrameSize sum; + frames.map_if(sum, TypeFilter(METHOD_BODY, HEADER_BODY)); + return sum.getSize(); +} + +void Message::decodeHeader(framing::Buffer& buffer) +{ + AMQFrame method; + method.decode(buffer); + frames.append(method); + + AMQFrame header; + header.decode(buffer); + frames.append(header); +} + +void Message::decodeContent(framing::Buffer& buffer) +{ + //get the data as a string and set that as the content + //body on a frame then add that frame to the frameset + AMQFrame frame; + frame.setBody(AMQContentBody()); + frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); + frames.append(frame); +} + +void Message::releaseContent(MessageStore* _store) +{ + store = _store; + if (!getPersistenceId()) { + store->stage(*this); + } + //remove any content frames from the frameset + frames.remove(TypeFilter(CONTENT_BODY)); +} + +void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize) +{ + if (isContentReleased()) { + //load content from store in chunks of maxContentSize + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load? + for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) + { + uint64_t remaining = expectedSize - offset; + AMQFrame frame(channel, AMQContentBody()); + string& data = frame.castBody<AMQContentBody>()->getData(); + + store->loadContent(*this, data, offset, + remaining > maxContentSize ? maxContentSize : remaining); + out.handle(frame); + } + + } else { + SendContent f(out, channel, maxFrameSize); + frames.map_if(f, TypeFilter(CONTENT_BODY)); + } +} + +void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/) +{ + Relay f(out, channel); + frames.map_if(f, TypeFilter(HEADER_BODY)); +} + +MessageAdapter& Message::getAdapter() const +{ + if (!adapter) { + if (frames.isA<BasicPublishBody>()) { + adapter = &PUBLISH; + } else if(frames.isA<MessageTransferBody>()) { + adapter = &TRANSFER; + } else { + const AMQMethodBody* method = frames.getMethod(); + if (!method) throw Exception("Can't adapt message with no method"); + else throw Exception(QPID_MSG("Can't adapt message based on " << *method)); + } + } + return *adapter; +} + +uint64_t Message::contentSize() const +{ + return frames.getContentSize(); +} + +bool Message::isContentLoaded() const +{ + return contentSize() > 0; +} diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h new file mode 100644 index 0000000000..95b3f38b55 --- /dev/null +++ b/cpp/src/qpid/broker/Message.h @@ -0,0 +1,139 @@ +#ifndef _broker_Message_h +#define _broker_Message_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <boost/shared_ptr.hpp> +#include <boost/variant.hpp> +#include "PersistableMessage.h" +#include "MessageAdapter.h" +#include "qpid/framing/amqp_types.h" + +namespace qpid { + +namespace framing { +class FieldTable; +class SequenceNumber; +} + +namespace broker { +class ConnectionToken; +class Exchange; +class ExchangeRegistry; +class MessageStore; + +class Message : public PersistableMessage { +public: + typedef boost::shared_ptr<Message> shared_ptr; + + Message(const framing::SequenceNumber& id = framing::SequenceNumber()); + + uint64_t getPersistenceId() const { return persistenceId; } + void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } + + bool getRedelivered() const { return redelivered; } + void redeliver() { redelivered = true; } + + const ConnectionToken* getPublisher() const { return publisher; } + void setPublisher(ConnectionToken* p) { publisher = p; } + + uint64_t contentSize() const; + + const std::string& getRoutingKey() const; + const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const; + const std::string& getExchangeName() const; + bool isImmediate() const; + const framing::FieldTable& getApplicationHeaders() const; + bool isPersistent(); + + framing::FrameSet& getFrames() { return frames; } + const framing::FrameSet& getFrames() const { return frames; } + + template <class T> T* getProperties() { + return frames.getHeaders()->get<T>(true); + } + + template <class T> const T* getProperties() const { + return frames.getHeaders()->get<T>(); + } + + template <class T> const T* getMethod() const { + return frames.as<T>(); + } + + template <class T> bool isA() const { + return frames.isA<T>(); + } + + uint32_t getRequiredCredit() const; + + void encode(framing::Buffer& buffer) const; + + /** + * @returns the size of the buffer needed to encode this + * message in its entirety + */ + uint32_t encodedSize() const; + /** + * @returns the size of the buffer needed to encode the + * 'header' of this message (not just the header frame, + * but other meta data e.g.routing key and exchange) + */ + uint32_t encodedHeaderSize() const; + uint32_t encodedContentSize() const; + + void decodeHeader(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer); + + /** + * Releases the in-memory content data held by this + * message. Must pass in a store from which the data can + * be reloaded. + */ + void releaseContent(MessageStore* store); + + void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize); + void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize); + + bool isContentLoaded() const; + + private: + framing::FrameSet frames; + mutable boost::shared_ptr<Exchange> exchange; + mutable uint64_t persistenceId; + bool redelivered; + ConnectionToken* publisher; + MessageStore* store; + mutable MessageAdapter* adapter; + + static TransferAdapter TRANSFER; + static PublishAdapter PUBLISH; + + MessageAdapter& getAdapter() const; + bool isContentReleased() { return store; } +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h new file mode 100644 index 0000000000..0b2dc6307a --- /dev/null +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -0,0 +1,108 @@ +#ifndef _broker_MessageAdapter_h +#define _broker_MessageAdapter_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/framing/BasicPublishBody.h" +#include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" + +namespace qpid { +namespace broker { + +struct MessageAdapter +{ + virtual ~MessageAdapter() {} + + virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0; + virtual const std::string& getExchange(const framing::FrameSet& f) = 0; + virtual bool isImmediate(const framing::FrameSet& f) = 0; + virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0; + virtual bool isPersistent(const framing::FrameSet& f) = 0; +}; + +struct PublishAdapter : MessageAdapter +{ + const std::string& getRoutingKey(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getRoutingKey(); + } + + const std::string& getExchange(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getExchange(); + } + + bool isImmediate(const framing::FrameSet& f) + { + return f.as<framing::BasicPublishBody>()->getImmediate(); + } + + const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders(); + } + + bool isPersistent(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2; + } +}; + +struct TransferAdapter : MessageAdapter +{ + const std::string& getRoutingKey(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey(); + } + + const std::string& getExchange(const framing::FrameSet& f) + { + return f.as<framing::MessageTransferBody>()->getDestination(); + } + + bool isImmediate(const framing::FrameSet&) + { + //TODO: we seem to have lost the immediate flag + return false; + } + + const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders(); + } + + bool isPersistent(const framing::FrameSet& f) + { + return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2; + } +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index f19927b708..1a84aa9b65 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -20,55 +20,64 @@ */ #include "MessageBuilder.h" -#include "InMemoryContent.h" -#include "LazyLoadedContent.h" +#include "Message.h" +#include "MessageStore.h" +#include "qpid/Exception.h" +#include "qpid/framing/AMQFrame.h" using namespace qpid::broker; using namespace qpid::framing; -using std::auto_ptr; -MessageBuilder::MessageBuilder(CompletionHandler* _handler, - MessageStore* const _store, - uint64_t _stagingThreshold -) : - handler(_handler), - store(_store), - stagingThreshold(_stagingThreshold) -{} +MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : + state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {} -void MessageBuilder::route(){ - if (message->isComplete()) { - if (handler) handler->complete(message); - message.reset(); +void MessageBuilder::handle(AMQFrame& frame) +{ + switch(state) { + case METHOD: + checkType(METHOD_BODY, frame.getBody()->type()); + state = HEADER; + break; + case HEADER: + checkType(HEADER_BODY, frame.getBody()->type()); + state = CONTENT; + break; + case CONTENT: + checkType(CONTENT_BODY, frame.getBody()->type()); + break; + default: + throw ConnectionException(504, "Invalid frame sequence for message."); + } + if (staging) { + store->appendContent(*message, frame.castBody<AMQContentBody>()->getData()); + } else { + message->getFrames().append(frame); + //have we reached the staging limit? if so stage message and release content + if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) { + store->stage(*message); + message->releaseContent(store); + staging = true; + } } } -void MessageBuilder::initialise(Message::shared_ptr& msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); +void MessageBuilder::checkType(uint8_t expected, uint8_t actual) +{ + if (expected != actual) { + throw ConnectionException(504, "Invalid frame sequence for message."); } - message = msg; } -void MessageBuilder::setHeader(AMQHeaderBody* header){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); - } - message->setHeader(header); - if (stagingThreshold && header->getContentSize() >= stagingThreshold) { - store->stage(*message); - message->releaseContent(store); - } else { - auto_ptr<Content> content(new InMemoryContent()); - message->setContent(content); - } - route(); +void MessageBuilder::end() +{ + message.reset(); + state = DORMANT; + staging = false; } -void MessageBuilder::addContent(AMQContentBody* content){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); - } - message->addContent(content); - route(); +void MessageBuilder::start(const SequenceNumber& id) +{ + message = Message::shared_ptr(new Message(id)); + state = METHOD; + staging = false; } diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h index 18e85d7383..134f93b68f 100644 --- a/cpp/src/qpid/broker/MessageBuilder.h +++ b/cpp/src/qpid/broker/MessageBuilder.h @@ -21,37 +21,35 @@ #ifndef _MessageBuilder_ #define _MessageBuilder_ -#include <memory> -#include "qpid/QpidError.h" -#include "BrokerExchange.h" -#include "BrokerMessage.h" -#include "MessageStore.h" -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/BasicPublishBody.h" -#include "CompletionHandler.h" +#include "boost/shared_ptr.hpp" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SequenceNumber.h" namespace qpid { namespace broker { - class MessageBuilder{ + class Message; + class MessageStore; + + class MessageBuilder : public framing::FrameHandler{ public: - MessageBuilder(CompletionHandler* _handler, - MessageStore* const store = 0, - uint64_t stagingThreshold = 0); - void initialise(Message::shared_ptr& msg); - void setHeader(framing::AMQHeaderBody* header); - void addContent(framing::AMQContentBody* content); - Message::shared_ptr getMessage() { return message; } + MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0); + void handle(framing::AMQFrame& frame); + boost::shared_ptr<Message> getMessage() { return message; } + void start(const framing::SequenceNumber& id); + void end(); private: - Message::shared_ptr message; - CompletionHandler* handler; + enum State {DORMANT, METHOD, HEADER, CONTENT}; + State state; + boost::shared_ptr<Message> message; MessageStore* const store; const uint64_t stagingThreshold; + bool staging; - void route(); + void checkType(uint8_t expected, uint8_t actual); }; } } #endif + diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp new file mode 100644 index 0000000000..09ab8ec465 --- /dev/null +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MessageDelivery.h" + +#include "DeliveryToken.h" +#include "Message.h" +#include "BrokerQueue.h" +#include "qpid/framing/ChannelAdapter.h" +#include "qpid/framing/BasicDeliverBody.h" +#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/MessageTransferBody.h" + + +using namespace boost; +using namespace qpid::broker; +using namespace qpid::framing; + +namespace qpid{ +namespace broker{ + +struct BaseToken : DeliveryToken +{ + virtual ~BaseToken() {} + virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0; +}; + +struct BasicGetToken : BaseToken +{ + typedef boost::shared_ptr<BasicGetToken> shared_ptr; + + Queue::shared_ptr queue; + + BasicGetToken(Queue::shared_ptr q) : queue(q) {} + + void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + { + channel.send(BasicGetOkBody( + channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), + msg->getRoutingKey(), queue->getMessageCount())); + + } +}; + +struct BasicConsumeToken : BaseToken +{ + typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; + + const string consumer; + + BasicConsumeToken(const string c) : consumer(c) {} + + void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + { + channel.send(BasicDeliverBody( + channel.getVersion(), consumer, id.getValue(), + msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); + } + +}; + +struct MessageDeliveryToken : BaseToken +{ + const std::string destination; + const u_int8_t confirmMode; + const u_int8_t acquireMode; + + MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : + destination(d), confirmMode(c), acquireMode(a) {} + + void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/) + { + //TODO; need to figure out how the acquire mode gets + //communicated (this is just a temporary solution) + channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode)); + + //may need to set the redelivered flag: + if (msg->getRedelivered()){ + msg->getProperties<DeliveryProperties>()->setRedelivered(true); + } + } +}; + +} +} + +DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue) +{ + return DeliveryToken::shared_ptr(new BasicGetToken(queue)); +} + +DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer) +{ + return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); +} + +DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, + u_int8_t confirmMode, u_int8_t acquireMode) +{ + return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode)); +} + +void MessageDelivery::deliver(Message::shared_ptr msg, + framing::ChannelAdapter& channel, + DeliveryId id, + DeliveryToken::shared_ptr token, + uint16_t framesize) +{ + //currently a message published from one class and delivered to + //another may well have the wrong headers; however we will only + //have one content class for 0-10 proper + + //send method + boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); + t->sendMethod(msg, channel, id); + + boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out; + //send header + msg->sendHeader(*handler, channel.getId(), framesize); + + //send content + msg->sendContent(*handler, channel.getId(), framesize); +} diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h new file mode 100644 index 0000000000..b87ef2a5ce --- /dev/null +++ b/cpp/src/qpid/broker/MessageDelivery.h @@ -0,0 +1,60 @@ +#ifndef _broker_MessageDelivery_h +#define _broker_MessageDelivery_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> +#include "DeliveryId.h" + +namespace qpid { + +namespace framing { + +class ChannelAdapter; + +} + +namespace broker { + +class DeliveryToken; +class Message; +class Queue; + +/** + * Encapsulates the different options for message delivery currently supported. + */ +class MessageDelivery { +public: + static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue); + static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer); + static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination, + u_int8_t confirmMode, + u_int8_t acquireMode); + + static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel, + DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize); +}; + +} +} + + +#endif /*!_broker_MessageDelivery_h*/ diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index ce1fa1e028..a4ceb77c12 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" -#include "BrokerMessageMessage.h" +#include "MessageDelivery.h" #include "qpid/framing/MessageAppendBody.h" #include "qpid/framing/MessageTransferBody.h" #include "BrokerAdapter.h" @@ -55,7 +55,7 @@ MessageHandlerImpl::open(const string& /*reference*/) } void -MessageHandlerImpl::append(const framing::AMQMethodBody& ) +MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) { throw ConnectionException(540, "References no longer supported"); } @@ -92,7 +92,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, const string& destination, bool noLocal, u_int8_t confirmMode, - u_int8_t /*acquireMode*/,//TODO: implement acquire modes + u_int8_t acquireMode,//TODO: implement acquire modes bool exclusive, const framing::FieldTable& filter ) { @@ -101,7 +101,8 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, confirmMode == 1, exclusive, &filter); + channel.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + tag, queue, noLocal, confirmMode == 1, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -115,7 +116,7 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, { Queue::shared_ptr queue = getQueue(queueName); - if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){ + if (channel.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -160,20 +161,6 @@ MessageHandlerImpl::reject(const SequenceNumberSet& /*transfers*/, uint16_t /*co //TODO: implement } -void -MessageHandlerImpl::transfer(const framing::AMQMethodBody& context) -{ - const MessageTransferBody* transfer = boost::polymorphic_downcast<const MessageTransferBody*>(&context); - if (transfer->getBody().isInline()) { - MessageMessage::shared_ptr message(new MessageMessage(&connection, transfer)); - channel.handleInlineTransfer(message); - } else { - throw ConnectionException(540, "References no longer supported"); - } -} - - - void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) { diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index f4d9fa0c76..35d34bf94e 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -23,7 +23,6 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include "Reference.h" #include "HandlerImpl.h" namespace qpid { @@ -40,7 +39,7 @@ class MessageHandlerImpl : public: MessageHandlerImpl(CoreRefs& parent); - void append(const framing::AMQMethodBody& context); + void append(const std::string& reference, const std::string& bytes); void cancel(const std::string& destination ); @@ -75,8 +74,6 @@ class MessageHandlerImpl : void resume(const std::string& reference, const std::string& identifier ); - void transfer(const framing::AMQMethodBody& context); - void flow(const std::string& destination, u_int8_t unit, u_int32_t value); void flowMode(const std::string& destination, u_int8_t mode); @@ -98,8 +95,6 @@ class MessageHandlerImpl : bool exclusive, const framing::FieldTable& filter); - private: - ReferenceRegistry references; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 0da12a1a75..1254c3890b 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -21,7 +21,6 @@ #ifndef _MessageStoreModule_ #define _MessageStoreModule_ -#include "BrokerMessage.h" #include "MessageStore.h" #include "BrokerQueue.h" #include "RecoveryManager.h" diff --git a/cpp/src/qpid/broker/NameGenerator.h b/cpp/src/qpid/broker/NameGenerator.h index affcedba41..6ea25c9797 100644 --- a/cpp/src/qpid/broker/NameGenerator.h +++ b/cpp/src/qpid/broker/NameGenerator.h @@ -21,7 +21,7 @@ #ifndef _NameGenerator_ #define _NameGenerator_ -#include "BrokerMessage.h" +#include <string> namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 0d5a5b55f9..95f55f21b9 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -22,7 +22,6 @@ #define _NullMessageStore_ #include <set> -#include "BrokerMessage.h" #include "MessageStore.h" #include "BrokerQueue.h" diff --git a/cpp/src/qpid/broker/PersistableExchange.h b/cpp/src/qpid/broker/PersistableExchange.h index 9ba883cec0..683b740ddc 100644 --- a/cpp/src/qpid/broker/PersistableExchange.h +++ b/cpp/src/qpid/broker/PersistableExchange.h @@ -35,7 +35,7 @@ namespace broker { class PersistableExchange : public Persistable { public: - virtual std::string getName() const = 0; + virtual const std::string& getName() const = 0; virtual ~PersistableExchange() {}; }; diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index e47ca0ae48..06fc59107e 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -34,7 +34,7 @@ namespace broker { * The interface messages must expose to the MessageStore in order to * be persistable. */ - class PersistableMessage : public Persistable +class PersistableMessage : public Persistable { @@ -72,10 +72,11 @@ public: virtual uint32_t encodedHeaderSize() const = 0; virtual ~PersistableMessage() {}; + PersistableMessage(): - enqueueCompleted(false), - asyncCounter(0), - dequeueCompleted(false){}; + enqueueCompleted(false), + asyncCounter(0), + dequeueCompleted(false){}; inline bool isEnqueueComplete() {return enqueueCompleted;}; inline void enqueueComplete() { diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 9e0c334dc3..9dcc9d4233 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -25,7 +25,7 @@ #include <functional> #include <list> #include "Deliverable.h" -#include "BrokerMessage.h" +#include "Message.h" #include "MessageStore.h" #include "BrokerQueue.h" #include "TxOp.h" diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index 25c5baf364..a571343e93 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -25,7 +25,7 @@ #include <functional> #include <list> #include "Deliverable.h" -#include "BrokerMessage.h" +#include "Message.h" #include "MessageStore.h" #include "BrokerQueue.h" #include "TxOp.h" diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 954c50faee..29390a6452 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -20,8 +20,7 @@ */ #include "RecoveryManagerImpl.h" -#include "BrokerMessage.h" -#include "BrokerMessageMessage.h" +#include "Message.h" #include "BrokerQueue.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" @@ -110,10 +109,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff { buffer.record(); //peek at type: - Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ? - ((Message*) new MessageMessage()) : - ((Message*) new BasicMessage())); - buffer.restore(); + Message::shared_ptr message(new Message()); message->decodeHeader(buffer); return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); } @@ -131,21 +127,6 @@ void RecoveryManagerImpl::recoveryComplete() //TODO (finalise binding setup etc) } -uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer) -{ - return buffer.getOctet(); -} - -void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer) -{ - buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC); -} - -uint32_t RecoveryManagerImpl::encodedMessageTypeSize() -{ - return 1; -} - bool RecoverableMessageImpl::loadContent(uint64_t available) { return !stagingThreshold || available < stagingThreshold; diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.h b/cpp/src/qpid/broker/RecoveryManagerImpl.h index bcd71defb1..58ec63926c 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -45,10 +45,6 @@ namespace broker { RecoverableTransaction::shared_ptr recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn); void recoveryComplete(); - - static uint8_t decodeMessageType(framing::Buffer& buffer); - static void encodeMessageType(const Message& msg, framing::Buffer& buffer); - static uint32_t encodedMessageTypeSize(); }; diff --git a/cpp/src/qpid/broker/Reference.cpp b/cpp/src/qpid/broker/Reference.cpp deleted file mode 100644 index 283b231b60..0000000000 --- a/cpp/src/qpid/broker/Reference.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/bind.hpp> -#include "Reference.h" -#include "BrokerMessageMessage.h" -#include "qpid/QpidError.h" -#include "qpid/framing/MessageAppendBody.h" -#include "CompletionHandler.h" - -namespace qpid { -namespace broker { - -Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) { - ReferenceMap::iterator i = references.find(id); - if (i != references.end()) - throw ConnectionException(503, "Attempt to re-open reference " +id); - return references[id] = Reference::shared_ptr(new Reference(id, this)); -} - -Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) { - ReferenceMap::iterator i = references.find(id); - if (i == references.end()) - throw ConnectionException(503, "Attempt to use non-existent reference "+id); - return i->second; -} - -void Reference::append(const framing::MessageAppendBody& app) { - appends.push_back(app); - size += app.getBytes().length(); -} - -void Reference::close() { - messages.clear(); - registry->references.erase(getId()); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Reference.h b/cpp/src/qpid/broker/Reference.h deleted file mode 100644 index 5a373fbeba..0000000000 --- a/cpp/src/qpid/broker/Reference.h +++ /dev/null @@ -1,115 +0,0 @@ -#ifndef _broker_Reference_h -#define _broker_Reference_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/MessageAppendBody.h" - -#include <string> -#include <vector> -#include <map> -#include <boost/shared_ptr.hpp> -#include <boost/range.hpp> - -namespace qpid { - -namespace framing { -class MessageAppendBody; -} - -namespace broker { - -class MessageMessage; -class ReferenceRegistry; - -// FIXME aconway 2007-03-27: Merge with client::IncomingMessage -// to common reference handling code. - -/** - * A reference is an accumulation point for data in a multi-frame - * message. A reference can be used by multiple transfer commands to - * create multiple messages, so the reference tracks which commands - * are using it. When the reference is closed, all the associated - * transfers are completed. - * - * THREAD UNSAFE: per-channel resource, access to channels is - * serialized. - */ -class Reference -{ - public: - typedef std::string Id; - typedef boost::shared_ptr<Reference> shared_ptr; - typedef boost::shared_ptr<MessageMessage> MessagePtr; - typedef std::vector<MessagePtr> Messages; - typedef std::vector<framing::MessageAppendBody> Appends; - - Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) - : id(id_), size(0), registry(reg) {} - - const std::string& getId() const { return id; } - uint64_t getSize() const { return size; } - - /** Add a message to be completed with this reference */ - void addMessage(MessagePtr message) { messages.push_back(message); } - - /** Append more data to the reference */ - void append(const framing::MessageAppendBody&); - - /** Close the reference, complete each associated message */ - void close(); - - const Appends& getAppends() const { return appends; } - const Messages& getMessages() const { return messages; } - - private: - Id id; - uint64_t size; - ReferenceRegistry* registry; - Messages messages; - Appends appends; -}; - - -/** - * A registry/factory for references. - * - * THREAD UNSAFE: per-channel resource, access to channels is - * serialized. - */ -class ReferenceRegistry { - public: - ReferenceRegistry() {}; - Reference::shared_ptr open(const Reference::Id& id); - Reference::shared_ptr get(const Reference::Id& id); - - private: - typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap; - ReferenceMap references; - - // Reference calls references.erase(). - friend class Reference; -}; - - -}} // namespace qpid::broker - - - -#endif /*!_broker_Reference_h*/ diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f65e450e82..5e9106c1dd 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,7 +20,10 @@ */ #include "SemanticHandler.h" + +#include "boost/format.hpp" #include "BrokerAdapter.h" +#include "MessageDelivery.h" #include "qpid/framing/ChannelAdapter.h" #include "qpid/framing/ChannelCloseOkBody.h" #include "qpid/framing/ExecutionCompleteBody.h" @@ -32,18 +35,16 @@ using namespace qpid::framing; using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : - connection(c), - channel(c, *this, id, &c.broker.getStore()) + connection(c), channel(c, *this, id) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); } - void SemanticHandler::handle(framing::AMQFrame& frame) { - //TODO: assembly etc when move to 0-10 framing - // + //TODO: assembly for method and headers + //have potentially three separate tracks at this point: // // (1) execution controls @@ -51,46 +52,43 @@ void SemanticHandler::handle(framing::AMQFrame& frame) // (3) data i.e. content-bearing commands // //framesets on each can be interleaved. framesets on the latter - //two share a command-id sequence. + //two share a command-id sequence. controls on the first track are + //used to communicate details about that command-id sequence. // //need to decide what to do if a frame on the command track //arrives while a frameset on the data track is still //open. execute it (i.e. out-of order execution with respect to - //the command id sequence) or queue it up. + //the command id sequence) or queue it up? - //if ready to execute (i.e. if segment is complete or frame is - //message content): - handleBody(frame.getBody()); -} - -//ChannelAdapter virtual methods: -void SemanticHandler::handleMethod(framing::AMQMethodBody* method) -{ - try { - if (!method->invoke(this)) { - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.lwm); - } + try{ - //else do the usual: - handleL4(method); - //(if the frameset is complete) we can move the execution-mark - //forward - - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - //TODO: need to account for async store opreations - //when this command is a message publication - ++(incoming.hwm); + TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header + + switch(track) { + case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler + handleL2(frame.castBody<AMQMethodBody>()); + break; + case EXECUTION_CONTROL_TRACK: + handleL3(frame.castBody<AMQMethodBody>()); + break; + case MODEL_COMMAND_TRACK: + if (!isOpen()) { + throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); } - - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset (that - //can't happen until 0-10 framing is implemented) + handleCommand(frame.castBody<AMQMethodBody>()); + break; + case MODEL_CONTENT_TRACK: + handleContent(frame); + break; } + + }catch(const ChannelException& e){ + adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); + connection.closeChannel(getId()); + }catch(const ConnectionException& e){ + connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); }catch(const std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame)); } } @@ -102,7 +100,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran outgoing.lwm = mark; //ack messages: channel.ackCumulative(mark.getValue()); - //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); @@ -116,7 +113,6 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran void SemanticHandler::flush() { //flush doubles as a sync to begin with - send an execution.complete - incoming.lwm = incoming.hwm; if (isOpen()) { Mutex::ScopedLock l(outLock); ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); @@ -142,52 +138,59 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) //never actually sent by client at present } -void SemanticHandler::handleL4(framing::AMQMethodBody* method) +void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - InvocationVisitor v(adapter.get()); - method->accept(v); - if (!v.wasHandled()) { - throw ConnectionException(540, "Not implemented"); - } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); - } - } - }catch(const ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + ++(incoming.lwm); + InvocationVisitor v(adapter.get()); + method->accept(v); + //TODO: need to account for async store operations and interleaving + ++(incoming.hwm); + + if (!v.wasHandled()) { + throw ConnectionException(540, "Not implemented"); + } else if (v.hasResult()) { + ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); } } -bool SemanticHandler::isOpen() const -{ - return channel.isOpen(); +void SemanticHandler::handleL2(framing::AMQMethodBody* method) +{ + if(!method->isA<ChannelOpenBody>() && !isOpen()) { + if (!method->isA<ChannelCloseOkBody>()) { + throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); + } + } else { + method->invoke(adapter->getChannelHandler()); + } } -void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody* body) +void SemanticHandler::handleL3(framing::AMQMethodBody* method) { - channel.handleHeader(body); + if (!method->invoke(this)) { + throw ConnectionException(540, "Not implemented"); + } } -void SemanticHandler::handleContent(qpid::framing::AMQContentBody* body) +void SemanticHandler::handleContent(AMQFrame& frame) { - channel.handleContent(body); + Message::shared_ptr msg(msgBuilder.getMessage()); + if (!msg) {//start of frameset will be indicated by frame flags + msgBuilder.start(++(incoming.lwm)); + msg = msgBuilder.getMessage(); + } + msgBuilder.handle(frame); + if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags + msg->setPublisher(&connection); + channel.handle(msg); + msgBuilder.end(); + //TODO: need to account for async store operations and interleaving + ++(incoming.hwm); + } } -void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody* body) -{ - channel.handleHeartbeat(body); +bool SemanticHandler::isOpen() const +{ + return channel.isOpen(); } DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) @@ -195,14 +198,13 @@ DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::sha Mutex::ScopedLock l(outLock); SequenceNumber copy(outgoing.hwm); ++copy; - msg->deliver(*this, copy.getValue(), token, connection.getFrameMax()); - //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax()); return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - msg->deliver(*this, tag, token, connection.getFrameMax()); + MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax()); } void SemanticHandler::send(const AMQBody& body) @@ -214,3 +216,49 @@ void SemanticHandler::send(const AMQBody& body) } ChannelAdapter::send(body); } + +uint16_t SemanticHandler::getClassId(const AMQFrame& frame) +{ + return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0; +} + +uint16_t SemanticHandler::getMethodId(const AMQFrame& frame) +{ + return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0; +} + +SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) +{ + //will be replaced by field in 0-10 frame header + uint8_t type = frame.getBody()->type(); + uint16_t classId; + switch(type) { + case METHOD_BODY: + if (frame.castBody<AMQMethodBody>()->isContentBearing()) { + return MODEL_CONTENT_TRACK; + } + + classId = frame.castBody<AMQMethodBody>()->amqpClassId(); + switch (classId) { + case ChannelOpenBody::CLASS_ID: + return SESSION_CONTROL_TRACK; + case ExecutionCompleteBody::CLASS_ID: + return EXECUTION_CONTROL_TRACK; + } + + return MODEL_COMMAND_TRACK; + case HEADER_BODY: + case CONTENT_BODY: + return MODEL_CONTENT_TRACK; + } + throw Exception("Could not determine track"); +} + +//ChannelAdapter virtual methods, no longer used: +void SemanticHandler::handleMethod(framing::AMQMethodBody*){} + +void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {} + +void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {} + +void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {} diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 672c6ad929..611cd3a99b 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -25,6 +25,7 @@ #include "BrokerChannel.h" #include "Connection.h" #include "DeliveryAdapter.h" +#include "MessageBuilder.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" @@ -55,8 +56,17 @@ class SemanticHandler : private framing::ChannelAdapter, framing::Window incoming; framing::Window outgoing; sys::Mutex outLock; + MessageBuilder msgBuilder; - void handleL4(framing::AMQMethodBody* method); + enum TrackId {SESSION_CONTROL_TRACK, EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK}; + TrackId getTrack(const framing::AMQFrame& frame); + uint16_t getClassId(const framing::AMQFrame& frame); + uint16_t getMethodId(const framing::AMQFrame& frame); + + void handleL3(framing::AMQMethodBody* method); + void handleL2(framing::AMQMethodBody* method); + void handleCommand(framing::AMQMethodBody* method); + void handleContent(framing::AMQFrame& frame); //ChannelAdapter virtual methods: void handleMethod(framing::AMQMethodBody* method); diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 6536a7c4ce..c411fb1965 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -25,7 +25,6 @@ #include <vector> #include "BrokerExchange.h" #include "qpid/framing/FieldTable.h" -#include "BrokerMessage.h" #include "qpid/sys/Monitor.h" #include "BrokerQueue.h" diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 29b1dc38af..564e021c5a 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -24,10 +24,10 @@ #include <algorithm> #include <functional> #include <list> +#include "BrokerQueue.h" #include "Deliverable.h" -#include "BrokerMessage.h" +#include "Message.h" #include "MessageStore.h" -#include "BrokerQueue.h" #include "TxOp.h" namespace qpid { |
