summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
committerAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
commit8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch)
tree1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/src/qpid/broker
parent9a808fb13aba243d41bbdab75158dae5939a80a4 (diff)
downloadqpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/AutoDelete.cpp93
-rw-r--r--cpp/src/qpid/broker/AutoDelete.h54
-rw-r--r--cpp/src/qpid/broker/Binding.h35
-rw-r--r--cpp/src/qpid/broker/Broker.cpp84
-rw-r--r--cpp/src/qpid/broker/Broker.h86
-rw-r--r--cpp/src/qpid/broker/Channel.cpp256
-rw-r--r--cpp/src/qpid/broker/Channel.h199
-rw-r--r--cpp/src/qpid/broker/Configuration.cpp196
-rw-r--r--cpp/src/qpid/broker/Configuration.h135
-rw-r--r--cpp/src/qpid/broker/ConnectionToken.h35
-rw-r--r--cpp/src/qpid/broker/Consumer.h34
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp72
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h52
-rw-r--r--cpp/src/qpid/broker/Exchange.h41
-rw-r--r--cpp/src/qpid/broker/ExchangeBinding.cpp32
-rw-r--r--cpp/src/qpid/broker/ExchangeBinding.h45
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp57
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h44
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp56
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h55
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp120
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h60
-rw-r--r--cpp/src/qpid/broker/Message.cpp100
-rw-r--r--cpp/src/qpid/broker/Message.h89
-rw-r--r--cpp/src/qpid/broker/NameGenerator.cpp29
-rw-r--r--cpp/src/qpid/broker/NameGenerator.h36
-rw-r--r--cpp/src/qpid/broker/Queue.cpp155
-rw-r--r--cpp/src/qpid/broker/Queue.h106
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp72
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h88
-rw-r--r--cpp/src/qpid/broker/Router.cpp32
-rw-r--r--cpp/src/qpid/broker/Router.h39
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp50
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.h49
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp405
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.h233
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp163
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h94
38 files changed, 3581 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/AutoDelete.cpp b/cpp/src/qpid/broker/AutoDelete.cpp
new file mode 100644
index 0000000000..22076e9e0c
--- /dev/null
+++ b/cpp/src/qpid/broker/AutoDelete.cpp
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/AutoDelete.h"
+
+using namespace qpid::broker;
+
+AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry),
+ period(_period),
+ stopped(true),
+ runner(0){}
+
+void AutoDelete::add(Queue::shared_ptr const queue){
+ lock.acquire();
+ queues.push(queue);
+ lock.release();
+}
+
+Queue::shared_ptr const AutoDelete::pop(){
+ Queue::shared_ptr next;
+ lock.acquire();
+ if(!queues.empty()){
+ next = queues.front();
+ queues.pop();
+ }
+ lock.release();
+ return next;
+}
+
+void AutoDelete::process(){
+ Queue::shared_ptr seen;
+ for(Queue::shared_ptr q = pop(); q; q = pop()){
+ if(seen == q){
+ add(q);
+ break;
+ }else if(q->canAutoDelete()){
+ std::string name(q->getName());
+ registry->destroy(name);
+ std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
+ }else{
+ add(q);
+ if(!seen) seen = q;
+ }
+ }
+}
+
+void AutoDelete::run(){
+ monitor.acquire();
+ while(!stopped){
+ process();
+ monitor.wait(period);
+ }
+ monitor.release();
+}
+
+void AutoDelete::start(){
+ monitor.acquire();
+ if(stopped){
+ runner = factory.create(this);
+ stopped = false;
+ monitor.release();
+ runner->start();
+ }else{
+ monitor.release();
+ }
+}
+
+void AutoDelete::stop(){
+ monitor.acquire();
+ if(!stopped){
+ stopped = true;
+ monitor.notify();
+ monitor.release();
+ runner->join();
+ delete runner;
+ }else{
+ monitor.release();
+ }
+}
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h
new file mode 100644
index 0000000000..77a5a338e3
--- /dev/null
+++ b/cpp/src/qpid/broker/AutoDelete.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AutoDelete_
+#define _AutoDelete_
+
+#include <iostream>
+#include <queue>
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Queue.h"
+#include "./qpid/broker/QueueRegistry.h"
+#include "./qpid/concurrent/ThreadFactoryImpl.h"
+
+namespace qpid {
+ namespace broker{
+ class AutoDelete : private virtual qpid::concurrent::Runnable{
+ qpid::concurrent::ThreadFactoryImpl factory;
+ qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::MonitorImpl monitor;
+ std::queue<Queue::shared_ptr> queues;
+ QueueRegistry* const registry;
+ const u_int32_t period;
+ volatile bool stopped;
+ qpid::concurrent::Thread* runner;
+
+ Queue::shared_ptr const pop();
+ void process();
+ virtual void run();
+
+ public:
+ AutoDelete(QueueRegistry* const registry, u_int32_t period);
+ void add(Queue::shared_ptr const);
+ void start();
+ void stop();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Binding.h b/cpp/src/qpid/broker/Binding.h
new file mode 100644
index 0000000000..4202d390c3
--- /dev/null
+++ b/cpp/src/qpid/broker/Binding.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Binding_
+#define _Binding_
+
+#include "./qpid/framing/FieldTable.h"
+
+namespace qpid {
+ namespace broker {
+ class Binding{
+ public:
+ virtual void cancel() = 0;
+ virtual ~Binding(){}
+ };
+ }
+}
+
+
+#endif
+
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
new file mode 100644
index 0000000000..27ce840d01
--- /dev/null
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include "./qpid/broker/Broker.h"
+#include "./qpid/io/Acceptor.h"
+#include "./qpid/broker/Configuration.h"
+#include "./qpid/QpidError.h"
+#include "./qpid/broker/SessionHandlerFactoryImpl.h"
+#include "./qpid/io/BlockingAPRAcceptor.h"
+#include "./qpid/io/LFAcceptor.h"
+
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+namespace {
+ Acceptor* createAcceptor(const Configuration& config){
+ const string type(config.getAcceptor());
+ if("blocking" == type){
+ std::cout << "Using blocking acceptor " << std::endl;
+ return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
+ }else if("non-blocking" == type){
+ std::cout << "Using non-blocking acceptor " << std::endl;
+ return new LFAcceptor(config.isTrace(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads(),
+ config.getMaxConnections());
+ }
+ throw Configuration::ParseException("Unrecognised acceptor: " + type);
+ }
+}
+
+Broker::Broker(const Configuration& config) :
+ acceptor(createAcceptor(config)),
+ port(config.getPort()),
+ isBound(false) {}
+
+Broker::shared_ptr Broker::create(int port)
+{
+ Configuration config;
+ config.setPort(port);
+ return create(config);
+}
+
+Broker::shared_ptr Broker::create(const Configuration& config) {
+ return Broker::shared_ptr(new Broker(config));
+}
+
+int16_t Broker::bind()
+{
+ if (!isBound) {
+ port = acceptor->bind(port);
+ }
+ return port;
+}
+
+void Broker::run() {
+ bind();
+ acceptor->run(&factory);
+}
+
+void Broker::shutdown() {
+ acceptor->shutdown();
+}
+
+Broker::~Broker() { }
+
+const int16_t Broker::DEFAULT_PORT(5672);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
new file mode 100644
index 0000000000..3423dc0910
--- /dev/null
+++ b/cpp/src/qpid/broker/Broker.h
@@ -0,0 +1,86 @@
+#ifndef _Broker_
+#define _Broker_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "./qpid/io/Acceptor.h"
+#include "./qpid/broker/Configuration.h"
+#include "./qpid/concurrent/Runnable.h"
+#include "./qpid/broker/SessionHandlerFactoryImpl.h"
+#include <boost/noncopyable.hpp>
+#include <tr1/memory>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * A broker instance.
+ */
+ class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
+ Broker(const Configuration& config); // Private, use create()
+ std::auto_ptr<qpid::io::Acceptor> acceptor;
+ SessionHandlerFactoryImpl factory;
+ int16_t port;
+ bool isBound;
+
+ public:
+ static const int16_t DEFAULT_PORT;
+
+ virtual ~Broker();
+ typedef std::tr1::shared_ptr<Broker> shared_ptr;
+
+ /**
+ * Create a broker.
+ * @param port Port to listen on or 0 to pick a port dynamically.
+ */
+ static shared_ptr create(int port = DEFAULT_PORT);
+
+ /**
+ * Create a broker from a Configuration.
+ */
+ static shared_ptr create(const Configuration& config);
+
+ /**
+ * Bind to the listening port.
+ * @return The port number bound.
+ */
+ virtual int16_t bind();
+
+ /**
+ * Return listening port. If called before bind this is
+ * the configured port. If called after it is the actual
+ * port, which will be different if the configured port is
+ * 0.
+ */
+ virtual int16_t getPort() { return port; }
+
+ /**
+ * Run the broker. Implements Runnable::run() so the broker
+ * can be run in a separate thread.
+ */
+ virtual void run();
+
+ /** Shut down the broker */
+ virtual void shutdown();
+ };
+ }
+}
+
+
+
+#endif /*!_Broker_*/
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
new file mode 100644
index 0000000000..8d1cce9f1b
--- /dev/null
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -0,0 +1,256 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/Channel.h"
+#include "./qpid/QpidError.h"
+#include <iostream>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
+ id(_id),
+ out(_out),
+ deliveryTag(1),
+ transactional(false),
+ prefetchSize(0),
+ prefetchCount(0),
+ outstandingSize(0),
+ outstandingCount(0),
+ framesize(_framesize),
+ tagGenerator("sgen"){}
+
+Channel::~Channel(){
+}
+
+bool Channel::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
+ if(tag.empty()) tag = tagGenerator.generate();
+
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
+ try{
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tag] = c;
+ }catch(ExclusiveAccessException& e){
+ delete c;
+ throw e;
+ }
+}
+
+void Channel::cancel(consumer_iterator i){
+ ConsumerImpl* c = i->second;
+ consumers.erase(i);
+ if(c){
+ c->cancel();
+ delete c;
+ }
+}
+
+void Channel::cancel(const string& tag){
+ consumer_iterator i = consumers.find(tag);
+ if(i != consumers.end()){
+ cancel(i);
+ }
+}
+
+void Channel::close(){
+ //cancel all consumers
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ cancel(i);
+ }
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+
+}
+
+void Channel::rollback(){
+
+}
+
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+ Locker locker(deliveryLock);
+
+ u_int64_t myDeliveryTag = deliveryTag++;
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
+ outstandingSize += msg->contentSize();
+ outstandingCount++;
+ }
+ //send deliver method, header and content(s)
+ msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
+}
+
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Locker locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+ return countOk && sizeOk;
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection),
+ ackExpected(ack),
+ blocked(false){
+}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(!connection || connection != msg->getPublisher()){//check for no_local
+ if(ackExpected && !parent->checkPrefetch(msg)){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
+ }
+ return false;
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue) queue->cancel(this);
+}
+
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked) queue->dispatch();
+}
+
+void Channel::checkMessage(const std::string& text){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
+ }
+}
+
+void Channel::handlePublish(Message* msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = Message::shared_ptr(msg);
+}
+
+void Channel::ack(u_int64_t _deliveryTag, bool multiple){
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
+ if(i == unacknowledged.end()){
+ throw InvalidAckException();
+ }else if(multiple){
+ unacknowledged.erase(unacknowledged.begin(), ++i);
+ //recompute prefetch outstanding (note: messages delivered through get are ignored)
+ CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
+ outstandingSize = calc.getSize();
+ outstandingCount = calc.getCount();
+ }else{
+ if(!i->pull){
+ outstandingSize -= i->msg->contentSize();
+ outstandingCount--;
+ }
+ unacknowledged.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
+}
+
+void Channel::recover(bool requeue){
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstandingSize = 0;
+ outstandingCount = 0;
+ ack_iterator start(unacknowledged.begin());
+ ack_iterator end(unacknowledged.end());
+ for_each(start, end, Requeue());
+ unacknowledged.erase(start, end);
+ }else{
+ for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
+ }
+}
+
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Locker locker(deliveryLock);
+ u_int64_t myDeliveryTag = deliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
+
+bool Channel::MatchAck::operator()(AckRecord& record) const{
+ return tag == record.deliveryTag;
+}
+
+void Channel::Requeue::operator()(AckRecord& record) const{
+ record.msg->redeliver();
+ record.queue->deliver(record.msg);
+}
+
+Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
+
+void Channel::Redeliver::operator()(AckRecord& record) const{
+ if(record.pull){
+ //if message was originally sent as response to get, we must requeue it
+ record.msg->redeliver();
+ record.queue->deliver(record.msg);
+ }else{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ }
+}
+
+Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
+
+void Channel::CalculatePrefetch::operator()(AckRecord& record){
+ if(!record.pull){
+ //ignore messages that were sent in response to get when calculating prefetch
+ size += record.msg->contentSize();
+ count++;
+ }
+}
+
+u_int32_t Channel::CalculatePrefetch::getSize(){
+ return size;
+}
+
+u_int16_t Channel::CalculatePrefetch::getCount(){
+ return count;
+}
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
new file mode 100644
index 0000000000..a20f4d9599
--- /dev/null
+++ b/cpp/src/qpid/broker/Channel.h
@@ -0,0 +1,199 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Channel_
+#define _Channel_
+
+#include <algorithm>
+#include <map>
+#include "./qpid/framing/AMQContentBody.h"
+#include "./qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
+#include "./qpid/broker/Binding.h"
+#include "./qpid/broker/Consumer.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/NameGenerator.h"
+#include "./qpid/framing/OutputHandler.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
+ class Channel{
+ private:
+ class ConsumerImpl : public virtual Consumer{
+ Channel* parent;
+ string tag;
+ Queue::shared_ptr queue;
+ ConnectionToken* const connection;
+ const bool ackExpected;
+ bool blocked;
+ public:
+ ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
+ virtual bool deliver(Message::shared_ptr& msg);
+ void cancel();
+ void requestDispatch();
+ };
+
+ typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+
+ struct AckRecord{
+ Message::shared_ptr msg;
+ Queue::shared_ptr queue;
+ string consumerTag;
+ u_int64_t deliveryTag;
+ bool pull;
+
+ AckRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ pull(false){}
+
+ AckRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ pull(true){}
+ };
+
+ typedef std::vector<AckRecord>::iterator ack_iterator;
+
+ class MatchAck{
+ const u_int64_t tag;
+ public:
+ MatchAck(u_int64_t tag);
+ bool operator()(AckRecord& record) const;
+ };
+
+ class Requeue{
+ public:
+ void operator()(AckRecord& record) const;
+ };
+
+ class Redeliver{
+ Channel* const channel;
+ public:
+ Redeliver(Channel* const channel);
+ void operator()(AckRecord& record) const;
+ };
+
+ class CalculatePrefetch{
+ u_int32_t size;
+ u_int16_t count;
+ public:
+ CalculatePrefetch();
+ void operator()(AckRecord& record);
+ u_int32_t getSize();
+ u_int16_t getCount();
+ };
+
+ const int id;
+ qpid::framing::OutputHandler* out;
+ u_int64_t deliveryTag;
+ Queue::shared_ptr defaultQueue;
+ bool transactional;
+ std::map<string, ConsumerImpl*> consumers;
+ u_int32_t prefetchSize;
+ u_int16_t prefetchCount;
+ u_int32_t outstandingSize;
+ u_int16_t outstandingCount;
+ u_int32_t framesize;
+ Message::shared_ptr message;
+ NameGenerator tagGenerator;
+ std::vector<AckRecord> unacknowledged;
+ qpid::concurrent::MonitorImpl deliveryLock;
+
+ void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
+ void checkMessage(const std::string& text);
+ bool checkPrefetch(Message::shared_ptr& msg);
+ void cancel(consumer_iterator consumer);
+
+ template<class Operation> Operation processMessage(Operation route){
+ if(message->isComplete()){
+ route(message);
+ message.reset();
+ }
+ return route;
+ }
+
+
+ public:
+ Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
+ ~Channel();
+ inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
+ inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
+ inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; }
+ bool exists(const string& consumerTag);
+ void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
+ void cancel(const string& tag);
+ bool get(Queue::shared_ptr queue, bool ackExpected);
+ void begin();
+ void close();
+ void commit();
+ void rollback();
+ void ack(u_int64_t deliveryTag, bool multiple);
+ void recover(bool requeue);
+
+ /**
+ * Handles the initial publish request though a
+ * channel. The header and (if applicable) content will be
+ * accumulated through calls to handleHeader() and
+ * handleContent()
+ */
+ void handlePublish(Message* msg);
+
+ /**
+ * A template method that handles a received header and if
+ * there is no content routes it using the functor passed
+ * in.
+ */
+ template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
+ checkMessage("Invalid message sequence: got header before publish.");
+ message->setHeader(header);
+ return processMessage(route);
+ }
+
+ /**
+ * A template method that handles a received content and
+ * if this completes the message, routes it using the
+ * functor passed in.
+ */
+ template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
+ checkMessage("Invalid message sequence: got content before publish.");
+ message->addContent(content);
+ return processMessage(route);
+ }
+
+ };
+
+ struct InvalidAckException{};
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp
new file mode 100644
index 0000000000..d4b27e4dd2
--- /dev/null
+++ b/cpp/src/qpid/broker/Configuration.cpp
@@ -0,0 +1,196 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/Configuration.h"
+#include <string.h>
+
+using namespace qpid::broker;
+using namespace std;
+
+Configuration::Configuration() :
+ trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
+ port('p', "port", "Sets the port to listen on (default=5672)", 5672),
+ workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
+ maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+ connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
+ acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
+ help("help", "Prints usage information", false)
+{
+ options.push_back(&trace);
+ options.push_back(&port);
+ options.push_back(&workerThreads);
+ options.push_back(&maxConnections);
+ options.push_back(&connectionBacklog);
+ options.push_back(&acceptor);
+ options.push_back(&help);
+}
+
+Configuration::~Configuration(){}
+
+void Configuration::parse(int argc, char** argv){
+ int position = 1;
+ while(position < argc){
+ bool matched(false);
+ for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
+ matched = (*i)->parse(position, argv, argc);
+ }
+ if(!matched){
+ std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
+ position++;
+ }
+ }
+}
+
+void Configuration::usage(){
+ for(op_iterator i = options.begin(); i < options.end(); i++){
+ (*i)->print(std::cout);
+ }
+}
+
+bool Configuration::isHelp() const {
+ return help.getValue();
+}
+
+bool Configuration::isTrace() const {
+ return trace.getValue();
+}
+
+int Configuration::getPort() const {
+ return port.getValue();
+}
+
+int Configuration::getWorkerThreads() const {
+ return workerThreads.getValue();
+}
+
+int Configuration::getMaxConnections() const {
+ return maxConnections.getValue();
+}
+
+int Configuration::getConnectionBacklog() const {
+ return connectionBacklog.getValue();
+}
+
+string Configuration::getAcceptor() const {
+ return acceptor.getValue();
+}
+
+Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
+ flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
+
+Configuration::Option::Option(const string& _name, const string& _desc) :
+ flag(""), name("--" + _name), desc(_desc) {}
+
+Configuration::Option::~Option(){}
+
+bool Configuration::Option::match(const string& arg){
+ return flag == arg || name == arg;
+}
+
+bool Configuration::Option::parse(int& i, char** argv, int argc){
+ const string arg(argv[i]);
+ if(match(arg)){
+ if(needsValue()){
+ if(++i < argc) setValue(argv[i]);
+ else throw ParseException("Argument " + arg + " requires a value!");
+ }else{
+ setValue("");
+ }
+ i++;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Configuration::Option::print(ostream& out) const {
+ out << " ";
+ if(flag.length() > 0){
+ out << flag << " or ";
+ }
+ out << name;
+ if(needsValue()) out << "<value>";
+ out << std::endl;
+ out << " " << desc << std::endl;
+}
+
+
+// String Option:
+
+Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::~StringOption(){}
+
+const string& Configuration::StringOption::getValue() const {
+ return value;
+}
+
+bool Configuration::StringOption::needsValue() const {
+ return true;
+}
+
+void Configuration::StringOption::setValue(const std::string& _value){
+ value = _value;
+}
+
+// Int Option:
+
+Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::~IntOption(){}
+
+int Configuration::IntOption::getValue() const {
+ return value;
+}
+
+bool Configuration::IntOption::needsValue() const {
+ return true;
+}
+
+void Configuration::IntOption::setValue(const std::string& _value){
+ value = atoi(_value.c_str());
+}
+
+// Bool Option:
+
+Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::~BoolOption(){}
+
+bool Configuration::BoolOption::getValue() const {
+ return value;
+}
+
+bool Configuration::BoolOption::needsValue() const {
+ return false;
+}
+
+void Configuration::BoolOption::setValue(const std::string& _value){
+ value = strcasecmp(_value.c_str(), "true") == 0;
+}
diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h
new file mode 100644
index 0000000000..3785e1bac0
--- /dev/null
+++ b/cpp/src/qpid/broker/Configuration.h
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Configuration_
+#define _Configuration_
+
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include "./qpid/Exception.h"
+
+namespace qpid {
+ namespace broker {
+ class Configuration{
+ class Option {
+ const std::string flag;
+ const std::string name;
+ const std::string desc;
+
+ bool match(const std::string& arg);
+
+ protected:
+ virtual bool needsValue() const = 0;
+ virtual void setValue(const std::string& value) = 0;
+
+ public:
+ Option(const char flag, const std::string& name, const std::string& desc);
+ Option(const std::string& name, const std::string& desc);
+ virtual ~Option();
+
+ bool parse(int& i, char** argv, int argc);
+ void print(std::ostream& out) const;
+ };
+
+ class IntOption : public Option{
+ const int defaultValue;
+ int value;
+ public:
+ IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
+ IntOption(const std::string& name, const std::string& desc, const int value = 0);
+ virtual ~IntOption();
+
+ int getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ virtual void setValue(int _value) { value = _value; }
+ };
+
+ class StringOption : public Option{
+ const std::string defaultValue;
+ std::string value;
+ public:
+ StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
+ StringOption(const std::string& name, const std::string& desc, const std::string value = "");
+ virtual ~StringOption();
+
+ const std::string& getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ class BoolOption : public Option{
+ const bool defaultValue;
+ bool value;
+ public:
+ BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
+ BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
+ virtual ~BoolOption();
+
+ bool getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ virtual void setValue(bool _value) { value = _value; }
+ };
+
+ BoolOption trace;
+ IntOption port;
+ IntOption workerThreads;
+ IntOption maxConnections;
+ IntOption connectionBacklog;
+ StringOption acceptor;
+ BoolOption help;
+
+ typedef std::vector<Option*>::iterator op_iterator;
+ std::vector<Option*> options;
+
+ public:
+ class ParseException : public Exception {
+ public:
+ ParseException(const std::string& msg) : Exception(msg) {}
+ };
+
+
+ Configuration();
+ ~Configuration();
+
+ void parse(int argc, char** argv);
+
+ bool isHelp() const;
+ bool isTrace() const;
+ int getPort() const;
+ int getWorkerThreads() const;
+ int getMaxConnections() const;
+ int getConnectionBacklog() const;
+ std::string getAcceptor() const;
+
+ void setHelp(bool b) { help.setValue(b); }
+ void setTrace(bool b) { trace.setValue(b); }
+ void setPort(int i) { port.setValue(i); }
+ void setWorkerThreads(int i) { workerThreads.setValue(i); }
+ void setMaxConnections(int i) { maxConnections.setValue(i); }
+ void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
+ void setAcceptor(const std::string& val) { acceptor.setValue(val); }
+
+ void usage();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/ConnectionToken.h b/cpp/src/qpid/broker/ConnectionToken.h
new file mode 100644
index 0000000000..1faefec2cc
--- /dev/null
+++ b/cpp/src/qpid/broker/ConnectionToken.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ConnectionToken_
+#define _ConnectionToken_
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An empty interface allowing opaque implementations of some
+ * form of token to identify a connection.
+ */
+ class ConnectionToken{
+ public:
+ virtual ~ConnectionToken(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
new file mode 100644
index 0000000000..7d346a4a0a
--- /dev/null
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Consumer_
+#define _Consumer_
+
+#include "./qpid/broker/Message.h"
+
+namespace qpid {
+ namespace broker {
+ class Consumer{
+ public:
+ virtual bool deliver(Message::shared_ptr& msg) = 0;
+ virtual ~Consumer(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
new file mode 100644
index 0000000000..aa90d8dd81
--- /dev/null
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/DirectExchange.h"
+#include "./qpid/broker/ExchangeBinding.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
+
+}
+
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i == queues.end()){
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+ lock.release();
+}
+
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+ lock.release();
+}
+
+void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ int count(0);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
+ (*i)->deliver(msg);
+ }
+ if(!count){
+ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
+ }
+ lock.release();
+}
+
+DirectExchange::~DirectExchange(){
+
+}
+
+
+const std::string DirectExchange::typeName("direct");
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
new file mode 100644
index 0000000000..ce58a174c6
--- /dev/null
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DirectExchange_
+#define _DirectExchange_
+
+#include <map>
+#include <vector>
+#include "./qpid/broker/Exchange.h"
+#include "./qpid/framing/FieldTable.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+namespace broker {
+ class DirectExchange : public virtual Exchange{
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ DirectExchange(const std::string& name);
+
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~DirectExchange();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
new file mode 100644
index 0000000000..7b8bb1c034
--- /dev/null
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Exchange_
+#define _Exchange_
+
+#include "./qpid/framing/FieldTable.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+namespace broker {
+ class Exchange{
+ const std::string name;
+ public:
+ explicit Exchange(const std::string& _name) : name(_name) {}
+ virtual ~Exchange(){}
+ std::string getName() { return name; }
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/ExchangeBinding.cpp b/cpp/src/qpid/broker/ExchangeBinding.cpp
new file mode 100644
index 0000000000..61f44f634c
--- /dev/null
+++ b/cpp/src/qpid/broker/ExchangeBinding.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/ExchangeBinding.h"
+#include "./qpid/broker/Exchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+
+void ExchangeBinding::cancel(){
+ e->unbind(q, key, args);
+ delete this;
+}
+
+ExchangeBinding::~ExchangeBinding(){
+}
diff --git a/cpp/src/qpid/broker/ExchangeBinding.h b/cpp/src/qpid/broker/ExchangeBinding.h
new file mode 100644
index 0000000000..fda5fab153
--- /dev/null
+++ b/cpp/src/qpid/broker/ExchangeBinding.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeBinding_
+#define _ExchangeBinding_
+
+#include "./qpid/broker/Binding.h"
+#include "./qpid/framing/FieldTable.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Exchange;
+ class Queue;
+
+ class ExchangeBinding : public virtual Binding{
+ Exchange* e;
+ Queue::shared_ptr q;
+ const string key;
+ qpid::framing::FieldTable* args;
+ public:
+ ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args);
+ virtual void cancel();
+ virtual ~ExchangeBinding();
+ };
+ }
+}
+
+
+#endif
+
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
new file mode 100644
index 0000000000..0755393e68
--- /dev/null
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/ExchangeRegistry.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
+
+ExchangeRegistry::~ExchangeRegistry(){
+ for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i)
+ {
+ delete i->second;
+ }
+ delete lock;
+}
+
+void ExchangeRegistry::declare(Exchange* exchange){
+ exchanges[exchange->getName()] = exchange;
+}
+
+void ExchangeRegistry::destroy(const string& name){
+ if(exchanges[name]){
+ delete exchanges[name];
+ exchanges.erase(name);
+ }
+}
+
+Exchange* ExchangeRegistry::get(const string& name){
+ return exchanges[name];
+}
+
+namespace
+{
+const std::string empty;
+}
+
+Exchange* ExchangeRegistry::getDefault()
+{
+ return get(empty);
+}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
new file mode 100644
index 0000000000..7728ed0eff
--- /dev/null
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeRegistry_
+#define _ExchangeRegistry_
+
+#include <map>
+#include "./qpid/broker/Exchange.h"
+#include "./qpid/concurrent/Monitor.h"
+
+namespace qpid {
+namespace broker {
+ class ExchangeRegistry{
+ typedef std::map<string, Exchange*> ExchangeMap;
+ ExchangeMap exchanges;
+ qpid::concurrent::Monitor* lock;
+ public:
+ ExchangeRegistry();
+ void declare(Exchange* exchange);
+ void destroy(const string& name);
+ Exchange* get(const string& name);
+ Exchange* getDefault();
+ inline qpid::concurrent::Monitor* getLock(){ return lock; }
+ ~ExchangeRegistry();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
new file mode 100644
index 0000000000..1b4e3643d0
--- /dev/null
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/FanOutExchange.h"
+#include "./qpid/broker/ExchangeBinding.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
+
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ // Add if not already present.
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i == bindings.end()) {
+ bindings.push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){
+ Locker locker(lock);
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
+ }
+}
+
+void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){
+ Locker locker(lock);
+ for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
+ (*i)->deliver(msg);
+ }
+}
+
+FanOutExchange::~FanOutExchange() {}
+
+const std::string FanOutExchange::typeName("fanout");
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
new file mode 100644
index 0000000000..f79dd28ec5
--- /dev/null
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _FanOutExchange_
+#define _FanOutExchange_
+
+#include <map>
+#include <vector>
+#include "./qpid/broker/Exchange.h"
+#include "./qpid/framing/FieldTable.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class FanOutExchange : public virtual Exchange {
+ std::vector<Queue::shared_ptr> bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ FanOutExchange(const std::string& name);
+
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~FanOutExchange();
+};
+
+}
+}
+
+
+
+#endif
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
new file mode 100644
index 0000000000..35feef22dd
--- /dev/null
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/HeadersExchange.h"
+#include "./qpid/broker/ExchangeBinding.h"
+#include "./qpid/framing/Value.h"
+#include "./qpid/QpidError.h"
+#include <algorithm>
+
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// The current search algorithm really sucks.
+// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
+
+using namespace qpid::broker;
+
+namespace {
+ const std::string all("all");
+ const std::string any("any");
+ const std::string x_match("x-match");
+}
+
+HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
+
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ std::cout << "HeadersExchange::bind" << std::endl;
+ Locker locker(lock);
+ std::string what = args->getString("x-match");
+ if (what != all && what != any) {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
+ }
+ bindings.push_back(Binding(*args, queue));
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){
+ Locker locker(lock);
+ Bindings::iterator i =
+ std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ if (i != bindings.end()) bindings.erase(i);
+}
+
+
+void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){
+ std::cout << "route: " << *args << std::endl;
+ Locker locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, *args)) i->second->deliver(msg);
+ }
+}
+
+HeadersExchange::~HeadersExchange() {}
+
+const std::string HeadersExchange::typeName("headers");
+
+namespace
+{
+
+ bool match_values(const Value& bind, const Value& msg) {
+ return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
+ }
+
+}
+
+
+bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
+ typedef FieldTable::ValueMap Map;
+ std::string what = bind.getString(x_match);
+ if (what == all) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j == msg.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ }
+ return true;
+ } else if (what == any) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j != msg.getMap().end()) {
+ if (match_values(*(i->second), *(j->second))) return true;
+ }
+ }
+ }
+ return false;
+ } else {
+ return false;
+ }
+}
+
+
+
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
new file mode 100644
index 0000000000..a330e050f5
--- /dev/null
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _HeadersExchange_
+#define _HeadersExchange_
+
+#include <vector>
+#include "./qpid/broker/Exchange.h"
+#include "./qpid/framing/FieldTable.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+namespace broker {
+
+
+class HeadersExchange : public virtual Exchange {
+ typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
+ typedef std::vector<Binding> Bindings;
+
+ Bindings bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ HeadersExchange(const string& name);
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~HeadersExchange();
+
+ static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+};
+
+
+
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
new file mode 100644
index 0000000000..7210ecc2f2
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/broker/ExchangeRegistry.h"
+#include <iostream>
+
+using namespace std::tr1;//for *_pointer_cast methods
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+
+Message::Message(const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate) : publisher(_publisher),
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ redelivered(false),
+ size(0){
+
+}
+
+Message::~Message(){
+}
+
+void Message::setHeader(AMQHeaderBody::shared_ptr _header){
+ this->header = _header;
+}
+
+void Message::addContent(AMQContentBody::shared_ptr data){
+ content.push_back(data);
+ size += data->size();
+}
+
+bool Message::isComplete(){
+ return header.get() && (header->getContentSize() == contentSize());
+}
+
+void Message::redeliver(){
+ redelivered = true;
+}
+
+void Message::deliver(OutputHandler* out, int channel,
+ const string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendGetOk(OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
+ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+ out->send(new AMQFrame(channel, headerBody));
+ for(content_iterator i = content.begin(); i != content.end(); i++){
+ if((*i)->size() > framesize){
+ //TODO: need to split it
+ std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
+ }else{
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ out->send(new AMQFrame(channel, contentBody));
+ }
+ }
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+const ConnectionToken* const Message::getPublisher(){
+ return publisher;
+}
+
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
new file mode 100644
index 0000000000..7ba8767baf
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.h
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Message_
+#define _Message_
+
+#include "./memory.h"
+#include "./qpid/framing/AMQContentBody.h"
+#include "./qpid/framing/AMQHeaderBody.h"
+#include "./qpid/framing/BasicHeaderProperties.h"
+#include "./qpid/framing/BasicPublishBody.h"
+#include "./qpid/broker/ConnectionToken.h"
+#include "./qpid/framing/OutputHandler.h"
+
+namespace qpid {
+ namespace broker {
+ class ExchangeRegistry;
+
+ /**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
+ class Message{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef content_list::iterator content_iterator;
+
+ const ConnectionToken* const publisher;
+ const string exchange;
+ const string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ bool redelivered;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ content_list content;
+ u_int64_t size;
+
+ void sendContent(qpid::framing::OutputHandler* out,
+ int channel, u_int32_t framesize);
+
+ public:
+ typedef std::tr1::shared_ptr<Message> shared_ptr;
+
+ Message(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate);
+ ~Message();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ const ConnectionToken* const getPublisher();
+
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const string& consumerTag,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
+ void redeliver();
+
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ const string& getRoutingKey() const { return routingKey; }
+ const string& getExchange() const { return exchange; }
+ u_int64_t contentSize() const{ return size; }
+
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/NameGenerator.cpp b/cpp/src/qpid/broker/NameGenerator.cpp
new file mode 100644
index 0000000000..d9b758c5a0
--- /dev/null
+++ b/cpp/src/qpid/broker/NameGenerator.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/NameGenerator.h"
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+ std::stringstream ss;
+ ss << base << counter++;
+ return ss.str();
+}
diff --git a/cpp/src/qpid/broker/NameGenerator.h b/cpp/src/qpid/broker/NameGenerator.h
new file mode 100644
index 0000000000..38f3c0a4c1
--- /dev/null
+++ b/cpp/src/qpid/broker/NameGenerator.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include "./qpid/broker/Message.h"
+
+namespace qpid {
+ namespace broker {
+ class NameGenerator{
+ const std::string base;
+ unsigned int counter;
+ public:
+ NameGenerator(const std::string& base);
+ std::string generate();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
new file mode 100644
index 0000000000..d672074555
--- /dev/null
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -0,0 +1,155 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/Queue.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
+ name(_name),
+ autodelete(_autodelete),
+ durable(_durable),
+ owner(_owner),
+ queueing(false),
+ dispatching(false),
+ next(0),
+ lastUsed(0),
+ exclusive(0)
+{
+ if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
+}
+
+Queue::~Queue(){
+ for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
+ b->cancel();
+ bindings.pop();
+ }
+}
+
+void Queue::bound(Binding* b){
+ bindings.push(b);
+}
+
+void Queue::deliver(Message::shared_ptr& msg){
+ Locker locker(lock);
+ if(queueing || !dispatch(msg)){
+ queueing = true;
+ messages.push(msg);
+ }
+}
+
+bool Queue::dispatch(Message::shared_ptr& msg){
+ if(consumers.empty()){
+ return false;
+ }else if(exclusive){
+ if(!exclusive->deliver(msg)){
+ std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+ }
+ return true;
+ }else{
+ //deliver to next consumer
+ next = next % consumers.size();
+ Consumer* c = consumers[next];
+ int start = next;
+ while(c){
+ next++;
+ if(c->deliver(msg)) return true;
+
+ next = next % consumers.size();
+ c = next == start ? 0 : consumers[next];
+ }
+ return false;
+ }
+}
+
+bool Queue::startDispatching(){
+ Locker locker(lock);
+ if(queueing && !dispatching){
+ dispatching = true;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Queue::dispatch(){
+ bool proceed = startDispatching();
+ while(proceed){
+ Locker locker(lock);
+ if(!messages.empty() && dispatch(messages.front())){
+ messages.pop();
+ }else{
+ dispatching = false;
+ proceed = false;
+ queueing = !messages.empty();
+ }
+ }
+}
+
+void Queue::consume(Consumer* c, bool requestExclusive){
+ Locker locker(lock);
+ if(exclusive) throw ExclusiveAccessException();
+ if(requestExclusive){
+ if(!consumers.empty()) throw ExclusiveAccessException();
+ exclusive = c;
+ }
+
+ if(autodelete && consumers.empty()) lastUsed = 0;
+ consumers.push_back(c);
+}
+
+void Queue::cancel(Consumer* c){
+ Locker locker(lock);
+ consumers.erase(find(consumers.begin(), consumers.end(), c));
+ if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
+ if(exclusive == c) exclusive = 0;
+}
+
+Message::shared_ptr Queue::dequeue(){
+ Locker locker(lock);
+ Message::shared_ptr msg;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ return msg;
+}
+
+u_int32_t Queue::purge(){
+ Locker locker(lock);
+ int count = messages.size();
+ while(!messages.empty()) messages.pop();
+ return count;
+}
+
+u_int32_t Queue::getMessageCount() const{
+ Locker locker(lock);
+ return messages.size();
+}
+
+u_int32_t Queue::getConsumerCount() const{
+ Locker locker(lock);
+ return consumers.size();
+}
+
+bool Queue::canAutoDelete() const{
+ Locker locker(lock);
+ return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
new file mode 100644
index 0000000000..6cf9088633
--- /dev/null
+++ b/cpp/src/qpid/broker/Queue.h
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Queue_
+#define _Queue_
+
+#include <vector>
+#include <queue>
+#include "./memory.h"
+#include "apr_time.h"
+#include "./qpid/framing/amqp_types.h"
+#include "./qpid/broker/Binding.h"
+#include "./qpid/broker/ConnectionToken.h"
+#include "./qpid/broker/Consumer.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * Thrown when exclusive access would be violated.
+ */
+ struct ExclusiveAccessException{};
+
+ /**
+ * The brokers representation of an amqp queue. Messages are
+ * delivered to a queue from where they can be dispatched to
+ * registered consumers or be stored until dequeued or until one
+ * or more consumers registers.
+ */
+ class Queue{
+ const string name;
+ const u_int32_t autodelete;
+ const bool durable;
+ const ConnectionToken* const owner;
+ std::vector<Consumer*> consumers;
+ std::queue<Binding*> bindings;
+ std::queue<Message::shared_ptr> messages;
+ bool queueing;
+ bool dispatching;
+ int next;
+ mutable qpid::concurrent::MonitorImpl lock;
+ apr_time_t lastUsed;
+ Consumer* exclusive;
+
+ bool startDispatching();
+ bool dispatch(Message::shared_ptr& msg);
+
+ public:
+
+ typedef std::tr1::shared_ptr<Queue> shared_ptr;
+
+ typedef std::vector<shared_ptr> vector;
+
+ Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ ~Queue();
+ /**
+ * Informs the queue of a binding that should be cancelled on
+ * destruction of the queue.
+ */
+ void bound(Binding* b);
+ /**
+ * Delivers a message to the queue from where it will be
+ * dispatched to immediately to a consumer if one is
+ * available or stored for dequeue or later dispatch if
+ * not.
+ */
+ void deliver(Message::shared_ptr& msg);
+ /**
+ * Dispatch any queued messages providing there are
+ * consumers for them. Only one thread can be dispatching
+ * at any time, but this method (rather than the caller)
+ * is responsible for ensuring that.
+ */
+ void dispatch();
+ void consume(Consumer* c, bool exclusive = false);
+ void cancel(Consumer* c);
+ Message::shared_ptr dequeue();
+ u_int32_t purge();
+ u_int32_t getMessageCount() const;
+ u_int32_t getConsumerCount() const;
+ inline const string& getName() const { return name; }
+ inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline bool hasExclusiveConsumer() const { return exclusive; }
+ bool canAutoDelete() const;
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
new file mode 100644
index 0000000000..f2cb46648e
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/QueueRegistry.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/SessionHandlerImpl.h"
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+QueueRegistry::QueueRegistry() : counter(1){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+{
+ Locker locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+ queues[name] = queue;
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
+ } else {
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void QueueRegistry::destroy(const string& name){
+ Locker locker(lock);
+ queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+ Locker locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ return Queue::shared_ptr();
+ } else {
+ return i->second;
+ }
+}
+
+string QueueRegistry::generateName(){
+ string name;
+ do {
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
+ } while(queues.find(name) != queues.end());
+ return name;
+}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
new file mode 100644
index 0000000000..9e6778153e
--- /dev/null
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class SessionHandlerImpl;
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+
+ public:
+ QueueRegistry();
+ ~QueueRegistry();
+
+ /**
+ * Declare a queue.
+ *
+ * @return The queue and a boolean flag which is true if the queue
+ * was created by this declare call false if it already existed.
+ */
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+
+ /**
+ * Destroy the named queue.
+ *
+ * Note: if the queue is in use it is not actually destroyed until
+ * all shared_ptrs to it are destroyed. During that time it is
+ * possible that a new queue with the same name may be
+ * created. This should not create any problems as the new and
+ * old queues exist independently. The registry has
+ * forgotten the old queue so there can be no confusion for
+ * subsequent calls to find or declare with the same name.
+ *
+ */
+ void destroy(const string& name);
+
+ /**
+ * Find the named queue. Return 0 if not found.
+ */
+ Queue::shared_ptr find(const string& name);
+
+ /**
+ * Generate unique queue name.
+ */
+ string generateName();
+
+ private:
+ typedef std::map<string, Queue::shared_ptr> QueueMap;
+ QueueMap queues;
+ qpid::concurrent::MonitorImpl lock;
+ int counter;
+
+};
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/Router.cpp b/cpp/src/qpid/broker/Router.cpp
new file mode 100644
index 0000000000..6a81816aea
--- /dev/null
+++ b/cpp/src/qpid/broker/Router.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/Router.h"
+
+using namespace qpid::broker;
+
+Router::Router(ExchangeRegistry& _registry) : registry(_registry){}
+
+void Router::operator()(Message::shared_ptr& msg){
+ Exchange* exchange = registry.get(msg->getExchange());
+ if(exchange){
+ exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }else{
+ std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl;
+ }
+
+}
diff --git a/cpp/src/qpid/broker/Router.h b/cpp/src/qpid/broker/Router.h
new file mode 100644
index 0000000000..23739e6527
--- /dev/null
+++ b/cpp/src/qpid/broker/Router.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Router_
+#define _Router_
+
+#include "./qpid/broker/ExchangeRegistry.h"
+#include "./qpid/broker/Message.h"
+
+/**
+ * A routing functor
+ */
+namespace qpid {
+ namespace broker {
+ class Router{
+ ExchangeRegistry& registry;
+ public:
+ Router(ExchangeRegistry& registry);
+ void operator()(Message::shared_ptr& msg);
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
new file mode 100644
index 0000000000..8afff976e1
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/SessionHandlerFactoryImpl.h"
+#include "./qpid/broker/SessionHandlerImpl.h"
+#include "./qpid/broker/FanOutExchange.h"
+#include "./qpid/broker/HeadersExchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+namespace
+{
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+}
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
+ exchanges.declare(new DirectExchange(empty)); // Default exchange.
+ exchanges.declare(new DirectExchange(amq_direct));
+ exchanges.declare(new TopicExchange(amq_topic));
+ exchanges.declare(new FanOutExchange(amq_fanout));
+ exchanges.declare(new HeadersExchange(amq_match));
+ cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+ cleaner.stop();
+}
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
new file mode 100644
index 0000000000..2875cf63e6
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include "./qpid/framing/AMQFrame.h"
+#include "./qpid/broker/AutoDelete.h"
+#include "./qpid/broker/DirectExchange.h"
+#include "./qpid/broker/ExchangeRegistry.h"
+#include "./qpid/framing/ProtocolInitiation.h"
+#include "./qpid/broker/QueueRegistry.h"
+#include "./qpid/io/SessionHandlerFactory.h"
+#include "./qpid/io/TimeoutHandler.h"
+
+namespace qpid {
+ namespace broker {
+
+ class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
+ {
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ AutoDelete cleaner;
+ public:
+ SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+ virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
+ virtual ~SessionHandlerFactoryImpl();
+ };
+
+ }
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
new file mode 100644
index 0000000000..49d43d8f8b
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -0,0 +1,405 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "./qpid/broker/SessionHandlerImpl.h"
+#include "./qpid/broker/FanOutExchange.h"
+#include "./qpid/broker/HeadersExchange.h"
+#include "./qpid/broker/Router.h"
+#include "./qpid/broker/TopicExchange.h"
+#include "assert.h"
+
+using namespace std::tr1;
+using namespace qpid::broker;
+using namespace qpid::io;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
+ QueueRegistry* _queues,
+ ExchangeRegistry* _exchanges,
+ AutoDelete* _cleaner,
+ const u_int32_t _timeout) :
+ context(_context),
+ client(context),
+ queues(_queues),
+ exchanges(_exchanges),
+ cleaner(_cleaner),
+ timeout(_timeout),
+ connectionHandler(new ConnectionHandlerImpl(this)),
+ channelHandler(new ChannelHandlerImpl(this)),
+ basicHandler(new BasicHandlerImpl(this)),
+ exchangeHandler(new ExchangeHandlerImpl(this)),
+ queueHandler(new QueueHandlerImpl(this)),
+ framemax(65536),
+ heartbeat(0) {}
+
+SessionHandlerImpl::~SessionHandlerImpl(){
+ // TODO aconway 2006-09-07: Should be auto_ptr or plain members.
+ delete channelHandler;
+ delete connectionHandler;
+ delete basicHandler;
+ delete exchangeHandler;
+ delete queueHandler;
+}
+
+Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
+ channel_iterator i = channels.find(channel);
+ if(i == channels.end()){
+ throw ConnectionException(504, "Unknown channel: " + channel);
+ }
+ return i->second;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getChannel(channel)->getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = queues->find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange* SessionHandlerImpl::findExchange(const string& name){
+ exchanges->getLock()->acquire();
+ Exchange* exchange(exchanges->get(name));
+ exchanges->getLock()->release();
+ return exchange;
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+ u_int16_t channel = frame->getChannel();
+ AMQBody::shared_ptr body = frame->getBody();
+ AMQMethodBody::shared_ptr method;
+
+ switch(body->type())
+ {
+ case METHOD_BODY:
+ method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }catch(ConnectionException& e){
+ client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
+ }
+ break;
+
+ case HEADER_BODY:
+ this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ //channel must be 0
+ this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ break;
+ }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
+}
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+ getChannel(channel)->handleHeader(body, Router(*exchanges));
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+ getChannel(channel)->handleContent(body, Router(*exchanges));
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
+ u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/,
+ string& /*response*/, string& /*locale*/){
+
+ parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
+ parent->framemax = framemax;
+ parent->heartbeat = heartbeat;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){
+ string knownhosts;
+ parent->client.getConnection().openOk(0, knownhosts);
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::close(
+ u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/)
+{
+ parent->client.getConnection().closeOk(0);
+ parent->context->close();
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
+ parent->context->close();
+}
+
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){
+ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+ parent->client.getChannel().openOk(channel);
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
+
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/,
+ u_int16_t /*classId*/, u_int16_t /*methodId*/){
+ Channel* c = parent->getChannel(channel);
+ if(c){
+ parent->channels.erase(channel);
+ c->close();
+ delete c;
+ parent->client.getChannel().closeOk(channel);
+ }
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
+
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ FieldTable& /*arguments*/){
+
+ if(!passive && (
+ type != TopicExchange::typeName &&
+ type != DirectExchange::typeName &&
+ type != FanOutExchange::typeName &&
+ type != HeadersExchange::typeName
+ )
+ )
+ {
+ throw ChannelException(540, "Exchange type not implemented: " + type);
+ }
+
+ parent->exchanges->getLock()->acquire();
+ if(!parent->exchanges->get(exchange)){
+ if(type == TopicExchange::typeName){
+ parent->exchanges->declare(new TopicExchange(exchange));
+ }else if(type == DirectExchange::typeName){
+ parent->exchanges->declare(new DirectExchange(exchange));
+ }else if(type == FanOutExchange::typeName){
+ parent->exchanges->declare(new DirectExchange(exchange));
+ }else if (type == HeadersExchange::typeName) {
+ parent->exchanges->declare(new HeadersExchange(exchange));
+ }
+ }
+ parent->exchanges->getLock()->release();
+ if(!nowait){
+ parent->client.getExchange().declareOk(channel);
+ }
+}
+
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
+ //TODO: implement unused
+ parent->exchanges->getLock()->acquire();
+ parent->exchanges->destroy(exchange);
+ parent->exchanges->getLock()->release();
+ if(!nowait) parent->client.getExchange().deleteOk(channel);
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, FieldTable& /*arguments*/){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = parent->getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ parent->getChannel(channel)->setDefaultQueue(queue);
+ //add default binding:
+ parent->exchanges->getDefault()->bind(queue, name, 0);
+ if(exclusive){
+ parent->exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ parent->cleaner->add(queue);
+ }
+ }
+ }
+ if(exclusive && !queue->isExclusiveOwner(parent)){
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ if(!nowait){
+ name = queue->getName();
+ parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName,
+ string& exchangeName, string& routingKey, bool nowait,
+ FieldTable& arguments){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange* exchange = parent->exchanges->get(exchangeName);
+ if(exchange){
+ if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
+ exchange->bind(queue, routingKey, &arguments);
+ if(!nowait) parent->client.getQueue().bindOk(channel);
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) parent->client.getQueue().purgeOk(channel, count);
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = parent->getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(parent)){
+ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+ if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ parent->queues->destroy(queue);
+ }
+ if(!nowait) parent->client.getQueue().deleteOk(channel, count);
+}
+
+
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ parent->getChannel(channel)->setPrefetchSize(prefetchSize);
+ parent->getChannel(channel)->setPrefetchCount(prefetchCount);
+ parent->client.getBasic().qosOk(channel);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
+ string& queueName, string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ Channel* channel = parent->channels[channelId];
+ if(!consumerTag.empty() && channel->exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+ if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag);
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
+ parent->getChannel(channel)->cancel(consumerTag);
+ if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
+ string& exchange, string& routingKey,
+ bool mandatory, bool immediate){
+
+ Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
+ parent->getChannel(channel)->handlePublish(msg);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+ parent->client.getBasic().getEmpty(channelId, clusterId);
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
+ try{
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
+ parent->getChannel(channel)->recover(requeue);
+}
+
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h
new file mode 100644
index 0000000000..09a4a67e66
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.h
@@ -0,0 +1,233 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include "./qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "./qpid/broker/AutoDelete.h"
+#include "./qpid/broker/ExchangeRegistry.h"
+#include "./qpid/broker/Channel.h"
+#include "./qpid/broker/ConnectionToken.h"
+#include "./qpid/broker/DirectExchange.h"
+#include "./qpid/framing/OutputHandler.h"
+#include "./qpid/framing/ProtocolInitiation.h"
+#include "./qpid/broker/QueueRegistry.h"
+#include "./qpid/io/SessionContext.h"
+#include "./qpid/io/SessionHandler.h"
+#include "./qpid/io/TimeoutHandler.h"
+#include "./qpid/broker/TopicExchange.h"
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+ u_int16_t code;
+ string text;
+ ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ChannelException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+ u_int16_t code;
+ string text;
+ ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ConnectionException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
+ public virtual qpid::framing::AMQP_ServerOperations,
+ public virtual ConnectionToken
+{
+ typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ qpid::io::SessionContext* context;
+ qpid::framing::AMQP_ClientProxy client;
+ QueueRegistry* queues;
+ ExchangeRegistry* const exchanges;
+ AutoDelete* const cleaner;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+
+ ConnectionHandler* connectionHandler;
+ ChannelHandler* channelHandler;
+ BasicHandler* basicHandler;
+ ExchangeHandler* exchangeHandler;
+ QueueHandler* queueHandler;
+
+ std::map<u_int16_t, Channel*> channels;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ u_int32_t framemax;
+ u_int16_t heartbeat;
+
+ void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ Channel* getChannel(u_int16_t channel);
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if no queue specified and channel has not declared one.
+ */
+ Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+ Exchange* findExchange(const string& name);
+
+ public:
+ SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues,
+ ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+ virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void initiated(qpid::framing::ProtocolInitiation* header);
+ virtual void idleOut();
+ virtual void idleIn();
+ virtual void closed();
+ virtual ~SessionHandlerImpl();
+
+ class ConnectionHandlerImpl : public virtual ConnectionHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism,
+ string& response, string& locale);
+
+ virtual void secureOk(u_int16_t channel, string& response);
+
+ virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
+
+ virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist);
+
+ virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId,
+ u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ConnectionHandlerImpl(){}
+ };
+
+ class ChannelHandlerImpl : public virtual ChannelHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void open(u_int16_t channel, string& outOfBand);
+
+ virtual void flow(u_int16_t channel, bool active);
+
+ virtual void flowOk(u_int16_t channel, bool active);
+
+ virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ChannelHandlerImpl(){}
+ };
+
+ class ExchangeHandlerImpl : public virtual ExchangeHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ qpid::framing::FieldTable& arguments);
+
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait);
+
+ virtual ~ExchangeHandlerImpl(){}
+ };
+
+
+ class QueueHandlerImpl : public virtual QueueHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments);
+
+ virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue,
+ string& exchange, string& routingKey, bool nowait,
+ qpid::framing::FieldTable& arguments);
+
+ virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool nowait);
+
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty,
+ bool nowait);
+
+ virtual ~QueueHandlerImpl(){}
+ };
+
+ class BasicHandlerImpl : public virtual BasicHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
+
+ virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive, bool nowait);
+
+ virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait);
+
+ virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey,
+ bool mandatory, bool immediate);
+
+ virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck);
+
+ virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+
+ virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+
+ virtual void recover(u_int16_t channel, bool requeue);
+
+ virtual ~BasicHandlerImpl(){}
+ };
+
+ inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; }
+ inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; }
+ inline virtual BasicHandler* getBasicHandler(){ return basicHandler; }
+ inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; }
+ inline virtual QueueHandler* getQueueHandler(){ return queueHandler; }
+
+ inline virtual AccessHandler* getAccessHandler(){ return 0; }
+ inline virtual FileHandler* getFileHandler(){ return 0; }
+ inline virtual StreamHandler* getStreamHandler(){ return 0; }
+ inline virtual TxHandler* getTxHandler(){ return 0; }
+ inline virtual DtxHandler* getDtxHandler(){ return 0; }
+ inline virtual TunnelHandler* getTunnelHandler(){ return 0; }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
new file mode 100644
index 0000000000..2affa6057d
--- /dev/null
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/broker/TopicExchange.h"
+#include "./qpid/broker/ExchangeBinding.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+ clear();
+ if (s.empty()) return *this;
+ std::string::const_iterator i = s.begin();
+ while (true) {
+ // Invariant: i is at the beginning of the next untokenized word.
+ std::string::const_iterator j = find(i, s.end(), '.');
+ push_back(std::string(i, j));
+ if (j == s.end()) return *this;
+ i = j + 1;
+ }
+ return *this;
+}
+
+size_t Tokens::Hash::operator()(const Tokens& p) const {
+ size_t hash = 0;
+ for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) {
+ hash += std::tr1::hash<std::string>()(*i);
+ }
+ return hash;
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+ Tokens::operator=(tokens);
+ normalize();
+ return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
+}
+
+void TopicPattern::normalize() {
+ std::string word;
+ Tokens::iterator i = begin();
+ while (i != end()) {
+ if (*i == hashmark) {
+ ++i;
+ while (i != end()) {
+ // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+ if (*i == star) { // Move * before #.
+ std::swap(*i, *(i-1));
+ ++i;
+ } else if (*i == hashmark) {
+ erase(i); // Remove extra #
+ } else {
+ break;
+ }
+ }
+ } else {
+ i ++;
+ }
+ }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need more efficient Tokens impl that can operate on a string in place.
+//
+bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
+{
+ // Invariant: [pattern_begin..p) matches [target_begin..t)
+ Tokens::const_iterator p = pattern_begin;
+ Tokens::const_iterator t = target_begin;
+ while (p != pattern_end && t != target_end)
+ {
+ if (*p == star || *p == *t) {
+ ++p, ++t;
+ } else if (*p == hashmark) {
+ ++p;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ while (t != target_end) {
+ ++t;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+ }
+ while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+ return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target) const
+{
+ return do_match(begin(), end(), target.begin(), target.end());
+}
+
+TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ TopicPattern routingPattern(routingKey);
+ bindings[routingPattern].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ lock.release();
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
+ lock.acquire();
+ BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+ Queue::vector& qv(bi->second);
+ if (bi == bindings.end()) return;
+ Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+ if(q == qv.end()) return;
+ qv.erase(q);
+ if(qv.empty()) bindings.erase(bi);
+ lock.release();
+}
+
+
+void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+ lock.acquire();
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(routingKey)) {
+ Queue::vector& qv(i->second);
+ for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ (*j)->deliver(msg);
+ }
+ }
+ }
+ lock.release();
+}
+
+TopicExchange::~TopicExchange() {}
+
+const std::string TopicExchange::typeName("topic");
+
+
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
new file mode 100644
index 0000000000..8bad7aab4c
--- /dev/null
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <tr1/unordered_map>
+#include <vector>
+#include "./qpid/broker/Exchange.h"
+#include "./qpid/framing/FieldTable.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Queue.h"
+
+namespace qpid {
+namespace broker {
+
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+ public:
+ Tokens() {};
+ // Default copy, assign, dtor are sufficient.
+
+ /** Tokenize s, provides automatic conversion of string to Tokens */
+ Tokens(const std::string& s) { operator=(s); }
+ /** Tokenize s */
+ Tokens & operator=(const std::string& s);
+
+ struct Hash { size_t operator()(const Tokens&) const; };
+ typedef std::equal_to<Tokens> Equal;
+};
+
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens. Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+ public:
+ TopicPattern() {}
+ // Default copy, assign, dtor are sufficient.
+ TopicPattern(const Tokens& tokens) { operator=(tokens); }
+ TopicPattern(const std::string& str) { operator=(str); }
+ TopicPattern& operator=(const Tokens&);
+ TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
+
+ /** Match a topic */
+ bool match(const std::string& topic) { return match(Tokens(topic)); }
+ bool match(const Tokens& topic) const;
+
+ private:
+ void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+ typedef std::tr1::unordered_map<TopicPattern, Queue::vector, TopicPattern::Hash> BindingMap;
+ BindingMap bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+};
+
+
+
+}
+}
+
+#endif