diff options
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 278 |
1 files changed, 0 insertions, 278 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp deleted file mode 100644 index d08409695e..0000000000 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ /dev/null @@ -1,278 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/RecoveryManagerImpl.h" - -#include "qpid/broker/Message.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Link.h" -#include "qpid/broker/Bridge.h" -#include "qpid/broker/RecoveredEnqueue.h" -#include "qpid/broker/RecoveredDequeue.h" -#include "qpid/framing/reply_exceptions.h" - -using boost::dynamic_pointer_cast; -using boost::intrusive_ptr; -using std::string; - -namespace qpid { -namespace broker { - -RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr) - : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {} - -RecoveryManagerImpl::~RecoveryManagerImpl() {} - -class RecoverableMessageImpl : public RecoverableMessage -{ - intrusive_ptr<Message> msg; -public: - RecoverableMessageImpl(const intrusive_ptr<Message>& _msg); - ~RecoverableMessageImpl() {}; - void setPersistenceId(uint64_t id); - void setRedelivered(); - bool loadContent(uint64_t available); - void decodeContent(framing::Buffer& buffer); - void recover(Queue::shared_ptr queue); - void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); - void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue); -}; - -class RecoverableQueueImpl : public RecoverableQueue -{ - Queue::shared_ptr queue; -public: - RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {} - ~RecoverableQueueImpl() {}; - void setPersistenceId(uint64_t id); - uint64_t getPersistenceId() const; - const std::string& getName() const; - void setExternalQueueStore(ExternalQueueStore* inst); - ExternalQueueStore* getExternalQueueStore() const; - void recover(RecoverableMessage::shared_ptr msg); - void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); - void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); -}; - -class RecoverableExchangeImpl : public RecoverableExchange -{ - Exchange::shared_ptr exchange; - QueueRegistry& queues; -public: - RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {} - void setPersistenceId(uint64_t id); - void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args); -}; - -class RecoverableConfigImpl : public RecoverableConfig -{ - Link::shared_ptr link; - Bridge::shared_ptr bridge; -public: - RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {} - RecoverableConfigImpl(Bridge::shared_ptr _bridge) : bridge(_bridge) {} - void setPersistenceId(uint64_t id); -}; - -class RecoverableTransactionImpl : public RecoverableTransaction -{ - DtxBuffer::shared_ptr buffer; -public: - RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {} - void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); - void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); -}; - -RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer) -{ - Exchange::shared_ptr e = Exchange::decode(exchanges, buffer); - if (e) { - return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(e, queues)); - } else { - return RecoverableExchange::shared_ptr(); - } -} - -RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer) -{ - Queue::shared_ptr queue = Queue::restore(queues, buffer); - try { - Exchange::shared_ptr exchange = exchanges.getDefault(); - if (exchange) { - exchange->bind(queue, queue->getName(), 0); - queue->bound(exchange->getName(), queue->getName(), framing::FieldTable()); - } - } catch (const framing::NotFoundException& /*e*/) { - //assume no default exchange has been declared - } - return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue)); -} - -RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) -{ - boost::intrusive_ptr<Message> message(new Message()); - message->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message)); -} - -RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, - std::auto_ptr<TPCTransactionContext> txn) -{ - DtxBuffer::shared_ptr buffer(new DtxBuffer()); - dtxMgr.recover(xid, txn, buffer); - return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); -} - -RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer) -{ - string kind; - - buffer.getShortString (kind); - if (kind == "link") - return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer))); - else if (kind == "bridge") - return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); - - return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead -} - -void RecoveryManagerImpl::recoveryComplete() -{ - //notify all queues and exchanges - queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1, boost::ref(exchanges))); - exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges))); -} - -RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg) -{ - if (!msg->isPersistent()) { - msg->forcePersistent(); // set so that message will get dequeued from store. - } -} - -bool RecoverableMessageImpl::loadContent(uint64_t /*available*/) -{ - return true; -} - -void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer) -{ - msg->decodeContent(buffer); -} - -void RecoverableMessageImpl::recover(Queue::shared_ptr queue) -{ - queue->recover(msg); -} - -void RecoverableMessageImpl::setPersistenceId(uint64_t id) -{ - msg->setPersistenceId(id); -} - -void RecoverableMessageImpl::setRedelivered() -{ - msg->redeliver(); -} - -void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) -{ - dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue); -} - -void RecoverableQueueImpl::setPersistenceId(uint64_t id) -{ - queue->setPersistenceId(id); -} - -uint64_t RecoverableQueueImpl::getPersistenceId() const -{ - return queue->getPersistenceId(); -} - -const std::string& RecoverableQueueImpl::getName() const -{ - return queue->getName(); -} - -void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst) -{ - queue->setExternalQueueStore(inst); -} - -ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const -{ - return queue->getExternalQueueStore(); -} - -void RecoverableExchangeImpl::setPersistenceId(uint64_t id) -{ - exchange->setPersistenceId(id); -} - -void RecoverableConfigImpl::setPersistenceId(uint64_t id) -{ - if (link.get()) - link->setPersistenceId(id); - else if (bridge.get()) - bridge->setPersistenceId(id); -} - -void RecoverableExchangeImpl::bind(const string& queueName, - const string& key, - framing::FieldTable& args) -{ - Queue::shared_ptr queue = queues.find(queueName); - exchange->bind(queue, key, &args); - queue->bound(exchange->getName(), key, args); -} - -void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) -{ - buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg))); -} - -void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) -{ - buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); -} - -void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) -{ - dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue); -} - -void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) -{ - dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue); -} - -void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) -{ - dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message); -} - -void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message) -{ - dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message); -} - -}} |