diff options
| author | Gordon Sim <gsim@apache.org> | 2007-09-18 14:45:33 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-09-18 14:45:33 +0000 |
| commit | 9f6c3db207f27e91cb7e76f82f8257a9c5007719 (patch) | |
| tree | eea611c55725503413a5427b1e4d6d762e8d369f /cpp/src/qpid | |
| parent | 174c235915e94fe9b27493f85b91b6ad6eab9271 (diff) | |
| download | qpid-python-9f6c3db207f27e91cb7e76f82f8257a9c5007719.tar.gz | |
Added Dispatcher class (plus test). This converts incoming MessageTransfer framesets to Messages and pumps them to registered listeners.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@576935 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/client/ClientMessage.h | 31 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 150 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Dispatcher.h | 87 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/TransferContent.h | 2 |
4 files changed, 261 insertions, 9 deletions
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index 5c4eb4e5aa..a573e17940 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -22,6 +22,8 @@ * */ #include <string> +#include "qpid/client/Session.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/TransferContent.h" namespace qpid { @@ -40,12 +42,7 @@ public: std::string getDestination() const { - return destination; - } - - void setDestination(const std::string& dest) - { - destination = dest; + return method.getDestination(); } bool isRedelivered() const @@ -53,7 +50,8 @@ public: return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); } - void setRedelivered(bool redelivered) { + void setRedelivered(bool redelivered) + { getDeliveryProperties().setRedelivered(redelivered); } @@ -62,8 +60,25 @@ public: return getMessageProperties().getApplicationHeaders(); } + void acknowledge(Session& session, bool cumulative = true, bool send = true) const + { + session.execution().completed(id, cumulative, send); + } + + Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()) + { + populate(frameset); + } + + const framing::MessageTransferBody& getMethod() const + { + return method; + } + private: - std::string destination; + //method and id are only set for received messages: + const framing::MessageTransferBody method; + const framing::SequenceNumber id; }; }} diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp new file mode 100644 index 0000000000..8f3ed8bcbe --- /dev/null +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Dispatcher.h" + +#include "qpid/client/Session.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/log/Statement.h" +#include "BlockingQueue.h" +#include "ClientMessage.h" + +using qpid::framing::FrameSet; +using qpid::framing::MessageTransferBody; +using qpid::sys::Mutex; +using qpid::sys::ScopedLock; +using qpid::sys::Thread; + +namespace qpid { +namespace client { + + Subscriber::Subscriber(Session& s, MessageListener* l, bool a, uint f) : session(s), listener(l), autoAck(a), ackFrequency(f), count(0) {} + +void Subscriber::received(Message& msg) +{ + if (listener) { + listener->received(msg); + if (autoAck) { + bool send = (++count >= ackFrequency); + msg.acknowledge(session, true, send); + if (send) count = 0; + } + } +} + + + Dispatcher::Dispatcher(Session& s, const std::string& q) : session(s), queue(q), running(false), stopped(false) +{ +} + +void Dispatcher::start() +{ + worker = Thread(this); +} + +void Dispatcher::run() +{ + BlockingQueue<FrameSet::shared_ptr>& q = queue.empty() ? + session.execution().getDemux().getDefault() : + session.execution().getDemux().get(queue); + + startRunning(); + stopped = false; + while (!isStopped()) { + FrameSet::shared_ptr content = q.pop(); + if (content->isA<MessageTransferBody>()) { + Message msg(*content); + Subscriber::shared_ptr listener = find(msg.getDestination()); + if (!listener) { + QPID_LOG(error, "No message listener set: " << content->getMethod()); + } else { + listener->received(msg); + } + } else { + if (handler.get()) { + handler->handle(*content); + } else { + QPID_LOG(error, "Unhandled method: " << content->getMethod()); + } + } + } + stopRunning(); +} + +void Dispatcher::stop() +{ + ScopedLock<Mutex> l(lock); + stopped = true; +} + +bool Dispatcher::isStopped() +{ + ScopedLock<Mutex> l(lock); + return stopped; +} + +/** + * Prevent concurrent threads invoking run. + */ +void Dispatcher::startRunning() +{ + ScopedLock<Mutex> l(lock); + if (running) { + throw Exception("Dispatcher is already running."); + } + running = true; +} + +void Dispatcher::stopRunning() +{ + ScopedLock<Mutex> l(lock); + running = false; +} + +Subscriber::shared_ptr Dispatcher::find(const std::string& name) +{ + ScopedLock<Mutex> l(lock); + Listeners::iterator i = listeners.find(name); + if (i == listeners.end()) { + return defaultListener; + } + return i->second; +} + +void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackFrequency) +{ + ScopedLock<Mutex> l(lock); + defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency)); +} + +void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool autoAck, uint ackFrequency) +{ + ScopedLock<Mutex> l(lock); + listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackFrequency)); +} + +void Dispatcher::cancel(const std::string& destination) +{ + ScopedLock<Mutex> l(lock); + listeners.erase(destination); +} + +}} diff --git a/cpp/src/qpid/client/Dispatcher.h b/cpp/src/qpid/client/Dispatcher.h new file mode 100644 index 0000000000..e4a4cec4a6 --- /dev/null +++ b/cpp/src/qpid/client/Dispatcher.h @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Dispatcher_ +#define _Dispatcher_ + +#include <map> +#include <memory> +#include <string> +#include <boost/shared_ptr.hpp> +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "MessageListener.h" + +namespace qpid { +namespace client { + +class Session; + +class Subscriber : public MessageListener +{ + Session& session; + MessageListener* const listener; + const bool autoAck; + const uint ackFrequency; + uint count; + +public: + typedef boost::shared_ptr<Subscriber> shared_ptr; + Subscriber(Session& session, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1); + void received(Message& msg); + +}; + +typedef framing::Handler<framing::FrameSet> FrameSetHandler; + +class Dispatcher : public sys::Runnable +{ + typedef std::map<std::string, Subscriber::shared_ptr> Listeners; + sys::Mutex lock; + sys::Thread worker; + Session& session; + const std::string queue; + bool running; + bool stopped; + Listeners listeners; + Subscriber::shared_ptr defaultListener; + std::auto_ptr<FrameSetHandler> handler; + + Subscriber::shared_ptr find(const std::string& name); + void startRunning(); + void stopRunning(); + bool isStopped(); + +public: + Dispatcher(Session& session, const std::string& queue = ""); + + void start(); + void run(); + void stop(); + + void listen(MessageListener* listener, bool autoAck = true, uint ackFrequency = 1); + void listen(const std::string& destination, MessageListener* listener, bool autoAck = true, uint ackFrequency = 1); + void cancel(const std::string& destination); +}; + +}} + +#endif diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index c5fc395c94..6fd96f3587 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -37,7 +37,7 @@ class TransferContent : public MethodContent AMQHeaderBody header; std::string data; public: - TransferContent(const std::string& data); + TransferContent(const std::string& data = ""); AMQHeaderBody getHeader() const; void setData(const std::string&); void appendData(const std::string&); |
