diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-14 15:02:10 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-14 15:02:10 +0000 |
| commit | 8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec (patch) | |
| tree | f43ed248e99d67639f4330e604a0ed718f736d22 /cpp/lib | |
| parent | 20a5f81e8bbf8d4429d55fffb47278e7ade81c17 (diff) | |
| download | qpid-python-8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec.tar.gz | |
* cpp/lib/common/sys/ProducerConsumer.h:
General-purpose producer-consumer synchronization. Anywhere we have
producer/consumer threads in qpid we should re-use this sync object
rather than re-inventing the synchronization each time.
* cpp/lib/common/sys/AtomicCount.h: Separated ScopedIncrement/ScopedDecrement
into ScopedIncrement.h
* cpp/tests/InProcessBroker.h: Added class InProcessBrokerClient, a
self contained in-process client + broker convenience for tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 4 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 1 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 6 | ||||
| -rw-r--r-- | cpp/lib/client/ClientChannel.h | 8 | ||||
| -rw-r--r-- | cpp/lib/common/Makefile.am | 6 | ||||
| -rw-r--r-- | cpp/lib/common/sys/AtomicCount.h | 24 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ProducerConsumer.cpp | 141 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ProducerConsumer.h | 165 | ||||
| -rw-r--r-- | cpp/lib/common/sys/ScopedIncrement.h | 59 |
9 files changed, 380 insertions, 34 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 674d0e9505..fc82e3111d 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -327,9 +327,9 @@ void Channel::handleMethodInContext( ) { try{ - if(id != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { + if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { std::stringstream out; - out << "Attempt to use unopened channel: " << id; + out << "Attempt to use unopened channel: " << getId(); throw ConnectionException(504, out.str()); } else { method->invoke(*adapter, context); diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 1a1c4dabba..18e833b85c 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -78,7 +78,6 @@ class Channel : public framing::ChannelAdapter, typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; Connection& connection; - u_int16_t id; u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index 52910f5161..613469c4ba 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -54,11 +54,6 @@ AMQP_ServerProxy& Channel::brokerProxy() { return *proxy; } -AMQMethodBody::shared_ptr Channel::brokerResponse() { - // FIXME aconway 2007-02-08: implement responses. - return AMQMethodBody::shared_ptr(); -} - void Channel::open(ChannelId id, Connection& con) { if (isOpen()) @@ -482,7 +477,6 @@ void Channel::close( u_int16_t code, const std::string& text, ClassId classId, MethodId methodId) { - // FIXME aconway 2007-01-26: Locking? if (getId() != 0 && isOpen()) { try { sendAndReceive<ChannelCloseOkBody>( diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h index 1c082f3b59..d6ec1d9772 100644 --- a/cpp/lib/client/ClientChannel.h +++ b/cpp/lib/client/ClientChannel.h @@ -44,6 +44,7 @@ namespace qpid { namespace framing { class ChannelCloseBody; class AMQP_ServerProxy; +class AMQMethodBody; } namespace client { @@ -89,10 +90,13 @@ class Channel : public framing::ChannelAdapter, u_int64_t lastDeliveryTag; }; typedef std::map<std::string, Consumer> ConsumerMap; + typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods; + static const std::string OK; Connection* connection; sys::Thread dispatcher; + IncomingMethods incomingMethods; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume @@ -367,12 +371,12 @@ class Channel : public framing::ChannelAdapter, * Returns a proxy for the "raw" AMQP broker protocol. Only for use by * protocol experts. */ - framing::AMQP_ServerProxy& brokerProxy(); + /** * Wait for the next method from the broker. */ - framing::AMQMethodBody::shared_ptr brokerResponse(); + framing::AMQMethodBody::shared_ptr receive(); }; }} diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am index c44480bddf..971571089f 100644 --- a/cpp/lib/common/Makefile.am +++ b/cpp/lib/common/Makefile.am @@ -85,7 +85,8 @@ libqpidcommon_la_SOURCES = \ ExceptionHolder.cpp \ QpidError.cpp \ sys/Runnable.cpp \ - sys/Time.cpp + sys/Time.cpp \ + sys/ProducerConsumer.cpp nobase_pkginclude_HEADERS = \ $(gen)/AMQP_HighestVersion.h \ @@ -132,7 +133,8 @@ nobase_pkginclude_HEADERS = \ sys/Socket.h \ sys/Thread.h \ sys/Time.h \ - sys/TimeoutHandler.h + sys/TimeoutHandler.h \ + sys/ProducerConsumer.h # Force build during dist phase so help2man will work. diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h index b625b2c9b0..63670cbf00 100644 --- a/cpp/lib/common/sys/AtomicCount.h +++ b/cpp/lib/common/sys/AtomicCount.h @@ -20,7 +20,7 @@ */ #include <boost/detail/atomic_count.hpp> -#include <boost/noncopyable.hpp> +#include "ScopedIncrement.h" namespace qpid { namespace sys { @@ -30,26 +30,8 @@ namespace sys { */ class AtomicCount : boost::noncopyable { public: - class ScopedDecrement : boost::noncopyable { - public: - /** Decrement counter in constructor and increment in destructor. */ - ScopedDecrement(AtomicCount& c) : count(c) { value = --count; } - ~ScopedDecrement() { ++count; } - /** Return the value returned by the decrement. */ - operator long() { return value; } - private: - AtomicCount& count; - long value; - }; - - class ScopedIncrement : boost::noncopyable { - public: - /** Increment counter in constructor and increment in destructor. */ - ScopedIncrement(AtomicCount& c) : count(c) { ++count; } - ~ScopedIncrement() { --count; } - private: - AtomicCount& count; - }; + typedef ScopedDecrement<AtomicCount> ScopedDecrement; + typedef ScopedIncrement<AtomicCount> ScopedIncrement; AtomicCount(long value = 0) : count(value) {} diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp new file mode 100644 index 0000000000..3f6156f230 --- /dev/null +++ b/cpp/lib/common/sys/ProducerConsumer.cpp @@ -0,0 +1,141 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "QpidError.h" +#include "ScopedIncrement.h" +#include "ProducerConsumer.h" + +namespace qpid { +namespace sys { + +// // ================ ProducerConsumer + +ProducerConsumer::ProducerConsumer(size_t init_items) + : items(init_items), waiters(0), stopped(false) +{} + +void ProducerConsumer::stop() { + Mutex::ScopedLock l(monitor); + stopped = true; + monitor.notifyAll(); + // Wait for waiting consumers to wake up. + while (waiters > 0) + monitor.wait(); +} + +size_t ProducerConsumer::available() const { + Mutex::ScopedLock l(monitor); + return items; +} + +size_t ProducerConsumer::consumers() const { + Mutex::ScopedLock l(monitor); + return waiters; +} + +// ================ Lock + +ProducerConsumer::Lock::Lock(ProducerConsumer& p) + : pc(p), lock(p.monitor), status(INCOMPLETE) {} + +bool ProducerConsumer::Lock::isOk() const { + return !pc.isStopped() && status==INCOMPLETE; +} + +void ProducerConsumer::Lock::checkOk() const { + assert(!pc.isStopped()); + assert(status == INCOMPLETE); +} + +ProducerConsumer::Lock::~Lock() { + assert(status != INCOMPLETE || pc.isStopped()); +} + +void ProducerConsumer::Lock::confirm() { + checkOk(); + status = CONFIRMED; +} + +void ProducerConsumer::Lock::cancel() { + checkOk(); + status = CANCELLED; +} + +// ================ ProducerLock + +ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p) +{} + + +ProducerConsumer::ProducerLock::~ProducerLock() { + if (status == CONFIRMED) { + pc.items++; + pc.monitor.notify(); // Notify a consumer. + } +} + +// ================ ConsumerLock + +ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) +{ + if (isOk()) { + ScopedIncrement<size_t> inc(pc.waiters); + while (pc.items == 0 && !pc.stopped) { + pc.monitor.wait(); + } + } +} + +ProducerConsumer::ConsumerLock::ConsumerLock( + ProducerConsumer& p, const Time& timeout) : Lock(p) +{ + if (isOk()) { + // Don't wait if timeout==0 + if (timeout == 0) { + if (pc.items == 0) + status = TIMEOUT; + return; + } + else { + Time deadline = now() + timeout; + ScopedIncrement<size_t> inc(pc.waiters); + while (pc.items == 0 && !pc.stopped) { + if (!pc.monitor.wait(deadline)) { + status = TIMEOUT; + return; + } + } + } + } +} + +ProducerConsumer::ConsumerLock::~ConsumerLock() { + if (pc.isStopped()) { + if (pc.waiters == 0) + pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s) + } + else if (status==CONFIRMED) { + pc.items--; + if (pc.items > 0) + pc.monitor.notify(); // Notify another consumer. + } +} + + +}} // namespace qpid::sys diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h new file mode 100644 index 0000000000..742639323b --- /dev/null +++ b/cpp/lib/common/sys/ProducerConsumer.h @@ -0,0 +1,165 @@ +#ifndef _sys_ProducerConsumer_h +#define _sys_ProducerConsumer_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <boost/noncopyable.hpp> +#include "Exception.h" +#include "sys/Monitor.h" + +namespace qpid { +namespace sys { + +/** + * Producer-consumer synchronisation. + * + * Producers increase the number of available items, consumers reduce it. + * Consumers wait till an item is available. Waiting threads can be + * woken for shutdown using stop(). + * + * Note: Currently implements unbounded producer-consumer, i.e. no limit + * to available items, producers never block. Can be extended to support + * bounded PC if required. + * + // TODO aconway 2007-02-13: example, from tests. +*/ +class ProducerConsumer +{ + public: + ProducerConsumer(size_t init_items=0); + + ~ProducerConsumer() { stop(); } + + /** + * Wake any threads waiting for ProducerLock or ConsumerLock. + *@post No threads are waiting in Producer or Consumer locks. + */ + void stop(); + + /** True if queue is stopped */ + bool isStopped() { return stopped; } + + /** Number of items available for consumers */ + size_t available() const; + + /** Number of consumers waiting for items */ + size_t consumers() const; + + /** True if available == 0 */ + bool empty() const { return available() == 0; } + + /** + * Base class for producer and consumer locks. + */ + class Lock : private boost::noncopyable { + public: + + /** + * You must call isOk() after creating a lock to verify its state. + * + *@return true means the lock succeeded. You MUST call either + *confirm() or cancel() before the lock goes out of scope. + * + * false means the lock failed - timed out or the + * ProducerConsumer is stopped. You should not do anything in + * the scope of the lock. + */ + bool isOk() const; + + /** + * Confirm that an item was produced/consumed. + *@pre isOk() + */ + void confirm(); + + /** + * Cancel the lock to indicate nothing was produced/consumed. + * Note that locks are not actually released until destroyed. + * + *@pre isOk() + */ + void cancel(); + + /** True if this lock experienced a timeout */ + bool isTimedOut() const { return status == TIMEOUT; } + + /** True if we have been stopped */ + bool isStopped() const { return pc.isStopped(); } + + ProducerConsumer& pc; + + protected: + /** Lock status */ + enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT }; + + Lock(ProducerConsumer& p); + ~Lock(); + void checkOk() const; + Mutex::ScopedLock lock; + Status status; + }; + + /** Lock for code that produces items. */ + struct ProducerLock : public Lock { + /** + * Acquire locks to produce an item. + *@post If isOk() the calling thread has exclusive access + * to produce an item. + */ + ProducerLock(ProducerConsumer& p); + + /** Release locks, signal waiting consumers if confirm() was called. */ + ~ProducerLock(); + }; + + /** Lock for code that consumes items */ + struct ConsumerLock : public Lock { + /** + * Wait for an item to consume and acquire locks. + * + *@post If isOk() there is at least one item available and the + *calling thread has exclusive access to consume it. + */ + ConsumerLock(ProducerConsumer& p); + + /** + * Wait up to timeout to acquire lock. + *@post If isOk() caller has a producer lock. + * If isTimedOut() there was a timeout. + * If neither then we were stopped. + */ + ConsumerLock(ProducerConsumer& p, const Time& timeout); + + /** Release locks */ + ~ConsumerLock(); + }; + + private: + mutable Monitor monitor; + size_t items; + size_t waiters; + bool stopped; + + friend class Lock; + friend class ProducerLock; + friend class ConsumerLock; +}; + +}} // namespace qpid::sys + +#endif /*!_sys_ProducerConsumer_h*/ diff --git a/cpp/lib/common/sys/ScopedIncrement.h b/cpp/lib/common/sys/ScopedIncrement.h new file mode 100644 index 0000000000..f14461ddaf --- /dev/null +++ b/cpp/lib/common/sys/ScopedIncrement.h @@ -0,0 +1,59 @@ +#ifndef _posix_ScopedIncrement_h +#define _posix_ScopedIncrement_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/noncopyable.hpp> + +namespace qpid { +namespace sys { + +/** Increment counter in constructor and decrement in destructor. */ +template <class T> +class ScopedIncrement : boost::noncopyable +{ + public: + ScopedIncrement(T& c) : count(c) { ++count; } + ~ScopedIncrement() { --count; } + private: + T& count; +}; + + +/** Decrement counter in constructor and increment in destructor. */ +template <class T> +class ScopedDecrement : boost::noncopyable +{ + public: + ScopedDecrement(T& c) : count(c) { value = --count; } + ~ScopedDecrement() { ++count; } + + /** Return the value after the decrement. */ + operator long() { return value; } + + private: + T& count; + long value; +}; + + +}} + + +#endif // _posix_ScopedIncrement_h |
