From 80bfab9ed823cebd9f8f58b559fd32df108bcf7d Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Wed, 1 Aug 2012 14:05:21 +0000 Subject: QPID-3858: WIP: Moving Simple* test classes into the correct namespaces so as to correspond with broker classes. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368006 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp | 4 +- cpp/src/qpid/asyncStore/AsyncStoreImpl.h | 7 +- cpp/src/qpid/asyncStore/TxnHandleImpl.cpp | 4 +- cpp/src/qpid/asyncStore/TxnHandleImpl.h | 8 +- cpp/src/qpid/broker/AsyncStore.h | 8 +- cpp/src/qpid/broker/QueueAsyncContext.cpp | 6 +- cpp/src/qpid/broker/QueueAsyncContext.h | 8 +- cpp/src/qpid/broker/SimpleConsumer.h | 42 ++ cpp/src/qpid/broker/SimpleDeliverable.cpp | 40 ++ cpp/src/qpid/broker/SimpleDeliverable.h | 53 +++ cpp/src/qpid/broker/SimpleDeliveryRecord.cpp | 92 +++++ cpp/src/qpid/broker/SimpleDeliveryRecord.h | 59 +++ cpp/src/qpid/broker/SimpleMessage.cpp | 108 ++++++ cpp/src/qpid/broker/SimpleMessage.h | 73 ++++ cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp | 59 +++ cpp/src/qpid/broker/SimpleMessageAsyncContext.h | 55 +++ cpp/src/qpid/broker/SimpleMessageDeque.cpp | 59 +++ cpp/src/qpid/broker/SimpleMessageDeque.h | 57 +++ cpp/src/qpid/broker/SimpleMessages.h | 52 +++ cpp/src/qpid/broker/SimpleQueue.cpp | 448 ++++++++++++++++++++++ cpp/src/qpid/broker/SimpleQueue.h | 155 ++++++++ cpp/src/qpid/broker/SimpleQueuedMessage.cpp | 98 +++++ cpp/src/qpid/broker/SimpleQueuedMessage.h | 66 ++++ cpp/src/qpid/broker/SimpleTxnAccept.cpp | 73 ++++ cpp/src/qpid/broker/SimpleTxnAccept.h | 52 +++ cpp/src/qpid/broker/SimpleTxnBuffer.cpp | 254 ++++++++++++ cpp/src/qpid/broker/SimpleTxnBuffer.h | 89 +++++ cpp/src/qpid/broker/SimpleTxnOp.h | 44 +++ cpp/src/qpid/broker/SimpleTxnPublish.cpp | 101 +++++ cpp/src/qpid/broker/SimpleTxnPublish.h | 67 ++++ cpp/src/qpid/broker/TxnAsyncContext.cpp | 4 +- cpp/src/qpid/broker/TxnAsyncContext.h | 12 +- cpp/src/qpid/broker/TxnBuffer.cpp | 254 ------------ cpp/src/qpid/broker/TxnBuffer.h | 89 ----- cpp/src/qpid/broker/TxnOp.h | 44 --- 35 files changed, 2224 insertions(+), 420 deletions(-) create mode 100644 cpp/src/qpid/broker/SimpleConsumer.h create mode 100644 cpp/src/qpid/broker/SimpleDeliverable.cpp create mode 100644 cpp/src/qpid/broker/SimpleDeliverable.h create mode 100644 cpp/src/qpid/broker/SimpleDeliveryRecord.cpp create mode 100644 cpp/src/qpid/broker/SimpleDeliveryRecord.h create mode 100644 cpp/src/qpid/broker/SimpleMessage.cpp create mode 100644 cpp/src/qpid/broker/SimpleMessage.h create mode 100644 cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp create mode 100644 cpp/src/qpid/broker/SimpleMessageAsyncContext.h create mode 100644 cpp/src/qpid/broker/SimpleMessageDeque.cpp create mode 100644 cpp/src/qpid/broker/SimpleMessageDeque.h create mode 100644 cpp/src/qpid/broker/SimpleMessages.h create mode 100644 cpp/src/qpid/broker/SimpleQueue.cpp create mode 100644 cpp/src/qpid/broker/SimpleQueue.h create mode 100644 cpp/src/qpid/broker/SimpleQueuedMessage.cpp create mode 100644 cpp/src/qpid/broker/SimpleQueuedMessage.h create mode 100644 cpp/src/qpid/broker/SimpleTxnAccept.cpp create mode 100644 cpp/src/qpid/broker/SimpleTxnAccept.h create mode 100644 cpp/src/qpid/broker/SimpleTxnBuffer.cpp create mode 100644 cpp/src/qpid/broker/SimpleTxnBuffer.h create mode 100644 cpp/src/qpid/broker/SimpleTxnOp.h create mode 100644 cpp/src/qpid/broker/SimpleTxnPublish.cpp create mode 100644 cpp/src/qpid/broker/SimpleTxnPublish.h delete mode 100644 cpp/src/qpid/broker/TxnBuffer.cpp delete mode 100644 cpp/src/qpid/broker/TxnBuffer.h delete mode 100644 cpp/src/qpid/broker/TxnOp.h (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp index aa66e7adb8..2ee1d23025 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.cpp @@ -75,7 +75,7 @@ AsyncStoreImpl::createTxnHandle() } qpid::broker::TxnHandle -AsyncStoreImpl::createTxnHandle(qpid::broker::TxnBuffer* tb) +AsyncStoreImpl::createTxnHandle(qpid::broker::SimpleTxnBuffer* tb) { return qpid::broker::TxnHandle(new TxnHandleImpl(tb)); } @@ -90,7 +90,7 @@ AsyncStoreImpl::createTxnHandle(const std::string& xid, qpid::broker::TxnHandle AsyncStoreImpl::createTxnHandle(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb) + qpid::broker::SimpleTxnBuffer* tb) { return qpid::broker::TxnHandle(new TxnHandleImpl(xid, tpcFlag, tb)); } diff --git a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h index eb3f090ad7..40a7552a68 100644 --- a/cpp/src/qpid/asyncStore/AsyncStoreImpl.h +++ b/cpp/src/qpid/asyncStore/AsyncStoreImpl.h @@ -42,8 +42,7 @@ class Poller; namespace asyncStore { -class AsyncStoreImpl : public qpid::broker::AsyncTransactionalStore, - public qpid::broker::AsyncStore +class AsyncStoreImpl : public qpid::broker::AsyncStore { public: AsyncStoreImpl(boost::shared_ptr poller, @@ -59,12 +58,12 @@ public: // --- Interface from AsyncTransactionalStore --- qpid::broker::TxnHandle createTxnHandle(); - qpid::broker::TxnHandle createTxnHandle(qpid::broker::TxnBuffer* tb); + qpid::broker::TxnHandle createTxnHandle(qpid::broker::SimpleTxnBuffer* tb); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag); qpid::broker::TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb); + qpid::broker::SimpleTxnBuffer* tb); void submitPrepare(qpid::broker::TxnHandle& txnHandle, boost::shared_ptr TxnCtxt); diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp index dd644b29bd..50dce1b2af 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.cpp @@ -31,7 +31,7 @@ TxnHandleImpl::TxnHandleImpl() : m_txnBuffer(0) {} -TxnHandleImpl::TxnHandleImpl(qpid::broker::TxnBuffer* tb) : +TxnHandleImpl::TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb) : m_tpcFlag(false), m_txnBuffer(tb) {} @@ -44,7 +44,7 @@ TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag) : TxnHandleImpl::TxnHandleImpl(const std::string& xid, const bool tpcFlag, - qpid::broker::TxnBuffer* tb) : + qpid::broker::SimpleTxnBuffer* tb) : m_xid(xid), m_tpcFlag(tpcFlag), m_txnBuffer(tb) diff --git a/cpp/src/qpid/asyncStore/TxnHandleImpl.h b/cpp/src/qpid/asyncStore/TxnHandleImpl.h index e1f8afff3e..ce23665d5b 100644 --- a/cpp/src/qpid/asyncStore/TxnHandleImpl.h +++ b/cpp/src/qpid/asyncStore/TxnHandleImpl.h @@ -33,7 +33,7 @@ namespace qpid { namespace broker { -class TxnBuffer; +class SimpleTxnBuffer; } namespace asyncStore { @@ -42,9 +42,9 @@ class TxnHandleImpl : public virtual qpid::RefCounted { public: TxnHandleImpl(); - TxnHandleImpl(qpid::broker::TxnBuffer* tb); + TxnHandleImpl(qpid::broker::SimpleTxnBuffer* tb); TxnHandleImpl(const std::string& xid, const bool tpcFlag); - TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::TxnBuffer* tb); + TxnHandleImpl(const std::string& xid, const bool tpcFlag, qpid::broker::SimpleTxnBuffer* tb); virtual ~TxnHandleImpl(); const std::string& getXid() const; bool is2pc() const; @@ -52,7 +52,7 @@ public: private: std::string m_xid; bool m_tpcFlag; - qpid::broker::TxnBuffer* const m_txnBuffer; + qpid::broker::SimpleTxnBuffer* const m_txnBuffer; }; }} // namespace qpid::asyncStore diff --git a/cpp/src/qpid/broker/AsyncStore.h b/cpp/src/qpid/broker/AsyncStore.h index 6f1c02e059..7009565a7c 100644 --- a/cpp/src/qpid/broker/AsyncStore.h +++ b/cpp/src/qpid/broker/AsyncStore.h @@ -70,19 +70,19 @@ class TxnHandle; class QueueAsyncContext; class TpcTxnAsyncContext; class TxnAsyncContext; -class TxnBuffer; +class SimpleTxnBuffer; class AsyncTransactionalStore { public: virtual ~AsyncTransactionalStore() {} virtual TxnHandle createTxnHandle() = 0; - virtual TxnHandle createTxnHandle(TxnBuffer* tb) = 0; + virtual TxnHandle createTxnHandle(SimpleTxnBuffer* tb) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag) = 0; virtual TxnHandle createTxnHandle(const std::string& xid, const bool tpcFlag, - TxnBuffer* tb) = 0; + SimpleTxnBuffer* tb) = 0; virtual void submitPrepare(TxnHandle&, boost::shared_ptr) = 0; // Distributed txns only @@ -94,7 +94,7 @@ public: }; // Subclassed by store: -class AsyncStore { +class AsyncStore : public AsyncTransactionalStore { public: virtual ~AsyncStore() {} diff --git a/cpp/src/qpid/broker/QueueAsyncContext.cpp b/cpp/src/qpid/broker/QueueAsyncContext.cpp index 4bd2d271eb..02eb2e9546 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.cpp +++ b/cpp/src/qpid/broker/QueueAsyncContext.cpp @@ -48,7 +48,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, {} QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -61,7 +61,7 @@ QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, QueueAsyncContext::QueueAsyncContext(boost::shared_ptr q, boost::intrusive_ptr msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : m_q(q), @@ -89,7 +89,7 @@ QueueAsyncContext::getMessage() const return m_msg; } -TxnBuffer* +SimpleTxnBuffer* QueueAsyncContext::getTxnBuffer() const { return m_tb; } diff --git a/cpp/src/qpid/broker/QueueAsyncContext.h b/cpp/src/qpid/broker/QueueAsyncContext.h index e9ba2ebbac..8657922377 100644 --- a/cpp/src/qpid/broker/QueueAsyncContext.h +++ b/cpp/src/qpid/broker/QueueAsyncContext.h @@ -52,18 +52,18 @@ public: AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr q, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); QueueAsyncContext(boost::shared_ptr q, boost::intrusive_ptr msg, - TxnBuffer* tb, + SimpleTxnBuffer* tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~QueueAsyncContext(); boost::shared_ptr getQueue() const; boost::intrusive_ptr getMessage() const; - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; AsyncResultQueue* getAsyncResultQueue() const; AsyncResultCallback getAsyncResultCallback() const; void invokeCallback(const AsyncResultHandle* const arh) const; @@ -72,7 +72,7 @@ public: private: boost::shared_ptr m_q; boost::intrusive_ptr m_msg; - TxnBuffer* m_tb; + SimpleTxnBuffer* m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; diff --git a/cpp/src/qpid/broker/SimpleConsumer.h b/cpp/src/qpid/broker/SimpleConsumer.h new file mode 100644 index 0000000000..6601c65a42 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleConsumer.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleConsumer.h + */ + +#ifndef qpid_broker_SimpleConsumer_h_ +#define qpid_broker_SimpleConsumer_h_ + +#include + +namespace qpid { +namespace broker { +class SimpleDeliveryRecord; + +class SimpleConsumer { +public: + virtual ~SimpleConsumer() {} + virtual void commitComplete() = 0; + virtual void record(boost::shared_ptr dr) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleConsumer_h_ diff --git a/cpp/src/qpid/broker/SimpleDeliverable.cpp b/cpp/src/qpid/broker/SimpleDeliverable.cpp new file mode 100644 index 0000000000..7037a377c5 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliverable.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliverable.cpp + */ + +#include "SimpleDeliverable.h" + +namespace qpid { +namespace broker { + +SimpleDeliverable::SimpleDeliverable() : + m_delivered(false) +{} + +SimpleDeliverable::~SimpleDeliverable() {} + +bool +SimpleDeliverable::isDelivered() const { + return m_delivered; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleDeliverable.h b/cpp/src/qpid/broker/SimpleDeliverable.h new file mode 100644 index 0000000000..6441e14841 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliverable.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliverable.h + */ + +#ifndef qpid_broker_SimpleDeliverable_h_ +#define qpid_broker_SimpleDeliverable_h_ + +#include +#include // uint64_t + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleDeliverable +{ +public: + SimpleDeliverable(); + virtual ~SimpleDeliverable(); + + virtual uint64_t contentSize() = 0; + virtual void deliverTo(const boost::shared_ptr& queue) = 0; + virtual SimpleMessage& getMessage() = 0; + virtual bool isDelivered() const; + +protected: + bool m_delivered; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleDeliverable_h_ diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp new file mode 100644 index 0000000000..b71df6975b --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.cpp @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliveryRecord.cpp + */ + +#include "SimpleDeliveryRecord.h" + +#include "SimpleConsumer.h" +#include "SimpleMessage.h" +#include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" + +namespace qpid { +namespace broker { + +SimpleDeliveryRecord::SimpleDeliveryRecord(boost::shared_ptr qm, + SimpleConsumer& sc, + bool accepted) : + m_queuedMessage(qm), + m_msgConsumer(sc), + m_accepted(accepted), + m_ended(accepted) +{} + +SimpleDeliveryRecord::~SimpleDeliveryRecord() {} + +bool +SimpleDeliveryRecord::accept() { + if (!m_ended) { + m_queuedMessage->getQueue()->dequeue(m_queuedMessage); + m_accepted = true; + setEnded(); + } + return isRedundant(); +} + +bool +SimpleDeliveryRecord::isAccepted() const { + return m_accepted; +} + +bool +SimpleDeliveryRecord::setEnded() { + m_ended = true; + m_queuedMessage->payload() = boost::intrusive_ptr(0); + return isRedundant(); +} + +bool +SimpleDeliveryRecord::isEnded() const { + return m_ended; +} + +bool +SimpleDeliveryRecord::isRedundant() const { + return m_ended; +} + +void +SimpleDeliveryRecord::dequeue(qpid::broker::SimpleTxnBuffer* tb) { + m_queuedMessage->getQueue()->dequeue(tb, m_queuedMessage); +} + +void +SimpleDeliveryRecord::committed() const { + m_msgConsumer.commitComplete(); +} + +boost::shared_ptr +SimpleDeliveryRecord::getQueuedMessage() const { + return m_queuedMessage; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleDeliveryRecord.h b/cpp/src/qpid/broker/SimpleDeliveryRecord.h new file mode 100644 index 0000000000..622ce578d7 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleDeliveryRecord.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleDeliveryRecord.h + */ + +#ifndef qpid_broker_SimpleDeliveryRecord_h_ +#define qpid_broker_SimpleDeliveryRecord_h_ + +#include + +namespace qpid { +namespace broker { + +class SimpleConsumer; +class SimpleQueuedMessage; +class SimpleTxnBuffer; + +class SimpleDeliveryRecord { +public: + SimpleDeliveryRecord(boost::shared_ptr qm, + SimpleConsumer& sc, + bool accepted); + virtual ~SimpleDeliveryRecord(); + bool accept(); + bool isAccepted() const; + bool setEnded(); + bool isEnded() const; + bool isRedundant() const; + void dequeue(qpid::broker::SimpleTxnBuffer* tb); + void committed() const; + boost::shared_ptr getQueuedMessage() const; +private: + boost::shared_ptr m_queuedMessage; + SimpleConsumer& m_msgConsumer; + bool m_accepted : 1; + bool m_ended : 1; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleDeliveryRecord_h_ diff --git a/cpp/src/qpid/broker/SimpleMessage.cpp b/cpp/src/qpid/broker/SimpleMessage.cpp new file mode 100644 index 0000000000..1239533edf --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessage.cpp @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.cpp + */ + +#include "SimpleMessage.h" + +#include // memcpy() + +namespace qpid { +namespace broker { + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_store(0), + m_msgHandle(MessageHandle()) +{} + +SimpleMessage::SimpleMessage(const char* msgData, + const uint32_t msgSize, + AsyncStore* store) : + m_persistenceId(0ULL), + m_msg(msgData, static_cast(msgSize)), + m_store(store), + m_msgHandle(store ? store->createMessageHandle(this) : MessageHandle()) +{} + +SimpleMessage::~SimpleMessage() {} + +const MessageHandle& +SimpleMessage::getHandle() const { + return m_msgHandle; +} + +MessageHandle& +SimpleMessage::getHandle() { + return m_msgHandle; +} + +uint64_t +SimpleMessage::contentSize() const { + return static_cast(m_msg.size()); +} + +void +SimpleMessage::setPersistenceId(uint64_t id) const { + m_persistenceId = id; +} + +uint64_t +SimpleMessage::getPersistenceId() const { + return m_persistenceId; +} + +void +SimpleMessage::encode(qpid::framing::Buffer& buffer) const { + buffer.putRawData(m_msg); +} + +uint32_t +SimpleMessage::encodedSize() const { + return static_cast(m_msg.size()); +} + +void +SimpleMessage::allDequeuesComplete() {} + +uint32_t +SimpleMessage::encodedHeaderSize() const { + return 0; +} + +bool +SimpleMessage::isPersistent() const { + return m_store != 0; +} + +uint64_t +SimpleMessage::getSize() { + return m_msg.size(); +} + +void +SimpleMessage::write(char* target) { + ::memcpy(target, m_msg.data(), m_msg.size()); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessage.h b/cpp/src/qpid/broker/SimpleMessage.h new file mode 100644 index 0000000000..edfaa8d13b --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessage.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessage.h + */ + +#ifndef qpid_broker_SimpleMessage_h_ +#define qpid_broker_SimpleMessage_h_ + +#include "AsyncStore.h" // DataSource +#include "MessageHandle.h" +#include "PersistableMessage.h" + +namespace qpid { +namespace broker { + +class SimpleMessage: public PersistableMessage, + public DataSource +{ +public: + SimpleMessage(const char* msgData, + const uint32_t msgSize); + SimpleMessage(const char* msgData, + const uint32_t msgSize, + AsyncStore* store); + virtual ~SimpleMessage(); + const MessageHandle& getHandle() const; + MessageHandle& getHandle(); + uint64_t contentSize() const; + + // --- Interface Persistable --- + virtual void setPersistenceId(uint64_t id) const; + virtual uint64_t getPersistenceId() const; + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + + // --- Interface PersistableMessage --- + virtual void allDequeuesComplete(); + virtual uint32_t encodedHeaderSize() const; + virtual bool isPersistent() const; + + // --- Interface DataSource --- + virtual uint64_t getSize(); // <- same as encodedSize()? + virtual void write(char* target); + +private: + mutable uint64_t m_persistenceId; + const std::string m_msg; + AsyncStore* m_store; + + MessageHandle m_msgHandle; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessage_h_ diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp new file mode 100644 index 0000000000..a88258f5bc --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageAsyncContext.cpp + */ + +#include "SimpleMessageAsyncContext.h" + +#include "SimpleMessage.h" + +#include + +namespace qpid { +namespace broker { + +SimpleMessageAsyncContext::SimpleMessageAsyncContext(boost::intrusive_ptr msg, + boost::shared_ptr q) : + m_msg(msg), + m_q(q) +{ + assert(m_msg.get() != 0); + assert(m_q.get() != 0); +} + +SimpleMessageAsyncContext::~SimpleMessageAsyncContext() {} + +boost::intrusive_ptr +SimpleMessageAsyncContext::getMessage() const { + return m_msg; +} + +boost::shared_ptr +SimpleMessageAsyncContext::getQueue() const { + return m_q; +} + +void +SimpleMessageAsyncContext::destroy() { + delete this; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessageAsyncContext.h b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h new file mode 100644 index 0000000000..e3975e790e --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageAsyncContext.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageAsyncContext.h + */ + +#ifndef qpid_broker_SimpleMessageAsyncContext_h_ +#define qpid_broker_SimpleMessageAsyncContext_h_ + +#include "AsyncStore.h" // BrokerAsyncContext + +#include +#include + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleMessageAsyncContext : public BrokerAsyncContext +{ +public: + SimpleMessageAsyncContext(boost::intrusive_ptr msg, + boost::shared_ptr q); + virtual ~SimpleMessageAsyncContext(); + boost::intrusive_ptr getMessage() const; + boost::shared_ptr getQueue() const; + void destroy(); + +private: + boost::intrusive_ptr m_msg; + boost::shared_ptr m_q; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessageAsyncContext_h_ diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.cpp b/cpp/src/qpid/broker/SimpleMessageDeque.cpp new file mode 100644 index 0000000000..0aadcfd94a --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageDeque.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageDeque.cpp + */ + +#include "SimpleMessageDeque.h" + +#include "SimpleQueuedMessage.h" + +namespace qpid { +namespace broker { + +SimpleMessageDeque::SimpleMessageDeque() {} + +SimpleMessageDeque::~SimpleMessageDeque() {} + +uint32_t +SimpleMessageDeque::size() { + qpid::sys::ScopedLock l(m_msgMutex); + return m_messages.size(); +} + +bool +SimpleMessageDeque::push(boost::shared_ptr& added) { + qpid::sys::ScopedLock l(m_msgMutex); + m_messages.push_back(added); + return false; +} + +bool +SimpleMessageDeque::consume(boost::shared_ptr& msg) { + qpid::sys::ScopedLock l(m_msgMutex); + if (!m_messages.empty()) { + msg = m_messages.front(); + m_messages.pop_front(); + return true; + } + return false; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleMessageDeque.h b/cpp/src/qpid/broker/SimpleMessageDeque.h new file mode 100644 index 0000000000..5db0755a43 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessageDeque.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessageDeque.h + */ + +/* + * This is a copy of qpid::broker::MessageDeque.h, but using the local + * SimpleQueuedMessage class instead of QueuedMessage. + */ + +#ifndef qpid_broker_SimpleMessageDeque_h_ +#define qpid_broker_SimpleMessageDeque_h_ + +#include "SimpleMessages.h" + +#include "qpid/sys/Mutex.h" + +#include + +namespace qpid { +namespace broker { + +class SimpleMessageDeque : public SimpleMessages +{ +public: + SimpleMessageDeque(); + virtual ~SimpleMessageDeque(); + uint32_t size(); + bool push(boost::shared_ptr& added); + bool consume(boost::shared_ptr& msg); +private: + std::deque > m_messages; + qpid::sys::Mutex m_msgMutex; + +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessageDeque_h_ diff --git a/cpp/src/qpid/broker/SimpleMessages.h b/cpp/src/qpid/broker/SimpleMessages.h new file mode 100644 index 0000000000..2a40859032 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleMessages.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleMessages.h + */ + +/* + * This is a copy of qpid::broker::Messages.h, but using the local + * tests::storePerftools::asyncPerf::QueuedMessage class instead of + * qpid::broker::QueuedMessage. + */ + +#ifndef qpid_broker_SimpleMessages_h_ +#define qpid_broker_SimpleMessages_h_ + +#include +#include + +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; + +class SimpleMessages +{ +public: + virtual ~SimpleMessages() {} + virtual uint32_t size() = 0; + virtual bool push(boost::shared_ptr& added) = 0; + virtual bool consume(boost::shared_ptr& msg) = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleMessages_h_ diff --git a/cpp/src/qpid/broker/SimpleQueue.cpp b/cpp/src/qpid/broker/SimpleQueue.cpp new file mode 100644 index 0000000000..5cd8841f94 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueue.cpp @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.cpp + */ + +#include "SimpleQueue.h" + +#include "AsyncResultHandle.h" +#include "QueueAsyncContext.h" +#include "SimpleConsumer.h" +#include "SimpleDeliveryRecord.h" +#include "SimpleMessage.h" +#include "SimpleMessageDeque.h" +#include "SimpleQueuedMessage.h" +#include "SimpleTxnBuffer.h" + +#include // memcpy() + +namespace qpid { +namespace broker { + +//static +TxnHandle SimpleQueue::s_nullTxnHandle; // used for non-txn operations + + +SimpleQueue::SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& /*args*/, + AsyncStore* store, + AsyncResultQueue& arq) : + PersistableQueue(), + m_name(name), + m_store(store), + m_resultQueue(arq), + m_asyncOpCounter(0UL), + m_persistenceId(0ULL), + m_persistableData(m_name), // TODO: Currently queue durable data consists only of the queue name. Update this. + m_destroyPending(false), + m_destroyed(false), + m_barrier(*this), + m_messages(new SimpleMessageDeque()) +{ + if (m_store != 0) { + const qpid::types::Variant::Map qo; + m_queueHandle = m_store->createQueueHandle(m_name, qo); + } +} + +SimpleQueue::~SimpleQueue() {} + +const QueueHandle& +SimpleQueue::getHandle() const { + return m_queueHandle; +} + +QueueHandle& +SimpleQueue::getHandle() { + return m_queueHandle; +} + +AsyncStore* +SimpleQueue::getStore() { + return m_store; +} + +void +SimpleQueue::asyncCreate() { + if (m_store) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncCreateResult, + &m_resultQueue)); + m_store->submitCreate(m_queueHandle, this, qac); + ++m_asyncOpCounter; + } +} + +//static +void +SimpleQueue::handleAsyncCreateResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->createComplete(qc); + } + } +} + +void +SimpleQueue::asyncDestroy(const bool deleteQueue) +{ + m_destroyPending = true; + if (m_store) { + if (deleteQueue) { + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + &handleAsyncDestroyResult, + &m_resultQueue)); + m_store->submitDestroy(m_queueHandle, qac); + ++m_asyncOpCounter; + } + m_asyncOpCounter.waitForZero(qpid::sys::Duration(10UL*1000*1000*1000)); + } +} + +//static +void +SimpleQueue::handleAsyncDestroyResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = + boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->destroyComplete(qc); + } + } +} + +void +SimpleQueue::deliver(boost::intrusive_ptr msg) { + boost::shared_ptr qm(boost::shared_ptr(new SimpleQueuedMessage(this, msg))); + enqueue(qm); + push(qm); +} + +bool +SimpleQueue::dispatch(SimpleConsumer& sc) { + boost::shared_ptr qm; + if (m_messages->consume(qm)) { + boost::shared_ptr dr(new SimpleDeliveryRecord(qm, sc, false)); + sc.record(dr); + return true; + } + return false; +} + +bool +SimpleQueue::enqueue(boost::shared_ptr qm) { + return enqueue(0, qm); +} + +bool +SimpleQueue::enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->enqueueAsync(shared_from_this(), m_store); + return asyncEnqueue(tb, qm); + } + return false; +} + +bool +SimpleQueue::dequeue(boost::shared_ptr qm) { + return dequeue(0, qm); +} + +bool +SimpleQueue::dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + ScopedUse u(m_barrier); + if (!u.m_acquired) { + return false; + } + if (qm->payload()->isPersistent() && m_store) { + qm->payload()->dequeueAsync(shared_from_this(), m_store); + return asyncDequeue(tb, qm); + } + return true; +} + +void +SimpleQueue::process(boost::intrusive_ptr msg) { + push(boost::shared_ptr(new SimpleQueuedMessage(this, msg))); +} + +void +SimpleQueue::enqueueAborted(boost::intrusive_ptr) {} + +void +SimpleQueue::encode(qpid::framing::Buffer& buffer) const { + buffer.putShortString(m_name); +} + +uint32_t +SimpleQueue::encodedSize() const { + return m_name.size() + 1; +} + +uint64_t +SimpleQueue::getPersistenceId() const { + return m_persistenceId; +} + +void +SimpleQueue::setPersistenceId(uint64_t persistenceId) const { + m_persistenceId = persistenceId; +} + +void +SimpleQueue::flush() { + //if(m_store) m_store->flush(*this); +} + +const std::string& +SimpleQueue::getName() const { + return m_name; +} + +void +SimpleQueue::setExternalQueueStore(ExternalQueueStore* inst) { + if (externalQueueStore != inst && externalQueueStore) + delete externalQueueStore; + externalQueueStore = inst; +} + +uint64_t +SimpleQueue::getSize() { + return m_persistableData.size(); +} + +void +SimpleQueue::write(char* target) { + ::memcpy(target, m_persistableData.data(), m_persistableData.size()); +} + +// --- Members & methods in msg handling path from qpid::Queue --- + +// protected +SimpleQueue::UsageBarrier::UsageBarrier(SimpleQueue& q) : + m_parent(q), + m_count(0) +{} + +// protected +bool +SimpleQueue::UsageBarrier::acquire() { + qpid::sys::Monitor::ScopedLock l(m_monitor); + if (m_parent.m_destroyed) { + return false; + } else { + ++m_count; + return true; + } +} + +// protected +void SimpleQueue::UsageBarrier::release() { + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + if (--m_count == 0) { + m_monitor.notifyAll(); + } +} + +// protected +void SimpleQueue::UsageBarrier::destroy() { + qpid::sys::Monitor::Monitor::ScopedLock l(m_monitor); + m_parent.m_destroyed = true; + while (m_count) { + m_monitor.wait(); + } +} + +// protected +SimpleQueue::ScopedUse::ScopedUse(UsageBarrier& b) : + m_barrier(b), + m_acquired(m_barrier.acquire()) +{} + +// protected +SimpleQueue::ScopedUse::~ScopedUse() { + if (m_acquired) { + m_barrier.release(); + } +} + +// private +void +SimpleQueue::push(boost::shared_ptr qm, + bool /*isRecovery*/) { + m_messages->push(qm); +} + +// --- End Members & methods in msg handling path from qpid::Queue --- + +// private +bool +SimpleQueue::asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + assert(qm.get()); + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncEnqueueResult, + &m_resultQueue)); + if (tb) { + tb->incrOpCnt(); + m_store->submitEnqueue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitEnqueue(qm->enqHandle(), s_nullTxnHandle, qac); + } + ++m_asyncOpCounter; + return true; +} + +// private static +void +SimpleQueue::handleAsyncEnqueueResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = + boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->enqueueComplete(qc); + } + } +} + +// private +bool +SimpleQueue::asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm) { + assert(qm.get()); + boost::shared_ptr qac(new QueueAsyncContext(shared_from_this(), + qm->payload(), + tb, + &handleAsyncDequeueResult, + &m_resultQueue)); + if (tb) { + tb->incrOpCnt(); + m_store->submitDequeue(qm->enqHandle(), tb->getTxnHandle(), qac); + } else { + m_store->submitDequeue(qm->enqHandle(), s_nullTxnHandle, qac); + } + ++m_asyncOpCounter; + return true; +} + +// private static +void +SimpleQueue::handleAsyncDequeueResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr qc = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + boost::shared_ptr sq = boost::dynamic_pointer_cast(qc->getQueue()); + if (arh->getErrNo()) { + // TODO: Handle async failure here (other than by simply printing a message) + std::cerr << "Queue name=\"" << sq->m_name << "\": Operation " << qc->getOpStr() << ": failure " + << arh->getErrNo() << " (" << arh->getErrMsg() << ")" << std::endl; + } else { + sq->dequeueComplete(qc); + } + } +} + +// private +void +SimpleQueue::destroyCheck(const std::string& opDescr) const { + if (m_destroyPending || m_destroyed) { + std::ostringstream oss; + oss << opDescr << " on queue \"" << m_name << "\" after call to destroy"; + throw qpid::Exception(oss.str()); + } +} + +// private +void +SimpleQueue::createComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::flushComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::destroyComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + } + --m_asyncOpCounter; + m_destroyed = true; +} + +// private +void +SimpleQueue::enqueueComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } + } + --m_asyncOpCounter; +} + +// private +void +SimpleQueue::dequeueComplete(const boost::shared_ptr qc) { + if (qc.get()) { + assert(qc->getQueue().get() == this); + if (qc->getTxnBuffer()) { // transactional enqueue + qc->getTxnBuffer()->decrOpCnt(); + } + } + --m_asyncOpCounter; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleQueue.h b/cpp/src/qpid/broker/SimpleQueue.h new file mode 100644 index 0000000000..c2f21076cd --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueue.h @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueue.h + */ + +#ifndef qpid_broker_SimpleQueue_h_ +#define qpid_broker_SimpleQueue_h_ + +#include "qpid/asyncStore/AtomicCounter.h" // AsyncOpCounter +#include "qpid/broker/AsyncStore.h" // DataSource +#include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/QueueHandle.h" +#include "qpid/sys/Monitor.h" + +#include +#include +#include + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace broker { + +class AsyncResultQueue; +class QueueAsyncContext; +class SimpleConsumer; +class SimpleMessages; +class SimpleQueuedMessage; +class SimpleMessage; +class SimpleTxnBuffer; + +class SimpleQueue : public boost::enable_shared_from_this, + public PersistableQueue, + public DataSource +{ +public: + SimpleQueue(const std::string& name, + const qpid::framing::FieldTable& args, + AsyncStore* store, + AsyncResultQueue& arq); + virtual ~SimpleQueue(); + + const QueueHandle& getHandle() const; + QueueHandle& getHandle(); + AsyncStore* getStore(); + + void asyncCreate(); + static void handleAsyncCreateResult(const AsyncResultHandle* const arh); + void asyncDestroy(const bool deleteQueue); + static void handleAsyncDestroyResult(const AsyncResultHandle* const arh); + + // --- Methods in msg handling path from qpid::Queue --- + void deliver(boost::intrusive_ptr msg); + bool dispatch(SimpleConsumer& sc); + bool enqueue(boost::shared_ptr qm); + bool enqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + bool dequeue(boost::shared_ptr qm); + bool dequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + void process(boost::intrusive_ptr msg); + void enqueueAborted(boost::intrusive_ptr msg); + + // --- Interface qpid::broker::Persistable --- + virtual void encode(qpid::framing::Buffer& buffer) const; + virtual uint32_t encodedSize() const; + virtual uint64_t getPersistenceId() const; + virtual void setPersistenceId(uint64_t persistenceId) const; + + // --- Interface qpid::broker::PersistableQueue --- + virtual void flush(); + virtual const std::string& getName() const; + virtual void setExternalQueueStore(ExternalQueueStore* inst); + + // --- Interface qpid::broker::DataStore --- + virtual uint64_t getSize(); + virtual void write(char* target); + +private: + static TxnHandle s_nullTxnHandle; // used for non-txn operations + + const std::string m_name; + AsyncStore* m_store; + AsyncResultQueue& m_resultQueue; + qpid::asyncStore::AsyncOpCounter m_asyncOpCounter; // TODO: change this to non-async store counter! + mutable uint64_t m_persistenceId; + std::string m_persistableData; + QueueHandle m_queueHandle; + bool m_destroyPending; + bool m_destroyed; + + // --- Members & methods in msg handling path copied from qpid::Queue --- + struct UsageBarrier { + SimpleQueue& m_parent; + uint32_t m_count; + qpid::sys::Monitor m_monitor; + UsageBarrier(SimpleQueue& q); + bool acquire(); + void release(); + void destroy(); + }; + struct ScopedUse { + UsageBarrier& m_barrier; + const bool m_acquired; + ScopedUse(UsageBarrier& b); + ~ScopedUse(); + }; + UsageBarrier m_barrier; + std::auto_ptr m_messages; + void push(boost::shared_ptr qm, + bool isRecovery = false); + + // -- Async ops --- + bool asyncEnqueue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + static void handleAsyncEnqueueResult(const AsyncResultHandle* const arh); + bool asyncDequeue(SimpleTxnBuffer* tb, + boost::shared_ptr qm); + static void handleAsyncDequeueResult(const AsyncResultHandle* const arh); + + // --- Async op counter --- + void destroyCheck(const std::string& opDescr) const; + + // --- Async op completions (called through handleAsyncResult) --- + void createComplete(const boost::shared_ptr qc); + void flushComplete(const boost::shared_ptr qc); + void destroyComplete(const boost::shared_ptr qc); + void enqueueComplete(const boost::shared_ptr qc); + void dequeueComplete(const boost::shared_ptr qc); +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleQueue_h_ diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.cpp b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp new file mode 100644 index 0000000000..35ac799ecc --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.cpp @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueuedMessage.cpp + */ + +#include "SimpleQueuedMessage.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" + +namespace qpid { +namespace broker { + +SimpleQueuedMessage::SimpleQueuedMessage() : + m_queue(0) +{} + +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr msg) : + boost::enable_shared_from_this(), + m_queue(q), + m_msg(msg) +{ + if (m_queue->getStore()) { + m_enqHandle = q->getStore()->createEnqueueHandle(msg->getHandle(), q->getHandle()); + } +} + +SimpleQueuedMessage::SimpleQueuedMessage(const SimpleQueuedMessage& qm) : + boost::enable_shared_from_this(), + m_queue(qm.m_queue), + m_msg(qm.m_msg), + m_enqHandle(qm.m_enqHandle) +{} + +SimpleQueuedMessage::SimpleQueuedMessage(SimpleQueuedMessage* const qm) : + boost::enable_shared_from_this(), + m_queue(qm->m_queue), + m_msg(qm->m_msg), + m_enqHandle(qm->m_enqHandle) +{} + +SimpleQueuedMessage::~SimpleQueuedMessage() {} + +SimpleQueue* +SimpleQueuedMessage::getQueue() const { + return m_queue; +} + +boost::intrusive_ptr +SimpleQueuedMessage::payload() const { + return m_msg; +} + +const EnqueueHandle& +SimpleQueuedMessage::enqHandle() const { + return m_enqHandle; +} + +EnqueueHandle& +SimpleQueuedMessage::enqHandle() { + return m_enqHandle; +} + +void +SimpleQueuedMessage::prepareEnqueue(SimpleTxnBuffer* tb) { + m_queue->enqueue(tb, shared_from_this()); +} + +void +SimpleQueuedMessage::commitEnqueue() { + m_queue->process(m_msg); +} + +void +SimpleQueuedMessage::abortEnqueue() { + m_queue->enqueueAborted(m_msg); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleQueuedMessage.h b/cpp/src/qpid/broker/SimpleQueuedMessage.h new file mode 100644 index 0000000000..1172eb73f3 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleQueuedMessage.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleQueuedMessage.h + */ + +#ifndef qpid_broker_SimpleQueuedMessage_h_ +#define qpid_broker_SimpleQueuedMessage_h_ + +#include "AsyncStore.h" +#include "EnqueueHandle.h" + +#include +#include + +namespace qpid { +namespace broker { + +class SimpleMessage; +class SimpleQueue; + +class SimpleQueuedMessage : public boost::enable_shared_from_this +{ +public: + SimpleQueuedMessage(); + SimpleQueuedMessage(SimpleQueue* q, + boost::intrusive_ptr msg); + SimpleQueuedMessage(const SimpleQueuedMessage& qm); + SimpleQueuedMessage(SimpleQueuedMessage* const qm); + virtual ~SimpleQueuedMessage(); + SimpleQueue* getQueue() const; + boost::intrusive_ptr payload() const; + const EnqueueHandle& enqHandle() const; + EnqueueHandle& enqHandle(); + + // --- Transaction handling --- + void prepareEnqueue(qpid::broker::SimpleTxnBuffer* tb); + void commitEnqueue(); + void abortEnqueue(); + +private: + SimpleQueue* m_queue; + boost::intrusive_ptr m_msg; + qpid::broker::EnqueueHandle m_enqHandle; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleQueuedMessage_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.cpp b/cpp/src/qpid/broker/SimpleTxnAccept.cpp new file mode 100644 index 0000000000..343bbb54c7 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnAccept.cpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnAccept.cpp + */ + +#include "SimpleTxnAccept.h" + +#include "SimpleDeliveryRecord.h" + +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +SimpleTxnAccept::SimpleTxnAccept(std::deque >& ops) : + m_ops(ops) +{} + +SimpleTxnAccept::~SimpleTxnAccept() {} + +// --- Interface TxnOp --- + +bool +SimpleTxnAccept::prepare(SimpleTxnBuffer* tb) throw() { + try { + for (std::deque >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->dequeue(tb); + } + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnAccept: Failed to prepare transaction: (unknown error)"); + } + return false; +} + +void +SimpleTxnAccept::commit() throw() { + try { + for (std::deque >::iterator i=m_ops.begin(); i!=m_ops.end(); ++i) { + (*i)->committed(); + (*i)->setEnded(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnAccept: Failed to commit transaction: " << e.what()); + } catch(...) { + QPID_LOG(error, "TxnAccept: Failed to commit transaction: (unknown error)"); + } +} + +void +SimpleTxnAccept::rollback() throw() {} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnAccept.h b/cpp/src/qpid/broker/SimpleTxnAccept.h new file mode 100644 index 0000000000..eb6963bc88 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnAccept.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnAccept.h + */ + +#ifndef tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ +#define tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ + +#include "SimpleTxnOp.h" + +#include "boost/shared_ptr.hpp" +#include + +namespace qpid { +namespace broker { + +class SimpleDeliveryRecord; + +class SimpleTxnAccept: public SimpleTxnOp { +public: + SimpleTxnAccept(std::deque >& ops); + virtual ~SimpleTxnAccept(); + + // --- Interface TxnOp --- + bool prepare(SimpleTxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); +private: + std::deque > m_ops; +}; + +}} // namespace qpid::broker + +#endif // tests_storePerftools_asyncPerf_SimpleTxnAccept_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnBuffer.cpp b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp new file mode 100644 index 0000000000..d72a785c2a --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.cpp @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnBuffer.cpp + */ + +#include "SimpleTxnBuffer.h" + +#include "AsyncResultHandle.h" +#include "SimpleTxnOp.h" +#include "TxnAsyncContext.h" + +#include "qpid/log/Statement.h" + +#include + +namespace qpid { +namespace broker { + +qpid::sys::Mutex SimpleTxnBuffer::s_uuidMutex; + +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq) : + m_store(0), + m_resultQueue(arq), + m_tpcFlag(false), + m_submitOpCnt(0), + m_completeOpCnt(0), + m_state(NONE) +{ + createLocalXid(); +} + +SimpleTxnBuffer::SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid) : + m_store(0), + m_resultQueue(arq), + m_xid(xid), + m_tpcFlag(!xid.empty()), + m_submitOpCnt(0), + m_completeOpCnt(0), + m_state(NONE) +{ + if (m_xid.empty()) { + createLocalXid(); + } +} + +SimpleTxnBuffer::~SimpleTxnBuffer() {} + +TxnHandle& +SimpleTxnBuffer::getTxnHandle() { + return m_txnHandle; +} + +const std::string& +SimpleTxnBuffer::getXid() const { + return m_xid; +} + +bool +SimpleTxnBuffer::is2pc() const { + return m_tpcFlag; +} + +void +SimpleTxnBuffer::incrOpCnt() { + qpid::sys::ScopedLock l(m_submitOpCntMutex); + ++m_submitOpCnt; +} + +void +SimpleTxnBuffer::decrOpCnt() { + const uint32_t numOps = getNumOps(); + qpid::sys::ScopedLock l2(m_completeOpCntMutex); + qpid::sys::ScopedLock l3(m_submitOpCntMutex); + if (m_completeOpCnt == m_submitOpCnt) { + throw qpid::Exception("Transaction async operation count underflow"); + } + ++m_completeOpCnt; + if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) { + asyncLocalCommit(); + } +} + +void +SimpleTxnBuffer::enlist(boost::shared_ptr op) { + qpid::sys::ScopedLock l(m_opsMutex); + m_ops.push_back(op); +} + +bool +SimpleTxnBuffer::prepare() { + qpid::sys::ScopedLock l(m_opsMutex); + for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + if (!(*i)->prepare(this)) { + return false; + } + } + return true; +} + +void +SimpleTxnBuffer::commit() { + qpid::sys::ScopedLock l(m_opsMutex); + for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->commit(); + } + m_ops.clear(); +} + +void +SimpleTxnBuffer::rollback() { + qpid::sys::ScopedLock l(m_opsMutex); + for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { + (*i)->rollback(); + } + m_ops.clear(); +} + +bool +SimpleTxnBuffer::commitLocal(AsyncTransactionalStore* const store) { + try { + m_store = store; + asyncLocalCommit(); + } catch (std::exception& e) { + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); + } + return false; +} + +void +SimpleTxnBuffer::asyncLocalCommit() { + switch(m_state) { + case NONE: + m_state = PREPARE; + if (m_store) { + m_txnHandle = m_store->createTxnHandle(this); + } + prepare(/*shared_from_this()*/); + if (m_store) { + break; + } + case PREPARE: + m_state = COMMIT; + if (m_store) { + boost::shared_ptr tac(new TxnAsyncContext(this, + &handleAsyncCommitResult, + &m_resultQueue)); + m_store->testOp(); + m_store->submitCommit(m_txnHandle, tac); + break; + } + case COMMIT: + commit(); + m_state = COMPLETE; + delete this; + break; + case COMPLETE: + default: ; + } +} + +//static +void +SimpleTxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() + << " (" << arh->getErrMsg() << ")"); + tac->getTxnBuffer()->asyncLocalAbort(); + } else { + tac->getTxnBuffer()->asyncLocalCommit(); + } + } +} + +void +SimpleTxnBuffer::asyncLocalAbort() { + assert(m_store != 0); + switch (m_state) { + case NONE: + case PREPARE: + case COMMIT: + m_state = ROLLBACK; + { + boost::shared_ptr tac(new TxnAsyncContext(this, + &handleAsyncAbortResult, + &m_resultQueue)); + m_store->submitCommit(m_txnHandle, tac); + } + break; + case ROLLBACK: + rollback(); + m_state = COMPLETE; + delete this; + default: ; + } +} + +//static +void +SimpleTxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { + if (arh) { + boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); + if (arh->getErrNo()) { + QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() + << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")"); + } + tac->getTxnBuffer()->asyncLocalAbort(); + } +} + +// private +uint32_t +SimpleTxnBuffer::getNumOps() const { + qpid::sys::ScopedLock l(m_opsMutex); + return m_ops.size(); +} + +// private +void +SimpleTxnBuffer::createLocalXid() +{ + uuid_t uuid; + { + qpid::sys::ScopedLock l(s_uuidMutex); + ::uuid_generate_random(uuid); // Not thread-safe + } + char uuidStr[37]; // 36-char uuid + trailing '\0' + ::uuid_unparse(uuid, uuidStr); + m_xid.assign(uuidStr); + QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnBuffer.h b/cpp/src/qpid/broker/SimpleTxnBuffer.h new file mode 100644 index 0000000000..b2164cfeed --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnBuffer.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnBuffer.h + */ + +#ifndef qpid_broker_SimpleTxnBuffer_h_ +#define qpid_broker_SimpleTxnBuffer_h_ + +#include "TxnHandle.h" + +#include "qpid/sys/Mutex.h" + +#include +#include + +namespace qpid { +namespace broker { + +class AsyncResultHandle; +class AsyncResultQueue; +class AsyncTransactionalStore; +class SimpleTxnOp; + +class SimpleTxnBuffer { +public: + SimpleTxnBuffer(AsyncResultQueue& arq); + SimpleTxnBuffer(AsyncResultQueue& arq, std::string& xid); + virtual ~SimpleTxnBuffer(); + TxnHandle& getTxnHandle(); + const std::string& getXid() const; + bool is2pc() const; + void incrOpCnt(); + void decrOpCnt(); + + void enlist(boost::shared_ptr op); + bool prepare(); + void commit(); + void rollback(); + bool commitLocal(AsyncTransactionalStore* const store); + + // --- Async operations --- + void asyncLocalCommit(); + static void handleAsyncCommitResult(const AsyncResultHandle* const arh); + void asyncLocalAbort(); + static void handleAsyncAbortResult(const AsyncResultHandle* const arh); + +private: + mutable qpid::sys::Mutex m_opsMutex; + mutable qpid::sys::Mutex m_submitOpCntMutex; + mutable qpid::sys::Mutex m_completeOpCntMutex; + static qpid::sys::Mutex s_uuidMutex; + + std::vector > m_ops; + TxnHandle m_txnHandle; + AsyncTransactionalStore* m_store; + AsyncResultQueue& m_resultQueue; + std::string m_xid; + bool m_tpcFlag; + uint32_t m_submitOpCnt; + uint32_t m_completeOpCnt; + + typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; + e_txnState m_state; + + uint32_t getNumOps() const; + void createLocalXid(); +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnBuffer_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnOp.h b/cpp/src/qpid/broker/SimpleTxnOp.h new file mode 100644 index 0000000000..2cec2da8f0 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnOp.h @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnOp.h + */ + +#ifndef qpid_broker_SimpleTxnOp_h_ +#define qpid_broker_SimpleTxnOp_h_ + +#include + +namespace qpid { +namespace broker { + +class SimpleTxnBuffer; + +class SimpleTxnOp{ +public: + virtual ~SimpleTxnOp() {} + virtual bool prepare(SimpleTxnBuffer*) throw() = 0; + virtual void commit() throw() = 0; + virtual void rollback() throw() = 0; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnOp_h_ diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.cpp b/cpp/src/qpid/broker/SimpleTxnPublish.cpp new file mode 100644 index 0000000000..6ad6a108ea --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnPublish.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnPublish.cpp + */ + +#include "SimpleTxnPublish.h" + +#include "SimpleMessage.h" +#include "SimpleQueue.h" +#include "SimpleQueuedMessage.h" + +#include "qpid/log/Statement.h" +#include + +namespace qpid { +namespace broker { + +SimpleTxnPublish::SimpleTxnPublish(boost::intrusive_ptr msg) : + m_msg(msg) +{} + +SimpleTxnPublish::~SimpleTxnPublish() {} + +bool +SimpleTxnPublish::prepare(SimpleTxnBuffer* tb) throw() { + try { + while (!m_queues.empty()) { + m_queues.front()->prepareEnqueue(tb); + m_prepared.push_back(m_queues.front()); + m_queues.pop_front(); + } + return true; + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to prepare transaction: (unknown error)"); + } + return false; +} + +void +SimpleTxnPublish::commit() throw() { + try { + for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->commitEnqueue(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to commit transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to commit transaction: (unknown error)"); + } +} + +void +SimpleTxnPublish::rollback() throw() { + try { + for (std::list >::iterator i = m_prepared.begin(); i != m_prepared.end(); ++i) { + (*i)->abortEnqueue(); + } + } catch (const std::exception& e) { + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: " << e.what()); + } catch (...) { + QPID_LOG(error, "TxnPublish: Failed to rollback transaction: (unknown error)"); + } +} + +uint64_t +SimpleTxnPublish::contentSize() { + return m_msg->contentSize(); +} + +void +SimpleTxnPublish::deliverTo(const boost::shared_ptr& queue) { + m_queues.push_back(boost::shared_ptr(new SimpleQueuedMessage(queue.get(), m_msg))); + m_delivered = true; +} + +SimpleMessage& +SimpleTxnPublish::getMessage() { + return *m_msg; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SimpleTxnPublish.h b/cpp/src/qpid/broker/SimpleTxnPublish.h new file mode 100644 index 0000000000..0aaf8e4ba0 --- /dev/null +++ b/cpp/src/qpid/broker/SimpleTxnPublish.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \file SimpleTxnPublish.h + */ + +#ifndef qpid_broker_SimpleTxnPublish_h_ +#define qpid_broker_SimpleTxnPublish_h_ + +#include "SimpleDeliverable.h" +#include "SimpleTxnOp.h" + +#include +#include +#include + + +namespace qpid { +namespace broker { + +class SimpleQueuedMessage; +class SimpleMessage; +class SimpleQueue; + +class SimpleTxnPublish : public SimpleTxnOp, + public SimpleDeliverable +{ +public: + SimpleTxnPublish(boost::intrusive_ptr msg); + virtual ~SimpleTxnPublish(); + + // --- Interface TxOp --- + bool prepare(SimpleTxnBuffer* tb) throw(); + void commit() throw(); + void rollback() throw(); + + // --- Interface Deliverable --- + uint64_t contentSize(); + void deliverTo(const boost::shared_ptr& queue); + SimpleMessage& getMessage(); + +private: + boost::intrusive_ptr m_msg; + std::list > m_queues; + std::list > m_prepared; +}; + +}} // namespace qpid::broker + +#endif // qpid_broker_SimpleTxnPublish_h_ diff --git a/cpp/src/qpid/broker/TxnAsyncContext.cpp b/cpp/src/qpid/broker/TxnAsyncContext.cpp index 63e2de2b41..527cb4741f 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.cpp +++ b/cpp/src/qpid/broker/TxnAsyncContext.cpp @@ -26,7 +26,7 @@ namespace qpid { namespace broker { -TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, +TxnAsyncContext::TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq): m_tb(tb), @@ -37,7 +37,7 @@ TxnAsyncContext::TxnAsyncContext(TxnBuffer* const tb, TxnAsyncContext::~TxnAsyncContext() {} -TxnBuffer* +SimpleTxnBuffer* TxnAsyncContext::getTxnBuffer() const { return m_tb; diff --git a/cpp/src/qpid/broker/TxnAsyncContext.h b/cpp/src/qpid/broker/TxnAsyncContext.h index 9c617238e8..04f6ef76f5 100644 --- a/cpp/src/qpid/broker/TxnAsyncContext.h +++ b/cpp/src/qpid/broker/TxnAsyncContext.h @@ -29,38 +29,34 @@ #include "qpid/asyncStore/AsyncOperation.h" namespace qpid { -//namespace asyncStore { -//class AsyncOperation; -//} namespace broker { class AsyncResultHandle; class AsyncResultQueue; -//class TxnHandle; typedef void (*AsyncResultCallback)(const AsyncResultHandle* const); class TxnAsyncContext: public BrokerAsyncContext { public: - TxnAsyncContext(TxnBuffer* const tb, + TxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq); virtual ~TxnAsyncContext(); - TxnBuffer* getTxnBuffer() const; + SimpleTxnBuffer* getTxnBuffer() const; // --- Interface BrokerAsyncContext --- AsyncResultQueue* getAsyncResultQueue() const; void invokeCallback(const AsyncResultHandle* const) const; private: - TxnBuffer* const m_tb; + SimpleTxnBuffer* const m_tb; AsyncResultCallback m_rcb; AsyncResultQueue* const m_arq; }; class TpcTxnAsyncContext : public TxnAsyncContext { public: - TpcTxnAsyncContext(TxnBuffer* const tb, + TpcTxnAsyncContext(SimpleTxnBuffer* const tb, AsyncResultCallback rcb, AsyncResultQueue* const arq) : TxnAsyncContext(tb, rcb, arq) diff --git a/cpp/src/qpid/broker/TxnBuffer.cpp b/cpp/src/qpid/broker/TxnBuffer.cpp deleted file mode 100644 index 4d6e7b7918..0000000000 --- a/cpp/src/qpid/broker/TxnBuffer.cpp +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnBuffer.cpp - */ - -#include "TxnBuffer.h" - -#include "AsyncResultHandle.h" -#include "TxnAsyncContext.h" -#include "TxnOp.h" - -#include "qpid/log/Statement.h" - -#include - -namespace qpid { -namespace broker { - -qpid::sys::Mutex TxnBuffer::s_uuidMutex; - -TxnBuffer::TxnBuffer(AsyncResultQueue& arq) : - m_store(0), - m_resultQueue(arq), - m_tpcFlag(false), - m_submitOpCnt(0), - m_completeOpCnt(0), - m_state(NONE) -{ - createLocalXid(); -} - -TxnBuffer::TxnBuffer(AsyncResultQueue& arq, std::string& xid) : - m_store(0), - m_resultQueue(arq), - m_xid(xid), - m_tpcFlag(!xid.empty()), - m_submitOpCnt(0), - m_completeOpCnt(0), - m_state(NONE) -{ - if (m_xid.empty()) { - createLocalXid(); - } -} - -TxnBuffer::~TxnBuffer() {} - -TxnHandle& -TxnBuffer::getTxnHandle() { - return m_txnHandle; -} - -const std::string& -TxnBuffer::getXid() const { - return m_xid; -} - -bool -TxnBuffer::is2pc() const { - return m_tpcFlag; -} - -void -TxnBuffer::incrOpCnt() { - qpid::sys::ScopedLock l(m_submitOpCntMutex); - ++m_submitOpCnt; -} - -void -TxnBuffer::decrOpCnt() { - const uint32_t numOps = getNumOps(); - qpid::sys::ScopedLock l2(m_completeOpCntMutex); - qpid::sys::ScopedLock l3(m_submitOpCntMutex); - if (m_completeOpCnt == m_submitOpCnt) { - throw qpid::Exception("Transaction async operation count underflow"); - } - ++m_completeOpCnt; - if (numOps == m_submitOpCnt && numOps == m_completeOpCnt) { - asyncLocalCommit(); - } -} - -void -TxnBuffer::enlist(boost::shared_ptr op) { - qpid::sys::ScopedLock l(m_opsMutex); - m_ops.push_back(op); -} - -bool -TxnBuffer::prepare() { - qpid::sys::ScopedLock l(m_opsMutex); - for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - if (!(*i)->prepare(this)) { - return false; - } - } - return true; -} - -void -TxnBuffer::commit() { - qpid::sys::ScopedLock l(m_opsMutex); - for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->commit(); - } - m_ops.clear(); -} - -void -TxnBuffer::rollback() { - qpid::sys::ScopedLock l(m_opsMutex); - for(std::vector >::iterator i = m_ops.begin(); i != m_ops.end(); ++i) { - (*i)->rollback(); - } - m_ops.clear(); -} - -bool -TxnBuffer::commitLocal(AsyncTransactionalStore* const store) { - try { - m_store = store; - asyncLocalCommit(); - } catch (std::exception& e) { - QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed: " << e.what()); - } catch (...) { - QPID_LOG(error, "TxnBuffer::commitLocal: Commit failed (unknown exception)"); - } - return false; -} - -void -TxnBuffer::asyncLocalCommit() { - switch(m_state) { - case NONE: - m_state = PREPARE; - if (m_store) { - m_txnHandle = m_store->createTxnHandle(this); - } - prepare(/*shared_from_this()*/); - if (m_store) { - break; - } - case PREPARE: - m_state = COMMIT; - if (m_store) { - boost::shared_ptr tac(new TxnAsyncContext(this, - &handleAsyncCommitResult, - &m_resultQueue)); - m_store->testOp(); - m_store->submitCommit(m_txnHandle, tac); - break; - } - case COMMIT: - commit(); - m_state = COMPLETE; - delete this; - break; - case COMPLETE: - default: ; - } -} - -//static -void -TxnBuffer::handleAsyncCommitResult(const AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncCommitResult: Transactional operation " << tac->getOpStr() << " failed: err=" << arh->getErrNo() - << " (" << arh->getErrMsg() << ")"); - tac->getTxnBuffer()->asyncLocalAbort(); - } else { - tac->getTxnBuffer()->asyncLocalCommit(); - } - } -} - -void -TxnBuffer::asyncLocalAbort() { - assert(m_store != 0); - switch (m_state) { - case NONE: - case PREPARE: - case COMMIT: - m_state = ROLLBACK; - { - boost::shared_ptr tac(new TxnAsyncContext(this, - &handleAsyncAbortResult, - &m_resultQueue)); - m_store->submitCommit(m_txnHandle, tac); - } - break; - case ROLLBACK: - rollback(); - m_state = COMPLETE; - delete this; - default: ; - } -} - -//static -void -TxnBuffer::handleAsyncAbortResult(const AsyncResultHandle* const arh) { - if (arh) { - boost::shared_ptr tac = boost::dynamic_pointer_cast(arh->getBrokerAsyncContext()); - if (arh->getErrNo()) { - QPID_LOG(error, "TxnBuffer::handleAsyncAbortResult: Transactional operation " << tac->getOpStr() - << " failed: err=" << arh->getErrNo() << " (" << arh->getErrMsg() << ")"); - } - tac->getTxnBuffer()->asyncLocalAbort(); - } -} - -// private -uint32_t -TxnBuffer::getNumOps() const { - qpid::sys::ScopedLock l(m_opsMutex); - return m_ops.size(); -} - -// private -void -TxnBuffer::createLocalXid() -{ - uuid_t uuid; - { - qpid::sys::ScopedLock l(s_uuidMutex); - ::uuid_generate_random(uuid); // Not thread-safe - } - char uuidStr[37]; // 36-char uuid + trailing '\0' - ::uuid_unparse(uuid, uuidStr); - m_xid.assign(uuidStr); - QPID_LOG(debug, "Local XID created: \"" << m_xid << "\""); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/TxnBuffer.h b/cpp/src/qpid/broker/TxnBuffer.h deleted file mode 100644 index 02569f6545..0000000000 --- a/cpp/src/qpid/broker/TxnBuffer.h +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnBuffer.h - */ - -#ifndef qpid_broker_TxnBuffer_h_ -#define qpid_broker_TxnBuffer_h_ - -#include "TxnHandle.h" - -#include "qpid/sys/Mutex.h" - -#include -#include - -namespace qpid { -namespace broker { - -class AsyncResultHandle; -class AsyncResultQueue; -class AsyncTransactionalStore; -class TxnOp; - -class TxnBuffer { -public: - TxnBuffer(AsyncResultQueue& arq); - TxnBuffer(AsyncResultQueue& arq, std::string& xid); - virtual ~TxnBuffer(); - TxnHandle& getTxnHandle(); - const std::string& getXid() const; - bool is2pc() const; - void incrOpCnt(); - void decrOpCnt(); - - void enlist(boost::shared_ptr op); - bool prepare(); - void commit(); - void rollback(); - bool commitLocal(AsyncTransactionalStore* const store); - - // --- Async operations --- - void asyncLocalCommit(); - static void handleAsyncCommitResult(const AsyncResultHandle* const arh); - void asyncLocalAbort(); - static void handleAsyncAbortResult(const AsyncResultHandle* const arh); - -private: - mutable qpid::sys::Mutex m_opsMutex; - mutable qpid::sys::Mutex m_submitOpCntMutex; - mutable qpid::sys::Mutex m_completeOpCntMutex; - static qpid::sys::Mutex s_uuidMutex; - - std::vector > m_ops; - TxnHandle m_txnHandle; - AsyncTransactionalStore* m_store; - AsyncResultQueue& m_resultQueue; - std::string m_xid; - bool m_tpcFlag; - uint32_t m_submitOpCnt; - uint32_t m_completeOpCnt; - - typedef enum {NONE = 0, PREPARE, COMMIT, ROLLBACK, COMPLETE} e_txnState; - e_txnState m_state; - - uint32_t getNumOps() const; - void createLocalXid(); -}; - -}} // namespace qpid::broker - -#endif // qpid_broker_TxnBuffer_h_ diff --git a/cpp/src/qpid/broker/TxnOp.h b/cpp/src/qpid/broker/TxnOp.h deleted file mode 100644 index bcff87551c..0000000000 --- a/cpp/src/qpid/broker/TxnOp.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * \file TxnOp.h - */ - -#ifndef qpid_broker_TxnOp_h_ -#define qpid_broker_TxnOp_h_ - -#include - -namespace qpid { -namespace broker { - -class TxnBuffer; - -class TxnOp{ -public: - virtual ~TxnOp() {} - virtual bool prepare(qpid::broker::TxnBuffer*) throw() = 0; - virtual void commit() throw() = 0; - virtual void rollback() throw() = 0; -}; - -}} // namespace qpid::broker - -#endif // qpid_broker_TxnOp_h_ -- cgit v1.2.1