summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-14 15:02:10 +0000
committerAlan Conway <aconway@apache.org>2007-02-14 15:02:10 +0000
commit8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec (patch)
treef43ed248e99d67639f4330e604a0ed718f736d22 /cpp/lib
parent20a5f81e8bbf8d4429d55fffb47278e7ade81c17 (diff)
downloadqpid-python-8189a1f1f3d27d9ad7e0de50ed9e924e63d74aec.tar.gz
* cpp/lib/common/sys/ProducerConsumer.h:
General-purpose producer-consumer synchronization. Anywhere we have producer/consumer threads in qpid we should re-use this sync object rather than re-inventing the synchronization each time. * cpp/lib/common/sys/AtomicCount.h: Separated ScopedIncrement/ScopedDecrement into ScopedIncrement.h * cpp/tests/InProcessBroker.h: Added class InProcessBrokerClient, a self contained in-process client + broker convenience for tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp4
-rw-r--r--cpp/lib/broker/BrokerChannel.h1
-rw-r--r--cpp/lib/client/ClientChannel.cpp6
-rw-r--r--cpp/lib/client/ClientChannel.h8
-rw-r--r--cpp/lib/common/Makefile.am6
-rw-r--r--cpp/lib/common/sys/AtomicCount.h24
-rw-r--r--cpp/lib/common/sys/ProducerConsumer.cpp141
-rw-r--r--cpp/lib/common/sys/ProducerConsumer.h165
-rw-r--r--cpp/lib/common/sys/ScopedIncrement.h59
9 files changed, 380 insertions, 34 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 674d0e9505..fc82e3111d 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -327,9 +327,9 @@ void Channel::handleMethodInContext(
)
{
try{
- if(id != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
std::stringstream out;
- out << "Attempt to use unopened channel: " << id;
+ out << "Attempt to use unopened channel: " << getId();
throw ConnectionException(504, out.str());
} else {
method->invoke(*adapter, context);
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 1a1c4dabba..18e833b85c 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -78,7 +78,6 @@ class Channel : public framing::ChannelAdapter,
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
Connection& connection;
- u_int16_t id;
u_int64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
bool transactional;
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index 52910f5161..613469c4ba 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -54,11 +54,6 @@ AMQP_ServerProxy& Channel::brokerProxy() {
return *proxy;
}
-AMQMethodBody::shared_ptr Channel::brokerResponse() {
- // FIXME aconway 2007-02-08: implement responses.
- return AMQMethodBody::shared_ptr();
-}
-
void Channel::open(ChannelId id, Connection& con)
{
if (isOpen())
@@ -482,7 +477,6 @@ void Channel::close(
u_int16_t code, const std::string& text,
ClassId classId, MethodId methodId)
{
- // FIXME aconway 2007-01-26: Locking?
if (getId() != 0 && isOpen()) {
try {
sendAndReceive<ChannelCloseOkBody>(
diff --git a/cpp/lib/client/ClientChannel.h b/cpp/lib/client/ClientChannel.h
index 1c082f3b59..d6ec1d9772 100644
--- a/cpp/lib/client/ClientChannel.h
+++ b/cpp/lib/client/ClientChannel.h
@@ -44,6 +44,7 @@ namespace qpid {
namespace framing {
class ChannelCloseBody;
class AMQP_ServerProxy;
+class AMQMethodBody;
}
namespace client {
@@ -89,10 +90,13 @@ class Channel : public framing::ChannelAdapter,
u_int64_t lastDeliveryTag;
};
typedef std::map<std::string, Consumer> ConsumerMap;
+ typedef std::queue<boost::shared_ptr<framing::AMQMethodBody> > IncomingMethods;
+
static const std::string OK;
Connection* connection;
sys::Thread dispatcher;
+ IncomingMethods incomingMethods;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
@@ -367,12 +371,12 @@ class Channel : public framing::ChannelAdapter,
* Returns a proxy for the "raw" AMQP broker protocol. Only for use by
* protocol experts.
*/
-
framing::AMQP_ServerProxy& brokerProxy();
+
/**
* Wait for the next method from the broker.
*/
- framing::AMQMethodBody::shared_ptr brokerResponse();
+ framing::AMQMethodBody::shared_ptr receive();
};
}}
diff --git a/cpp/lib/common/Makefile.am b/cpp/lib/common/Makefile.am
index c44480bddf..971571089f 100644
--- a/cpp/lib/common/Makefile.am
+++ b/cpp/lib/common/Makefile.am
@@ -85,7 +85,8 @@ libqpidcommon_la_SOURCES = \
ExceptionHolder.cpp \
QpidError.cpp \
sys/Runnable.cpp \
- sys/Time.cpp
+ sys/Time.cpp \
+ sys/ProducerConsumer.cpp
nobase_pkginclude_HEADERS = \
$(gen)/AMQP_HighestVersion.h \
@@ -132,7 +133,8 @@ nobase_pkginclude_HEADERS = \
sys/Socket.h \
sys/Thread.h \
sys/Time.h \
- sys/TimeoutHandler.h
+ sys/TimeoutHandler.h \
+ sys/ProducerConsumer.h
# Force build during dist phase so help2man will work.
diff --git a/cpp/lib/common/sys/AtomicCount.h b/cpp/lib/common/sys/AtomicCount.h
index b625b2c9b0..63670cbf00 100644
--- a/cpp/lib/common/sys/AtomicCount.h
+++ b/cpp/lib/common/sys/AtomicCount.h
@@ -20,7 +20,7 @@
*/
#include <boost/detail/atomic_count.hpp>
-#include <boost/noncopyable.hpp>
+#include "ScopedIncrement.h"
namespace qpid {
namespace sys {
@@ -30,26 +30,8 @@ namespace sys {
*/
class AtomicCount : boost::noncopyable {
public:
- class ScopedDecrement : boost::noncopyable {
- public:
- /** Decrement counter in constructor and increment in destructor. */
- ScopedDecrement(AtomicCount& c) : count(c) { value = --count; }
- ~ScopedDecrement() { ++count; }
- /** Return the value returned by the decrement. */
- operator long() { return value; }
- private:
- AtomicCount& count;
- long value;
- };
-
- class ScopedIncrement : boost::noncopyable {
- public:
- /** Increment counter in constructor and increment in destructor. */
- ScopedIncrement(AtomicCount& c) : count(c) { ++count; }
- ~ScopedIncrement() { --count; }
- private:
- AtomicCount& count;
- };
+ typedef ScopedDecrement<AtomicCount> ScopedDecrement;
+ typedef ScopedIncrement<AtomicCount> ScopedIncrement;
AtomicCount(long value = 0) : count(value) {}
diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp
new file mode 100644
index 0000000000..3f6156f230
--- /dev/null
+++ b/cpp/lib/common/sys/ProducerConsumer.cpp
@@ -0,0 +1,141 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "QpidError.h"
+#include "ScopedIncrement.h"
+#include "ProducerConsumer.h"
+
+namespace qpid {
+namespace sys {
+
+// // ================ ProducerConsumer
+
+ProducerConsumer::ProducerConsumer(size_t init_items)
+ : items(init_items), waiters(0), stopped(false)
+{}
+
+void ProducerConsumer::stop() {
+ Mutex::ScopedLock l(monitor);
+ stopped = true;
+ monitor.notifyAll();
+ // Wait for waiting consumers to wake up.
+ while (waiters > 0)
+ monitor.wait();
+}
+
+size_t ProducerConsumer::available() const {
+ Mutex::ScopedLock l(monitor);
+ return items;
+}
+
+size_t ProducerConsumer::consumers() const {
+ Mutex::ScopedLock l(monitor);
+ return waiters;
+}
+
+// ================ Lock
+
+ProducerConsumer::Lock::Lock(ProducerConsumer& p)
+ : pc(p), lock(p.monitor), status(INCOMPLETE) {}
+
+bool ProducerConsumer::Lock::isOk() const {
+ return !pc.isStopped() && status==INCOMPLETE;
+}
+
+void ProducerConsumer::Lock::checkOk() const {
+ assert(!pc.isStopped());
+ assert(status == INCOMPLETE);
+}
+
+ProducerConsumer::Lock::~Lock() {
+ assert(status != INCOMPLETE || pc.isStopped());
+}
+
+void ProducerConsumer::Lock::confirm() {
+ checkOk();
+ status = CONFIRMED;
+}
+
+void ProducerConsumer::Lock::cancel() {
+ checkOk();
+ status = CANCELLED;
+}
+
+// ================ ProducerLock
+
+ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p)
+{}
+
+
+ProducerConsumer::ProducerLock::~ProducerLock() {
+ if (status == CONFIRMED) {
+ pc.items++;
+ pc.monitor.notify(); // Notify a consumer.
+ }
+}
+
+// ================ ConsumerLock
+
+ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p)
+{
+ if (isOk()) {
+ ScopedIncrement<size_t> inc(pc.waiters);
+ while (pc.items == 0 && !pc.stopped) {
+ pc.monitor.wait();
+ }
+ }
+}
+
+ProducerConsumer::ConsumerLock::ConsumerLock(
+ ProducerConsumer& p, const Time& timeout) : Lock(p)
+{
+ if (isOk()) {
+ // Don't wait if timeout==0
+ if (timeout == 0) {
+ if (pc.items == 0)
+ status = TIMEOUT;
+ return;
+ }
+ else {
+ Time deadline = now() + timeout;
+ ScopedIncrement<size_t> inc(pc.waiters);
+ while (pc.items == 0 && !pc.stopped) {
+ if (!pc.monitor.wait(deadline)) {
+ status = TIMEOUT;
+ return;
+ }
+ }
+ }
+ }
+}
+
+ProducerConsumer::ConsumerLock::~ConsumerLock() {
+ if (pc.isStopped()) {
+ if (pc.waiters == 0)
+ pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s)
+ }
+ else if (status==CONFIRMED) {
+ pc.items--;
+ if (pc.items > 0)
+ pc.monitor.notify(); // Notify another consumer.
+ }
+}
+
+
+}} // namespace qpid::sys
diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h
new file mode 100644
index 0000000000..742639323b
--- /dev/null
+++ b/cpp/lib/common/sys/ProducerConsumer.h
@@ -0,0 +1,165 @@
+#ifndef _sys_ProducerConsumer_h
+#define _sys_ProducerConsumer_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include "Exception.h"
+#include "sys/Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Producer-consumer synchronisation.
+ *
+ * Producers increase the number of available items, consumers reduce it.
+ * Consumers wait till an item is available. Waiting threads can be
+ * woken for shutdown using stop().
+ *
+ * Note: Currently implements unbounded producer-consumer, i.e. no limit
+ * to available items, producers never block. Can be extended to support
+ * bounded PC if required.
+ *
+ // TODO aconway 2007-02-13: example, from tests.
+*/
+class ProducerConsumer
+{
+ public:
+ ProducerConsumer(size_t init_items=0);
+
+ ~ProducerConsumer() { stop(); }
+
+ /**
+ * Wake any threads waiting for ProducerLock or ConsumerLock.
+ *@post No threads are waiting in Producer or Consumer locks.
+ */
+ void stop();
+
+ /** True if queue is stopped */
+ bool isStopped() { return stopped; }
+
+ /** Number of items available for consumers */
+ size_t available() const;
+
+ /** Number of consumers waiting for items */
+ size_t consumers() const;
+
+ /** True if available == 0 */
+ bool empty() const { return available() == 0; }
+
+ /**
+ * Base class for producer and consumer locks.
+ */
+ class Lock : private boost::noncopyable {
+ public:
+
+ /**
+ * You must call isOk() after creating a lock to verify its state.
+ *
+ *@return true means the lock succeeded. You MUST call either
+ *confirm() or cancel() before the lock goes out of scope.
+ *
+ * false means the lock failed - timed out or the
+ * ProducerConsumer is stopped. You should not do anything in
+ * the scope of the lock.
+ */
+ bool isOk() const;
+
+ /**
+ * Confirm that an item was produced/consumed.
+ *@pre isOk()
+ */
+ void confirm();
+
+ /**
+ * Cancel the lock to indicate nothing was produced/consumed.
+ * Note that locks are not actually released until destroyed.
+ *
+ *@pre isOk()
+ */
+ void cancel();
+
+ /** True if this lock experienced a timeout */
+ bool isTimedOut() const { return status == TIMEOUT; }
+
+ /** True if we have been stopped */
+ bool isStopped() const { return pc.isStopped(); }
+
+ ProducerConsumer& pc;
+
+ protected:
+ /** Lock status */
+ enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT };
+
+ Lock(ProducerConsumer& p);
+ ~Lock();
+ void checkOk() const;
+ Mutex::ScopedLock lock;
+ Status status;
+ };
+
+ /** Lock for code that produces items. */
+ struct ProducerLock : public Lock {
+ /**
+ * Acquire locks to produce an item.
+ *@post If isOk() the calling thread has exclusive access
+ * to produce an item.
+ */
+ ProducerLock(ProducerConsumer& p);
+
+ /** Release locks, signal waiting consumers if confirm() was called. */
+ ~ProducerLock();
+ };
+
+ /** Lock for code that consumes items */
+ struct ConsumerLock : public Lock {
+ /**
+ * Wait for an item to consume and acquire locks.
+ *
+ *@post If isOk() there is at least one item available and the
+ *calling thread has exclusive access to consume it.
+ */
+ ConsumerLock(ProducerConsumer& p);
+
+ /**
+ * Wait up to timeout to acquire lock.
+ *@post If isOk() caller has a producer lock.
+ * If isTimedOut() there was a timeout.
+ * If neither then we were stopped.
+ */
+ ConsumerLock(ProducerConsumer& p, const Time& timeout);
+
+ /** Release locks */
+ ~ConsumerLock();
+ };
+
+ private:
+ mutable Monitor monitor;
+ size_t items;
+ size_t waiters;
+ bool stopped;
+
+ friend class Lock;
+ friend class ProducerLock;
+ friend class ConsumerLock;
+};
+
+}} // namespace qpid::sys
+
+#endif /*!_sys_ProducerConsumer_h*/
diff --git a/cpp/lib/common/sys/ScopedIncrement.h b/cpp/lib/common/sys/ScopedIncrement.h
new file mode 100644
index 0000000000..f14461ddaf
--- /dev/null
+++ b/cpp/lib/common/sys/ScopedIncrement.h
@@ -0,0 +1,59 @@
+#ifndef _posix_ScopedIncrement_h
+#define _posix_ScopedIncrement_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+namespace sys {
+
+/** Increment counter in constructor and decrement in destructor. */
+template <class T>
+class ScopedIncrement : boost::noncopyable
+{
+ public:
+ ScopedIncrement(T& c) : count(c) { ++count; }
+ ~ScopedIncrement() { --count; }
+ private:
+ T& count;
+};
+
+
+/** Decrement counter in constructor and increment in destructor. */
+template <class T>
+class ScopedDecrement : boost::noncopyable
+{
+ public:
+ ScopedDecrement(T& c) : count(c) { value = --count; }
+ ~ScopedDecrement() { ++count; }
+
+ /** Return the value after the decrement. */
+ operator long() { return value; }
+
+ private:
+ T& count;
+ long value;
+};
+
+
+}}
+
+
+#endif // _posix_ScopedIncrement_h