summaryrefslogtreecommitdiff
path: root/cpp/broker
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/broker
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker')
-rw-r--r--cpp/broker/Makefile47
-rw-r--r--cpp/broker/inc/AutoDelete.h54
-rw-r--r--cpp/broker/inc/Binding.h35
-rw-r--r--cpp/broker/inc/Channel.h87
-rw-r--r--cpp/broker/inc/Configuration.h125
-rw-r--r--cpp/broker/inc/ConnectionToken.h35
-rw-r--r--cpp/broker/inc/Consumer.h34
-rw-r--r--cpp/broker/inc/DirectExchange.h55
-rw-r--r--cpp/broker/inc/Exchange.h39
-rw-r--r--cpp/broker/inc/ExchangeBinding.h45
-rw-r--r--cpp/broker/inc/ExchangeRegistry.h42
-rw-r--r--cpp/broker/inc/FanOutExchange.h58
-rw-r--r--cpp/broker/inc/Message.h73
-rw-r--r--cpp/broker/inc/NameGenerator.h36
-rw-r--r--cpp/broker/inc/Queue.h106
-rw-r--r--cpp/broker/inc/QueueRegistry.h88
-rw-r--r--cpp/broker/inc/SessionHandlerFactoryImpl.h49
-rw-r--r--cpp/broker/inc/SessionHandlerImpl.h230
-rw-r--r--cpp/broker/inc/TopicExchange.h55
-rw-r--r--cpp/broker/src/AutoDelete.cpp93
-rw-r--r--cpp/broker/src/Broker.cpp92
-rw-r--r--cpp/broker/src/Channel.cpp148
-rw-r--r--cpp/broker/src/Configuration.cpp195
-rw-r--r--cpp/broker/src/DirectExchange.cpp72
-rw-r--r--cpp/broker/src/ExchangeBinding.cpp32
-rw-r--r--cpp/broker/src/ExchangeRegistry.cpp43
-rw-r--r--cpp/broker/src/FanOutExchange.cpp56
-rw-r--r--cpp/broker/src/Message.cpp97
-rw-r--r--cpp/broker/src/NameGenerator.cpp29
-rw-r--r--cpp/broker/src/Queue.cpp148
-rw-r--r--cpp/broker/src/QueueRegistry.cpp72
-rw-r--r--cpp/broker/src/SessionHandlerFactoryImpl.cpp40
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp378
-rw-r--r--cpp/broker/src/TopicExchange.cpp62
-rw-r--r--cpp/broker/test/Makefile20
-rw-r--r--cpp/broker/test/QueueRegistryTest.cpp79
-rw-r--r--cpp/broker/test/exchange_test.cpp68
-rw-r--r--cpp/broker/test/message_test.cpp57
-rw-r--r--cpp/broker/test/queue_test.cpp138
39 files changed, 3212 insertions, 0 deletions
diff --git a/cpp/broker/Makefile b/cpp/broker/Makefile
new file mode 100644
index 0000000000..58ba3a41b5
--- /dev/null
+++ b/cpp/broker/Makefile
@@ -0,0 +1,47 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Build broker library and executable.
+#
+
+QPID_HOME = ../..
+include ${QPID_HOME}/cpp/options.mk
+
+SOURCES= $(wildcard src/*.cpp)
+OBJECTS= $(subst .cpp,.o,$(SOURCES))
+LIB_OBJECTS= $(subst src/Broker.o,,$(OBJECTS))
+EXE_OBJECTS= src/Broker.o
+
+
+.PHONY: all clean test
+
+all: $(BROKER)
+
+test:
+ @$(MAKE) -C test all
+
+clean:
+ -@rm -f ${OBJECTS} src/*.d ${BROKER} $(BROKER_LIB)
+ @$(MAKE) -C test clean
+
+$(BROKER): $(BROKER_LIB) $(EXE_OBJECTS)
+ ${CXX} -o $@ $(EXE_OBJECTS) $(LDFLAGS) -lapr-1 $(COMMON_LIB) $(BROKER_LIB)
+
+$(BROKER_LIB): $(LIB_OBJECTS)
+ $(CXX) -shared -o $@ $(LDFLAGS) $(LIB_OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR)
+
+-include $(SOURCES:.cpp=.d)
diff --git a/cpp/broker/inc/AutoDelete.h b/cpp/broker/inc/AutoDelete.h
new file mode 100644
index 0000000000..864d68358f
--- /dev/null
+++ b/cpp/broker/inc/AutoDelete.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AutoDelete_
+#define _AutoDelete_
+
+#include <iostream>
+#include <queue>
+#include "MonitorImpl.h"
+#include "Queue.h"
+#include "QueueRegistry.h"
+#include "ThreadFactoryImpl.h"
+
+namespace qpid {
+ namespace broker{
+ class AutoDelete : private virtual qpid::concurrent::Runnable{
+ qpid::concurrent::ThreadFactoryImpl factory;
+ qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::MonitorImpl monitor;
+ std::queue<Queue::shared_ptr> queues;
+ QueueRegistry* const registry;
+ const u_int32_t period;
+ volatile bool stopped;
+ qpid::concurrent::Thread* runner;
+
+ Queue::shared_ptr const pop();
+ void process();
+ virtual void run();
+
+ public:
+ AutoDelete(QueueRegistry* const registry, u_int32_t period);
+ void add(Queue::shared_ptr const);
+ void start();
+ void stop();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Binding.h b/cpp/broker/inc/Binding.h
new file mode 100644
index 0000000000..b11419e92c
--- /dev/null
+++ b/cpp/broker/inc/Binding.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Binding_
+#define _Binding_
+
+#include "FieldTable.h"
+
+namespace qpid {
+ namespace broker {
+ class Binding{
+ public:
+ virtual void cancel() = 0;
+ virtual ~Binding(){}
+ };
+ }
+}
+
+
+#endif
+
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
new file mode 100644
index 0000000000..aaf2ce569b
--- /dev/null
+++ b/cpp/broker/inc/Channel.h
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Channel_
+#define _Channel_
+
+#include <map>
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "BasicPublishBody.h"
+#include "Binding.h"
+#include "Consumer.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "NameGenerator.h"
+#include "OutputHandler.h"
+#include "Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Channel{
+ private:
+ class ConsumerImpl : public virtual Consumer{
+ ConnectionToken* const connection;
+ Channel* parent;
+ string tag;
+ Queue::shared_ptr queue;
+ public:
+ ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection);
+ virtual bool deliver(Message::shared_ptr& msg);
+ void cancel();
+ };
+
+ typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+
+ const int id;
+ qpid::framing::OutputHandler* out;
+ u_int64_t deliveryTag;
+ Queue::shared_ptr defaultQueue;
+ bool transactional;
+ std::map<string, ConsumerImpl*> consumers;
+ u_int32_t prefetchSize;
+ u_int16_t prefetchCount;
+ u_int32_t framesize;
+ Message::shared_ptr message;
+ NameGenerator tagGenerator;
+
+ void deliver(Message::shared_ptr& msg, string& tag);
+ void publish(ExchangeRegistry* exchanges);
+
+ public:
+ Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
+ ~Channel();
+ inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
+ inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
+ inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
+ void handlePublish(Message* msg);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges);
+ bool exists(string& consumerTag);
+ void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
+ void cancel(string& tag);
+ void begin();
+ void close();
+ void commit();
+ void rollback();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h
new file mode 100644
index 0000000000..5ec70a839b
--- /dev/null
+++ b/cpp/broker/inc/Configuration.h
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Configuration_
+#define _Configuration_
+
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+
+namespace qpid {
+ namespace broker {
+ class Configuration{
+ class Option{
+ const std::string flag;
+ const std::string name;
+ const std::string desc;
+
+ bool match(const std::string& arg);
+
+ protected:
+ virtual bool needsValue() const = 0;
+ virtual void setValue(const std::string& value) = 0;
+
+ public:
+ Option(const char flag, const std::string& name, const std::string& desc);
+ Option(const std::string& name, const std::string& desc);
+ virtual ~Option();
+
+ bool parse(int& i, char** argv, int argc);
+ void print(std::ostream& out) const;
+ };
+
+ class IntOption : public Option{
+ const int defaultValue;
+ int value;
+ public:
+ IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
+ IntOption(const std::string& name, const std::string& desc, const int value = 0);
+ virtual ~IntOption();
+
+ int getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ class StringOption : public Option{
+ const std::string defaultValue;
+ std::string value;
+ public:
+ StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
+ StringOption(const std::string& name, const std::string& desc, const std::string value = "");
+ virtual ~StringOption();
+
+ const std::string& getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ class BoolOption : public Option{
+ const bool defaultValue;
+ bool value;
+ public:
+ BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
+ BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
+ virtual ~BoolOption();
+
+ bool getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ BoolOption trace;
+ IntOption port;
+ IntOption workerThreads;
+ IntOption maxConnections;
+ IntOption connectionBacklog;
+ StringOption acceptor;
+ BoolOption help;
+
+ typedef std::vector<Option*>::iterator op_iterator;
+ std::vector<Option*> options;
+
+ public:
+ class ParseException{
+ public:
+ const std::string& error;
+ ParseException(const std::string& _error) : error(_error) {}
+ };
+
+
+ Configuration();
+ ~Configuration();
+
+ void parse(int argc, char** argv);
+
+ bool isHelp();
+ bool isTrace();
+ int getPort();
+ int getWorkerThreads();
+ int getMaxConnections();
+ int getConnectionBacklog();
+ const std::string& getAcceptor();
+
+ void usage();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/ConnectionToken.h b/cpp/broker/inc/ConnectionToken.h
new file mode 100644
index 0000000000..1faefec2cc
--- /dev/null
+++ b/cpp/broker/inc/ConnectionToken.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ConnectionToken_
+#define _ConnectionToken_
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An empty interface allowing opaque implementations of some
+ * form of token to identify a connection.
+ */
+ class ConnectionToken{
+ public:
+ virtual ~ConnectionToken(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Consumer.h b/cpp/broker/inc/Consumer.h
new file mode 100644
index 0000000000..af2d5d7812
--- /dev/null
+++ b/cpp/broker/inc/Consumer.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Consumer_
+#define _Consumer_
+
+#include "Message.h"
+
+namespace qpid {
+ namespace broker {
+ class Consumer{
+ public:
+ virtual bool deliver(Message::shared_ptr& msg) = 0;
+ virtual ~Consumer(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h
new file mode 100644
index 0000000000..bf8c5f0b37
--- /dev/null
+++ b/cpp/broker/inc/DirectExchange.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DirectExchange_
+#define _DirectExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+ class DirectExchange : public virtual Exchange{
+ const string name;
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ DirectExchange(const string& name);
+
+ inline virtual const string& getName(){ return name; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~DirectExchange();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h
new file mode 100644
index 0000000000..5f5dc5ce71
--- /dev/null
+++ b/cpp/broker/inc/Exchange.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Exchange_
+#define _Exchange_
+
+#include "FieldTable.h"
+#include "Message.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+ class Exchange{
+ public:
+ virtual const string& getName() = 0;
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual ~Exchange(){}
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/ExchangeBinding.h b/cpp/broker/inc/ExchangeBinding.h
new file mode 100644
index 0000000000..4cbb73acbf
--- /dev/null
+++ b/cpp/broker/inc/ExchangeBinding.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeBinding_
+#define _ExchangeBinding_
+
+#include "Binding.h"
+#include "FieldTable.h"
+#include "Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Exchange;
+ class Queue;
+
+ class ExchangeBinding : public virtual Binding{
+ Exchange* e;
+ Queue::shared_ptr q;
+ const string key;
+ qpid::framing::FieldTable* args;
+ public:
+ ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args);
+ virtual void cancel();
+ virtual ~ExchangeBinding();
+ };
+ }
+}
+
+
+#endif
+
diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h
new file mode 100644
index 0000000000..0f0eaae0d0
--- /dev/null
+++ b/cpp/broker/inc/ExchangeRegistry.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeRegistry_
+#define _ExchangeRegistry_
+
+#include <map>
+#include "Exchange.h"
+#include "Monitor.h"
+
+namespace qpid {
+namespace broker {
+ class ExchangeRegistry{
+ std::map<string, Exchange*> exchanges;
+ qpid::concurrent::Monitor* lock;
+ public:
+ ExchangeRegistry();
+ void declare(Exchange* exchange);
+ void destroy(const string& name);
+ Exchange* get(const string& name);
+ inline qpid::concurrent::Monitor* getLock(){ return lock; }
+ ~ExchangeRegistry();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h
new file mode 100644
index 0000000000..9d0d32bbf8
--- /dev/null
+++ b/cpp/broker/inc/FanOutExchange.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _FanOutExchange_
+#define _FanOutExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class FanOutExchange : public virtual Exchange {
+ const string name;
+ std::vector<Queue::shared_ptr> bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ FanOutExchange(const string& name);
+
+ inline virtual const string& getName(){ return name; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~FanOutExchange();
+};
+
+}
+}
+
+
+
+#endif
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
new file mode 100644
index 0000000000..37a0c9b2c8
--- /dev/null
+++ b/cpp/broker/inc/Message.h
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Message_
+#define _Message_
+
+#include "memory.h"
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "BasicHeaderProperties.h"
+#include "BasicPublishBody.h"
+#include "ConnectionToken.h"
+#include "OutputHandler.h"
+
+namespace qpid {
+ namespace broker {
+ class ExchangeRegistry;
+
+ class Message{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef content_list::iterator content_iterator;
+
+ const ConnectionToken* const publisher;
+ string exchange;
+ string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ content_list content;
+
+ u_int64_t contentSize();
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+
+
+ public:
+ typedef std::tr1::shared_ptr<Message> shared_ptr;
+
+ Message(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate);
+ ~Message();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ const ConnectionToken* const getPublisher();
+
+ void deliver(qpid::framing::OutputHandler* out, int channel,
+ string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
+
+ };
+ bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/NameGenerator.h b/cpp/broker/inc/NameGenerator.h
new file mode 100644
index 0000000000..6e6e0acf28
--- /dev/null
+++ b/cpp/broker/inc/NameGenerator.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include "Message.h"
+
+namespace qpid {
+ namespace broker {
+ class NameGenerator{
+ const std::string base;
+ unsigned int counter;
+ public:
+ NameGenerator(const std::string& base);
+ std::string generate();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Queue.h b/cpp/broker/inc/Queue.h
new file mode 100644
index 0000000000..2229ba6235
--- /dev/null
+++ b/cpp/broker/inc/Queue.h
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Queue_
+#define _Queue_
+
+#include <vector>
+#include <queue>
+#include "memory.h"
+#include "apr_time.h"
+#include "amqp_types.h"
+#include "Binding.h"
+#include "ConnectionToken.h"
+#include "Consumer.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * Thrown when exclusive access would be violated.
+ */
+ struct ExclusiveAccessException{};
+
+ /**
+ * The brokers representation of an amqp queue. Messages are
+ * delivered to a queue from where they can be dispatched to
+ * registered consumers or be stored until dequeued or until one
+ * or more consumers registers.
+ */
+ class Queue{
+ const string name;
+ const u_int32_t autodelete;
+ const bool durable;
+ const ConnectionToken* const owner;
+ std::vector<Consumer*> consumers;
+ std::queue<Binding*> bindings;
+ std::queue<Message::shared_ptr> messages;
+ bool queueing;
+ bool dispatching;
+ int next;
+ mutable qpid::concurrent::MonitorImpl lock;
+ apr_time_t lastUsed;
+ Consumer* exclusive;
+
+ bool startDispatching();
+ bool dispatch(Message::shared_ptr& msg);
+
+ public:
+
+ typedef std::tr1::shared_ptr<Queue> shared_ptr;
+
+ typedef std::vector<shared_ptr> vector;
+
+ Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ ~Queue();
+ /**
+ * Informs the queue of a binding that should be cancelled on
+ * destruction of the queue.
+ */
+ void bound(Binding* b);
+ /**
+ * Delivers a message to the queue from where it will be
+ * dispatched to immediately to a consumer if one is
+ * available or stored for dequeue or later dispatch if
+ * not.
+ */
+ void deliver(Message::shared_ptr& msg);
+ /**
+ * Dispatch any queued messages providing there are
+ * consumers for them. Only one thread can be dispatching
+ * at any time, but this method (rather than the caller)
+ * is responsible for ensuring that.
+ */
+ void dispatch();
+ void consume(Consumer* c, bool exclusive = false);
+ void cancel(Consumer* c);
+ Message::shared_ptr dequeue();
+ u_int32_t purge();
+ u_int32_t getMessageCount() const;
+ u_int32_t getConsumerCount() const;
+ inline const string& getName() const { return name; }
+ inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline bool hasExclusiveConsumer() const { return exclusive; }
+ bool canAutoDelete() const;
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/QueueRegistry.h b/cpp/broker/inc/QueueRegistry.h
new file mode 100644
index 0000000000..ac12aa8f88
--- /dev/null
+++ b/cpp/broker/inc/QueueRegistry.h
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class SessionHandlerImpl;
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+
+ public:
+ QueueRegistry();
+ ~QueueRegistry();
+
+ /**
+ * Declare a queue.
+ *
+ * @return The queue and a boolean flag which is true if the queue
+ * was created by this declare call false if it already existed.
+ */
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+
+ /**
+ * Destroy the named queue.
+ *
+ * Note: if the queue is in use it is not actually destroyed until
+ * all shared_ptrs to it are destroyed. During that time it is
+ * possible that a new queue with the same name may be
+ * created. This should not create any problems as the new and
+ * old queues exist independently. The registry has
+ * forgotten the old queue so there can be no confusion for
+ * subsequent calls to find or declare with the same name.
+ *
+ */
+ void destroy(const string& name);
+
+ /**
+ * Find the named queue. Return 0 if not found.
+ */
+ Queue::shared_ptr find(const string& name);
+
+ /**
+ * Generate unique queue name.
+ */
+ string generateName();
+
+ private:
+ typedef std::map<string, Queue::shared_ptr> QueueMap;
+ QueueMap queues;
+ qpid::concurrent::MonitorImpl lock;
+ int counter;
+
+};
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/SessionHandlerFactoryImpl.h b/cpp/broker/inc/SessionHandlerFactoryImpl.h
new file mode 100644
index 0000000000..2317a6667b
--- /dev/null
+++ b/cpp/broker/inc/SessionHandlerFactoryImpl.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include "AMQFrame.h"
+#include "AutoDelete.h"
+#include "DirectExchange.h"
+#include "ExchangeRegistry.h"
+#include "ProtocolInitiation.h"
+#include "QueueRegistry.h"
+#include "SessionHandlerFactory.h"
+#include "TimeoutHandler.h"
+
+namespace qpid {
+ namespace broker {
+
+ class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
+ {
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ AutoDelete cleaner;
+ public:
+ SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+ virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
+ virtual ~SessionHandlerFactoryImpl();
+ };
+
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h
new file mode 100644
index 0000000000..14a6404c78
--- /dev/null
+++ b/cpp/broker/inc/SessionHandlerImpl.h
@@ -0,0 +1,230 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include "AMQFrame.h"
+#include "AMQP_ServerOperations.h"
+#include "AutoDelete.h"
+#include "ExchangeRegistry.h"
+#include "Channel.h"
+#include "ConnectionToken.h"
+#include "DirectExchange.h"
+#include "OutputHandler.h"
+#include "ProtocolInitiation.h"
+#include "QueueRegistry.h"
+#include "SessionContext.h"
+#include "SessionHandler.h"
+#include "TimeoutHandler.h"
+#include "TopicExchange.h"
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+ u_int16_t code;
+ string text;
+ ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ChannelException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+ u_int16_t code;
+ string text;
+ ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ConnectionException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
+ public virtual qpid::framing::AMQP_ServerOperations,
+ public virtual ConnectionToken
+{
+ typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ qpid::io::SessionContext* context;
+ QueueRegistry* queues;
+ ExchangeRegistry* const exchanges;
+ AutoDelete* const cleaner;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+
+ ConnectionHandler* connectionHandler;
+ ChannelHandler* channelHandler;
+ BasicHandler* basicHandler;
+ ExchangeHandler* exchangeHandler;
+ QueueHandler* queueHandler;
+
+ std::map<u_int16_t, Channel*> channels;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ u_int32_t framemax;
+ u_int16_t heartbeat;
+
+ void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if no queue specified and channel has not declared one.
+ */
+ Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+ Exchange* findExchange(const string& name);
+
+ public:
+ SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues,
+ ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+ virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void initiated(qpid::framing::ProtocolInitiation* header);
+ virtual void idleOut();
+ virtual void idleIn();
+ virtual void closed();
+ virtual ~SessionHandlerImpl();
+
+ class ConnectionHandlerImpl : public virtual ConnectionHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism,
+ string& response, string& locale);
+
+ virtual void secureOk(u_int16_t channel, string& response);
+
+ virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
+
+ virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist);
+
+ virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId,
+ u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ConnectionHandlerImpl(){}
+ };
+
+ class ChannelHandlerImpl : public virtual ChannelHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void open(u_int16_t channel, string& outOfBand);
+
+ virtual void flow(u_int16_t channel, bool active);
+
+ virtual void flowOk(u_int16_t channel, bool active);
+
+ virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ChannelHandlerImpl(){}
+ };
+
+ class ExchangeHandlerImpl : public virtual ExchangeHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ qpid::framing::FieldTable& arguments);
+
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait);
+
+ virtual ~ExchangeHandlerImpl(){}
+ };
+
+
+ class QueueHandlerImpl : public virtual QueueHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments);
+
+ virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue,
+ string& exchange, string& routingKey, bool nowait,
+ qpid::framing::FieldTable& arguments);
+
+ virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool nowait);
+
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty,
+ bool nowait);
+
+ virtual ~QueueHandlerImpl(){}
+ };
+
+ class BasicHandlerImpl : public virtual BasicHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
+
+ virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive, bool nowait);
+
+ virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait);
+
+ virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey,
+ bool mandatory, bool immediate);
+
+ virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck);
+
+ virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+
+ virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+
+ virtual void recover(u_int16_t channel, bool requeue);
+
+ virtual ~BasicHandlerImpl(){}
+ };
+
+ inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; }
+ inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; }
+ inline virtual BasicHandler* getBasicHandler(){ return basicHandler; }
+ inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; }
+ inline virtual QueueHandler* getQueueHandler(){ return queueHandler; }
+
+ inline virtual AccessHandler* getAccessHandler(){ return 0; }
+ inline virtual FileHandler* getFileHandler(){ return 0; }
+ inline virtual StreamHandler* getStreamHandler(){ return 0; }
+ inline virtual TxHandler* getTxHandler(){ return 0; }
+ inline virtual DtxHandler* getDtxHandler(){ return 0; }
+ inline virtual TunnelHandler* getTunnelHandler(){ return 0; }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h
new file mode 100644
index 0000000000..d9ff62ecc6
--- /dev/null
+++ b/cpp/broker/inc/TopicExchange.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+ class TopicExchange : public virtual Exchange{
+ const string name;
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern matching not yet supported
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
+
+ inline virtual const string& getName(){ return name; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp
new file mode 100644
index 0000000000..6793ec449d
--- /dev/null
+++ b/cpp/broker/src/AutoDelete.cpp
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AutoDelete.h"
+
+using namespace qpid::broker;
+
+AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry),
+ period(_period),
+ stopped(true),
+ runner(0){}
+
+void AutoDelete::add(Queue::shared_ptr const queue){
+ lock.acquire();
+ queues.push(queue);
+ lock.release();
+}
+
+Queue::shared_ptr const AutoDelete::pop(){
+ Queue::shared_ptr next;
+ lock.acquire();
+ if(!queues.empty()){
+ next = queues.front();
+ queues.pop();
+ }
+ lock.release();
+ return next;
+}
+
+void AutoDelete::process(){
+ Queue::shared_ptr seen;
+ for(Queue::shared_ptr q = pop(); q; q = pop()){
+ if(seen == q){
+ add(q);
+ break;
+ }else if(q->canAutoDelete()){
+ std::string name(q->getName());
+ registry->destroy(name);
+ std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
+ }else{
+ add(q);
+ if(!seen) seen = q;
+ }
+ }
+}
+
+void AutoDelete::run(){
+ monitor.acquire();
+ while(!stopped){
+ process();
+ monitor.wait(period);
+ }
+ monitor.release();
+}
+
+void AutoDelete::start(){
+ monitor.acquire();
+ if(stopped){
+ runner = factory.create(this);
+ stopped = false;
+ monitor.release();
+ runner->start();
+ }else{
+ monitor.release();
+ }
+}
+
+void AutoDelete::stop(){
+ monitor.acquire();
+ if(!stopped){
+ stopped = true;
+ monitor.notify();
+ monitor.release();
+ runner->join();
+ delete runner;
+ }else{
+ monitor.release();
+ }
+}
diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp
new file mode 100644
index 0000000000..5d59b63622
--- /dev/null
+++ b/cpp/broker/src/Broker.cpp
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include "apr_signal.h"
+
+#include "Acceptor.h"
+#include "Configuration.h"
+#include "QpidError.h"
+#include "SessionHandlerFactoryImpl.h"
+
+//optional includes:
+#ifdef _USE_APR_IO_
+
+#include "BlockingAPRAcceptor.h"
+#include "LFAcceptor.h"
+
+#endif
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+void handle_signal(int signal);
+
+Acceptor* createAcceptor(Configuration& config);
+
+int main(int argc, char** argv)
+{
+ SessionHandlerFactoryImpl factory;
+ Configuration config;
+ try{
+
+ config.parse(argc, argv);
+ if(config.isHelp()){
+ config.usage();
+ }else{
+#ifdef _USE_APR_IO_
+ apr_signal(SIGINT, handle_signal);
+#endif
+ try{
+ std::auto_ptr<Acceptor> acceptor(createAcceptor(config));
+ try{
+ acceptor->bind(config.getPort(), &factory);
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+ }
+ }catch(Configuration::ParseException error){
+ std::cout << "Error: " << error.error << std::endl;
+ }
+
+ return 1;
+}
+
+Acceptor* createAcceptor(Configuration& config){
+ const string type(config.getAcceptor());
+#ifdef _USE_APR_IO_
+ if("blocking" == type){
+ std::cout << "Using blocking acceptor " << std::endl;
+ return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
+ }else if("non-blocking" == type){
+ std::cout << "Using non-blocking acceptor " << std::endl;
+ return new LFAcceptor(config.isTrace(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads(),
+ config.getMaxConnections());
+ }
+#endif
+ throw Configuration::ParseException("Unrecognised acceptor: " + type);
+}
+
+void handle_signal(int signal){
+ std::cout << "Shutting down..." << std::endl;
+}
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
new file mode 100644
index 0000000000..6980fe5a1b
--- /dev/null
+++ b/cpp/broker/src/Channel.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Channel.h"
+#include "QpidError.h"
+#include <iostream>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out),
+ id(_id),
+ framesize(_framesize),
+ transactional(false),
+ deliveryTag(1),
+ tagGenerator("sgen"){}
+
+Channel::~Channel(){
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl;
+ delete (i->second);
+ }
+}
+
+bool Channel::exists(string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
+ if(tag.empty()) tag = tagGenerator.generate();
+
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection));
+ try{
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tag] = c;
+ }catch(ExclusiveAccessException& e){
+ delete c;
+ throw e;
+ }
+}
+
+void Channel::cancel(string& tag){
+ ConsumerImpl* c = consumers[tag];
+ if(c){
+ c->cancel();
+ consumers.erase(tag);
+ delete c;
+ }
+}
+
+void Channel::close(){
+ //cancel all consumers
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ ConsumerImpl* c = i->second;
+ c->cancel();
+ consumers.erase(i);
+ delete c;
+ }
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+
+}
+
+void Channel::rollback(){
+
+}
+
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){
+ //send deliver method, header and content(s)
+ msg->deliver(out, id, consumerTag, deliveryTag++, framesize);
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection){
+}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(connection != msg->getPublisher()){
+ parent->deliver(msg, tag);
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue) queue->cancel(this);
+}
+
+void Channel::handlePublish(Message* msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = Message::shared_ptr(msg);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+ }
+ message->setHeader(header);
+ if(message->isComplete()){
+ publish(exchanges);
+ }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+ }
+ message->addContent(content);
+ if(message->isComplete()){
+ publish(exchanges);
+ }
+}
+
+void Channel::publish(ExchangeRegistry* exchanges){
+ if(!route(message, exchanges)){
+ std::cout << "WARNING: Could not route message." << std::endl;
+ }
+ message.reset();
+}
diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp
new file mode 100644
index 0000000000..aceb35bc87
--- /dev/null
+++ b/cpp/broker/src/Configuration.cpp
@@ -0,0 +1,195 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Configuration.h"
+
+using namespace qpid::broker;
+using namespace std;
+
+Configuration::Configuration() :
+ trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
+ port('p', "port", "Sets the port to listen on (default=5672)", 5672),
+ workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
+ maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+ connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
+ acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
+ help("help", "Prints usage information", false)
+{
+ options.push_back(&trace);
+ options.push_back(&port);
+ options.push_back(&workerThreads);
+ options.push_back(&maxConnections);
+ options.push_back(&connectionBacklog);
+ options.push_back(&acceptor);
+ options.push_back(&help);
+}
+
+Configuration::~Configuration(){}
+
+void Configuration::parse(int argc, char** argv){
+ int position = 1;
+ while(position < argc){
+ bool matched(false);
+ for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
+ matched = (*i)->parse(position, argv, argc);
+ }
+ if(!matched){
+ std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
+ position++;
+ }
+ }
+}
+
+void Configuration::usage(){
+ for(op_iterator i = options.begin(); i < options.end(); i++){
+ (*i)->print(std::cout);
+ }
+}
+
+bool Configuration::isHelp(){
+ return help.getValue();
+}
+
+bool Configuration::isTrace(){
+ return trace.getValue();
+}
+
+int Configuration::getPort(){
+ return port.getValue();
+}
+
+int Configuration::getWorkerThreads(){
+ return workerThreads.getValue();
+}
+
+int Configuration::getMaxConnections(){
+ return maxConnections.getValue();
+}
+
+int Configuration::getConnectionBacklog(){
+ return connectionBacklog.getValue();
+}
+
+const string& Configuration::getAcceptor(){
+ return acceptor.getValue();
+}
+
+Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
+ flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
+
+Configuration::Option::Option(const string& _name, const string& _desc) :
+ flag(""), name("--" + _name), desc(_desc) {}
+
+Configuration::Option::~Option(){}
+
+bool Configuration::Option::match(const string& arg){
+ return flag == arg || name == arg;
+}
+
+bool Configuration::Option::parse(int& i, char** argv, int argc){
+ const string arg(argv[i]);
+ if(match(arg)){
+ if(needsValue()){
+ if(++i < argc) setValue(argv[i]);
+ else throw ParseException("Argument " + arg + " requires a value!");
+ }else{
+ setValue("");
+ }
+ i++;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Configuration::Option::print(ostream& out) const {
+ out << " ";
+ if(flag.length() > 0){
+ out << flag << " or ";
+ }
+ out << name;
+ if(needsValue()) out << "<value>";
+ out << std::endl;
+ out << " " << desc << std::endl;
+}
+
+
+// String Option:
+
+Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::~StringOption(){}
+
+const string& Configuration::StringOption::getValue() const {
+ return value;
+}
+
+bool Configuration::StringOption::needsValue() const {
+ return true;
+}
+
+void Configuration::StringOption::setValue(const std::string& _value){
+ value = _value;
+}
+
+// Int Option:
+
+Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::~IntOption(){}
+
+int Configuration::IntOption::getValue() const {
+ return value;
+}
+
+bool Configuration::IntOption::needsValue() const {
+ return true;
+}
+
+void Configuration::IntOption::setValue(const std::string& _value){
+ value = atoi(_value.c_str());
+}
+
+// Bool Option:
+
+Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::~BoolOption(){}
+
+bool Configuration::BoolOption::getValue() const {
+ return value;
+}
+
+bool Configuration::BoolOption::needsValue() const {
+ return false;
+}
+
+void Configuration::BoolOption::setValue(const std::string& _value){
+ value = true;
+}
diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp
new file mode 100644
index 0000000000..70f7ee838f
--- /dev/null
+++ b/cpp/broker/src/DirectExchange.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "DirectExchange.h"
+#include "ExchangeBinding.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+DirectExchange::DirectExchange(const string& _name) : name(_name) {
+
+}
+
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i == queues.end()){
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+ lock.release();
+}
+
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+ lock.release();
+}
+
+void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ int count(0);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
+ (*i)->deliver(msg);
+ }
+ if(!count){
+ std::cout << "WARNING: DirectExchange " << name << " could not route message with key " << routingKey << std::endl;
+ }
+ lock.release();
+}
+
+DirectExchange::~DirectExchange(){
+
+}
+
+
+const std::string DirectExchange::typeName("direct");
diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp
new file mode 100644
index 0000000000..6160a67fd3
--- /dev/null
+++ b/cpp/broker/src/ExchangeBinding.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ExchangeBinding.h"
+#include "Exchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+
+void ExchangeBinding::cancel(){
+ e->unbind(q, key, args);
+ delete this;
+}
+
+ExchangeBinding::~ExchangeBinding(){
+}
diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp
new file mode 100644
index 0000000000..0ee581af2f
--- /dev/null
+++ b/cpp/broker/src/ExchangeRegistry.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ExchangeRegistry.h"
+#include "MonitorImpl.h"
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
+
+ExchangeRegistry::~ExchangeRegistry(){
+ delete lock;
+}
+
+void ExchangeRegistry::declare(Exchange* exchange){
+ exchanges[exchange->getName()] = exchange;
+}
+
+void ExchangeRegistry::destroy(const string& name){
+ if(exchanges[name]){
+ delete exchanges[name];
+ exchanges.erase(name);
+ }
+}
+
+Exchange* ExchangeRegistry::get(const string& name){
+ return exchanges[name];
+}
diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp
new file mode 100644
index 0000000000..7f261d5eda
--- /dev/null
+++ b/cpp/broker/src/FanOutExchange.cpp
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "FanOutExchange.h"
+#include "ExchangeBinding.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+FanOutExchange::FanOutExchange(const string& _name) : name(_name) {}
+
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ // Add if not already present.
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i == bindings.end()) {
+ bindings.push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
+ }
+}
+
+void FanOutExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ Locker locker(lock);
+ for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
+ (*i)->deliver(msg);
+ }
+}
+
+FanOutExchange::~FanOutExchange() {}
+
+const std::string FanOutExchange::typeName("fanout");
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
new file mode 100644
index 0000000000..7afcd97934
--- /dev/null
+++ b/cpp/broker/src/Message.cpp
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "MonitorImpl.h"
+#include "Message.h"
+#include "ExchangeRegistry.h"
+#include <iostream>
+
+using namespace std::tr1;//for *_pointer_cast methods
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+
+Message::Message(const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate) : publisher(_publisher),
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate){
+
+}
+
+Message::~Message(){
+}
+
+void Message::setHeader(AMQHeaderBody::shared_ptr header){
+ this->header = header;
+}
+
+void Message::addContent(AMQContentBody::shared_ptr data){
+ content.push_back(data);
+}
+
+bool Message::isComplete(){
+ return header.get() && (header->getContentSize() == contentSize());
+}
+
+void Message::deliver(OutputHandler* out, int channel,
+ string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey)));
+ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+ out->send(new AMQFrame(channel, headerBody));
+ for(content_iterator i = content.begin(); i != content.end(); i++){
+ if((*i)->size() > framesize){
+ //TODO: need to split it
+ std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
+ }else{
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ out->send(new AMQFrame(channel, contentBody));
+ }
+ }
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+u_int64_t Message::contentSize(){
+ u_int64_t size(0);
+ for(content_iterator i = content.begin(); i != content.end(); i++){
+ size += (*i)->size();
+ }
+ return size;
+}
+
+const ConnectionToken* const Message::getPublisher(){
+ return publisher;
+}
+
+bool qpid::broker::route(Message::shared_ptr& msg, ExchangeRegistry* registry){
+ Exchange* exchange = registry->get(msg->exchange);
+ if(exchange){
+ exchange->route(msg, msg->routingKey, &(msg->getHeaderProperties()->getHeaders()));
+ return true;
+ }else{
+ return false;
+ }
+}
+
diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp
new file mode 100644
index 0000000000..46aa385a7e
--- /dev/null
+++ b/cpp/broker/src/NameGenerator.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "NameGenerator.h"
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+ std::stringstream ss;
+ ss << base << counter++;
+ return ss.str();
+}
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
new file mode 100644
index 0000000000..f7b8605b03
--- /dev/null
+++ b/cpp/broker/src/Queue.cpp
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+#include "MonitorImpl.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : name(_name),
+ durable(_durable),
+ autodelete(_autodelete),
+ owner(_owner),
+ queueing(false),
+ dispatching(false),
+ next(0),
+ lastUsed(0),
+ exclusive(0){
+
+ if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
+}
+
+Queue::~Queue(){
+ for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
+ b->cancel();
+ bindings.pop();
+ }
+}
+
+void Queue::bound(Binding* b){
+ bindings.push(b);
+}
+
+void Queue::deliver(Message::shared_ptr& msg){
+ Locker locker(lock);
+ if(queueing || !dispatch(msg)){
+ queueing = true;
+ messages.push(msg);
+ }
+}
+
+bool Queue::dispatch(Message::shared_ptr& msg){
+ if(consumers.empty()){
+ return false;
+ }else if(exclusive){
+ if(!exclusive->deliver(msg)){
+ std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+ }
+ return true;
+ }else{
+ //deliver to next consumer
+ next = next % consumers.size();
+ Consumer* c = consumers[next];
+ int start = next;
+ while(c){
+ next++;
+ if(c->deliver(msg)) return true;
+
+ next = next % consumers.size();
+ c = next == start ? 0 : consumers[next];
+ }
+ return false;
+ }
+}
+
+bool Queue::startDispatching(){
+ Locker locker(lock);
+ if(queueing && !dispatching){
+ dispatching = true;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Queue::dispatch(){
+ bool proceed = startDispatching();
+ while(proceed){
+ Locker locker(lock);
+ if(!messages.empty() && dispatch(messages.front())){
+ messages.pop();
+ }else{
+ dispatching = false;
+ proceed = false;
+ queueing = !messages.empty();
+ }
+ }
+}
+
+void Queue::consume(Consumer* c, bool requestExclusive){
+ Locker locker(lock);
+ if(exclusive) throw ExclusiveAccessException();
+ if(requestExclusive){
+ if(!consumers.empty()) throw ExclusiveAccessException();
+ exclusive = c;
+ }
+
+ if(autodelete && consumers.empty()) lastUsed = 0;
+ consumers.push_back(c);
+}
+
+void Queue::cancel(Consumer* c){
+ Locker locker(lock);
+ consumers.erase(find(consumers.begin(), consumers.end(), c));
+ if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
+ if(exclusive == c) exclusive = 0;
+}
+
+Message::shared_ptr Queue::dequeue(){
+
+}
+
+u_int32_t Queue::purge(){
+ Locker locker(lock);
+ int count = messages.size();
+ while(!messages.empty()) messages.pop();
+ return count;
+}
+
+u_int32_t Queue::getMessageCount() const{
+ Locker locker(lock);
+ return messages.size();
+}
+
+u_int32_t Queue::getConsumerCount() const{
+ Locker locker(lock);
+ return consumers.size();
+}
+
+bool Queue::canAutoDelete() const{
+ Locker locker(lock);
+ return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+}
diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp
new file mode 100644
index 0000000000..f807415314
--- /dev/null
+++ b/cpp/broker/src/QueueRegistry.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "QueueRegistry.h"
+#include "MonitorImpl.h"
+#include "SessionHandlerImpl.h"
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+QueueRegistry::QueueRegistry() : counter(1){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+{
+ Locker locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+ queues[name] = queue;
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
+ } else {
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void QueueRegistry::destroy(const string& name){
+ Locker locker(lock);
+ queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+ Locker locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ return Queue::shared_ptr();
+ } else {
+ return i->second;
+ }
+}
+
+string QueueRegistry::generateName(){
+ string name;
+ do {
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
+ } while(queues.find(name) != queues.end());
+ return name;
+}
diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
new file mode 100644
index 0000000000..661cb4ef81
--- /dev/null
+++ b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "SessionHandlerFactoryImpl.h"
+#include "SessionHandlerImpl.h"
+#include "FanOutExchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
+ exchanges.declare(new DirectExchange("amq.direct"));
+ exchanges.declare(new TopicExchange("amq.topic"));
+ exchanges.declare(new FanOutExchange("amq.fanout"));
+ cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+ return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+ cleaner.stop();
+ exchanges.destroy("amq.direct");
+ exchanges.destroy("amq.topic");
+}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
new file mode 100644
index 0000000000..19e243a01b
--- /dev/null
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -0,0 +1,378 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "SessionHandlerImpl.h"
+#include "FanOutExchange.h"
+#include "assert.h"
+
+using namespace std::tr1;
+using namespace qpid::broker;
+using namespace qpid::io;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
+ QueueRegistry* _queues,
+ ExchangeRegistry* _exchanges,
+ AutoDelete* _cleaner,
+ const u_int32_t _timeout) : context(_context),
+ queues(_queues),
+ exchanges(_exchanges),
+ cleaner(_cleaner),
+ timeout(_timeout),
+ channelHandler(new ChannelHandlerImpl(this)),
+ connectionHandler(new ConnectionHandlerImpl(this)),
+ basicHandler(new BasicHandlerImpl(this)),
+ exchangeHandler(new ExchangeHandlerImpl(this)),
+ queueHandler(new QueueHandlerImpl(this)),
+ framemax(65536),
+ heartbeat(0){
+
+}
+
+SessionHandlerImpl::~SessionHandlerImpl(){
+ // TODO aconway 2006-09-07: Should be auto_ptr or plain members.
+ delete channelHandler;
+ delete connectionHandler;
+ delete basicHandler;
+ delete exchangeHandler;
+ delete queueHandler;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = channels[channel]->getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = queues->find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange* SessionHandlerImpl::findExchange(const string& name){
+ exchanges->getLock()->acquire();
+ Exchange* exchange(exchanges->get(name));
+ exchanges->getLock()->release();
+ return exchange;
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+ u_int16_t channel = frame->getChannel();
+ AMQBody::shared_ptr body = frame->getBody();
+ AMQMethodBody::shared_ptr method;
+
+ switch(body->type())
+ {
+ case METHOD_BODY:
+ method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+ try{
+ method->invoke(*this, channel);
+ }catch(ChannelException& e){
+ channels[channel]->close();
+ channels.erase(channel);
+ context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ }catch(ConnectionException& e){
+ context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+ }
+ break;
+
+ case HEADER_BODY:
+ this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+ break;
+
+ case CONTENT_BODY:
+ this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+ break;
+
+ case HEARTBEAT_BODY:
+ //channel must be 0
+ this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+ break;
+ }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+ //send connection start
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales)));
+}
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+ for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+ Channel* c = i->second;
+ channels.erase(i);
+ c->close();
+ delete c;
+ }
+ for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+ string name = (*i)->getName();
+ queues->destroy(name);
+ exclusiveQueues.erase(i);
+ }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+ channels[channel]->handleHeader(body, exchanges);
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+ channels[channel]->handleContent(body, exchanges);
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+ std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism,
+ string& response, string& locale){
+
+ parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat)));
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int16_t channelmax, u_int32_t framemax, u_int16_t heartbeat){
+ parent->framemax = framemax;
+ parent->heartbeat = heartbeat;
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){
+ string knownhosts;
+ parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts)));
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId){
+
+ parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody()));
+ parent->context->close();
+}
+
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){
+ parent->context->close();
+}
+
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){
+ parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+ parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody()));
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool active){}
+
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId){
+ Channel* c = parent->channels[channel];
+ parent->channels.erase(channel);
+ c->close();
+ delete c;
+ parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody()));
+}
+
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){}
+
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ FieldTable& arguments){
+
+ if(!passive && (
+ type != TopicExchange::typeName &&
+ type != DirectExchange::typeName &&
+ type != FanOutExchange::typeName)
+ )
+ {
+ throw ChannelException(540, "Exchange type not implemented: " + type);
+ }
+
+ parent->exchanges->getLock()->acquire();
+ if(!parent->exchanges->get(exchange)){
+ if(type == TopicExchange::typeName){
+ parent->exchanges->declare(new TopicExchange(exchange));
+ }else if(type == DirectExchange::typeName){
+ parent->exchanges->declare(new DirectExchange(exchange));
+ }else if(type == FanOutExchange::typeName){
+ parent->exchanges->declare(new DirectExchange(exchange));
+ }
+ }
+ parent->exchanges->getLock()->release();
+ if(!nowait){
+ parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody()));
+ }
+}
+
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait){
+ //TODO: implement unused
+ parent->exchanges->getLock()->acquire();
+ parent->exchanges->destroy(exchange);
+ parent->exchanges->getLock()->release();
+ if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody()));
+}
+
+
+
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = parent->getQueue(name, channel);
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ parent->channels[channel]->setDefaultQueue(queue);
+ //add default binding:
+ parent->exchanges->get("amq.direct")->bind(queue, name, 0);
+ if(exclusive){
+ parent->exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ parent->cleaner->add(queue);
+ }
+ }
+ }
+ if(exclusive && !queue->isExclusiveOwner(parent)){
+ throw ChannelException(405, "Cannot grant exclusive access to queue");
+ }
+ if(!nowait){
+ name = queue->getName();
+ QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount());
+ parent->context->send(new AMQFrame(channel, response));
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t ticket, string& queueName,
+ string& exchangeName, string& routingKey, bool nowait,
+ FieldTable& arguments){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ Exchange* exchange = parent->exchanges->get(exchangeName);
+ if(exchange){
+ if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
+ exchange->bind(queue, routingKey, &arguments);
+ if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody()));
+ }else{
+ throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ticket, string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+ int count = queue->purge();
+ if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count)));
+}
+
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = parent->getQueue(queue, channel);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(parent)){
+ queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+ if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ parent->queues->destroy(queue);
+ }
+ if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count)));
+}
+
+
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){
+ //TODO: handle global
+ //TODO: channel doesn't do anything with these qos parameters yet
+ parent->channels[channel]->setPrefetchSize(prefetchSize);
+ parent->channels[channel]->setPrefetchCount(prefetchCount);
+ parent->context->send(new AMQFrame(channel, new BasicQosOkBody()));
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket,
+ string& queueName, string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait){
+
+ //TODO: implement nolocal
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ Channel* channel = parent->channels[channelId];
+ if(!consumerTag.empty() && channel->exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ try{
+ channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+ if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+ }catch(ExclusiveAccessException& e){
+ if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+ else throw ChannelException(403, "Access would violate previously granted exclusivity");
+ }
+
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
+ parent->channels[channel]->cancel(consumerTag);
+ if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag)));
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket,
+ string& exchange, string& routingKey,
+ bool mandatory, bool immediate){
+
+ Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey, mandatory, immediate);
+ parent->channels[channel]->handlePublish(msg);
+}
+
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
+
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){}
+
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){}
+
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){}
+
diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp
new file mode 100644
index 0000000000..e0248958f9
--- /dev/null
+++ b/cpp/broker/src/TopicExchange.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "TopicExchange.h"
+#include "ExchangeBinding.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+TopicExchange::TopicExchange(const string& _name) : name(_name) {
+
+}
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ lock.release();
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+ lock.release();
+}
+
+void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ lock.acquire();
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){
+ (*i)->deliver(msg);
+ }
+ lock.release();
+}
+
+TopicExchange::~TopicExchange(){
+
+}
+
+const std::string TopicExchange::typeName("topic");
diff --git a/cpp/broker/test/Makefile b/cpp/broker/test/Makefile
new file mode 100644
index 0000000000..172ce564bf
--- /dev/null
+++ b/cpp/broker/test/Makefile
@@ -0,0 +1,20 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+QPID_HOME = ../../..
+LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) $(BROKER_LIB)
+include ${QPID_HOME}/cpp/test_plugins.mk
+
diff --git a/cpp/broker/test/QueueRegistryTest.cpp b/cpp/broker/test/QueueRegistryTest.cpp
new file mode 100644
index 0000000000..c4ad40b5cd
--- /dev/null
+++ b/cpp/broker/test/QueueRegistryTest.cpp
@@ -0,0 +1,79 @@
+#include "QueueRegistry.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <string>
+
+using namespace qpid::broker;
+
+class QueueRegistryTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(QueueRegistryTest);
+ CPPUNIT_TEST(testDeclare);
+ CPPUNIT_TEST(testDeclareTmp);
+ CPPUNIT_TEST(testFind);
+ CPPUNIT_TEST(testDestroy);
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+ std::string foo, bar;
+ QueueRegistry reg;
+ std::pair<Queue::shared_ptr, bool> qc;
+
+ public:
+ void setUp() {
+ foo = "foo";
+ bar = "bar";
+ }
+
+ void testDeclare() {
+ qc = reg.declare(foo, false, 0, 0);
+ Queue::shared_ptr q = qc.first;
+ CPPUNIT_ASSERT(q);
+ CPPUNIT_ASSERT(qc.second); // New queue
+ CPPUNIT_ASSERT_EQUAL(foo, q->getName());
+
+ qc = reg.declare(foo, false, 0, 0);
+ CPPUNIT_ASSERT_EQUAL(q, qc.first);
+ CPPUNIT_ASSERT(!qc.second);
+
+ qc = reg.declare(bar, false, 0, 0);
+ q = qc.first;
+ CPPUNIT_ASSERT(q);
+ CPPUNIT_ASSERT_EQUAL(true, qc.second);
+ CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+ }
+
+ void testDeclareTmp()
+ {
+ qc = reg.declare(std::string(), false, 0, 0);
+ CPPUNIT_ASSERT(qc.second);
+ CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName());
+ }
+
+ void testFind() {
+ CPPUNIT_ASSERT(reg.find(foo) == 0);
+
+ reg.declare(foo, false, 0, 0);
+ reg.declare(bar, false, 0, 0);
+ Queue::shared_ptr q = reg.find(bar);
+ CPPUNIT_ASSERT(q);
+ CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+ }
+
+ void testDestroy() {
+ qc = reg.declare(foo, false, 0, 0);
+ reg.destroy(foo);
+ // Queue is gone from the registry.
+ CPPUNIT_ASSERT(reg.find(foo) == 0);
+ // Queue is not actually destroyed till we drop our reference.
+ CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName());
+ // We shoud be the only reference.
+ CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest);
diff --git a/cpp/broker/test/exchange_test.cpp b/cpp/broker/test/exchange_test.cpp
new file mode 100644
index 0000000000..6605f2685b
--- /dev/null
+++ b/cpp/broker/test/exchange_test.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "DirectExchange.h"
+#include "Exchange.h"
+#include "Queue.h"
+#include "TopicExchange.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+class ExchangeTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ExchangeTest);
+ CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ // TODO aconway 2006-09-12: Need more detailed tests.
+
+ void testMe()
+ {
+ Queue::shared_ptr queue(new Queue("queue", true, true));
+ Queue::shared_ptr queue2(new Queue("queue2", true, true));
+
+ TopicExchange topic("topic");
+ topic.bind(queue, "abc", 0);
+ topic.bind(queue2, "abc", 0);
+
+ DirectExchange direct("direct");
+ direct.bind(queue, "abc", 0);
+ direct.bind(queue2, "abc", 0);
+
+ queue.reset();
+ queue2.reset();
+
+ Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ topic.route(msg, "abc", 0);
+ direct.route(msg, "abc", 0);
+
+ // TODO aconway 2006-09-12: TODO Why no assertions?
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest);
diff --git a/cpp/broker/test/message_test.cpp b/cpp/broker/test/message_test.cpp
new file mode 100644
index 0000000000..94d25a831e
--- /dev/null
+++ b/cpp/broker/test/message_test.cpp
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRBase.h"
+#include "Message.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+class MessageTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(MessageTest);
+ CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ // TODO aconway 2006-09-12: Need more detailed tests,
+ // need tests to assert something!
+ //
+ void testMe()
+ {
+ APRBase::increment();
+ const int size(10);
+ for(int i = 0; i < size; i++){
+ Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true));
+ msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody()));
+ msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody()));
+ msg.reset();
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest);
+
diff --git a/cpp/broker/test/queue_test.cpp b/cpp/broker/test/queue_test.cpp
new file mode 100644
index 0000000000..aa423e7e08
--- /dev/null
+++ b/cpp/broker/test/queue_test.cpp
@@ -0,0 +1,138 @@
+ /*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+#include "QueueRegistry.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+
+class TestBinding : public virtual Binding{
+ bool cancelled;
+
+public:
+ TestBinding();
+ virtual void cancel();
+ bool isCancelled();
+};
+
+class TestConsumer : public virtual Consumer{
+public:
+ Message::shared_ptr last;
+
+ virtual bool deliver(Message::shared_ptr& msg);
+};
+
+
+class QueueTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(QueueTest);
+ CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ void testMe()
+ {
+ Queue::shared_ptr queue(new Queue("my_queue", true, true));
+
+ //Test adding consumers:
+ TestConsumer c1;
+ TestConsumer c2;
+ queue->consume(&c1);
+ queue->consume(&c2);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
+
+ //Test basic delivery:
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+
+ queue->deliver(msg1);
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+
+ queue->deliver(msg2);
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+
+ queue->deliver(msg3);
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
+
+ //Test cancellation:
+ queue->cancel(&c1);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
+ queue->cancel(&c2);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+
+ //Test bindings:
+ TestBinding a;
+ TestBinding b;
+ queue->bound(&a);
+ queue->bound(&b);
+
+ queue.reset();
+
+ CPPUNIT_ASSERT(a.isCancelled());
+ CPPUNIT_ASSERT(b.isCancelled());
+
+ //Test use of queues in registry:
+ QueueRegistry registry;
+ registry.declare("queue1", true, true);
+ registry.declare("queue2", true, true);
+ registry.declare("queue3", true, true);
+
+ CPPUNIT_ASSERT(registry.find("queue1"));
+ CPPUNIT_ASSERT(registry.find("queue2"));
+ CPPUNIT_ASSERT(registry.find("queue3"));
+
+ registry.destroy("queue1");
+ registry.destroy("queue2");
+ registry.destroy("queue3");
+
+ CPPUNIT_ASSERT(!registry.find("queue1"));
+ CPPUNIT_ASSERT(!registry.find("queue2"));
+ CPPUNIT_ASSERT(!registry.find("queue3"));
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
+
+//TestBinding
+TestBinding::TestBinding() : cancelled(false) {}
+
+void TestBinding::cancel(){
+ CPPUNIT_ASSERT(!cancelled);
+ cancelled = true;
+}
+
+bool TestBinding::isCancelled(){
+ return cancelled;
+}
+
+//TestConsumer
+bool TestConsumer::deliver(Message::shared_ptr& msg){
+ last = msg;
+ return true;
+}
+