summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp278
1 files changed, 0 insertions, 278 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
deleted file mode 100644
index d08409695e..0000000000
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/RecoveryManagerImpl.h"
-
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Link.h"
-#include "qpid/broker/Bridge.h"
-#include "qpid/broker/RecoveredEnqueue.h"
-#include "qpid/broker/RecoveredDequeue.h"
-#include "qpid/framing/reply_exceptions.h"
-
-using boost::dynamic_pointer_cast;
-using boost::intrusive_ptr;
-using std::string;
-
-namespace qpid {
-namespace broker {
-
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links,
- DtxManager& _dtxMgr)
- : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr) {}
-
-RecoveryManagerImpl::~RecoveryManagerImpl() {}
-
-class RecoverableMessageImpl : public RecoverableMessage
-{
- intrusive_ptr<Message> msg;
-public:
- RecoverableMessageImpl(const intrusive_ptr<Message>& _msg);
- ~RecoverableMessageImpl() {};
- void setPersistenceId(uint64_t id);
- void setRedelivered();
- bool loadContent(uint64_t available);
- void decodeContent(framing::Buffer& buffer);
- void recover(Queue::shared_ptr queue);
- void enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
- void dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue);
-};
-
-class RecoverableQueueImpl : public RecoverableQueue
-{
- Queue::shared_ptr queue;
-public:
- RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
- ~RecoverableQueueImpl() {};
- void setPersistenceId(uint64_t id);
- uint64_t getPersistenceId() const;
- const std::string& getName() const;
- void setExternalQueueStore(ExternalQueueStore* inst);
- ExternalQueueStore* getExternalQueueStore() const;
- void recover(RecoverableMessage::shared_ptr msg);
- void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
- void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
-};
-
-class RecoverableExchangeImpl : public RecoverableExchange
-{
- Exchange::shared_ptr exchange;
- QueueRegistry& queues;
-public:
- RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
- void setPersistenceId(uint64_t id);
- void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
-};
-
-class RecoverableConfigImpl : public RecoverableConfig
-{
- Link::shared_ptr link;
- Bridge::shared_ptr bridge;
-public:
- RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
- RecoverableConfigImpl(Bridge::shared_ptr _bridge) : bridge(_bridge) {}
- void setPersistenceId(uint64_t id);
-};
-
-class RecoverableTransactionImpl : public RecoverableTransaction
-{
- DtxBuffer::shared_ptr buffer;
-public:
- RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {}
- void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
- void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
-};
-
-RecoverableExchange::shared_ptr RecoveryManagerImpl::recoverExchange(framing::Buffer& buffer)
-{
- Exchange::shared_ptr e = Exchange::decode(exchanges, buffer);
- if (e) {
- return RecoverableExchange::shared_ptr(new RecoverableExchangeImpl(e, queues));
- } else {
- return RecoverableExchange::shared_ptr();
- }
-}
-
-RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
-{
- Queue::shared_ptr queue = Queue::restore(queues, buffer);
- try {
- Exchange::shared_ptr exchange = exchanges.getDefault();
- if (exchange) {
- exchange->bind(queue, queue->getName(), 0);
- queue->bound(exchange->getName(), queue->getName(), framing::FieldTable());
- }
- } catch (const framing::NotFoundException& /*e*/) {
- //assume no default exchange has been declared
- }
- return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue));
-}
-
-RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
-{
- boost::intrusive_ptr<Message> message(new Message());
- message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
-}
-
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
- std::auto_ptr<TPCTransactionContext> txn)
-{
- DtxBuffer::shared_ptr buffer(new DtxBuffer());
- dtxMgr.recover(xid, txn, buffer);
- return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
-}
-
-RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer& buffer)
-{
- string kind;
-
- buffer.getShortString (kind);
- if (kind == "link")
- return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Link::decode (links, buffer)));
- else if (kind == "bridge")
- return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer)));
-
- return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
-}
-
-void RecoveryManagerImpl::recoveryComplete()
-{
- //notify all queues and exchanges
- queues.eachQueue(boost::bind(&Queue::recoveryComplete, _1, boost::ref(exchanges)));
- exchanges.eachExchange(boost::bind(&Exchange::recoveryComplete, _1, boost::ref(exchanges)));
-}
-
-RecoverableMessageImpl:: RecoverableMessageImpl(const intrusive_ptr<Message>& _msg) : msg(_msg)
-{
- if (!msg->isPersistent()) {
- msg->forcePersistent(); // set so that message will get dequeued from store.
- }
-}
-
-bool RecoverableMessageImpl::loadContent(uint64_t /*available*/)
-{
- return true;
-}
-
-void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
-{
- msg->decodeContent(buffer);
-}
-
-void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
-{
- queue->recover(msg);
-}
-
-void RecoverableMessageImpl::setPersistenceId(uint64_t id)
-{
- msg->setPersistenceId(id);
-}
-
-void RecoverableMessageImpl::setRedelivered()
-{
- msg->redeliver();
-}
-
-void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
-{
- dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
-}
-
-void RecoverableQueueImpl::setPersistenceId(uint64_t id)
-{
- queue->setPersistenceId(id);
-}
-
-uint64_t RecoverableQueueImpl::getPersistenceId() const
-{
- return queue->getPersistenceId();
-}
-
-const std::string& RecoverableQueueImpl::getName() const
-{
- return queue->getName();
-}
-
-void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
-{
- queue->setExternalQueueStore(inst);
-}
-
-ExternalQueueStore* RecoverableQueueImpl::getExternalQueueStore() const
-{
- return queue->getExternalQueueStore();
-}
-
-void RecoverableExchangeImpl::setPersistenceId(uint64_t id)
-{
- exchange->setPersistenceId(id);
-}
-
-void RecoverableConfigImpl::setPersistenceId(uint64_t id)
-{
- if (link.get())
- link->setPersistenceId(id);
- else if (bridge.get())
- bridge->setPersistenceId(id);
-}
-
-void RecoverableExchangeImpl::bind(const string& queueName,
- const string& key,
- framing::FieldTable& args)
-{
- Queue::shared_ptr queue = queues.find(queueName);
- exchange->bind(queue, key, &args);
- queue->bound(exchange->getName(), key, args);
-}
-
-void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
-{
- buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
-}
-
-void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
-{
- buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
-}
-
-void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
-{
- dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue);
-}
-
-void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
-{
- dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue);
-}
-
-void RecoverableTransactionImpl::dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message)
-{
- dynamic_pointer_cast<RecoverableQueueImpl>(queue)->dequeue(buffer, message);
-}
-
-void RecoverableTransactionImpl::enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message)
-{
- dynamic_pointer_cast<RecoverableQueueImpl>(queue)->enqueue(buffer, message);
-}
-
-}}