summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-03-21 02:08:18 +0000
committerAlan Conway <aconway@apache.org>2007-03-21 02:08:18 +0000
commitd2eb3361494710466280341c98f76c03536d2ebe (patch)
treef16ec2eacd8383e388657e54a22fc0214a0ce023 /qpid/cpp/lib/broker
parent732544fe86089ab86c03fcc48d5ca4c72667c275 (diff)
downloadqpid-python-d2eb3361494710466280341c98f76c03536d2ebe.tar.gz
Renamed cpp-0-9 to cpp
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@520706 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/lib/broker')
-rw-r--r--qpid/cpp/lib/broker/AccumulatedAck.cpp57
-rw-r--r--qpid/cpp/lib/broker/AccumulatedAck.h57
-rw-r--r--qpid/cpp/lib/broker/AutoDelete.cpp86
-rw-r--r--qpid/cpp/lib/broker/AutoDelete.h55
-rw-r--r--qpid/cpp/lib/broker/Binding.h38
-rw-r--r--qpid/cpp/lib/broker/Broker.cpp118
-rw-r--r--qpid/cpp/lib/broker/Broker.h106
-rw-r--r--qpid/cpp/lib/broker/BrokerAdapter.cpp388
-rw-r--r--qpid/cpp/lib/broker/BrokerAdapter.h222
-rw-r--r--qpid/cpp/lib/broker/BrokerChannel.cpp346
-rw-r--r--qpid/cpp/lib/broker/BrokerChannel.h159
-rw-r--r--qpid/cpp/lib/broker/BrokerExchange.h51
-rw-r--r--qpid/cpp/lib/broker/BrokerMessage.cpp245
-rw-r--r--qpid/cpp/lib/broker/BrokerMessage.h137
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageBase.h184
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.cpp239
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.h91
-rw-r--r--qpid/cpp/lib/broker/BrokerQueue.cpp258
-rw-r--r--qpid/cpp/lib/broker/BrokerQueue.h151
-rw-r--r--qpid/cpp/lib/broker/BrokerSingleton.cpp36
-rw-r--r--qpid/cpp/lib/broker/BrokerSingleton.h52
-rw-r--r--qpid/cpp/lib/broker/CompletionHandler.h39
-rw-r--r--qpid/cpp/lib/broker/Configuration.cpp252
-rw-r--r--qpid/cpp/lib/broker/Configuration.h171
-rw-r--r--qpid/cpp/lib/broker/Connection.cpp128
-rw-r--r--qpid/cpp/lib/broker/Connection.h108
-rw-r--r--qpid/cpp/lib/broker/ConnectionFactory.cpp43
-rw-r--r--qpid/cpp/lib/broker/ConnectionFactory.h47
-rw-r--r--qpid/cpp/lib/broker/ConnectionToken.h38
-rw-r--r--qpid/cpp/lib/broker/Consumer.h37
-rw-r--r--qpid/cpp/lib/broker/Content.h64
-rw-r--r--qpid/cpp/lib/broker/DeletingTxOp.cpp45
-rw-r--r--qpid/cpp/lib/broker/DeletingTxOp.h45
-rw-r--r--qpid/cpp/lib/broker/Deliverable.h37
-rw-r--r--qpid/cpp/lib/broker/DeliverableMessage.cpp33
-rw-r--r--qpid/cpp/lib/broker/DeliverableMessage.h41
-rw-r--r--qpid/cpp/lib/broker/DeliveryRecord.cpp91
-rw-r--r--qpid/cpp/lib/broker/DeliveryRecord.h64
-rw-r--r--qpid/cpp/lib/broker/DirectExchange.cpp73
-rw-r--r--qpid/cpp/lib/broker/DirectExchange.h57
-rw-r--r--qpid/cpp/lib/broker/ExchangeBinding.cpp35
-rw-r--r--qpid/cpp/lib/broker/ExchangeBinding.h48
-rw-r--r--qpid/cpp/lib/broker/ExchangeRegistry.cpp76
-rw-r--r--qpid/cpp/lib/broker/ExchangeRegistry.h47
-rw-r--r--qpid/cpp/lib/broker/FanOutExchange.cpp60
-rw-r--r--qpid/cpp/lib/broker/FanOutExchange.h60
-rw-r--r--qpid/cpp/lib/broker/HandlerImpl.h71
-rw-r--r--qpid/cpp/lib/broker/HeadersExchange.cpp121
-rw-r--r--qpid/cpp/lib/broker/HeadersExchange.h65
-rw-r--r--qpid/cpp/lib/broker/InMemoryContent.cpp73
-rw-r--r--qpid/cpp/lib/broker/InMemoryContent.h45
-rw-r--r--qpid/cpp/lib/broker/LazyLoadedContent.cpp68
-rw-r--r--qpid/cpp/lib/broker/LazyLoadedContent.h49
-rw-r--r--qpid/cpp/lib/broker/Makefile.am96
-rw-r--r--qpid/cpp/lib/broker/MessageBuilder.cpp74
-rw-r--r--qpid/cpp/lib/broker/MessageBuilder.h57
-rw-r--r--qpid/cpp/lib/broker/MessageHandlerImpl.cpp243
-rw-r--r--qpid/cpp/lib/broker/MessageHandlerImpl.h130
-rw-r--r--qpid/cpp/lib/broker/MessageStore.h140
-rw-r--r--qpid/cpp/lib/broker/MessageStoreModule.cpp104
-rw-r--r--qpid/cpp/lib/broker/MessageStoreModule.h60
-rw-r--r--qpid/cpp/lib/broker/NameGenerator.cpp32
-rw-r--r--qpid/cpp/lib/broker/NameGenerator.h39
-rw-r--r--qpid/cpp/lib/broker/NullMessageStore.cpp104
-rw-r--r--qpid/cpp/lib/broker/NullMessageStore.h59
-rw-r--r--qpid/cpp/lib/broker/Prefetch.h42
-rw-r--r--qpid/cpp/lib/broker/QueuePolicy.cpp69
-rw-r--r--qpid/cpp/lib/broker/QueuePolicy.h54
-rw-r--r--qpid/cpp/lib/broker/QueueRegistry.cpp78
-rw-r--r--qpid/cpp/lib/broker/QueueRegistry.h96
-rw-r--r--qpid/cpp/lib/broker/RecoveryManager.cpp46
-rw-r--r--qpid/cpp/lib/broker/RecoveryManager.h45
-rw-r--r--qpid/cpp/lib/broker/Reference.cpp54
-rw-r--r--qpid/cpp/lib/broker/Reference.h111
-rw-r--r--qpid/cpp/lib/broker/TopicExchange.cpp156
-rw-r--r--qpid/cpp/lib/broker/TopicExchange.h100
-rw-r--r--qpid/cpp/lib/broker/TransactionalStore.h47
-rw-r--r--qpid/cpp/lib/broker/TxAck.cpp54
-rw-r--r--qpid/cpp/lib/broker/TxAck.h58
-rw-r--r--qpid/cpp/lib/broker/TxBuffer.cpp55
-rw-r--r--qpid/cpp/lib/broker/TxBuffer.h107
-rw-r--r--qpid/cpp/lib/broker/TxOp.h39
-rw-r--r--qpid/cpp/lib/broker/TxPublish.cpp60
-rw-r--r--qpid/cpp/lib/broker/TxPublish.h80
84 files changed, 7796 insertions, 0 deletions
diff --git a/qpid/cpp/lib/broker/AccumulatedAck.cpp b/qpid/cpp/lib/broker/AccumulatedAck.cpp
new file mode 100644
index 0000000000..ff471b0287
--- /dev/null
+++ b/qpid/cpp/lib/broker/AccumulatedAck.cpp
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AccumulatedAck.h"
+
+#include <assert.h>
+
+using std::less_equal;
+using std::bind2nd;
+using namespace qpid::broker;
+
+void AccumulatedAck::update(uint64_t firstTag, uint64_t lastTag){
+ assert(firstTag<=lastTag);
+ if (firstTag <= range + 1) {
+ if (lastTag > range) range = lastTag;
+ } else {
+ for (uint64_t tag = firstTag; tag<=lastTag; tag++)
+ individual.push_back(tag);
+ }
+}
+
+void AccumulatedAck::consolidate(){
+ individual.sort();
+ //remove any individual tags that are covered by range
+ individual.remove_if(bind2nd(less_equal<uint64_t>(), range));
+ //update range if possible (using <= allows for duplicates from overlapping ranges)
+ while (individual.front() <= range + 1) {
+ range = individual.front();
+ individual.pop_front();
+ }
+}
+
+void AccumulatedAck::clear(){
+ range = 0;
+ individual.clear();
+}
+
+bool AccumulatedAck::covers(uint64_t tag) const{
+ return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end();
+}
diff --git a/qpid/cpp/lib/broker/AccumulatedAck.h b/qpid/cpp/lib/broker/AccumulatedAck.h
new file mode 100644
index 0000000000..c4a6e3b79b
--- /dev/null
+++ b/qpid/cpp/lib/broker/AccumulatedAck.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _AccumulatedAck_
+#define _AccumulatedAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Keeps an accumulated record of acked messages (by delivery
+ * tag).
+ */
+ class AccumulatedAck {
+ public:
+ /**
+ * If not zero, then everything up to this value has been
+ * acked.
+ */
+ uint64_t range;
+ /**
+ * List of individually acked messages that are not
+ * included in the range marked by 'range'.
+ */
+ std::list<uint64_t> individual;
+
+ AccumulatedAck(uint64_t r) : range(r) {}
+ void update(uint64_t firstTag, uint64_t lastTag);
+ void consolidate();
+ void clear();
+ bool covers(uint64_t tag) const;
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/AutoDelete.cpp b/qpid/cpp/lib/broker/AutoDelete.cpp
new file mode 100644
index 0000000000..2037a9c71c
--- /dev/null
+++ b/qpid/cpp/lib/broker/AutoDelete.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <AutoDelete.h>
+#include <sys/Time.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+AutoDelete::AutoDelete(QueueRegistry* const _registry, uint32_t _period)
+ : registry(_registry), period(_period), stopped(true) { }
+
+void AutoDelete::add(Queue::shared_ptr const queue){
+ Mutex::ScopedLock l(lock);
+ queues.push(queue);
+}
+
+Queue::shared_ptr const AutoDelete::pop(){
+ Queue::shared_ptr next;
+ Mutex::ScopedLock l(lock);
+ if(!queues.empty()){
+ next = queues.front();
+ queues.pop();
+ }
+ return next;
+}
+
+void AutoDelete::process(){
+ Queue::shared_ptr seen;
+ for(Queue::shared_ptr q = pop(); q; q = pop()){
+ if(seen == q){
+ add(q);
+ break;
+ }else if(q->canAutoDelete()){
+ std::string name(q->getName());
+ registry->destroy(name);
+ std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
+ }else{
+ add(q);
+ if(!seen) seen = q;
+ }
+ }
+}
+
+void AutoDelete::run(){
+ Monitor::ScopedLock l(monitor);
+ while(!stopped){
+ process();
+ monitor.wait(period*TIME_MSEC);
+ }
+}
+
+void AutoDelete::start(){
+ Monitor::ScopedLock l(monitor);
+ if(stopped){
+ stopped = false;
+ runner = Thread(this);
+ }
+}
+
+void AutoDelete::stop(){
+ {
+ Monitor::ScopedLock l(monitor);
+ if(stopped) return;
+ stopped = true;
+ }
+ monitor.notify();
+ runner.join();
+}
diff --git a/qpid/cpp/lib/broker/AutoDelete.h b/qpid/cpp/lib/broker/AutoDelete.h
new file mode 100644
index 0000000000..9034de1730
--- /dev/null
+++ b/qpid/cpp/lib/broker/AutoDelete.h
@@ -0,0 +1,55 @@
+#ifndef _AutoDelete_
+#define _AutoDelete_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <queue>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+#include <QueueRegistry.h>
+#include <sys/Thread.h>
+
+namespace qpid {
+ namespace broker{
+ class AutoDelete : private qpid::sys::Runnable {
+ qpid::sys::Mutex lock;
+ qpid::sys::Monitor monitor;
+ std::queue<Queue::shared_ptr> queues;
+ QueueRegistry* const registry;
+ uint32_t period;
+ volatile bool stopped;
+ qpid::sys::Thread runner;
+
+ Queue::shared_ptr const pop();
+ void process();
+ virtual void run();
+
+ public:
+ AutoDelete(QueueRegistry* const registry, uint32_t period);
+ void add(Queue::shared_ptr const);
+ void start();
+ void stop();
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Binding.h b/qpid/cpp/lib/broker/Binding.h
new file mode 100644
index 0000000000..16ca223208
--- /dev/null
+++ b/qpid/cpp/lib/broker/Binding.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Binding_
+#define _Binding_
+
+#include <FieldTable.h>
+
+namespace qpid {
+ namespace broker {
+ class Binding{
+ public:
+ virtual void cancel() = 0;
+ virtual ~Binding(){}
+ };
+ }
+}
+
+
+#endif
+
diff --git a/qpid/cpp/lib/broker/Broker.cpp b/qpid/cpp/lib/broker/Broker.cpp
new file mode 100644
index 0000000000..f650452e33
--- /dev/null
+++ b/qpid/cpp/lib/broker/Broker.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+
+#include "AMQFrame.h"
+#include "DirectExchange.h"
+#include "TopicExchange.h"
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+#include "MessageStoreModule.h"
+#include "NullMessageStore.h"
+#include "ProtocolInitiation.h"
+#include "Connection.h"
+#include "sys/ConnectionInputHandler.h"
+#include "sys/ConnectionInputHandlerFactory.h"
+#include "sys/TimeoutHandler.h"
+
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
+
+Broker::Broker(const Configuration& conf) :
+ config(conf),
+ queues(store.get()),
+ timeout(30000),
+ stagingThreshold(0),
+ cleaner(&queues, timeout/10),
+ factory(*this)
+{
+ if (config.getStore().empty())
+ store.reset(new NullMessageStore(config.isTrace()));
+ else
+ store.reset(new MessageStoreModule(config.getStore()));
+
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ MessageStoreSettings storeSettings = { getStagingThreshold() };
+ store->recover(recoverer, &storeSettings);
+ }
+
+ cleaner.start();
+}
+
+
+Broker::shared_ptr Broker::create(int16_t port)
+{
+ Configuration config;
+ config.setPort(port);
+ return create(config);
+}
+
+Broker::shared_ptr Broker::create(const Configuration& config) {
+ return Broker::shared_ptr(new Broker(config));
+}
+
+void Broker::run() {
+ getAcceptor().run(&factory);
+}
+
+void Broker::shutdown() {
+ if (acceptor)
+ acceptor->shutdown();
+}
+
+Broker::~Broker() {
+ shutdown();
+}
+
+int16_t Broker::getPort() const { return getAcceptor().getPort(); }
+
+Acceptor& Broker::getAcceptor() const {
+ if (!acceptor)
+ const_cast<Acceptor::shared_ptr&>(acceptor) =
+ Acceptor::create(config.getPort(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads(),
+ config.isTrace());
+ return *acceptor;
+}
+
+
+const int16_t Broker::DEFAULT_PORT(5672);
+
+
+}} // namespace qpid::broker
+
diff --git a/qpid/cpp/lib/broker/Broker.h b/qpid/cpp/lib/broker/Broker.h
new file mode 100644
index 0000000000..7c21e90b18
--- /dev/null
+++ b/qpid/cpp/lib/broker/Broker.h
@@ -0,0 +1,106 @@
+#ifndef _Broker_
+#define _Broker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <Configuration.h>
+#include <ConnectionFactory.h>
+#include <sys/Runnable.h>
+#include <sys/Acceptor.h>
+#include <SharedObject.h>
+#include <MessageStore.h>
+#include <AutoDelete.h>
+#include <ExchangeRegistry.h>
+#include <ConnectionToken.h>
+#include <DirectExchange.h>
+#include <OutputHandler.h>
+#include <ProtocolInitiation.h>
+#include <QueueRegistry.h>
+
+namespace qpid {
+namespace broker {
+/**
+ * A broker instance.
+ */
+class Broker : public sys::Runnable,
+ public SharedObject<Broker>
+{
+ public:
+ static const int16_t DEFAULT_PORT;
+
+ virtual ~Broker();
+
+ /**
+ * Create a broker.
+ * @param port Port to listen on or 0 to pick a port dynamically.
+ */
+ static shared_ptr create(int16_t port = DEFAULT_PORT);
+
+ /**
+ * Create a broker using a Configuration.
+ */
+ static shared_ptr create(const Configuration& config);
+
+ /**
+ * Return listening port. If called before bind this is
+ * the configured port. If called after it is the actual
+ * port, which will be different if the configured port is
+ * 0.
+ */
+ virtual int16_t getPort() const;
+
+ /**
+ * Run the broker. Implements Runnable::run() so the broker
+ * can be run in a separate thread.
+ */
+ virtual void run();
+
+ /** Shut down the broker */
+ virtual void shutdown();
+
+ MessageStore& getStore() { return *store; }
+ QueueRegistry& getQueues() { return queues; }
+ ExchangeRegistry& getExchanges() { return exchanges; }
+ uint32_t getTimeout() { return timeout; }
+ uint64_t getStagingThreshold() { return stagingThreshold; }
+ AutoDelete& getCleaner() { return cleaner; }
+
+ private:
+ Broker(const Configuration& config);
+ sys::Acceptor& getAcceptor() const;
+
+ Configuration config;
+ sys::Acceptor::shared_ptr acceptor;
+ std::auto_ptr<MessageStore> store;
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ uint32_t timeout;
+ uint64_t stagingThreshold;
+ AutoDelete cleaner;
+ ConnectionFactory factory;
+};
+
+}}
+
+
+
+#endif /*!_Broker_*/
diff --git a/qpid/cpp/lib/broker/BrokerAdapter.cpp b/qpid/cpp/lib/broker/BrokerAdapter.cpp
new file mode 100644
index 0000000000..981801c40e
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerAdapter.cpp
@@ -0,0 +1,388 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/format.hpp>
+
+#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
+#include "Connection.h"
+#include "AMQMethodBody.h"
+#include "Exception.h"
+
+namespace qpid {
+namespace broker {
+
+using boost::format;
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr> QueueVector;
+
+
+BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
+ CoreRefs(ch, c, b),
+ connection(c),
+ basicHandler(*this),
+ channelHandler(*this),
+ connectionHandler(*this),
+ exchangeHandler(*this),
+ messageHandler(*this),
+ queueHandler(*this),
+ txHandler(*this)
+{}
+
+
+ProtocolVersion BrokerAdapter::getVersion() const {
+ return connection.getVersion();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::startOk(
+ const MethodContext&, const FieldTable& /*clientProperties*/,
+ const string& /*mechanism*/,
+ const string& /*response*/, const string& /*locale*/)
+{
+ client.tune(
+ 100, connection.getFrameMax(), connection.getHeartbeat());
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::secureOk(
+ const MethodContext&, const string& /*response*/){}
+
+void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
+ const MethodContext&, uint16_t /*channelmax*/,
+ uint32_t framemax, uint16_t heartbeat)
+{
+ connection.setFrameMax(framemax);
+ connection.setHeartbeat(heartbeat);
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::open(
+ const MethodContext& context, const string& /*virtualHost*/,
+ const string& /*capabilities*/, bool /*insist*/)
+{
+ string knownhosts;
+ client.openOk(
+ knownhosts, context.getRequestId());
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::close(
+ const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk(context.getRequestId());
+ connection.getOutput().close();
+}
+
+void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
+ connection.getOutput().close();
+}
+
+void BrokerAdapter::ChannelHandlerImpl::open(
+ const MethodContext& context, const string& /*outOfBand*/){
+ channel.open();
+ // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
+ client.openOk(
+ std::string()/* ID */, context.getRequestId());
+}
+
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
+void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
+
+void BrokerAdapter::ChannelHandlerImpl::close(
+ const MethodContext& context, uint16_t /*replyCode*/,
+ const string& /*replyText*/,
+ uint16_t /*classId*/, uint16_t /*methodId*/)
+{
+ client.closeOk(context.getRequestId());
+ // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
+ connection.closeChannel(channel.getId());
+}
+
+void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
+
+
+
+void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
+ bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
+ const FieldTable& /*arguments*/){
+
+ if(passive){
+ if(!broker.getExchanges().get(exchange)) {
+ throw ChannelException(404, "Exchange not found: " + exchange);
+ }
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
+ if(!response.second && response.first->getType() != type){
+ throw ConnectionException(
+ 530,
+ "Exchange already declared to be of type "
+ + response.first->getType() + ", requested " + type);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw ConnectionException(
+ 503, "Exchange type not implemented: " + type);
+ }
+ }
+ if(!nowait){
+ client.declareOk(context.getRequestId());
+ }
+}
+
+void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
+ const string& exchange, bool /*ifUnused*/, bool nowait){
+
+ //TODO: implement unused
+ broker.getExchanges().destroy(exchange);
+ if(!nowait) client.deleteOk(context.getRequestId());
+}
+
+void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = connection.getQueue(name, channel.getId());
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ broker.getQueues().declare(
+ name, durable,
+ autoDelete ? connection.getTimeout() : 0,
+ exclusive ? &connection : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ channel.setDefaultQueue(queue);
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ broker.getExchanges().getDefault()->bind(queue, name, 0);
+ if (exclusive) {
+ connection.exclusiveQueues.push_back(queue);
+ } else if(autoDelete){
+ broker.getCleaner().add(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(&connection))
+ throw ChannelException(
+ 405,
+ format("Cannot grant exclusive access to queue '%s'")
+ % queue->getName());
+ if (!nowait) {
+ string queueName = queue->getName();
+ client.declareOk(
+ queueName, queue->getMessageCount(), queue->getConsumerCount(),
+ context.getRequestId());
+ }
+}
+
+void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName,
+ const string& exchangeName, const string& routingKey, bool nowait,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ if(exchange){
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ exchange->bind(queue, exchangeRoutingKey, &arguments);
+ if(!nowait) client.bindOk(context.getRequestId());
+ }else{
+ throw ChannelException(
+ 404, "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void
+BrokerAdapter::QueueHandlerImpl::unbind(
+ const MethodContext& context,
+ uint16_t /*ticket*/,
+ const string& queueName,
+ const string& exchangeName,
+ const string& routingKey,
+ const qpid::framing::FieldTable& arguments )
+{
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+
+ Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+ if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
+
+ exchange->unbind(queue, routingKey, &arguments);
+
+ client.unbindOk(context.getRequestId());
+}
+
+void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ int count = queue->purge();
+ if(!nowait) client.purgeOk( count, context.getRequestId());
+}
+
+void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue,
+ bool ifUnused, bool ifEmpty, bool nowait){
+ ChannelException error(0, "");
+ int count(0);
+ Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw ChannelException(406, "Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw ChannelException(406, "Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(&connection)){
+ QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
+ if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+ }
+ count = q->getMessageCount();
+ q->destroy();
+ broker.getQueues().destroy(queue);
+ }
+
+ if(!nowait)
+ client.deleteOk(count, context.getRequestId());
+}
+
+
+
+
+void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
+ //TODO: handle global
+ channel.setPrefetchSize(prefetchSize);
+ channel.setPrefetchCount(prefetchCount);
+ client.qosOk(context.getRequestId());
+}
+
+void BrokerAdapter::BasicHandlerImpl::consume(
+ const MethodContext& context, uint16_t /*ticket*/,
+ const string& queueName, const string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive,
+ bool nowait, const FieldTable& fields)
+{
+
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!consumerTag.empty() && channel.exists(consumerTag)){
+ throw ConnectionException(530, "Consumer tags must be unique");
+ }
+
+ string newTag = consumerTag;
+ channel.consume(
+ newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+
+ if(!nowait) client.consumeOk(newTag, context.getRequestId());
+
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
+}
+
+void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
+ channel.cancel(consumerTag);
+
+ if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
+}
+
+void BrokerAdapter::BasicHandlerImpl::publish(
+ const MethodContext& context, uint16_t /*ticket*/,
+ const string& exchangeName, const string& routingKey,
+ bool mandatory, bool immediate)
+{
+
+ Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
+ if(exchange){
+ BasicMessage* msg = new BasicMessage(
+ &connection, exchangeName, routingKey, mandatory, immediate,
+ context.methodBody);
+ channel.handlePublish(msg);
+ }else{
+ throw ChannelException(
+ 404, "Exchange not found '" + exchangeName + "'");
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
+ string clusterId;//not used, part of an imatix hack
+
+ client.getEmpty(clusterId, context.getRequestId());
+ }
+}
+
+void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){
+ channel.ack(deliveryTag, multiple);
+}
+
+void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){}
+
+void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
+ channel.recover(requeue);
+}
+
+void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
+ channel.begin();
+ client.selectOk(context.getRequestId());
+}
+
+void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
+ channel.commit();
+ client.commitOk(context.getRequestId());
+}
+
+void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
+
+ channel.rollback();
+ client.rollbackOk(context.getRequestId());
+ channel.recover(false);
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
+{
+ //no specific action required, generic response handling should be sufficient
+}
+
+
+//
+// Message class method handlers
+//
+void
+BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
+{
+ client.ok(context.getRequestId());
+ client.pong();
+}
+
+
+void
+BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
+{
+ client.ok(context.getRequestId());
+}
+
+void
+BrokerAdapter::ChannelHandlerImpl::resume(
+ const MethodContext&,
+ const string& /*channel*/ )
+{
+ assert(0); // FIXME aconway 2007-01-04: 0-9 feature
+}
+
+}} // namespace qpid::broker
+
diff --git a/qpid/cpp/lib/broker/BrokerAdapter.h b/qpid/cpp/lib/broker/BrokerAdapter.h
new file mode 100644
index 0000000000..2fafbcc180
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerAdapter.h
@@ -0,0 +1,222 @@
+#ifndef _broker_BrokerAdapter_h
+#define _broker_BrokerAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AMQP_ServerOperations.h"
+#include "HandlerImpl.h"
+#include "MessageHandlerImpl.h"
+#include "Exception.h"
+
+namespace qpid {
+namespace broker {
+
+class Channel;
+class Connection;
+class Broker;
+class ChannelHandler;
+class ConnectionHandler;
+class BasicHandler;
+class ExchangeHandler;
+class QueueHandler;
+class TxHandler;
+class MessageHandler;
+class AccessHandler;
+class FileHandler;
+class StreamHandler;
+class DtxHandler;
+class TunnelHandler;
+class MessageHandlerImpl;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * A container for a collection of AMQP-class adapters that translate
+ * AMQP method bodies into calls on the core Channel, Connection and
+ * Broker objects. Each adapter class also provides a client proxy
+ * to send methods to the peer.
+ *
+ */
+class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
+{
+ public:
+ BrokerAdapter(Channel& ch, Connection& c, Broker& b);
+
+ framing::ProtocolVersion getVersion() const;
+ ChannelHandler* getChannelHandler() { return &channelHandler; }
+ ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
+ BasicHandler* getBasicHandler() { return &basicHandler; }
+ ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
+ QueueHandler* getQueueHandler() { return &queueHandler; }
+ TxHandler* getTxHandler() { return &txHandler; }
+ MessageHandler* getMessageHandler() { return &messageHandler; }
+ AccessHandler* getAccessHandler() {
+ throw ConnectionException(540, "Access class not implemented"); }
+ FileHandler* getFileHandler() {
+ throw ConnectionException(540, "File class not implemented"); }
+ StreamHandler* getStreamHandler() {
+ throw ConnectionException(540, "Stream class not implemented"); }
+ DtxHandler* getDtxHandler() {
+ throw ConnectionException(540, "Dtx class not implemented"); }
+ TunnelHandler* getTunnelHandler() {
+ throw ConnectionException(540, "Tunnel class not implemented"); }
+
+ framing::AMQP_ClientProxy& getProxy() { return proxy; }
+
+ private:
+
+ class ConnectionHandlerImpl :
+ public ConnectionHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Connection>
+ {
+ public:
+ ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void startOk(const framing::MethodContext& context,
+ const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const framing::MethodContext& context,
+ const std::string& response);
+ void tuneOk(const framing::MethodContext& context,
+ uint16_t channelMax,
+ uint32_t frameMax, uint16_t heartbeat);
+ void open(const framing::MethodContext& context,
+ const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(const framing::MethodContext& context, uint16_t replyCode,
+ const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+ };
+
+ class ChannelHandlerImpl :
+ public ChannelHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Channel>
+ {
+ public:
+ ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void open(const framing::MethodContext& context, const std::string& outOfBand);
+ void flow(const framing::MethodContext& context, bool active);
+ void flowOk(const framing::MethodContext& context, bool active);
+ void ok( const framing::MethodContext& context );
+ void ping( const framing::MethodContext& context );
+ void pong( const framing::MethodContext& context );
+ void resume( const framing::MethodContext& context, const std::string& channelId );
+ void close(const framing::MethodContext& context, uint16_t replyCode, const
+ std::string& replyText, uint16_t classId, uint16_t methodId);
+ void closeOk(const framing::MethodContext& context);
+ };
+
+ class ExchangeHandlerImpl :
+ public ExchangeHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
+ {
+ public:
+ ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void declare(const framing::MethodContext& context, uint16_t ticket,
+ const std::string& exchange, const std::string& type,
+ bool passive, bool durable, bool autoDelete,
+ bool internal, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const framing::MethodContext& context, uint16_t ticket,
+ const std::string& exchange, bool ifUnused, bool nowait);
+ };
+
+ class QueueHandlerImpl :
+ public QueueHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Queue>
+ {
+ public:
+ QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void declare(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait,
+ const qpid::framing::FieldTable& arguments);
+ void bind(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ bool nowait, const qpid::framing::FieldTable& arguments);
+ void unbind(const framing::MethodContext& context,
+ uint16_t ticket,
+ const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments );
+ void purge(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ bool nowait);
+ void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ bool ifUnused, bool ifEmpty,
+ bool nowait);
+ };
+
+ class BasicHandlerImpl :
+ public BasicHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Basic>
+ {
+ public:
+ BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void qos(const framing::MethodContext& context, uint32_t prefetchSize,
+ uint16_t prefetchCount, bool global);
+ void consume(
+ const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ const std::string& consumerTag, bool noLocal, bool noAck,
+ bool exclusive, bool nowait,
+ const qpid::framing::FieldTable& fields);
+ void cancel(const framing::MethodContext& context, const std::string& consumerTag,
+ bool nowait);
+ void publish(const framing::MethodContext& context, uint16_t ticket,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ void get(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
+ bool noAck);
+ void ack(const framing::MethodContext& context, uint64_t deliveryTag, bool multiple);
+ void reject(const framing::MethodContext& context, uint64_t deliveryTag, bool requeue);
+ void recover(const framing::MethodContext& context, bool requeue);
+ };
+
+ class TxHandlerImpl :
+ public TxHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Tx>
+ {
+ public:
+ TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+
+ void select(const framing::MethodContext& context);
+ void commit(const framing::MethodContext& context);
+ void rollback(const framing::MethodContext& context);
+ };
+
+ Connection& connection;
+ BasicHandlerImpl basicHandler;
+ ChannelHandlerImpl channelHandler;
+ ConnectionHandlerImpl connectionHandler;
+ ExchangeHandlerImpl exchangeHandler;
+ MessageHandlerImpl messageHandler;
+ QueueHandlerImpl queueHandler;
+ TxHandlerImpl txHandler;
+
+};
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_BrokerAdapter_h*/
diff --git a/qpid/cpp/lib/broker/BrokerChannel.cpp b/qpid/cpp/lib/broker/BrokerChannel.cpp
new file mode 100644
index 0000000000..5673a2c42a
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerChannel.cpp
@@ -0,0 +1,346 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <assert.h>
+
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+
+#include <boost/bind.hpp>
+
+#include "BrokerChannel.h"
+#include "DeletingTxOp.h"
+#include "framing/ChannelAdapter.h"
+#include <QpidError.h>
+#include <DeliverableMessage.h>
+#include <BrokerQueue.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <TxAck.h>
+#include <TxPublish.h>
+#include "BrokerAdapter.h"
+#include "Connection.h"
+
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+Channel::Channel(
+ Connection& con, ChannelId id,
+ uint32_t _framesize, MessageStore* const _store,
+ uint64_t _stagingThreshold
+) :
+ ChannelAdapter(id, &con.getOutput(), con.getVersion()),
+ connection(con),
+ currentDeliveryTag(1),
+ transactional(false),
+ prefetchSize(0),
+ prefetchCount(0),
+ framesize(_framesize),
+ tagGenerator("sgen"),
+ accumulatedAck(0),
+ store(_store),
+ messageBuilder(this, _store, _stagingThreshold),
+ opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
+ adapter(new BrokerAdapter(*this, con, con.broker))
+{
+ outstanding.reset();
+}
+
+Channel::~Channel(){
+ close();
+}
+
+bool Channel::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+// TODO aconway 2007-02-12: Why is connection token passed in instead
+// of using the channel's parent connection?
+void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection,
+ const FieldTable*)
+{
+ if(tagInOut.empty())
+ tagInOut = tagGenerator.generate();
+ std::auto_ptr<ConsumerImpl> c(
+ new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ queue->consume(c.get(), exclusive);//may throw exception
+ consumers.insert(tagInOut, c.release());
+}
+
+void Channel::cancel(const string& tag){
+ // consumers is a ptr_map so erase will delete the consumer
+ // which will call cancel.
+ ConsumerImplMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ consumers.erase(i);
+}
+
+void Channel::close(){
+ opened = false;
+ consumers.clear();
+ recover(true);
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
+}
+
+void Channel::rollback(){
+ txBuffer.rollback();
+ accumulatedAck.clear();
+}
+
+void Channel::deliver(
+ Message::shared_ptr& msg, const string& consumerTag,
+ Queue::shared_ptr& queue, bool ackExpected)
+{
+ Mutex::ScopedLock locker(deliveryLock);
+
+ // Key the delivered messages to the id of the request in which they're sent
+ uint64_t deliveryTag = getNextSendRequestId();
+
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
+ }
+ //send deliver method, header and content(s)
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+ return countOk && sizeOk;
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(!connection || connection != msg->getPublisher()){//check for no_local
+ if(ackExpected && !parent->checkPrefetch(msg)){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
+ }
+ return false;
+}
+
+Channel::ConsumerImpl::~ConsumerImpl() {
+ cancel();
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue)
+ queue->cancel(this);
+}
+
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked)
+ queue->dispatch();
+}
+
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
+{
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(
+ *deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(
+ deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ }
+}
+
+void Channel::handlePublish(Message* _message){
+ Message::shared_ptr message(_message);
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+ //at this point, decide based on the size of the message whether we want
+ //to stage it by saving content directly to disk as it arrives
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
+}
+
+void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
+ // TODO aconway 2007-01-17: Implement heartbeating.
+}
+
+void Channel::complete(Message::shared_ptr msg) {
+ Exchange::shared_ptr exchange =
+ connection.broker.getExchanges().get(msg->getExchange());
+ assert(exchange.get());
+ if(transactional) {
+ std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
+ exchange->route(*deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable.release()));
+ } else {
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(),
+ &(msg->getApplicationHeaders()));
+ }
+}
+
+void Channel::ack(){
+ ack(getFirstAckRequest(), getLastAckRequest());
+}
+
+// Used by Basic
+void Channel::ack(uint64_t deliveryTag, bool multiple){
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(uint64_t firstTag, uint64_t lastTag){
+ if(transactional){
+ accumulatedAck.update(firstTag, lastTag);
+
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
+ }else{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+
+ if(i == unacked.end()){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }else if(i!=j){
+ ack_iterator end = ++i;
+ for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ std::for_each(consumers.begin(), consumers.end(),
+ boost::bind(&ConsumerImpl::requestDispatch, _1));
+ }
+}
+
+void Channel::recover(bool requeue){
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ }else{
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ }
+}
+
+bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ uint64_t myDeliveryTag = getNextSendRequestId();
+ msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
+ destination,
+ queue->getMessageCount() + 1, myDeliveryTag,
+ framesize);
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
+ uint64_t deliveryTag)
+{
+ msg->deliver(*this, consumerTag, deliveryTag, framesize);
+}
+
+void Channel::handleMethodInContext(
+ boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const MethodContext& context
+)
+{
+ try{
+ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
+ std::stringstream out;
+ out << "Attempt to use unopened channel: " << getId();
+ throw ConnectionException(504, out.str());
+ } else {
+ method->invoke(*adapter, context);
+ }
+ }catch(ChannelException& e){
+ adapter->getProxy().getChannel().close(
+ e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ connection.closeChannel(getId());
+ }catch(ConnectionException& e){
+ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ }catch(std::exception& e){
+ connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ }
+}
diff --git a/qpid/cpp/lib/broker/BrokerChannel.h b/qpid/cpp/lib/broker/BrokerChannel.h
new file mode 100644
index 0000000000..5085783685
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerChannel.h
@@ -0,0 +1,159 @@
+#ifndef _broker_BrokerChannel_h
+#define _broker_BrokerChannel_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <list>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/ptr_container/ptr_map.hpp>
+
+#include <AccumulatedAck.h>
+#include <Consumer.h>
+#include <DeliveryRecord.h>
+#include <MessageBuilder.h>
+#include <NameGenerator.h>
+#include <Prefetch.h>
+#include <TxBuffer.h>
+#include "framing/ChannelAdapter.h"
+#include "ChannelOpenBody.h"
+#include "CompletionHandler.h"
+
+namespace qpid {
+namespace broker {
+
+class ConnectionToken;
+class Connection;
+class Queue;
+class BrokerAdapter;
+
+using framing::string;
+
+/**
+ * Maintains state for an AMQP channel. Handles incoming and
+ * outgoing messages for that channel.
+ */
+class Channel : public framing::ChannelAdapter,
+ public CompletionHandler
+{
+ class ConsumerImpl : public Consumer
+ {
+ Channel* parent;
+ const string tag;
+ Queue::shared_ptr queue;
+ ConnectionToken* const connection;
+ const bool ackExpected;
+ bool blocked;
+
+ public:
+ ConsumerImpl(Channel* parent, const string& tag,
+ Queue::shared_ptr queue,
+ ConnectionToken* const connection, bool ack);
+ ~ConsumerImpl();
+ virtual bool deliver(Message::shared_ptr& msg);
+ void cancel();
+ void requestDispatch();
+ };
+
+ typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+
+ Connection& connection;
+ uint64_t currentDeliveryTag;
+ Queue::shared_ptr defaultQueue;
+ bool transactional;
+ ConsumerImplMap consumers;
+ uint32_t prefetchSize;
+ uint16_t prefetchCount;
+ Prefetch outstanding;
+ uint32_t framesize;
+ NameGenerator tagGenerator;
+ std::list<DeliveryRecord> unacked;
+ sys::Mutex deliveryLock;
+ TxBuffer txBuffer;
+ AccumulatedAck accumulatedAck;
+ MessageStore* const store;
+ MessageBuilder messageBuilder;//builder for in-progress message
+ bool opened;
+ boost::scoped_ptr<BrokerAdapter> adapter;
+
+ // completion handler for MessageBuilder
+ void complete(Message::shared_ptr msg);
+
+ void deliver(Message::shared_ptr& msg, const string& tag,
+ Queue::shared_ptr& queue, bool ackExpected);
+ bool checkPrefetch(Message::shared_ptr& msg);
+
+ public:
+ Channel(Connection& parent,
+ framing::ChannelId id,
+ uint32_t framesize,
+ MessageStore* const _store = 0,
+ uint64_t stagingThreshold = 0);
+
+ ~Channel();
+
+ bool isOpen() const { return opened; }
+ BrokerAdapter& getAdatper() { return *adapter; }
+
+ void open() { opened = true; }
+ void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
+ uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
+ uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
+
+ bool exists(const string& consumerTag);
+
+ /**
+ *@param tagInOut - if empty it is updated with the generated token.
+ */
+ void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+ bool exclusive, ConnectionToken* const connection = 0,
+ const framing::FieldTable* = 0);
+ void cancel(const string& tag);
+ bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
+ void begin();
+ void close();
+ void commit();
+ void rollback();
+ void ack();
+ void ack(uint64_t deliveryTag, bool multiple);
+ void ack(uint64_t deliveryTag, uint64_t endTag);
+ void recover(bool requeue);
+ void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);
+ void handlePublish(Message* msg);
+ void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
+ void handleContent(boost::shared_ptr<framing::AMQContentBody>);
+ void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+
+ void handleInlineTransfer(Message::shared_ptr msg);
+
+ // For ChannelAdapter
+ void handleMethodInContext(
+ boost::shared_ptr<framing::AMQMethodBody> method,
+ const framing::MethodContext& context);
+};
+
+}} // namespace broker
+
+
+#endif /*!_broker_BrokerChannel_h*/
diff --git a/qpid/cpp/lib/broker/BrokerExchange.h b/qpid/cpp/lib/broker/BrokerExchange.h
new file mode 100644
index 0000000000..6f4e9e6671
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerExchange.h
@@ -0,0 +1,51 @@
+#ifndef _broker_BrokerExchange_h
+#define _broker_BrokerExchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <Deliverable.h>
+#include <BrokerQueue.h>
+#include <FieldTable.h>
+
+namespace qpid {
+ namespace broker {
+ using std::string;
+
+ class Exchange{
+ const string name;
+ public:
+ typedef boost::shared_ptr<Exchange> shared_ptr;
+
+ explicit Exchange(const string& _name) : name(_name){}
+ virtual ~Exchange(){}
+ string getName() { return name; }
+ virtual string getType() = 0;
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ };
+ }
+}
+
+
+#endif /*!_broker_BrokerExchange_h*/
diff --git a/qpid/cpp/lib/broker/BrokerMessage.cpp b/qpid/cpp/lib/broker/BrokerMessage.cpp
new file mode 100644
index 0000000000..91ba3dfec0
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerMessage.cpp
@@ -0,0 +1,245 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/cast.hpp>
+
+#include <BrokerMessage.h>
+#include <iostream>
+
+#include <InMemoryContent.h>
+#include <LazyLoadedContent.h>
+#include <MessageStore.h>
+#include <BasicDeliverBody.h>
+#include <BasicGetOkBody.h>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include "AMQMethodBody.h"
+#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+BasicMessage::BasicMessage(
+ const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
+) :
+ Message(_publisher, _exchange, _routingKey, _mandatory,
+ _immediate, respondTo),
+ size(0)
+{}
+
+// FIXME aconway 2007-02-01: remove.
+// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) :
+// publisher(0), size(0)
+// {
+
+// decode(buffer, headersOnly, contentChunkSize);
+// }
+
+// For tests only.
+BasicMessage::BasicMessage() : size(0)
+{}
+
+BasicMessage::~BasicMessage(){}
+
+void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
+ this->header = _header;
+}
+
+void BasicMessage::addContent(AMQContentBody::shared_ptr data){
+ if (!content.get()) {
+ content = std::auto_ptr<Content>(new InMemoryContent());
+ }
+ content->add(data);
+ size += data->size();
+}
+
+bool BasicMessage::isComplete(){
+ return header.get() && (header->getContentSize() == contentSize());
+}
+
+void BasicMessage::deliver(ChannelAdapter& channel,
+ const string& consumerTag, uint64_t deliveryTag,
+ uint32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ channel.send(
+ new BasicDeliverBody(
+ channel.getVersion(), consumerTag, deliveryTag,
+ getRedelivered(), getExchange(), getRoutingKey()));
+ sendContent(channel, framesize);
+}
+
+void BasicMessage::sendGetOk(const MethodContext& context,
+ const std::string& /*destination*/,
+ uint32_t messageCount,
+ uint64_t deliveryTag,
+ uint32_t framesize)
+{
+ // CCT -- TODO - Update code generator to take pointer/ not
+ // instance to avoid extra contruction
+ context.channel->send(
+ new BasicGetOkBody(
+ context.channel->getVersion(),
+ context.methodBody->getRequestId(),
+ deliveryTag, getRedelivered(), getExchange(),
+ getRoutingKey(), messageCount));
+ sendContent(*context.channel, framesize);
+}
+
+void BasicMessage::sendContent(
+ ChannelAdapter& channel, uint32_t framesize)
+{
+ channel.send(header);
+ Mutex::ScopedLock locker(contentLock);
+ if (content.get())
+ content->send(channel, framesize);
+}
+
+BasicHeaderProperties* BasicMessage::getHeaderProperties(){
+ return boost::polymorphic_downcast<BasicHeaderProperties*>(
+ header->getProperties());
+}
+
+const FieldTable& BasicMessage::getApplicationHeaders(){
+ return getHeaderProperties()->getHeaders();
+}
+
+bool BasicMessage::isPersistent()
+{
+ if(!header) return false;
+ BasicHeaderProperties* props = getHeaderProperties();
+ return props && props->getDeliveryMode() == PERSISTENT;
+}
+
+void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize)
+{
+ decodeHeader(buffer);
+ if (!headersOnly) decodeContent(buffer, contentChunkSize);
+}
+
+void BasicMessage::decodeHeader(Buffer& buffer)
+{
+ string exchange;
+ string routingKey;
+
+ buffer.getShortString(exchange);
+ buffer.getShortString(routingKey);
+ setRouting(exchange, routingKey);
+
+ uint32_t headerSize = buffer.getLong();
+ AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
+ headerBody->decode(buffer, headerSize);
+ setHeader(headerBody);
+}
+
+void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
+{
+ uint64_t expected = expectedContentSize();
+ if (expected != buffer.available()) {
+ std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+ throw Exception("Cannot decode content, buffer not large enough.");
+ }
+
+ if (!chunkSize || chunkSize > expected) {
+ chunkSize = expected;
+ }
+
+ uint64_t total = 0;
+ while (total < expectedContentSize()) {
+ uint64_t remaining = expected - total;
+ AMQContentBody::shared_ptr contentBody(new AMQContentBody());
+ contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
+ addContent(contentBody);
+ total += chunkSize;
+ }
+}
+
+void BasicMessage::encode(Buffer& buffer)
+{
+ encodeHeader(buffer);
+ encodeContent(buffer);
+}
+
+void BasicMessage::encodeHeader(Buffer& buffer)
+{
+ buffer.putShortString(getExchange());
+ buffer.putShortString(getRoutingKey());
+ buffer.putLong(header->size());
+ header->encode(buffer);
+}
+
+void BasicMessage::encodeContent(Buffer& buffer)
+{
+ Mutex::ScopedLock locker(contentLock);
+ if (content.get()) content->encode(buffer);
+}
+
+uint32_t BasicMessage::encodedSize()
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t BasicMessage::encodedContentSize()
+{
+ Mutex::ScopedLock locker(contentLock);
+ return content.get() ? content->size() : 0;
+}
+
+uint32_t BasicMessage::encodedHeaderSize()
+{
+ return getExchange().size() + 1
+ + getRoutingKey().size() + 1
+ + header->size() + 4;//4 extra bytes for size
+}
+
+uint64_t BasicMessage::expectedContentSize()
+{
+ return header.get() ? header->getContentSize() : 0;
+}
+
+void BasicMessage::releaseContent(MessageStore* store)
+{
+ Mutex::ScopedLock locker(contentLock);
+ if (!isPersistent() && getPersistenceId() == 0) {
+ store->stage(this);
+ }
+ if (!content.get() || content->size() > 0) {
+ // FIXME aconway 2007-02-07: handle MessageMessage.
+ //set content to lazy loading mode (but only if there is stored content):
+
+ //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
+ // then set as a member of that message so its lifetime is guaranteed to be no longer than
+ // that of the message itself
+ content = std::auto_ptr<Content>(
+ new LazyLoadedContent(store, this, expectedContentSize()));
+ }
+}
+
+void BasicMessage::setContent(std::auto_ptr<Content>& _content)
+{
+ Mutex::ScopedLock locker(contentLock);
+ content = _content;
+}
diff --git a/qpid/cpp/lib/broker/BrokerMessage.h b/qpid/cpp/lib/broker/BrokerMessage.h
new file mode 100644
index 0000000000..fcb104edbb
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerMessage.h
@@ -0,0 +1,137 @@
+#ifndef _broker_BrokerMessage_h
+#define _broker_BrokerMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <boost/shared_ptr.hpp>
+
+#include <BrokerMessageBase.h>
+#include <BasicHeaderProperties.h>
+#include <ConnectionToken.h>
+#include <Content.h>
+#include <Mutex.h>
+#include <TxBuffer.h>
+
+namespace qpid {
+
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+class AMQHeaderBody;
+}
+
+namespace broker {
+
+class MessageStore;
+using framing::string;
+
+/**
+ * Represents an AMQP message, i.e. a header body, a list of
+ * content bodies and some details about the publication
+ * request.
+ */
+class BasicMessage : public Message {
+ boost::shared_ptr<framing::AMQHeaderBody> header;
+ std::auto_ptr<Content> content;
+ sys::Mutex contentLock;
+ uint64_t size;
+
+ void sendContent(framing::ChannelAdapter&, uint32_t framesize);
+
+ public:
+ typedef boost::shared_ptr<BasicMessage> shared_ptr;
+
+ BasicMessage(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate,
+ boost::shared_ptr<framing::AMQMethodBody> respondTo);
+ BasicMessage();
+ ~BasicMessage();
+ void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
+ void addContent(framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+
+ void deliver(framing::ChannelAdapter&,
+ const string& consumerTag,
+ uint64_t deliveryTag,
+ uint32_t framesize);
+
+ void sendGetOk(const framing::MethodContext&,
+ const std::string& destination,
+ uint32_t messageCount,
+ uint64_t deliveryTag,
+ uint32_t framesize);
+
+ framing::BasicHeaderProperties* getHeaderProperties();
+ const framing::FieldTable& getApplicationHeaders();
+ bool isPersistent();
+ uint64_t contentSize() const { return size; }
+
+ void decode(framing::Buffer& buffer, bool headersOnly = false,
+ uint32_t contentChunkSize = 0);
+ void decodeHeader(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
+
+ void encode(framing::Buffer& buffer);
+ void encodeHeader(framing::Buffer& buffer);
+ void encodeContent(framing::Buffer& buffer);
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ */
+ uint32_t encodedSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ */
+ uint32_t encodedHeaderSize();
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ uint32_t encodedContentSize();
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ void releaseContent(MessageStore* store);
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ uint64_t expectedContentSize();
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ void setContent(std::auto_ptr<Content>& content);
+};
+
+}
+}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/qpid/cpp/lib/broker/BrokerMessageBase.h b/qpid/cpp/lib/broker/BrokerMessageBase.h
new file mode 100644
index 0000000000..709369ae2f
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerMessageBase.h
@@ -0,0 +1,184 @@
+#ifndef _broker_BrokerMessageBase_h
+#define _broker_BrokerMessageBase_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "Content.h"
+#include "framing/amqp_types.h"
+
+namespace qpid {
+
+namespace framing {
+class MethodContext;
+class ChannelAdapter;
+class BasicHeaderProperties;
+class FieldTable;
+class AMQMethodBody;
+class AMQContentBody;
+class AMQHeaderBody;
+}
+
+
+namespace broker {
+class ConnectionToken;
+class MessageStore;
+
+/**
+ * Base class for all types of internal broker messages
+ * abstracting away the operations
+ * TODO; AMS: for the moment this is mostly a placeholder
+ */
+class Message {
+ public:
+ typedef boost::shared_ptr<Message> shared_ptr;
+ typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
+
+
+ Message(const ConnectionToken* publisher_,
+ const std::string& _exchange,
+ const std::string& _routingKey,
+ bool _mandatory, bool _immediate,
+ AMQMethodBodyPtr respondTo_) :
+ publisher(publisher_),
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ persistenceId(0),
+ redelivered(false),
+ respondTo(respondTo_)
+ {}
+
+ Message() :
+ mandatory(false),
+ immediate(false),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
+ virtual ~Message() {};
+
+ // Accessors
+ const std::string& getRoutingKey() const { return routingKey; }
+ const std::string& getExchange() const { return exchange; }
+ uint64_t getPersistenceId() const { return persistenceId; }
+ bool getRedelivered() const { return redelivered; }
+ AMQMethodBodyPtr getRespondTo() const { return respondTo; }
+
+ void setRouting(const std::string& _exchange, const std::string& _routingKey)
+ { exchange = _exchange; routingKey = _routingKey; }
+ void setPersistenceId(uint64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
+ void redeliver() { redelivered = true; }
+
+ /**
+ * Used to deliver the message from the queue
+ */
+ virtual void deliver(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint64_t deliveryTag,
+ uint32_t framesize) = 0;
+ /**
+ * Used to return a message in response to a get from a queue
+ */
+ virtual void sendGetOk(const framing::MethodContext& context,
+ const std::string& destination,
+ uint32_t messageCount,
+ uint64_t deliveryTag,
+ uint32_t framesize) = 0;
+
+ virtual bool isComplete() = 0;
+
+ virtual uint64_t contentSize() const = 0;
+ // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties
+ // at this level. Expose only generic properties available from both
+ // message types (e.g. getApplicationHeaders below).
+ //
+ virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual const framing::FieldTable& getApplicationHeaders() = 0;
+ virtual bool isPersistent() = 0;
+ virtual const ConnectionToken* getPublisher() const {
+ return publisher;
+ }
+
+ virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+ virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
+
+ /**
+ * @returns the size of the buffer needed to encode this
+ * message in its entirety
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual uint32_t encodedSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * 'header' of this message (not just the header frame,
+ * but other meta data e.g.routing key and exchange)
+ *
+ * XXXX: Only used in tests?
+ */
+ virtual uint32_t encodedHeaderSize() = 0;
+ /**
+ * @returns the size of the buffer needed to encode the
+ * (possibly partial) content held by this message
+ */
+ virtual uint32_t encodedContentSize() = 0;
+ /**
+ * If headers have been received, returns the expected
+ * content size else returns 0.
+ */
+ virtual uint64_t expectedContentSize() = 0;
+
+ // TODO: AMS 29/1/2007 Don't think these are really part of base class
+
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
+ virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
+ virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
+ /**
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
+ */
+ virtual void releaseContent(MessageStore* /*store*/) {};
+
+ private:
+ const ConnectionToken* publisher;
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ uint64_t persistenceId;
+ bool redelivered;
+ AMQMethodBodyPtr respondTo;
+};
+
+}}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
new file mode 100644
index 0000000000..3449078d70
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QpidError.h"
+#include "BrokerMessageMessage.h"
+#include "ChannelAdapter.h"
+#include "MessageTransferBody.h"
+#include "MessageOpenBody.h"
+#include "MessageCloseBody.h"
+#include "MessageAppendBody.h"
+#include "Reference.h"
+#include "framing/FieldTable.h"
+#include "framing/BasicHeaderProperties.h"
+
+#include <algorithm>
+
+using namespace std;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace broker {
+
+MessageMessage::MessageMessage(
+ ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
+) : Message(publisher, transfer_->getDestination(),
+ transfer_->getRoutingKey(),
+ transfer_->getMandatory(),
+ transfer_->getImmediate(),
+ transfer_),
+ requestId(requestId_),
+ transfer(transfer_)
+{}
+
+MessageMessage::MessageMessage(
+ ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
+ ReferencePtr reference_
+) : Message(publisher, transfer_->getDestination(),
+ transfer_->getRoutingKey(),
+ transfer_->getMandatory(),
+ transfer_->getImmediate(),
+ transfer_),
+ requestId(requestId_),
+ transfer(transfer_),
+ reference(reference_)
+{}
+
+// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
+void MessageMessage::transferMessage(
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint32_t framesize)
+{
+ const framing::Content& body = transfer->getBody();
+
+ // Send any reference data
+ if (!body.isInline()){
+ // Open
+ channel.send(new MessageOpenBody(channel.getVersion(), reference->getId()));
+ // Appends
+ for(Reference::Appends::const_iterator a = reference->getAppends().begin();
+ a != reference->getAppends().end();
+ ++a) {
+ uint32_t sizeleft = (*a)->size();
+ const string& content = (*a)->getBytes();
+ // Calculate overhead bytes
+ // Assume that the overhead is constant as the reference name doesn't change
+ uint32_t overhead = sizeleft - content.size();
+ string::size_type contentStart = 0;
+ while (sizeleft) {
+ string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
+ channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(),
+ string(content, contentStart, contentSize)));
+ sizeleft -= contentSize;
+ contentStart += contentSize;
+ }
+ }
+ }
+
+ // The transfer
+ if ( transfer->size()<=framesize ) {
+ channel.send(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ body,
+ transfer->getMandatory()));
+ } else {
+ // Thing to do here is to construct a simple reference message then deliver that instead
+ // fragmentation will be taken care of in the delivery if necessary;
+ string content = body.getValue();
+ string refname = "dummy";
+ TransferPtr newTransfer(
+ new MessageTransferBody(channel.getVersion(),
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ framing::Content(REFERENCE, refname),
+ transfer->getMandatory()));
+ ReferencePtr newRef(new Reference(refname));
+ Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
+ newRef->append(newAppend);
+ MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
+ newMsg.transferMessage(channel, consumerTag, framesize);
+ return;
+ }
+ // Close any reference data
+ if (!body.isInline()){
+ // Close
+ channel.send(new MessageCloseBody(channel.getVersion(), reference->getId()));
+ }
+}
+
+void MessageMessage::deliver(
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint64_t /*deliveryTag*/,
+ uint32_t framesize)
+{
+ transferMessage(channel, consumerTag, framesize);
+}
+
+void MessageMessage::sendGetOk(
+ const framing::MethodContext& context,
+ const std::string& destination,
+ uint32_t /*messageCount*/,
+ uint64_t /*deliveryTag*/,
+ uint32_t framesize)
+{
+ framing::ChannelAdapter* channel = context.channel;
+ transferMessage(*channel, destination, framesize);
+}
+
+bool MessageMessage::isComplete()
+{
+ return true;
+}
+
+uint64_t MessageMessage::contentSize() const
+{
+ if (transfer->getBody().isInline())
+ return transfer->getBody().getValue().size();
+ else
+ return reference->getSize();
+}
+
+qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
+{
+ return 0; // FIXME aconway 2007-02-05:
+}
+
+const FieldTable& MessageMessage::getApplicationHeaders()
+{
+ return transfer->getApplicationHeaders();
+}
+bool MessageMessage::isPersistent()
+{
+ return transfer->getDeliveryMode() == PERSISTENT;
+}
+
+uint32_t MessageMessage::encodedSize()
+{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+ return 0; // FIXME aconway 2007-02-05:
+}
+
+uint32_t MessageMessage::encodedHeaderSize()
+{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+ return 0; // FIXME aconway 2007-02-05:
+}
+
+uint32_t MessageMessage::encodedContentSize()
+{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+ return 0; // FIXME aconway 2007-02-05:
+}
+
+uint64_t MessageMessage::expectedContentSize()
+{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+ return 0; // FIXME aconway 2007-02-05:
+}
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.h b/qpid/cpp/lib/broker/BrokerMessageMessage.h
new file mode 100644
index 0000000000..8a2ff3a063
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerMessageMessage.h
@@ -0,0 +1,91 @@
+#ifndef _broker_BrokerMessageMessage_h
+#define _broker_BrokerMessageMessage_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerMessageBase.h"
+#include "MessageTransferBody.h"
+#include "amqp_types.h"
+
+#include <vector>
+
+namespace qpid {
+
+namespace framing {
+class MessageTransferBody;
+}
+
+namespace broker {
+class ConnectionToken;
+class Reference;
+
+class MessageMessage: public Message{
+ public:
+ typedef boost::shared_ptr<MessageMessage> shared_ptr;
+ typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
+ typedef boost::shared_ptr<Reference> ReferencePtr;
+
+ MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
+ MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
+
+ // Default destructor okay
+
+ framing::RequestId getRequestId() {return requestId; }
+ TransferPtr getTransfer() { return transfer; }
+ ReferencePtr getReference() { return reference; }
+
+ void deliver(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint64_t deliveryTag,
+ uint32_t framesize);
+
+ void sendGetOk(const framing::MethodContext& context,
+ const std::string& destination,
+ uint32_t messageCount,
+ uint64_t deliveryTag,
+ uint32_t framesize);
+
+ bool isComplete();
+
+ uint64_t contentSize() const;
+ framing::BasicHeaderProperties* getHeaderProperties();
+ const framing::FieldTable& getApplicationHeaders();
+ bool isPersistent();
+
+ uint32_t encodedSize();
+ uint32_t encodedHeaderSize();
+ uint32_t encodedContentSize();
+ uint64_t expectedContentSize();
+
+ private:
+ void transferMessage(framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
+ uint32_t framesize);
+
+ framing::RequestId requestId;
+ const TransferPtr transfer;
+ const ReferencePtr reference;
+};
+
+}}
+
+
+#endif /*!_broker_BrokerMessage_h*/
diff --git a/qpid/cpp/lib/broker/BrokerQueue.cpp b/qpid/cpp/lib/broker/BrokerQueue.cpp
new file mode 100644
index 0000000000..31309bd6c5
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerQueue.cpp
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/format.hpp>
+
+#include <BrokerQueue.h>
+#include <MessageStore.h>
+#include <sys/Monitor.h>
+#include <sys/Time.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using boost::format;
+
+Queue::Queue(const string& _name, uint32_t _autodelete,
+ MessageStore* const _store,
+ const ConnectionToken* const _owner) :
+
+ name(_name),
+ autodelete(_autodelete),
+ store(_store),
+ owner(_owner),
+ queueing(false),
+ dispatching(false),
+ next(0),
+ lastUsed(0),
+ exclusive(0),
+ persistenceId(0)
+{
+ if(autodelete) lastUsed = now()/TIME_MSEC;
+}
+
+Queue::~Queue(){
+ for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
+ b->cancel();
+ bindings.pop();
+ }
+}
+
+void Queue::bound(Binding* b){
+ bindings.push(b);
+}
+
+void Queue::deliver(Message::shared_ptr& msg){
+ enqueue(0, msg, 0);
+ process(msg);
+}
+
+void Queue::recover(Message::shared_ptr& msg){
+ push(msg);
+ if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
+ //content has not been loaded, need to ensure that lazy loading mode is set:
+ //TODO: find a nicer way to do this
+ msg->releaseContent(store);
+ }
+}
+
+void Queue::process(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(lock);
+ if(queueing || !dispatch(msg)){
+ push(msg);
+ }
+}
+
+bool Queue::dispatch(Message::shared_ptr& msg){
+ if(consumers.empty()){
+ return false;
+ }else if(exclusive){
+ if(!exclusive->deliver(msg)){
+ std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+ }
+ return true;
+ }else{
+ //deliver to next consumer
+ next = next % consumers.size();
+ Consumer* c = consumers[next];
+ int start = next;
+ while(c){
+ next++;
+ if(c->deliver(msg)) return true;
+
+ next = next % consumers.size();
+ c = next == start ? 0 : consumers[next];
+ }
+ return false;
+ }
+}
+
+bool Queue::startDispatching(){
+ Mutex::ScopedLock locker(lock);
+ if(queueing && !dispatching){
+ dispatching = true;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Queue::dispatch(){
+ bool proceed = startDispatching();
+ while(proceed){
+ Mutex::ScopedLock locker(lock);
+ if(!messages.empty() && dispatch(messages.front())){
+ pop();
+ }else{
+ dispatching = false;
+ proceed = false;
+ queueing = !messages.empty();
+ }
+ }
+}
+
+void Queue::consume(Consumer* c, bool requestExclusive){
+ Mutex::ScopedLock locker(lock);
+ if(exclusive)
+ throw ChannelException(
+ 403, format("Queue '%s' has an exclusive consumer."
+ " No more consumers allowed.") % getName());
+ if(requestExclusive) {
+ if(!consumers.empty())
+ throw ChannelException(
+ 403, format("Queue '%s' already has conumers."
+ "Exclusive access denied.") %getName());
+ exclusive = c;
+ }
+ if(autodelete && consumers.empty()) lastUsed = 0;
+ consumers.push_back(c);
+}
+
+void Queue::cancel(Consumer* c){
+ Mutex::ScopedLock locker(lock);
+ Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
+ if (i != consumers.end())
+ consumers.erase(i);
+ if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
+ if(exclusive == c) exclusive = 0;
+}
+
+Message::shared_ptr Queue::dequeue(){
+ Mutex::ScopedLock locker(lock);
+ Message::shared_ptr msg;
+ if(!messages.empty()){
+ msg = messages.front();
+ pop();
+ }
+ return msg;
+}
+
+uint32_t Queue::purge(){
+ Mutex::ScopedLock locker(lock);
+ int count = messages.size();
+ while(!messages.empty()) pop();
+ return count;
+}
+
+void Queue::pop(){
+ if (policy.get()) policy->dequeued(messages.front()->contentSize());
+ messages.pop();
+}
+
+void Queue::push(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+ if (policy.get()) {
+ policy->enqueued(msg->contentSize());
+ if (policy->limitExceeded()) {
+ msg->releaseContent(store);
+ }
+ }
+}
+
+uint32_t Queue::getMessageCount() const{
+ Mutex::ScopedLock locker(lock);
+ return messages.size();
+}
+
+uint32_t Queue::getConsumerCount() const{
+ Mutex::ScopedLock locker(lock);
+ return consumers.size();
+}
+
+bool Queue::canAutoDelete() const{
+ Mutex::ScopedLock locker(lock);
+ return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
+}
+
+void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+{
+ if (msg->isPersistent() && store) {
+ store->enqueue(ctxt, msg.get(), *this, xid);
+ }
+}
+
+void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
+{
+ if (msg->isPersistent() && store) {
+ store->dequeue(ctxt, msg.get(), *this, xid);
+ }
+}
+
+namespace
+{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+}
+
+void Queue::create(const FieldTable& settings)
+{
+ if (store) {
+ store->create(*this, settings);
+ }
+ configure(settings);
+}
+
+void Queue::configure(const FieldTable& settings)
+{
+ QueuePolicy* _policy = new QueuePolicy(settings);
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
+ setPolicy(std::auto_ptr<QueuePolicy>(_policy));
+ }
+}
+
+void Queue::destroy()
+{
+ if (store) {
+ store->destroy(*this);
+ }
+}
+
+void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
+{
+ policy = _policy;
+}
+
+const QueuePolicy* const Queue::getPolicy()
+{
+ return policy.get();
+}
diff --git a/qpid/cpp/lib/broker/BrokerQueue.h b/qpid/cpp/lib/broker/BrokerQueue.h
new file mode 100644
index 0000000000..12f5815027
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerQueue.h
@@ -0,0 +1,151 @@
+#ifndef _broker_BrokerQueue_h
+#define _broker_BrokerQueue_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include <memory>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <amqp_types.h>
+#include <Binding.h>
+#include <ConnectionToken.h>
+#include <Consumer.h>
+#include <BrokerMessage.h>
+#include <FieldTable.h>
+#include <sys/Monitor.h>
+#include <QueuePolicy.h>
+
+// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
+// enforce ownership of Consumers.
+
+namespace qpid {
+ namespace broker {
+ class MessageStore;
+
+ /**
+ * Thrown when exclusive access would be violated.
+ */
+ using std::string;
+
+ /**
+ * The brokers representation of an amqp queue. Messages are
+ * delivered to a queue from where they can be dispatched to
+ * registered consumers or be stored until dequeued or until one
+ * or more consumers registers.
+ */
+ class Queue{
+ typedef std::vector<Consumer*> Consumers;
+ typedef std::queue<Binding*> Bindings;
+ typedef std::queue<Message::shared_ptr> Messages;
+
+ const string name;
+ const uint32_t autodelete;
+ MessageStore* const store;
+ const ConnectionToken* const owner;
+ Consumers consumers;
+ Bindings bindings;
+ Messages messages;
+ bool queueing;
+ bool dispatching;
+ int next;
+ mutable qpid::sys::Mutex lock;
+ int64_t lastUsed;
+ Consumer* exclusive;
+ mutable uint64_t persistenceId;
+ std::auto_ptr<QueuePolicy> policy;
+
+ void pop();
+ void push(Message::shared_ptr& msg);
+ bool startDispatching();
+ bool dispatch(Message::shared_ptr& msg);
+ void setPolicy(std::auto_ptr<QueuePolicy> policy);
+
+ public:
+
+ typedef boost::shared_ptr<Queue> shared_ptr;
+
+ typedef std::vector<shared_ptr> vector;
+
+ Queue(const string& name, uint32_t autodelete = 0,
+ MessageStore* const store = 0,
+ const ConnectionToken* const owner = 0);
+ ~Queue();
+
+ void create(const qpid::framing::FieldTable& settings);
+ void configure(const qpid::framing::FieldTable& settings);
+ void destroy();
+ /**
+ * Informs the queue of a binding that should be cancelled on
+ * destruction of the queue.
+ */
+ void bound(Binding* b);
+ /**
+ * Delivers a message to the queue. Will record it as
+ * enqueued if persistent then process it.
+ */
+ void deliver(Message::shared_ptr& msg);
+ /**
+ * Dispatches the messages immediately to a consumer if
+ * one is available or stores it for later if not.
+ */
+ void process(Message::shared_ptr& msg);
+ /**
+ * Used during recovery to add stored messages back to the queue
+ */
+ void recover(Message::shared_ptr& msg);
+ /**
+ * Dispatch any queued messages providing there are
+ * consumers for them. Only one thread can be dispatching
+ * at any time, but this method (rather than the caller)
+ * is responsible for ensuring that.
+ */
+ void dispatch();
+ void consume(Consumer* c, bool exclusive = false);
+ void cancel(Consumer* c);
+ uint32_t purge();
+ uint32_t getMessageCount() const;
+ uint32_t getConsumerCount() const;
+ inline const string& getName() const { return name; }
+ inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline bool hasExclusiveConsumer() const { return exclusive; }
+ inline uint64_t getPersistenceId() const { return persistenceId; }
+ inline void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
+
+ bool canAutoDelete() const;
+
+ void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ /**
+ * dequeue from store (only done once messages is acknowledged)
+ */
+ void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid);
+ /**
+ * dequeues from memory only
+ */
+ Message::shared_ptr dequeue();
+
+ const QueuePolicy* const getPolicy();
+ };
+ }
+}
+
+
+#endif /*!_broker_BrokerQueue_h*/
diff --git a/qpid/cpp/lib/broker/BrokerSingleton.cpp b/qpid/cpp/lib/broker/BrokerSingleton.cpp
new file mode 100644
index 0000000000..4571764850
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerSingleton.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "BrokerSingleton.h"
+
+namespace qpid {
+namespace broker {
+
+BrokerSingleton::BrokerSingleton() {
+ if (broker.get() == 0)
+ broker = Broker::create();
+ Broker::shared_ptr::operator=(broker);
+}
+
+BrokerSingleton::~BrokerSingleton() {
+ broker->shutdown();
+}
+
+Broker::shared_ptr BrokerSingleton::broker;
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/BrokerSingleton.h b/qpid/cpp/lib/broker/BrokerSingleton.h
new file mode 100644
index 0000000000..139e02a5fd
--- /dev/null
+++ b/qpid/cpp/lib/broker/BrokerSingleton.h
@@ -0,0 +1,52 @@
+#ifndef _broker_BrokerSingleton_h
+#define _broker_BrokerSingleton_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * BrokerSingleton is a smart pointer to a process-wide singleton broker
+ * started on an os-chosen port. The broker starts the first time
+ * an instance of BrokerSingleton is created and runs untill the process exits.
+ *
+ * Useful for unit tests that want to share a broker between multiple
+ * tests to reduce overhead of starting/stopping a broker for every test.
+ *
+ * Tests that need a new broker can call Broker::create directly.
+ *
+ * THREAD UNSAFE.
+ */
+class BrokerSingleton : public Broker::shared_ptr
+{
+ public:
+ BrokerSingleton();
+ ~BrokerSingleton();
+ private:
+ static Broker::shared_ptr broker;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_BrokerSingleton_h*/
diff --git a/qpid/cpp/lib/broker/CompletionHandler.h b/qpid/cpp/lib/broker/CompletionHandler.h
new file mode 100644
index 0000000000..9d51656282
--- /dev/null
+++ b/qpid/cpp/lib/broker/CompletionHandler.h
@@ -0,0 +1,39 @@
+#ifndef _broker_CompletionHandler_h
+#define _broker_CompletionHandler_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Callback interface to handle completion of a message.
+ */
+class CompletionHandler
+{
+ public:
+ virtual ~CompletionHandler(){}
+ virtual void complete(Message::shared_ptr) = 0;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_CompletionHandler_h*/
diff --git a/qpid/cpp/lib/broker/Configuration.cpp b/qpid/cpp/lib/broker/Configuration.cpp
new file mode 100644
index 0000000000..e83c359f2d
--- /dev/null
+++ b/qpid/cpp/lib/broker/Configuration.cpp
@@ -0,0 +1,252 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Configuration.h>
+#include <string.h>
+#include <config.h>
+
+using namespace qpid::broker;
+using namespace std;
+
+Configuration::Configuration() :
+ daemon('d', "daemon", "Run as system daemon, detached from terminal.", false),
+ trace('t', "trace", "Print incoming & outgoing frames to the console", false),
+ port('p', "port", "Set the port to listen on (default=5672)", 5672),
+ workerThreads("worker-threads", "Set the number of worker threads to use (default=5).", 5),
+ maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500),
+ connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10),
+ store('s', "store", "Set the message store module to use (default='' which implies no store)", ""),
+ stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000),
+ help("help", "Print usage information", false),
+ version("version", "Print version information", false)
+{
+ options.push_back(&daemon);
+ options.push_back(&trace);
+ options.push_back(&port);
+ options.push_back(&workerThreads);
+ options.push_back(&maxConnections);
+ options.push_back(&connectionBacklog);
+ options.push_back(&store);
+ options.push_back(&stagingThreshold);
+ options.push_back(&help);
+ options.push_back(&version);
+}
+
+Configuration::~Configuration(){}
+
+void Configuration::parse(char const *progName, int argc, char** argv){
+ programName = progName;
+ int position = 1;
+ while(position < argc){
+ bool matched(false);
+ for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
+ matched = (*i)->parse(position, argv, argc);
+ }
+ if(!matched) {
+ throw BadOptionException(
+ std::string("Unrecognised option: ")+argv[position]);
+ }
+ }
+}
+
+void Configuration::usage(){
+ std::cout << "Usage: " << programName << " [OPTION]..." << std::endl
+ << "Start the Qpid AMQP broker daemon." << std::endl << std::endl
+ << "Options:" << std::endl;
+ for(op_iterator i = options.begin(); i < options.end(); i++){
+ (*i)->print(std::cout);
+ }
+
+ std::cout << std::endl << "Report bugs to <" << PACKAGE_BUGREPORT << ">."
+ << std::endl;
+}
+
+bool Configuration::isHelp() const {
+ return help.getValue();
+}
+
+bool Configuration::isVersion() const {
+ return version.getValue();
+}
+
+bool Configuration::isDaemon() const {
+ return daemon.getValue();
+}
+
+bool Configuration::isTrace() const {
+ return trace.getValue();
+}
+
+int Configuration::getPort() const {
+ return port.getValue();
+}
+
+int Configuration::getWorkerThreads() const {
+ return workerThreads.getValue();
+}
+
+int Configuration::getMaxConnections() const {
+ return maxConnections.getValue();
+}
+
+int Configuration::getConnectionBacklog() const {
+ return connectionBacklog.getValue();
+}
+
+const std::string& Configuration::getStore() const {
+ return store.getValue();
+}
+
+long Configuration::getStagingThreshold() const {
+ return stagingThreshold.getValue();
+}
+
+
+Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
+ flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
+
+Configuration::Option::Option(const string& _name, const string& _desc) :
+ flag(""), name("--" + _name), desc(_desc) {}
+
+Configuration::Option::~Option(){}
+
+bool Configuration::Option::match(const string& arg){
+ return flag == arg || name == arg;
+}
+
+bool Configuration::Option::parse(int& i, char** argv, int argc){
+ const string arg(argv[i]);
+ if(match(arg)){
+ if(needsValue()){
+ if(++i < argc) setValue(argv[i]);
+ else throw ParseException("Argument " + arg + " requires a value!");
+ }else{
+ setValue("");
+ }
+ i++;
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Configuration::Option::print(ostream& out) const {
+ out << " ";
+ if(flag.length() > 0){
+ out << flag << ", ";
+ } else {
+ out << " ";
+ }
+ out << name;
+ if(needsValue()) out << " <value>";
+ out << std::endl;
+ out << " " << desc << std::endl;
+}
+
+
+// String Option:
+
+Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::~StringOption(){}
+
+const string& Configuration::StringOption::getValue() const {
+ return value;
+}
+
+bool Configuration::StringOption::needsValue() const {
+ return true;
+}
+
+void Configuration::StringOption::setValue(const std::string& _value){
+ value = _value;
+}
+
+// Int Option:
+
+Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::~IntOption(){}
+
+int Configuration::IntOption::getValue() const {
+ return value;
+}
+
+bool Configuration::IntOption::needsValue() const {
+ return true;
+}
+
+void Configuration::IntOption::setValue(const std::string& _value){
+ value = atoi(_value.c_str());
+}
+
+// Long Option:
+
+Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::LongOption::~LongOption(){}
+
+long Configuration::LongOption::getValue() const {
+ return value;
+}
+
+bool Configuration::LongOption::needsValue() const {
+ return true;
+}
+
+void Configuration::LongOption::setValue(const std::string& _value){
+ value = atol(_value.c_str());
+}
+
+// Bool Option:
+
+Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
+ Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
+ Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::~BoolOption(){}
+
+bool Configuration::BoolOption::getValue() const {
+ return value;
+}
+
+bool Configuration::BoolOption::needsValue() const {
+ return false;
+}
+
+void Configuration::BoolOption::setValue(const std::string& /*not required*/){
+ //BoolOptions have no value. The fact that the option is specified
+ //implies the value is true.
+ value = true;
+}
diff --git a/qpid/cpp/lib/broker/Configuration.h b/qpid/cpp/lib/broker/Configuration.h
new file mode 100644
index 0000000000..27c743c8f0
--- /dev/null
+++ b/qpid/cpp/lib/broker/Configuration.h
@@ -0,0 +1,171 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Configuration_
+#define _Configuration_
+
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include <Exception.h>
+
+namespace qpid {
+namespace broker {
+class Configuration{
+
+ class Option {
+ const std::string flag;
+ const std::string name;
+ const std::string desc;
+
+ bool match(const std::string& arg);
+
+ protected:
+ virtual bool needsValue() const = 0;
+ virtual void setValue(const std::string& value) = 0;
+
+ public:
+ Option(const char flag, const std::string& name, const std::string& desc);
+ Option(const std::string& name, const std::string& desc);
+ virtual ~Option();
+
+ bool parse(int& i, char** argv, int argc);
+ void print(std::ostream& out) const;
+ };
+
+ class IntOption : public Option{
+ const int defaultValue;
+ int value;
+ public:
+ IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
+ IntOption(const std::string& name, const std::string& desc, const int value = 0);
+ virtual ~IntOption();
+
+ int getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ virtual void setValue(int _value) { value = _value; }
+ };
+
+ class LongOption : public Option{
+ const long defaultValue;
+ int value;
+ public:
+ LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0);
+ LongOption(const std::string& name, const std::string& desc, const long value = 0);
+ virtual ~LongOption();
+
+ long getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ virtual void setValue(int _value) { value = _value; }
+ };
+
+ class StringOption : public Option{
+ const std::string defaultValue;
+ std::string value;
+ public:
+ StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
+ StringOption(const std::string& name, const std::string& desc, const std::string value = "");
+ virtual ~StringOption();
+
+ const std::string& getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ class BoolOption : public Option{
+ const bool defaultValue;
+ bool value;
+ public:
+ BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
+ BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
+ virtual ~BoolOption();
+
+ bool getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ virtual void setValue(bool _value) { value = _value; }
+ };
+
+ BoolOption daemon;
+ BoolOption trace;
+ IntOption port;
+ IntOption workerThreads;
+ IntOption maxConnections;
+ IntOption connectionBacklog;
+ StringOption store;
+ LongOption stagingThreshold;
+ BoolOption help;
+ BoolOption version;
+ char const *programName;
+
+ typedef std::vector<Option*>::iterator op_iterator;
+ std::vector<Option*> options;
+
+ public:
+
+ struct BadOptionException : public Exception {
+ template<class T>
+ BadOptionException(const T& msg) : Exception(msg) {}
+ };
+
+
+ class ParseException : public Exception {
+ public:
+ template <class T>
+ ParseException(const T& msg) : Exception(msg) {}
+ };
+
+
+ Configuration();
+ ~Configuration();
+
+ void parse(char const*, int argc, char** argv);
+
+ bool isHelp() const;
+ bool isVersion() const;
+ bool isDaemon() const;
+ bool isTrace() const;
+ int getPort() const;
+ int getWorkerThreads() const;
+ int getMaxConnections() const;
+ int getConnectionBacklog() const;
+ const std::string& getStore() const;
+ long getStagingThreshold() const;
+
+ void setHelp(bool b) { help.setValue(b); }
+ void setVersion(bool b) { version.setValue(b); }
+ void setDaemon(bool b) { daemon.setValue(b); }
+ void setTrace(bool b) { trace.setValue(b); }
+ void setPort(int i) { port.setValue(i); }
+ void setWorkerThreads(int i) { workerThreads.setValue(i); }
+ void setMaxConnections(int i) { maxConnections.setValue(i); }
+ void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
+ void setStore(const std::string& s) { store.setValue(s); }
+ void setStagingThreshold(long l) { stagingThreshold.setValue(l); }
+
+ void usage();
+};
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Connection.cpp b/qpid/cpp/lib/broker/Connection.cpp
new file mode 100644
index 0000000000..ae0114cba9
--- /dev/null
+++ b/qpid/cpp/lib/broker/Connection.cpp
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <assert.h>
+
+#include "Connection.h"
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+#include "BrokerAdapter.h"
+
+using namespace boost;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+namespace qpid {
+namespace broker {
+
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
+ broker(broker_),
+ out(out_),
+ framemax(65536),
+ heartbeat(0),
+ client(0),
+ timeout(broker.getTimeout()),
+ stagingThreshold(broker.getStagingThreshold())
+{}
+
+
+Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){
+ Queue::shared_ptr queue;
+ if (name.empty()) {
+ queue = getChannel(channel).getDefaultQueue();
+ if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+ } else {
+ queue = broker.getQueues().find(name);
+ if (queue == 0) {
+ throw ChannelException( 404, "Queue not found: " + name);
+ }
+ }
+ return queue;
+}
+
+
+Exchange::shared_ptr Connection::findExchange(const string& name){
+ return broker.getExchanges().get(name);
+}
+
+
+void Connection::received(framing::AMQFrame* frame){
+ getChannel(frame->getChannel()).handleBody(frame->getBody());
+}
+
+void Connection::close(
+ ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+{
+ client->close(code, text, classId, methodId);
+ getOutput().close();
+}
+
+void Connection::initiated(framing::ProtocolInitiation* header) {
+ version = ProtocolVersion(header->getMajor(), header->getMinor());
+ FieldTable properties;
+ string mechanisms("PLAIN");
+ string locales("en_US");
+ getChannel(0).init(0, *out, getVersion());
+ client = &getChannel(0).getAdatper().getProxy().getConnection();
+ client->start(
+ header->getMajor(), header->getMinor(),
+ properties, mechanisms, locales);
+}
+
+void Connection::idleOut(){}
+
+void Connection::idleIn(){}
+
+void Connection::closed(){
+ try {
+ while (!exclusiveQueues.empty()) {
+ broker.getQueues().destroy(exclusiveQueues.front()->getName());
+ exclusiveQueues.erase(exclusiveQueues.begin());
+ }
+ } catch(std::exception& e) {
+ std::cout << "Caught unhandled exception while closing session: " <<
+ e.what() << std::endl;
+ assert(0);
+ }
+}
+
+void Connection::closeChannel(uint16_t id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i != channels.end())
+ i->close();
+}
+
+
+Channel& Connection::getChannel(ChannelId id) {
+ ChannelMap::iterator i = channels.find(id);
+ if (i == channels.end()) {
+ i = channels.insert(
+ id, new Channel(
+ *this, id, framemax, broker.getQueues().getStore(),
+ broker.getStagingThreshold())).first;
+ }
+ return *i;
+}
+
+
+}}
+
diff --git a/qpid/cpp/lib/broker/Connection.h b/qpid/cpp/lib/broker/Connection.h
new file mode 100644
index 0000000000..1314ccbd97
--- /dev/null
+++ b/qpid/cpp/lib/broker/Connection.h
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Connection_
+#define _Connection_
+
+#include <sstream>
+#include <vector>
+
+#include <boost/ptr_container/ptr_map.hpp>
+
+#include <AMQFrame.h>
+#include <AMQP_ServerOperations.h>
+#include <AMQP_ClientProxy.h>
+#include <sys/ConnectionOutputHandler.h>
+#include <sys/ConnectionInputHandler.h>
+#include <sys/TimeoutHandler.h>
+#include "framing/ProtocolVersion.h"
+#include "Broker.h"
+#include "Exception.h"
+#include "BrokerChannel.h"
+
+namespace qpid {
+namespace broker {
+
+class Channel;
+
+class Connection : public sys::ConnectionInputHandler,
+ public ConnectionToken
+{
+ public:
+ Connection(sys::ConnectionOutputHandler* out, Broker& broker);
+
+ /** Get a channel. Create if it does not already exist */
+ Channel& getChannel(framing::ChannelId channel);
+
+ /** Close a channel */
+ void closeChannel(framing::ChannelId channel);
+
+ /** Close the connection */
+ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+
+ sys::ConnectionOutputHandler& getOutput() const { return *out; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ uint32_t getFrameMax() const { return framemax; }
+ uint16_t getHeartbeat() const { return heartbeat; }
+ uint32_t getTimeout() const { return timeout; }
+ uint64_t getStagingThreshold() const { return stagingThreshold; }
+
+ void setFrameMax(uint32_t fm) { framemax = fm; }
+ void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if name="" and channel has no default.
+ */
+ Queue::shared_ptr getQueue(const string& name, uint16_t channel);
+
+ Broker& broker;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame* frame);
+ void initiated(framing::ProtocolInitiation* header);
+ void idleOut();
+ void idleIn();
+ void closed();
+
+ private:
+ typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
+
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ Exchange::shared_ptr findExchange(const string& name);
+
+ framing::ProtocolVersion version;
+ ChannelMap channels;
+ sys::ConnectionOutputHandler* out;
+ uint32_t framemax;
+ uint16_t heartbeat;
+ framing::AMQP_ClientProxy::Connection* client;
+ const uint32_t timeout; //timeout for auto-deleted queues (in ms)
+ const uint64_t stagingThreshold;
+
+};
+
+}}
+
+#endif
diff --git a/qpid/cpp/lib/broker/ConnectionFactory.cpp b/qpid/cpp/lib/broker/ConnectionFactory.cpp
new file mode 100644
index 0000000000..20485dd0e1
--- /dev/null
+++ b/qpid/cpp/lib/broker/ConnectionFactory.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ConnectionFactory.h>
+#include <Connection.h>
+
+namespace qpid {
+namespace broker {
+
+
+ConnectionFactory::ConnectionFactory(Broker& b) : broker(b)
+{}
+
+
+ConnectionFactory::~ConnectionFactory()
+{
+ broker.getCleaner().stop();
+}
+
+qpid::sys::ConnectionInputHandler*
+ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
+{
+ return new Connection(out, broker);
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/ConnectionFactory.h b/qpid/cpp/lib/broker/ConnectionFactory.h
new file mode 100644
index 0000000000..9147384b2a
--- /dev/null
+++ b/qpid/cpp/lib/broker/ConnectionFactory.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionFactory_
+#define _ConnectionFactory_
+
+#include "ConnectionInputHandlerFactory.h"
+
+namespace qpid {
+namespace broker {
+class Broker;
+
+class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
+{
+ public:
+ ConnectionFactory(Broker& b);
+
+ virtual qpid::sys::ConnectionInputHandler* create(
+ qpid::sys::ConnectionOutputHandler* ctxt);
+
+ virtual ~ConnectionFactory();
+
+ private:
+ Broker& broker;
+};
+
+}}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/ConnectionToken.h b/qpid/cpp/lib/broker/ConnectionToken.h
new file mode 100644
index 0000000000..7e7f813d0e
--- /dev/null
+++ b/qpid/cpp/lib/broker/ConnectionToken.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ConnectionToken_
+#define _ConnectionToken_
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An empty interface allowing opaque implementations of some
+ * form of token to identify a connection.
+ */
+ class ConnectionToken{
+ public:
+ virtual ~ConnectionToken(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Consumer.h b/qpid/cpp/lib/broker/Consumer.h
new file mode 100644
index 0000000000..26deef4a26
--- /dev/null
+++ b/qpid/cpp/lib/broker/Consumer.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Consumer_
+#define _Consumer_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class Consumer{
+ public:
+ virtual bool deliver(Message::shared_ptr& msg) = 0;
+ virtual ~Consumer(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Content.h b/qpid/cpp/lib/broker/Content.h
new file mode 100644
index 0000000000..b65a454778
--- /dev/null
+++ b/qpid/cpp/lib/broker/Content.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Content_
+#define _Content_
+
+#include <boost/function.hpp>
+
+#include <AMQContentBody.h>
+#include <Buffer.h>
+#include <OutputHandler.h>
+
+namespace qpid {
+
+namespace framing {
+class ChannelAdapter;
+}
+
+namespace broker {
+class Content{
+ public:
+ typedef std::string DataBlock;
+ typedef boost::function1<void, const DataBlock&> SendFn;
+
+ virtual ~Content(){}
+
+ /** Add a block of data to the content */
+ virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
+
+ /** Total size of content in bytes */
+ virtual uint32_t size() = 0;
+
+ /**
+ * Iterate over the content calling SendFn for each block.
+ * Subdivide blocks if necessary to ensure each block is
+ * <= framesize bytes long.
+ */
+ virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0;
+
+ //FIXME aconway 2007-02-07: This is inconsistently implemented
+ //find out what is needed.
+ virtual void encode(qpid::framing::Buffer& buffer) = 0;
+};
+}}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/DeletingTxOp.cpp b/qpid/cpp/lib/broker/DeletingTxOp.cpp
new file mode 100644
index 0000000000..25fe9c98db
--- /dev/null
+++ b/qpid/cpp/lib/broker/DeletingTxOp.cpp
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DeletingTxOp.h>
+
+using namespace qpid::broker;
+
+DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
+
+bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){
+ return delegate && delegate->prepare(ctxt);
+}
+
+void DeletingTxOp::commit() throw(){
+ if(delegate){
+ delegate->commit();
+ delete delegate;
+ delegate = 0;
+ }
+}
+
+void DeletingTxOp::rollback() throw(){
+ if(delegate){
+ delegate->rollback();
+ delete delegate;
+ delegate = 0;
+ }
+}
diff --git a/qpid/cpp/lib/broker/DeletingTxOp.h b/qpid/cpp/lib/broker/DeletingTxOp.h
new file mode 100644
index 0000000000..3e026cd4ca
--- /dev/null
+++ b/qpid/cpp/lib/broker/DeletingTxOp.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeletingTxOp_
+#define _DeletingTxOp_
+
+#include <TxOp.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * TxOp wrapper that will delegate calls & delete the object
+ * to which it delegates after completion of the transaction.
+ */
+ class DeletingTxOp : public virtual TxOp{
+ TxOp* delegate;
+ public:
+ DeletingTxOp(TxOp* const delegate);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DeletingTxOp(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Deliverable.h b/qpid/cpp/lib/broker/Deliverable.h
new file mode 100644
index 0000000000..e33443555d
--- /dev/null
+++ b/qpid/cpp/lib/broker/Deliverable.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Deliverable_
+#define _Deliverable_
+
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class Deliverable{
+ public:
+ virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+ virtual ~Deliverable(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/DeliverableMessage.cpp b/qpid/cpp/lib/broker/DeliverableMessage.cpp
new file mode 100644
index 0000000000..b9c89da690
--- /dev/null
+++ b/qpid/cpp/lib/broker/DeliverableMessage.cpp
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DeliverableMessage.h>
+
+using namespace qpid::broker;
+
+DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
+{
+}
+
+void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+{
+ queue->deliver(msg);
+}
+
diff --git a/qpid/cpp/lib/broker/DeliverableMessage.h b/qpid/cpp/lib/broker/DeliverableMessage.h
new file mode 100644
index 0000000000..962f0da640
--- /dev/null
+++ b/qpid/cpp/lib/broker/DeliverableMessage.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliverableMessage_
+#define _DeliverableMessage_
+
+#include <Deliverable.h>
+#include <BrokerMessage.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class DeliverableMessage : public Deliverable{
+ Message::shared_ptr msg;
+ public:
+ DeliverableMessage(Message::shared_ptr& msg);
+ virtual void deliverTo(Queue::shared_ptr& queue);
+ virtual ~DeliverableMessage(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/DeliveryRecord.cpp b/qpid/cpp/lib/broker/DeliveryRecord.cpp
new file mode 100644
index 0000000000..0d2e5325c5
--- /dev/null
+++ b/qpid/cpp/lib/broker/DeliveryRecord.cpp
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DeliveryRecord.h>
+#include <BrokerChannel.h>
+
+using namespace qpid::broker;
+using std::string;
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const uint64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ pull(false){}
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const uint64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ pull(true){}
+
+
+void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{
+ queue->dequeue(ctxt, msg, xid);
+}
+
+void DeliveryRecord::discard() const{
+ discard(0, 0);
+}
+
+bool DeliveryRecord::matches(uint64_t tag) const{
+ return deliveryTag == tag;
+}
+
+bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+ return range->covers(deliveryTag);
+}
+
+void DeliveryRecord::redeliver(Channel* const channel) const{
+ if(pull){
+ //if message was originally sent as response to get, we must requeue it
+ requeue();
+ }else{
+ channel->deliver(msg, consumerTag, deliveryTag);
+ }
+}
+
+void DeliveryRecord::requeue() const{
+ msg->redeliver();
+ queue->process(msg);
+}
+
+void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size += msg->contentSize();
+ prefetch->count++;
+ }
+}
+
+void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+ if(!pull){
+ //ignore 'pulled' messages (i.e. those that were sent in
+ //response to get) when calculating prefetch
+ prefetch->size -= msg->contentSize();
+ prefetch->count--;
+ }
+}
diff --git a/qpid/cpp/lib/broker/DeliveryRecord.h b/qpid/cpp/lib/broker/DeliveryRecord.h
new file mode 100644
index 0000000000..bda2c2ec90
--- /dev/null
+++ b/qpid/cpp/lib/broker/DeliveryRecord.h
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DeliveryRecord_
+#define _DeliveryRecord_
+
+#include <algorithm>
+#include <list>
+#include <AccumulatedAck.h>
+#include <BrokerMessage.h>
+#include <Prefetch.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class Channel;
+
+ /**
+ * Record of a delivery for which an ack is outstanding.
+ */
+ class DeliveryRecord{
+ mutable Message::shared_ptr msg;
+ mutable Queue::shared_ptr queue;
+ std::string consumerTag;
+ uint64_t deliveryTag;
+ bool pull;
+
+ public:
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag);
+ DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag);
+
+ void discard() const;
+ void discard(TransactionContext* ctxt, const std::string* const xid) const;
+ bool matches(uint64_t tag) const;
+ bool coveredBy(const AccumulatedAck* const range) const;
+ void requeue() const;
+ void redeliver(Channel* const) const;
+ void addTo(Prefetch* const prefetch) const;
+ void subtractFrom(Prefetch* const prefetch) const;
+ };
+
+ typedef std::list<DeliveryRecord>::iterator ack_iterator;
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/DirectExchange.cpp b/qpid/cpp/lib/broker/DirectExchange.cpp
new file mode 100644
index 0000000000..c898ae8d7e
--- /dev/null
+++ b/qpid/cpp/lib/broker/DirectExchange.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <DirectExchange.h>
+#include <ExchangeBinding.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
+
+}
+
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Mutex::ScopedLock l(lock);
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i == queues.end()){
+ bindings[routingKey].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+ Mutex::ScopedLock l(lock);
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+ std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+ if(i < queues.end()){
+ queues.erase(i);
+ if(queues.empty()){
+ bindings.erase(routingKey);
+ }
+ }
+}
+
+void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ Mutex::ScopedLock l(lock);
+ std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+ int count(0);
+ for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
+ msg.deliverTo(*i);
+ }
+ if(!count){
+ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
+ }
+}
+
+DirectExchange::~DirectExchange(){
+
+}
+
+
+const std::string DirectExchange::typeName("direct");
diff --git a/qpid/cpp/lib/broker/DirectExchange.h b/qpid/cpp/lib/broker/DirectExchange.h
new file mode 100644
index 0000000000..a7ef5aca9e
--- /dev/null
+++ b/qpid/cpp/lib/broker/DirectExchange.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DirectExchange_
+#define _DirectExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+ class DirectExchange : public virtual Exchange{
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ DirectExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~DirectExchange();
+ };
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/ExchangeBinding.cpp b/qpid/cpp/lib/broker/ExchangeBinding.cpp
new file mode 100644
index 0000000000..bf2102414d
--- /dev/null
+++ b/qpid/cpp/lib/broker/ExchangeBinding.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ExchangeBinding.h>
+#include <BrokerExchange.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+
+void ExchangeBinding::cancel(){
+ e->unbind(q, key, args);
+ delete this;
+}
+
+ExchangeBinding::~ExchangeBinding(){
+}
diff --git a/qpid/cpp/lib/broker/ExchangeBinding.h b/qpid/cpp/lib/broker/ExchangeBinding.h
new file mode 100644
index 0000000000..2afaa89552
--- /dev/null
+++ b/qpid/cpp/lib/broker/ExchangeBinding.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ExchangeBinding_
+#define _ExchangeBinding_
+
+#include <Binding.h>
+#include <FieldTable.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+ class Exchange;
+ class Queue;
+
+ class ExchangeBinding : public virtual Binding{
+ Exchange* e;
+ Queue::shared_ptr q;
+ const string key;
+ const qpid::framing::FieldTable* args;
+ public:
+ ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args);
+ virtual void cancel();
+ virtual ~ExchangeBinding();
+ };
+ }
+}
+
+
+#endif
+
diff --git a/qpid/cpp/lib/broker/ExchangeRegistry.cpp b/qpid/cpp/lib/broker/ExchangeRegistry.cpp
new file mode 100644
index 0000000000..3e5ed89b54
--- /dev/null
+++ b/qpid/cpp/lib/broker/ExchangeRegistry.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ExchangeRegistry.h>
+#include <DirectExchange.h>
+#include <FanOutExchange.h>
+#include <HeadersExchange.h>
+#include <TopicExchange.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using std::pair;
+
+pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
+ Mutex::ScopedLock locker(lock);
+ ExchangeMap::iterator i = exchanges.find(name);
+ if (i == exchanges.end()) {
+ Exchange::shared_ptr exchange;
+
+ if(type == TopicExchange::typeName){
+ exchange = Exchange::shared_ptr(new TopicExchange(name));
+ }else if(type == DirectExchange::typeName){
+ exchange = Exchange::shared_ptr(new DirectExchange(name));
+ }else if(type == FanOutExchange::typeName){
+ exchange = Exchange::shared_ptr(new FanOutExchange(name));
+ }else if (type == HeadersExchange::typeName) {
+ exchange = Exchange::shared_ptr(new HeadersExchange(name));
+ }else{
+ throw UnknownExchangeTypeException();
+ }
+ exchanges[name] = exchange;
+ return std::pair<Exchange::shared_ptr, bool>(exchange, true);
+ } else {
+ return std::pair<Exchange::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void ExchangeRegistry::destroy(const string& name){
+ Mutex::ScopedLock locker(lock);
+ exchanges.erase(name);
+}
+
+Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+ Mutex::ScopedLock locker(lock);
+ Exchange::shared_ptr exchange =exchanges[name];
+ if (!exchange)
+ throw ChannelException(404, "Exchange not found:" + name);
+ return exchange;
+}
+
+namespace
+{
+const std::string empty;
+}
+
+Exchange::shared_ptr ExchangeRegistry::getDefault()
+{
+ return get(empty);
+}
diff --git a/qpid/cpp/lib/broker/ExchangeRegistry.h b/qpid/cpp/lib/broker/ExchangeRegistry.h
new file mode 100644
index 0000000000..aeb32753df
--- /dev/null
+++ b/qpid/cpp/lib/broker/ExchangeRegistry.h
@@ -0,0 +1,47 @@
+#ifndef _broker_ExchangeRegistry_h
+#define _broker_ExchangeRegistry_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <map>
+#include <BrokerExchange.h>
+#include <sys/Monitor.h>
+
+namespace qpid {
+namespace broker {
+ struct UnknownExchangeTypeException{};
+
+ class ExchangeRegistry{
+ typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
+ ExchangeMap exchanges;
+ qpid::sys::Mutex lock;
+ public:
+ std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException);
+ void destroy(const std::string& name);
+ Exchange::shared_ptr get(const std::string& name);
+ Exchange::shared_ptr getDefault();
+ };
+}
+}
+
+
+#endif /*!_broker_ExchangeRegistry_h*/
diff --git a/qpid/cpp/lib/broker/FanOutExchange.cpp b/qpid/cpp/lib/broker/FanOutExchange.cpp
new file mode 100644
index 0000000000..48afcc20d5
--- /dev/null
+++ b/qpid/cpp/lib/broker/FanOutExchange.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <FanOutExchange.h>
+#include <ExchangeBinding.h>
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
+
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);
+ // Add if not already present.
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i == bindings.end()) {
+ bindings.push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+ }
+}
+
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
+ Mutex::ScopedLock locker(lock);
+ Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+ if (i != bindings.end()) {
+ bindings.erase(i);
+ // TODO aconway 2006-09-14: What about the ExchangeBinding object?
+ // Don't we have to verify routingKey/args match?
+ }
+}
+
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+ Mutex::ScopedLock locker(lock);
+ for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
+ msg.deliverTo(*i);
+ }
+}
+
+FanOutExchange::~FanOutExchange() {}
+
+const std::string FanOutExchange::typeName("fanout");
diff --git a/qpid/cpp/lib/broker/FanOutExchange.h b/qpid/cpp/lib/broker/FanOutExchange.h
new file mode 100644
index 0000000000..6dc70e69bb
--- /dev/null
+++ b/qpid/cpp/lib/broker/FanOutExchange.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _FanOutExchange_
+#define _FanOutExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+class FanOutExchange : public virtual Exchange {
+ std::vector<Queue::shared_ptr> bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ FanOutExchange(const std::string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~FanOutExchange();
+};
+
+}
+}
+
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/HandlerImpl.h b/qpid/cpp/lib/broker/HandlerImpl.h
new file mode 100644
index 0000000000..c55a36da45
--- /dev/null
+++ b/qpid/cpp/lib/broker/HandlerImpl.h
@@ -0,0 +1,71 @@
+#ifndef _broker_HandlerImpl_h
+#define _broker_HandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "BrokerChannel.h"
+#include "AMQP_ClientProxy.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
+namespace broker {
+
+class Broker;
+class Channel;
+class Connection;
+
+/**
+ * A collection of references to the core objects required by an adapter,
+ * and a client proxy.
+ */
+struct CoreRefs
+{
+ CoreRefs(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b), proxy(ch) {}
+
+ Channel& channel;
+ Connection& connection;
+ Broker& broker;
+ framing::AMQP_ClientProxy proxy;
+};
+
+
+/**
+ * Base template for protocol handler implementations.
+ * Provides the core references and appropriate AMQP class proxy.
+ */
+template <class ProxyType>
+struct HandlerImpl : public CoreRefs {
+ typedef HandlerImpl<ProxyType> HandlerImplType;
+ HandlerImpl(CoreRefs& parent)
+ : CoreRefs(parent), client(ProxyType::get(proxy)) {}
+ ProxyType client;
+};
+
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_HandlerImpl_h*/
diff --git a/qpid/cpp/lib/broker/HeadersExchange.cpp b/qpid/cpp/lib/broker/HeadersExchange.cpp
new file mode 100644
index 0000000000..acd344725a
--- /dev/null
+++ b/qpid/cpp/lib/broker/HeadersExchange.cpp
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <HeadersExchange.h>
+#include <ExchangeBinding.h>
+#include <Value.h>
+#include <QpidError.h>
+#include <algorithm>
+
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// The current search algorithm really sucks.
+// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
+
+using namespace qpid::broker;
+
+namespace {
+ const std::string all("all");
+ const std::string any("any");
+ const std::string x_match("x-match");
+}
+
+HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
+
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);
+ std::string what = args->getString("x-match");
+ if (what != all && what != any) {
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
+ }
+ bindings.push_back(Binding(*args, queue));
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);
+ Bindings::iterator i =
+ std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
+ if (i != bindings.end()) bindings.erase(i);
+}
+
+
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
+ Mutex::ScopedLock locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, *args)) msg.deliverTo(i->second);
+ }
+}
+
+HeadersExchange::~HeadersExchange() {}
+
+const std::string HeadersExchange::typeName("headers");
+
+namespace
+{
+
+ bool match_values(const Value& bind, const Value& msg) {
+ return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
+ }
+
+}
+
+
+bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
+ typedef FieldTable::ValueMap Map;
+ std::string what = bind.getString(x_match);
+ if (what == all) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j == msg.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ }
+ return true;
+ } else if (what == any) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j != msg.getMap().end()) {
+ if (match_values(*(i->second), *(j->second))) return true;
+ }
+ }
+ }
+ return false;
+ } else {
+ return false;
+ }
+}
+
+
+
diff --git a/qpid/cpp/lib/broker/HeadersExchange.h b/qpid/cpp/lib/broker/HeadersExchange.h
new file mode 100644
index 0000000000..5e8da5ad85
--- /dev/null
+++ b/qpid/cpp/lib/broker/HeadersExchange.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _HeadersExchange_
+#define _HeadersExchange_
+
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+
+class HeadersExchange : public virtual Exchange {
+ typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
+ typedef std::vector<Binding> Bindings;
+
+ Bindings bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ HeadersExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~HeadersExchange();
+
+ static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
+};
+
+
+
+}
+}
+
+#endif
diff --git a/qpid/cpp/lib/broker/InMemoryContent.cpp b/qpid/cpp/lib/broker/InMemoryContent.cpp
new file mode 100644
index 0000000000..3e4ac29486
--- /dev/null
+++ b/qpid/cpp/lib/broker/InMemoryContent.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <InMemoryContent.h>
+#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using boost::static_pointer_cast;
+
+void InMemoryContent::add(AMQContentBody::shared_ptr data)
+{
+ content.push_back(data);
+}
+
+uint32_t InMemoryContent::size()
+{
+ int sum(0);
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ sum += (*i)->size();
+ }
+ return sum;
+}
+
+// FIXME aconway 2007-02-01: Remove version parameter.
+void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ if ((*i)->size() > framesize) {
+ uint32_t offset = 0;
+ for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
+ string data = (*i)->getData().substr(offset, framesize);
+ channel.send(new AMQContentBody(data));
+ offset += framesize;
+ }
+ uint32_t remainder = (*i)->size() % framesize;
+ if (remainder) {
+ string data = (*i)->getData().substr(offset, remainder);
+ channel.send(new AMQContentBody(data));
+ }
+ } else {
+ AMQBody::shared_ptr contentBody =
+ static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ channel.send(contentBody);
+ }
+ }
+}
+
+void InMemoryContent::encode(Buffer& buffer)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ (*i)->encode(buffer);
+ }
+}
+
diff --git a/qpid/cpp/lib/broker/InMemoryContent.h b/qpid/cpp/lib/broker/InMemoryContent.h
new file mode 100644
index 0000000000..7a58ace3a7
--- /dev/null
+++ b/qpid/cpp/lib/broker/InMemoryContent.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _InMemoryContent_
+#define _InMemoryContent_
+
+#include <Content.h>
+#include <vector>
+
+
+namespace qpid {
+ namespace broker {
+ class InMemoryContent : public Content{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef content_list::iterator content_iterator;
+
+ content_list content;
+ public:
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ uint32_t size();
+ void send(framing::ChannelAdapter&, uint32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/LazyLoadedContent.cpp b/qpid/cpp/lib/broker/LazyLoadedContent.cpp
new file mode 100644
index 0000000000..131943b448
--- /dev/null
+++ b/qpid/cpp/lib/broker/LazyLoadedContent.cpp
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <LazyLoadedContent.h>
+#include "AMQFrame.h"
+#include "framing/ChannelAdapter.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+LazyLoadedContent::~LazyLoadedContent()
+{
+ store->destroy(msg);
+}
+
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
+ store(_store), msg(_msg), expectedSize(_expectedSize) {}
+
+void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+{
+ store->appendContent(msg, data->getData());
+}
+
+uint32_t LazyLoadedContent::size()
+{
+ return 0;//all content is written as soon as it is added
+}
+
+void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
+{
+ if (expectedSize > framesize) {
+ for (uint64_t offset = 0; offset < expectedSize; offset += framesize)
+ {
+ uint64_t remaining = expectedSize - offset;
+ string data;
+ store->loadContent(msg, data, offset,
+ remaining > framesize ? framesize : remaining);
+ channel.send(new AMQContentBody(data));
+ }
+ } else {
+ string data;
+ store->loadContent(msg, data, 0, expectedSize);
+ channel.send(new AMQContentBody(data));
+ }
+}
+
+void LazyLoadedContent::encode(Buffer&)
+{
+ //do nothing as all content is written as soon as it is added
+}
+
diff --git a/qpid/cpp/lib/broker/LazyLoadedContent.h b/qpid/cpp/lib/broker/LazyLoadedContent.h
new file mode 100644
index 0000000000..e000a4ef69
--- /dev/null
+++ b/qpid/cpp/lib/broker/LazyLoadedContent.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LazyLoadedContent_
+#define _LazyLoadedContent_
+
+#include <Content.h>
+#include <MessageStore.h>
+
+namespace qpid {
+ namespace broker {
+ class LazyLoadedContent : public Content{
+ MessageStore* const store;
+ Message* const msg;
+ const uint64_t expectedSize;
+ public:
+ LazyLoadedContent(
+ MessageStore* const store, Message* const msg,
+ uint64_t expectedSize);
+ ~LazyLoadedContent();
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ uint32_t size();
+ void send(
+ framing::ChannelAdapter&,
+ uint32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Makefile.am b/qpid/cpp/lib/broker/Makefile.am
new file mode 100644
index 0000000000..68649c2b28
--- /dev/null
+++ b/qpid/cpp/lib/broker/Makefile.am
@@ -0,0 +1,96 @@
+AM_CXXFLAGS = $(WARNING_CFLAGS)
+INCLUDES = \
+ -I$(top_srcdir)/gen \
+ -I$(top_srcdir)/lib/common \
+ -I$(top_srcdir)/lib/common/sys \
+ -I$(top_srcdir)/lib/common/framing \
+ $(APR_CXXFLAGS)
+
+lib_LTLIBRARIES = libqpidbroker.la
+libqpidbroker_la_LIBADD = ../common/libqpidcommon.la
+libqpidbroker_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
+libqpidbroker_la_SOURCES = \
+ AccumulatedAck.cpp \
+ AccumulatedAck.h \
+ AutoDelete.cpp \
+ AutoDelete.h \
+ Binding.h \
+ Broker.cpp \
+ Broker.h \
+ BrokerSingleton.cpp \
+ BrokerSingleton.h \
+ BrokerChannel.cpp \
+ BrokerChannel.h \
+ BrokerExchange.h \
+ BrokerMessage.cpp \
+ BrokerMessage.h \
+ BrokerMessageMessage.cpp \
+ BrokerMessageMessage.h \
+ BrokerQueue.cpp \
+ BrokerQueue.h \
+ Configuration.cpp \
+ Configuration.h \
+ ConnectionToken.h \
+ Consumer.h \
+ Content.h \
+ DeletingTxOp.cpp \
+ DeletingTxOp.h \
+ Deliverable.h \
+ DeliverableMessage.cpp \
+ DeliverableMessage.h \
+ DeliveryRecord.cpp \
+ DeliveryRecord.h \
+ DirectExchange.cpp \
+ DirectExchange.h \
+ ExchangeBinding.cpp \
+ ExchangeBinding.h \
+ ExchangeRegistry.cpp \
+ ExchangeRegistry.h \
+ FanOutExchange.cpp \
+ FanOutExchange.h \
+ HeadersExchange.cpp \
+ HeadersExchange.h \
+ InMemoryContent.cpp \
+ InMemoryContent.h \
+ LazyLoadedContent.cpp \
+ LazyLoadedContent.h \
+ MessageBuilder.cpp \
+ MessageBuilder.h \
+ MessageStore.h \
+ MessageStoreModule.cpp \
+ MessageStoreModule.h \
+ NameGenerator.cpp \
+ NameGenerator.h \
+ NullMessageStore.cpp \
+ NullMessageStore.h \
+ Prefetch.h \
+ QueuePolicy.cpp \
+ QueuePolicy.h \
+ QueueRegistry.cpp \
+ QueueRegistry.h \
+ RecoveryManager.cpp \
+ RecoveryManager.h \
+ Reference.cpp \
+ Reference.h \
+ ConnectionFactory.cpp \
+ ConnectionFactory.h \
+ Connection.cpp \
+ Connection.h \
+ BrokerAdapter.cpp \
+ BrokerAdapter.h \
+ MessageHandlerImpl.cpp \
+ MessageHandlerImpl.h \
+ TopicExchange.cpp \
+ TopicExchange.h \
+ TransactionalStore.h \
+ TxAck.cpp \
+ TxAck.h \
+ TxBuffer.cpp \
+ TxBuffer.h \
+ TxOp.h \
+ TxPublish.cpp \
+ TxPublish.h
+
+
+# Force build during dist phase so help2man will work.
+dist-hook: $(lib_LTLIBRARIES)
diff --git a/qpid/cpp/lib/broker/MessageBuilder.cpp b/qpid/cpp/lib/broker/MessageBuilder.cpp
new file mode 100644
index 0000000000..8bffaef50f
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageBuilder.cpp
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <MessageBuilder.h>
+
+#include <InMemoryContent.h>
+#include <LazyLoadedContent.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using std::auto_ptr;
+
+MessageBuilder::MessageBuilder(CompletionHandler* _handler,
+ MessageStore* const _store,
+ uint64_t _stagingThreshold
+) :
+ handler(_handler),
+ store(_store),
+ stagingThreshold(_stagingThreshold)
+{}
+
+void MessageBuilder::route(){
+ if (message->isComplete()) {
+ if (handler) handler->complete(message);
+ message.reset();
+ }
+}
+
+void MessageBuilder::initialise(Message::shared_ptr& msg){
+ if(message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+ }
+ message = msg;
+}
+
+void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+ }
+ message->setHeader(header);
+ if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
+ store->stage(message.get());
+ message->releaseContent(store);
+ } else {
+ auto_ptr<Content> content(new InMemoryContent());
+ message->setContent(content);
+ }
+ route();
+}
+
+void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+ if(!message.get()){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+ }
+ message->addContent(content);
+ route();
+}
diff --git a/qpid/cpp/lib/broker/MessageBuilder.h b/qpid/cpp/lib/broker/MessageBuilder.h
new file mode 100644
index 0000000000..30834e1075
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageBuilder.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageBuilder_
+#define _MessageBuilder_
+
+#include <memory>
+#include <QpidError.h>
+#include <BrokerExchange.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <AMQContentBody.h>
+#include <AMQHeaderBody.h>
+#include <BasicPublishBody.h>
+#include "CompletionHandler.h"
+
+namespace qpid {
+ namespace broker {
+ class MessageBuilder{
+ public:
+ MessageBuilder(CompletionHandler* _handler,
+ MessageStore* const store = 0,
+ uint64_t stagingThreshold = 0);
+ void initialise(Message::shared_ptr& msg);
+ void setHeader(framing::AMQHeaderBody::shared_ptr& header);
+ void addContent(framing::AMQContentBody::shared_ptr& content);
+ Message::shared_ptr getMessage() { return message; }
+ private:
+ Message::shared_ptr message;
+ CompletionHandler* handler;
+ MessageStore* const store;
+ const uint64_t stagingThreshold;
+
+ void route();
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/MessageHandlerImpl.cpp b/qpid/cpp/lib/broker/MessageHandlerImpl.cpp
new file mode 100644
index 0000000000..fa7c10f26c
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -0,0 +1,243 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "QpidError.h"
+#include "MessageHandlerImpl.h"
+#include "BrokerChannel.h"
+#include "FramingContent.h"
+#include "Connection.h"
+#include "Broker.h"
+#include "BrokerMessageMessage.h"
+#include "MessageAppendBody.h"
+#include "MessageTransferBody.h"
+#include "BrokerAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
+ : HandlerImplType(parent) {}
+
+//
+// Message class method handlers
+//
+
+void
+MessageHandlerImpl::cancel(const MethodContext& context,
+ const string& destination )
+{
+ channel.cancel(destination);
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::open(const MethodContext& context,
+ const string& reference)
+{
+ references.open(reference);
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::append(const MethodContext& context,
+ const string& reference,
+ const string& /*bytes*/ )
+{
+ references.get(reference)->append(
+ boost::shared_polymorphic_downcast<MessageAppendBody>(
+ context.methodBody));
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::close(const MethodContext& context,
+ const string& reference)
+{
+ Reference::shared_ptr ref = references.get(reference);
+ client.ok(context.getRequestId());
+
+ // Send any transfer messages to their correct exchanges and okay them
+ const Reference::Messages& msgs = ref->getMessages();
+ for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
+ channel.handleInlineTransfer(*m);
+ client.ok((*m)->getRequestId());
+ }
+ ref->close();
+}
+
+void
+MessageHandlerImpl::checkpoint(const MethodContext& context,
+ const string& /*reference*/,
+ const string& /*identifier*/ )
+{
+ // Initial implementation (which is conforming) is to do nothing here
+ // and return offset zero for the resume
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::resume(const MethodContext& context,
+ const string& reference,
+ const string& /*identifier*/ )
+{
+ // Initial (null) implementation
+ // open reference and return 0 offset
+ references.open(reference);
+ client.offset(0, context.getRequestId());
+}
+
+void
+MessageHandlerImpl::offset(const MethodContext&,
+ uint64_t /*value*/ )
+{
+ // Shouldn't ever receive this as it is reponse to resume
+ // which is never sent
+ // TODO astitcher 2007-02-16 What is the correct exception to throw here?
+ THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
+}
+
+void
+MessageHandlerImpl::consume(const MethodContext& context,
+ uint16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const framing::FieldTable& filter )
+{
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!destination.empty() && channel.exists(destination))
+ throw ConnectionException(530, "Consumer tags must be unique");
+ string tag = destination;
+ channel.consume(
+ tag, queue, !noAck, exclusive,
+ noLocal ? &connection : 0, &filter);
+ client.ok(context.getRequestId());
+ // Dispatch messages as there is now a consumer.
+ queue->dispatch();
+}
+
+void
+MessageHandlerImpl::get( const MethodContext& context,
+ uint16_t /*ticket*/,
+ const string& queueName,
+ const string& destination,
+ bool noAck )
+{
+ Queue::shared_ptr queue =
+ connection.getQueue(queueName, context.channel->getId());
+
+ if(channel.get(queue, destination, !noAck))
+ client.ok(context.getRequestId());
+ else
+ client.empty(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::empty( const MethodContext& )
+{
+ // Shouldn't ever receive this as it is a response to get
+ // which is never sent
+ // TODO astitcher 2007-02-09 What is the correct exception to throw here?
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
+}
+
+void
+MessageHandlerImpl::ok(const MethodContext& /*context*/)
+{
+ channel.ack();
+}
+
+void
+MessageHandlerImpl::qos(const MethodContext& context,
+ uint32_t prefetchSize,
+ uint16_t prefetchCount,
+ bool /*global*/ )
+{
+ //TODO: handle global
+ channel.setPrefetchSize(prefetchSize);
+ channel.setPrefetchCount(prefetchCount);
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::recover(const MethodContext& context,
+ bool requeue)
+{
+ channel.recover(requeue);
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::reject(const MethodContext& /*context*/,
+ uint16_t /*code*/,
+ const string& /*text*/ )
+{
+ channel.ack();
+ // channel.requeue();
+}
+
+void
+MessageHandlerImpl::transfer(const MethodContext& context,
+ uint16_t /*ticket*/,
+ const string& /* destination */,
+ bool /*redelivered*/,
+ bool /*immediate*/,
+ uint64_t /*ttl*/,
+ uint8_t /*priority*/,
+ uint64_t /*timestamp*/,
+ uint8_t /*deliveryMode*/,
+ uint64_t /*expiration*/,
+ const string& /*exchangeName*/,
+ const string& /*routingKey*/,
+ const string& /*messageId*/,
+ const string& /*correlationId*/,
+ const string& /*replyTo*/,
+ const string& /*contentType*/,
+ const string& /*contentEncoding*/,
+ const string& /*userId*/,
+ const string& /*appId*/,
+ const string& /*transactionId*/,
+ const string& /*securityToken*/,
+ const framing::FieldTable& /*applicationHeaders*/,
+ const framing::Content& body,
+ bool /*mandatory*/)
+{
+ MessageTransferBody::shared_ptr transfer(
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody));
+ RequestId requestId = context.getRequestId();
+
+ if (body.isInline()) {
+ MessageMessage::shared_ptr message(
+ new MessageMessage(&connection, requestId, transfer));
+ channel.handleInlineTransfer(message);
+ client.ok(requestId);
+ } else {
+ Reference::shared_ptr ref(references.get(body.getValue()));
+ MessageMessage::shared_ptr message(
+ new MessageMessage(&connection, requestId, transfer, ref));
+ ref->addMessage(message);
+ }
+}
+
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/MessageHandlerImpl.h b/qpid/cpp/lib/broker/MessageHandlerImpl.h
new file mode 100644
index 0000000000..872d429d5c
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageHandlerImpl.h
@@ -0,0 +1,130 @@
+#ifndef _broker_MessageHandlerImpl_h
+#define _broker_MessageHandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <memory>
+
+#include "AMQP_ServerOperations.h"
+#include "AMQP_ClientProxy.h"
+#include "Reference.h"
+#include "HandlerImpl.h"
+
+namespace qpid {
+namespace broker {
+
+class Connection;
+class Broker;
+class MessageMessage;
+
+class MessageHandlerImpl :
+ public framing::AMQP_ServerOperations::MessageHandler,
+ public HandlerImpl<framing::AMQP_ClientProxy::Message>
+{
+ public:
+ MessageHandlerImpl(CoreRefs& parent);
+
+ void append(const framing::MethodContext&,
+ const std::string& reference,
+ const std::string& bytes );
+
+ void cancel(const framing::MethodContext&,
+ const std::string& destination );
+
+ void checkpoint(const framing::MethodContext&,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void close(const framing::MethodContext&,
+ const std::string& reference );
+
+ void consume(const framing::MethodContext&,
+ uint16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noLocal,
+ bool noAck,
+ bool exclusive,
+ const framing::FieldTable& filter );
+
+ void empty( const framing::MethodContext& );
+
+ void get(const framing::MethodContext&,
+ uint16_t ticket,
+ const std::string& queue,
+ const std::string& destination,
+ bool noAck );
+
+ void offset(const framing::MethodContext&,
+ uint64_t value );
+
+ void ok( const framing::MethodContext& );
+
+ void open(const framing::MethodContext&,
+ const std::string& reference );
+
+ void qos(const framing::MethodContext&,
+ uint32_t prefetchSize,
+ uint16_t prefetchCount,
+ bool global );
+
+ void recover(const framing::MethodContext&,
+ bool requeue );
+
+ void reject(const framing::MethodContext&,
+ uint16_t code,
+ const std::string& text );
+
+ void resume(const framing::MethodContext&,
+ const std::string& reference,
+ const std::string& identifier );
+
+ void transfer(const framing::MethodContext&,
+ uint16_t ticket,
+ const std::string& destination,
+ bool redelivered,
+ bool immediate,
+ uint64_t ttl,
+ uint8_t priority,
+ uint64_t timestamp,
+ uint8_t deliveryMode,
+ uint64_t expiration,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& messageId,
+ const std::string& correlationId,
+ const std::string& replyTo,
+ const std::string& contentType,
+ const std::string& contentEncoding,
+ const std::string& userId,
+ const std::string& appId,
+ const std::string& transactionId,
+ const std::string& securityToken,
+ const framing::FieldTable& applicationHeaders,
+ const framing::Content& body,
+ bool mandatory );
+ private:
+ ReferenceRegistry references;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_MessageHandlerImpl_h*/
diff --git a/qpid/cpp/lib/broker/MessageStore.h b/qpid/cpp/lib/broker/MessageStore.h
new file mode 100644
index 0000000000..9e38408886
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageStore.h
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include <BrokerMessage.h>
+#include <FieldTable.h>
+#include <RecoveryManager.h>
+#include <TransactionalStore.h>
+
+namespace qpid {
+ namespace broker {
+ struct MessageStoreSettings
+ {
+ /**
+ * Messages whose content length is larger than this value
+ * will be staged (i.e. will have thier data written to
+ * disk as it arrives) and will load their data lazily. On
+ * recovery therefore, only the headers should be loaded.
+ */
+ uint64_t stagingThreshold;
+ };
+ /**
+ * An abstraction of the persistent storage for messages. (In
+ * all methods, any pointers/references to queues or messages
+ * are valid only for the duration of the call).
+ */
+ class MessageStore : public TransactionalStore{
+ public:
+ /**
+ * Record the existance of a durable queue
+ */
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0;
+ /**
+ * Destroy a durable queue
+ */
+ virtual void destroy(const Queue& queue) = 0;
+
+ /**
+ * Request recovery of queue and message state from store
+ */
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0;
+
+ /**
+ * Stores a messages before it has been enqueued
+ * (enqueueing automatically stores the message so this is
+ * only required if storage is required prior to that
+ * point). If the message has not yet been stored it will
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
+ */
+ virtual void stage(Message* const msg) = 0;
+
+ /**
+ * Destroys a previously staged message. This only needs
+ * to be called if the message is never enqueued. (Once
+ * enqueued, deletion will be automatic when the message
+ * is dequeued from all queues it was enqueued onto).
+ */
+ virtual void destroy(Message* const msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(Message* const msg, const std::string& data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
+ */
+ virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length) = 0;
+
+ /**
+ * Enqueues a message, storing the message if it has not
+ * been previously stored and recording that the given
+ * message is on the given queue.
+ *
+ * @param msg the message to enqueue
+ * @param queue the name of the queue onto which it is to be enqueued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
+ /**
+ * Dequeues a message, recording that the given message is
+ * no longer on the given queue and deleting the message
+ * if it is no longer on any other queue.
+ *
+ * @param msg the message to dequeue
+ * @param queue the name of th queue from which it is to be dequeued
+ * @param xid (a pointer to) an identifier of the
+ * distributed transaction in which the operation takes
+ * place or null for 'local' transactions
+ */
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0;
+
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being prepared.
+ */
+ virtual void prepared(const std::string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being committed.
+ */
+ virtual void committed(const std::string * const xid) = 0;
+ /**
+ * Treat all enqueue/dequeues where this xid was specified as being aborted.
+ */
+ virtual void aborted(const std::string * const xid) = 0;
+
+ virtual ~MessageStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/MessageStoreModule.cpp b/qpid/cpp/lib/broker/MessageStoreModule.cpp
new file mode 100644
index 0000000000..676e86f84a
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageStoreModule.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <MessageStoreModule.h>
+#include <iostream>
+
+using namespace qpid::broker;
+
+MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
+{
+}
+
+void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings)
+{
+ store->create(queue, settings);
+}
+
+void MessageStoreModule::destroy(const Queue& queue)
+{
+ store->destroy(queue);
+}
+
+void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings)
+{
+ store->recover(registry, settings);
+}
+
+void MessageStoreModule::stage(Message* const msg)
+{
+ store->stage(msg);
+}
+
+void MessageStoreModule::destroy(Message* const msg)
+{
+ store->destroy(msg);
+}
+
+void MessageStoreModule::appendContent(Message* const msg, const std::string& data)
+{
+ store->appendContent(msg, data);
+}
+
+void MessageStoreModule::loadContent(Message* const msg, string& data, uint64_t offset, uint32_t length)
+{
+ store->loadContent(msg, data, offset, length);
+}
+
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+{
+ store->enqueue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid)
+{
+ store->dequeue(ctxt, msg, queue, xid);
+}
+
+void MessageStoreModule::prepared(const string * const xid)
+{
+ store->prepared(xid);
+}
+
+void MessageStoreModule::committed(const string * const xid)
+{
+ store->committed(xid);
+}
+
+void MessageStoreModule::aborted(const string * const xid)
+{
+ store->aborted(xid);
+}
+
+std::auto_ptr<TransactionContext> MessageStoreModule::begin()
+{
+ return store->begin();
+}
+
+void MessageStoreModule::commit(TransactionContext* ctxt)
+{
+ store->commit(ctxt);
+}
+
+void MessageStoreModule::abort(TransactionContext* ctxt)
+{
+ store->abort(ctxt);
+}
diff --git a/qpid/cpp/lib/broker/MessageStoreModule.h b/qpid/cpp/lib/broker/MessageStoreModule.h
new file mode 100644
index 0000000000..27fedbf635
--- /dev/null
+++ b/qpid/cpp/lib/broker/MessageStoreModule.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _MessageStoreModule_
+#define _MessageStoreModule_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+#include <sys/Module.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class MessageStoreModule : public MessageStore{
+ qpid::sys::Module<MessageStore> store;
+ public:
+ MessageStoreModule(const std::string& name);
+ void create(const Queue& queue, const qpid::framing::FieldTable& settings);
+ void destroy(const Queue& queue);
+ void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+ void stage(Message* const msg);
+ void destroy(Message* const msg);
+ void appendContent(Message* const msg, const std::string& data);
+ void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length);
+ void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ void prepared(const std::string * const xid);
+ void committed(const std::string * const xid);
+ void aborted(const std::string * const xid);
+ std::auto_ptr<TransactionContext> begin();
+ void commit(TransactionContext* ctxt);
+ void abort(TransactionContext* ctxt);
+ ~MessageStoreModule(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/NameGenerator.cpp b/qpid/cpp/lib/broker/NameGenerator.cpp
new file mode 100644
index 0000000000..3f281859fa
--- /dev/null
+++ b/qpid/cpp/lib/broker/NameGenerator.cpp
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NameGenerator.h>
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+ std::stringstream ss;
+ ss << base << counter++;
+ return ss.str();
+}
diff --git a/qpid/cpp/lib/broker/NameGenerator.h b/qpid/cpp/lib/broker/NameGenerator.h
new file mode 100644
index 0000000000..b2dbbdfb69
--- /dev/null
+++ b/qpid/cpp/lib/broker/NameGenerator.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include <BrokerMessage.h>
+
+namespace qpid {
+ namespace broker {
+ class NameGenerator{
+ const std::string base;
+ unsigned int counter;
+ public:
+ NameGenerator(const std::string& base);
+ std::string generate();
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/NullMessageStore.cpp b/qpid/cpp/lib/broker/NullMessageStore.cpp
new file mode 100644
index 0000000000..bcb15c2ae0
--- /dev/null
+++ b/qpid/cpp/lib/broker/NullMessageStore.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <NullMessageStore.h>
+
+#include <BrokerQueue.h>
+#include <RecoveryManager.h>
+
+#include <iostream>
+
+using namespace qpid::broker;
+
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+
+void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&)
+{
+ if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(const Queue& queue)
+{
+ if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
+}
+
+void NullMessageStore::stage(Message* const)
+{
+ if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::destroy(Message* const)
+{
+ if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::appendContent(Message* const, const string&)
+{
+ if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(Message* const, string&, uint64_t, uint32_t)
+{
+ if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const)
+{
+ if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::prepared(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::committed(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::aborted(const string * const)
+{
+ if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
+{
+ return std::auto_ptr<TransactionContext>();
+}
+
+void NullMessageStore::commit(TransactionContext*)
+{
+}
+
+void NullMessageStore::abort(TransactionContext*)
+{
+}
diff --git a/qpid/cpp/lib/broker/NullMessageStore.h b/qpid/cpp/lib/broker/NullMessageStore.h
new file mode 100644
index 0000000000..705f18ab43
--- /dev/null
+++ b/qpid/cpp/lib/broker/NullMessageStore.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _NullMessageStore_
+#define _NullMessageStore_
+
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * A null implementation of the MessageStore interface
+ */
+ class NullMessageStore : public MessageStore{
+ const bool warn;
+ public:
+ NullMessageStore(bool warn = false);
+ virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings);
+ virtual void destroy(const Queue& queue);
+ virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0);
+ virtual void stage(Message* const msg);
+ virtual void destroy(Message* const msg);
+ virtual void appendContent(Message* const msg, const std::string& data);
+ virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length);
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid);
+ virtual void prepared(const std::string * const xid);
+ virtual void committed(const std::string * const xid);
+ virtual void aborted(const std::string * const xid);
+ virtual std::auto_ptr<TransactionContext> begin();
+ virtual void commit(TransactionContext* ctxt);
+ virtual void abort(TransactionContext* ctxt);
+ ~NullMessageStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Prefetch.h b/qpid/cpp/lib/broker/Prefetch.h
new file mode 100644
index 0000000000..b6d4026c3f
--- /dev/null
+++ b/qpid/cpp/lib/broker/Prefetch.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include <amqp_types.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Count and total size of asynchronously delivered
+ * (i.e. pushed) messages that have acks outstanding.
+ */
+ struct Prefetch{
+ uint32_t size;
+ uint16_t count;
+
+ void reset() { size = 0; count = 0; }
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/QueuePolicy.cpp b/qpid/cpp/lib/broker/QueuePolicy.cpp
new file mode 100644
index 0000000000..94b86f2bbb
--- /dev/null
+++ b/qpid/cpp/lib/broker/QueuePolicy.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueuePolicy.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) :
+ maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
+
+QueuePolicy::QueuePolicy(const FieldTable& settings) :
+ maxCount(getInt(settings, maxCountKey, 0)),
+ maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {}
+
+void QueuePolicy::enqueued(uint64_t _size)
+{
+ if (maxCount) count++;
+ if (maxSize) size += _size;
+}
+
+void QueuePolicy::dequeued(uint64_t _size)
+{
+ if (maxCount) count--;
+ if (maxSize) size -= _size;
+}
+
+bool QueuePolicy::limitExceeded()
+{
+ return (maxSize && size > maxSize) || (maxCount && count > maxCount);
+}
+
+void QueuePolicy::update(FieldTable& settings)
+{
+ if (maxCount) settings.setInt(maxCountKey, maxCount);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+}
+
+
+int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue)
+{
+ //Note: currently field table only contain signed 32 bit ints, which
+ // restricts the values that can be set on the queue policy.
+ try {
+ return settings.getInt(key);
+ } catch (FieldNotFoundException& ignore) {
+ return defaultValue;
+ }
+}
+
+const std::string QueuePolicy::maxCountKey("qpid.max_count");
+const std::string QueuePolicy::maxSizeKey("qpid.max_size");
diff --git a/qpid/cpp/lib/broker/QueuePolicy.h b/qpid/cpp/lib/broker/QueuePolicy.h
new file mode 100644
index 0000000000..e7688f3e67
--- /dev/null
+++ b/qpid/cpp/lib/broker/QueuePolicy.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuePolicy_
+#define _QueuePolicy_
+
+#include <FieldTable.h>
+
+namespace qpid {
+ namespace broker {
+ class QueuePolicy
+ {
+ static const std::string maxCountKey;
+ static const std::string maxSizeKey;
+
+ const uint32_t maxCount;
+ const uint64_t maxSize;
+ uint32_t count;
+ uint64_t size;
+
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+
+ public:
+ QueuePolicy(uint32_t maxCount, uint64_t maxSize);
+ QueuePolicy(const qpid::framing::FieldTable& settings);
+ void enqueued(uint64_t size);
+ void dequeued(uint64_t size);
+ void update(qpid::framing::FieldTable& settings);
+ bool limitExceeded();
+ uint32_t getMaxCount() const { return maxCount; }
+ uint64_t getMaxSize() const { return maxSize; }
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/QueueRegistry.cpp b/qpid/cpp/lib/broker/QueueRegistry.cpp
new file mode 100644
index 0000000000..d33cd09840
--- /dev/null
+++ b/qpid/cpp/lib/broker/QueueRegistry.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QueueRegistry.h>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable,
+ uint32_t autoDelete, const ConnectionToken* owner)
+{
+ Mutex::ScopedLock locker(lock);
+ string name = declareName.empty() ? generateName() : declareName;
+ assert(!name.empty());
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
+ queues[name] = queue;
+ return std::pair<Queue::shared_ptr, bool>(queue, true);
+ } else {
+ return std::pair<Queue::shared_ptr, bool>(i->second, false);
+ }
+}
+
+void QueueRegistry::destroy(const string& name){
+ Mutex::ScopedLock locker(lock);
+ queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+ Mutex::ScopedLock locker(lock);
+ QueueMap::iterator i = queues.find(name);
+ if (i == queues.end()) {
+ return Queue::shared_ptr();
+ } else {
+ return i->second;
+ }
+}
+
+string QueueRegistry::generateName(){
+ string name;
+ do {
+ std::stringstream ss;
+ ss << "tmp_" << counter++;
+ name = ss.str();
+ // Thread safety: Private function, only called with lock held
+ // so this is OK.
+ } while(queues.find(name) != queues.end());
+ return name;
+}
+
+MessageStore* const QueueRegistry::getStore() const {
+ return store;
+}
diff --git a/qpid/cpp/lib/broker/QueueRegistry.h b/qpid/cpp/lib/broker/QueueRegistry.h
new file mode 100644
index 0000000000..079034359e
--- /dev/null
+++ b/qpid/cpp/lib/broker/QueueRegistry.h
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+
+ public:
+ QueueRegistry(MessageStore* const store = 0);
+ ~QueueRegistry();
+
+ /**
+ * Declare a queue.
+ *
+ * @return The queue and a boolean flag which is true if the queue
+ * was created by this declare call false if it already existed.
+ */
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0,
+ const ConnectionToken* const owner = 0);
+
+ /**
+ * Destroy the named queue.
+ *
+ * Note: if the queue is in use it is not actually destroyed until
+ * all shared_ptrs to it are destroyed. During that time it is
+ * possible that a new queue with the same name may be
+ * created. This should not create any problems as the new and
+ * old queues exist independently. The registry has
+ * forgotten the old queue so there can be no confusion for
+ * subsequent calls to find or declare with the same name.
+ *
+ */
+ void destroy(const string& name);
+
+ /**
+ * Find the named queue. Return 0 if not found.
+ */
+ Queue::shared_ptr find(const string& name);
+
+ /**
+ * Generate unique queue name.
+ */
+ string generateName();
+
+ /**
+ * Return the message store used.
+ */
+ MessageStore* const getStore() const;
+
+
+ private:
+ typedef std::map<string, Queue::shared_ptr> QueueMap;
+ QueueMap queues;
+ qpid::sys::Mutex lock;
+ int counter;
+ MessageStore* const store;
+};
+
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/RecoveryManager.cpp b/qpid/cpp/lib/broker/RecoveryManager.cpp
new file mode 100644
index 0000000000..6548e6a24f
--- /dev/null
+++ b/qpid/cpp/lib/broker/RecoveryManager.cpp
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <RecoveryManager.h>
+
+using namespace qpid::broker;
+
+RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {}
+
+RecoveryManager::~RecoveryManager() {}
+
+Queue::shared_ptr RecoveryManager::recoverQueue(const string& name)
+{
+ std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
+ try {
+ Exchange::shared_ptr exchange = exchanges.getDefault();
+ if (exchange) {
+ exchange->bind(result.first, result.first->getName(), 0);
+ }
+ } catch (ChannelException& e) {
+ //assume no default exchange has been declared
+ }
+ return result.first;
+}
+
+Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type)
+{
+ return exchanges.declare(name, type).first;
+}
diff --git a/qpid/cpp/lib/broker/RecoveryManager.h b/qpid/cpp/lib/broker/RecoveryManager.h
new file mode 100644
index 0000000000..d4e4cff3fd
--- /dev/null
+++ b/qpid/cpp/lib/broker/RecoveryManager.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _RecoveryManager_
+#define _RecoveryManager_
+
+#include <ExchangeRegistry.h>
+#include <QueueRegistry.h>
+
+namespace qpid {
+namespace broker {
+
+ class RecoveryManager{
+ QueueRegistry& queues;
+ ExchangeRegistry& exchanges;
+ public:
+ RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges);
+ ~RecoveryManager();
+ Queue::shared_ptr recoverQueue(const std::string& name);
+ Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type);
+ };
+
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/Reference.cpp b/qpid/cpp/lib/broker/Reference.cpp
new file mode 100644
index 0000000000..c4c33e6363
--- /dev/null
+++ b/qpid/cpp/lib/broker/Reference.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/bind.hpp>
+#include "Reference.h"
+#include "BrokerMessageMessage.h"
+#include "QpidError.h"
+#include "MessageAppendBody.h"
+#include "CompletionHandler.h"
+
+namespace qpid {
+namespace broker {
+
+Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) {
+ ReferenceMap::iterator i = references.find(id);
+ // TODO aconway 2007-02-05: should we throw Channel or Connection
+ // exceptions here?
+ if (i != references.end())
+ throw ConnectionException(503, "Attempt to re-open reference " +id);
+ return references[id] = Reference::shared_ptr(new Reference(id, this));
+}
+
+Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
+ ReferenceMap::iterator i = references.find(id);
+ if (i == references.end())
+ throw ConnectionException(503, "Attempt to use non-existent reference "+id);
+ return i->second;
+}
+
+void Reference::append(AppendPtr ptr) {
+ appends.push_back(ptr);
+ size += ptr->getBytes().length();
+}
+
+void Reference::close() {
+ registry->references.erase(getId());
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/Reference.h b/qpid/cpp/lib/broker/Reference.h
new file mode 100644
index 0000000000..e453645a54
--- /dev/null
+++ b/qpid/cpp/lib/broker/Reference.h
@@ -0,0 +1,111 @@
+#ifndef _broker_Reference_h
+#define _broker_Reference_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <string>
+#include <vector>
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/range.hpp>
+
+namespace qpid {
+
+namespace framing {
+class MessageAppendBody;
+}
+
+namespace broker {
+
+class MessageMessage;
+class ReferenceRegistry;
+
+/**
+ * A reference is an accumulation point for data in a multi-frame
+ * message. A reference can be used by multiple transfer commands to
+ * create multiple messages, so the reference tracks which commands
+ * are using it. When the reference is closed, all the associated
+ * transfers are completed.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class Reference
+{
+ public:
+ typedef std::string Id;
+ typedef boost::shared_ptr<Reference> shared_ptr;
+ typedef boost::shared_ptr<MessageMessage> MessagePtr;
+ typedef std::vector<MessagePtr> Messages;
+ typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
+ typedef std::vector<AppendPtr> Appends;
+
+ Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
+ : id(id_), size(0), registry(reg) {}
+
+ const std::string& getId() const { return id; }
+ uint64_t getSize() const { return size; }
+
+ /** Add a message to be completed with this reference */
+ void addMessage(MessagePtr message) { messages.push_back(message); }
+
+ /** Append more data to the reference */
+ void append(AppendPtr ptr);
+
+ /** Close the reference, complete each associated message */
+ void close();
+
+ const Appends& getAppends() const { return appends; }
+ const Messages& getMessages() const { return messages; }
+
+ private:
+ Id id;
+ uint64_t size;
+ ReferenceRegistry* registry;
+ Messages messages;
+ Appends appends;
+};
+
+
+/**
+ * A registry/factory for references.
+ *
+ * THREAD UNSAFE: per-channel resource, access to channels is
+ * serialized.
+ */
+class ReferenceRegistry {
+ public:
+ ReferenceRegistry() {};
+ Reference::shared_ptr open(const Reference::Id& id);
+ Reference::shared_ptr get(const Reference::Id& id);
+
+ private:
+ typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap;
+ ReferenceMap references;
+
+ // Reference calls references.erase().
+ friend class Reference;
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_Reference_h*/
diff --git a/qpid/cpp/lib/broker/TopicExchange.cpp b/qpid/cpp/lib/broker/TopicExchange.cpp
new file mode 100644
index 0000000000..3ebb3c8c56
--- /dev/null
+++ b/qpid/cpp/lib/broker/TopicExchange.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TopicExchange.h>
+#include <ExchangeBinding.h>
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// Areas for improvement:
+// - excessive string copying: should be 0 copy, match from original buffer.
+// - match/lookup: use descision tree or other more efficient structure.
+
+Tokens& Tokens::operator=(const std::string& s) {
+ clear();
+ if (s.empty()) return *this;
+ std::string::const_iterator i = s.begin();
+ while (true) {
+ // Invariant: i is at the beginning of the next untokenized word.
+ std::string::const_iterator j = find(i, s.end(), '.');
+ push_back(std::string(i, j));
+ if (j == s.end()) return *this;
+ i = j + 1;
+ }
+ return *this;
+}
+
+TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
+ Tokens::operator=(tokens);
+ normalize();
+ return *this;
+}
+
+namespace {
+const std::string hashmark("#");
+const std::string star("*");
+}
+
+void TopicPattern::normalize() {
+ std::string word;
+ Tokens::iterator i = begin();
+ while (i != end()) {
+ if (*i == hashmark) {
+ ++i;
+ while (i != end()) {
+ // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
+ if (*i == star) { // Move * before #.
+ std::swap(*i, *(i-1));
+ ++i;
+ } else if (*i == hashmark) {
+ erase(i); // Remove extra #
+ } else {
+ break;
+ }
+ }
+ } else {
+ i ++;
+ }
+ }
+}
+
+
+namespace {
+// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
+// Need StringRef class that operates on a string in place witout copy.
+// Should be applied everywhere strings are extracted from frames.
+//
+bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
+{
+ // Invariant: [pattern_begin..p) matches [target_begin..t)
+ Tokens::const_iterator p = pattern_begin;
+ Tokens::const_iterator t = target_begin;
+ while (p != pattern_end && t != target_end)
+ {
+ if (*p == star || *p == *t) {
+ ++p, ++t;
+ } else if (*p == hashmark) {
+ ++p;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ while (t != target_end) {
+ ++t;
+ if (do_match(p, pattern_end, t, target_end)) return true;
+ }
+ return false;
+ } else {
+ return false;
+ }
+ }
+ while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
+ return t == target_end && p == pattern_end;
+}
+}
+
+bool TopicPattern::match(const Tokens& target) const
+{
+ return do_match(begin(), end(), target.begin(), target.end());
+}
+
+TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){
+ Monitor::ScopedLock l(lock);
+ TopicPattern routingPattern(routingKey);
+ bindings[routingPattern].push_back(queue);
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
+ Monitor::ScopedLock l(lock);
+ BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
+ Queue::vector& qv(bi->second);
+ if (bi == bindings.end()) return;
+ Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
+ if(q == qv.end()) return;
+ qv.erase(q);
+ if(qv.empty()) bindings.erase(bi);
+}
+
+
+void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ Monitor::ScopedLock l(lock);
+ for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first.match(routingKey)) {
+ Queue::vector& qv(i->second);
+ for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
+ msg.deliverTo(*j);
+ }
+ }
+ }
+}
+
+TopicExchange::~TopicExchange() {}
+
+const std::string TopicExchange::typeName("topic");
+
+
diff --git a/qpid/cpp/lib/broker/TopicExchange.h b/qpid/cpp/lib/broker/TopicExchange.h
new file mode 100644
index 0000000000..fa0c86863a
--- /dev/null
+++ b/qpid/cpp/lib/broker/TopicExchange.h
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include <BrokerExchange.h>
+#include <FieldTable.h>
+#include <BrokerMessage.h>
+#include <sys/Monitor.h>
+#include <BrokerQueue.h>
+
+namespace qpid {
+namespace broker {
+
+/** A vector of string tokens */
+class Tokens : public std::vector<std::string> {
+ public:
+ Tokens() {};
+ // Default copy, assign, dtor are sufficient.
+
+ /** Tokenize s, provides automatic conversion of string to Tokens */
+ Tokens(const std::string& s) { operator=(s); }
+ /** Tokenizing assignment operator s */
+ Tokens & operator=(const std::string& s);
+
+ private:
+ size_t hash;
+};
+
+
+/**
+ * Tokens that have been normalized as a pattern and can be matched
+ * with topic Tokens. Normalized meands all sequences of mixed * and
+ * # are reduced to a series of * followed by at most one #.
+ */
+class TopicPattern : public Tokens
+{
+ public:
+ TopicPattern() {}
+ // Default copy, assign, dtor are sufficient.
+ TopicPattern(const Tokens& tokens) { operator=(tokens); }
+ TopicPattern(const std::string& str) { operator=(str); }
+ TopicPattern& operator=(const Tokens&);
+ TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
+
+ /** Match a topic */
+ bool match(const std::string& topic) { return match(Tokens(topic)); }
+ bool match(const Tokens& topic) const;
+
+ private:
+ void normalize();
+};
+
+class TopicExchange : public virtual Exchange{
+ typedef std::map<TopicPattern, Queue::vector> BindingMap;
+ BindingMap bindings;
+ qpid::sys::Mutex lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
+
+ virtual std::string getType(){ return typeName; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+};
+
+
+
+}
+}
+
+#endif
diff --git a/qpid/cpp/lib/broker/TransactionalStore.h b/qpid/cpp/lib/broker/TransactionalStore.h
new file mode 100644
index 0000000000..17bca3878a
--- /dev/null
+++ b/qpid/cpp/lib/broker/TransactionalStore.h
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+#include <memory>
+
+namespace qpid {
+ namespace broker {
+ struct InvalidTransactionContextException : public std::exception {};
+
+ class TransactionContext{
+ public:
+ virtual ~TransactionContext(){}
+ };
+
+ class TransactionalStore{
+ public:
+ virtual std::auto_ptr<TransactionContext> begin() = 0;
+ virtual void commit(TransactionContext*) = 0;
+ virtual void abort(TransactionContext*) = 0;
+
+ virtual ~TransactionalStore(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/TxAck.cpp b/qpid/cpp/lib/broker/TxAck.cpp
new file mode 100644
index 0000000000..b5211158f3
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxAck.cpp
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxAck.h>
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) :
+ acked(_acked), unacked(_unacked), xid(_xid){
+
+}
+
+bool TxAck::prepare(TransactionContext* ctxt) throw(){
+ try{
+ //dequeue all acked messages from their queues
+ for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
+ if (i->coveredBy(&acked)) {
+ i->discard(ctxt, xid);
+ }
+ }
+ return true;
+ }catch(...){
+ std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxAck::commit() throw(){
+ //remove all acked records from the list
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
+}
+
+void TxAck::rollback() throw(){
+}
diff --git a/qpid/cpp/lib/broker/TxAck.h b/qpid/cpp/lib/broker/TxAck.h
new file mode 100644
index 0000000000..88c321c445
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxAck.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxAck_
+#define _TxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <AccumulatedAck.h>
+#include <DeliveryRecord.h>
+#include <TxOp.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Defines the transactional behaviour for acks received by a
+ * transactional channel.
+ */
+ class TxAck : public TxOp{
+ AccumulatedAck& acked;
+ std::list<DeliveryRecord>& unacked;
+ const std::string* const xid;
+
+ public:
+ /**
+ * @param acked a representation of the accumulation of
+ * acks received
+ * @param unacked the record of delivered messages
+ */
+ TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~TxAck(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/TxBuffer.cpp b/qpid/cpp/lib/broker/TxBuffer.cpp
new file mode 100644
index 0000000000..acd3283bb7
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxBuffer.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxBuffer.h>
+
+using std::mem_fun;
+using namespace qpid::broker;
+
+bool TxBuffer::prepare(TransactionalStore* const store)
+{
+ std::auto_ptr<TransactionContext> ctxt;
+ if(store) ctxt = store->begin();
+ for(op_iterator i = ops.begin(); i < ops.end(); i++){
+ if(!(*i)->prepare(ctxt.get())){
+ if(store) store->abort(ctxt.get());
+ return false;
+ }
+ }
+ if(store) store->commit(ctxt.get());
+ return true;
+}
+
+void TxBuffer::commit()
+{
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
+ ops.clear();
+}
+
+void TxBuffer::rollback()
+{
+ for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
+ ops.clear();
+}
+
+void TxBuffer::enlist(TxOp* const op)
+{
+ ops.push_back(op);
+}
diff --git a/qpid/cpp/lib/broker/TxBuffer.h b/qpid/cpp/lib/broker/TxBuffer.h
new file mode 100644
index 0000000000..2d9a2a3679
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxBuffer.h
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxBuffer_
+#define _TxBuffer_
+
+#include <algorithm>
+#include <functional>
+#include <vector>
+#include <TransactionalStore.h>
+#include <TxOp.h>
+
+/**
+ * Represents a single transaction. As such, an instance of this class
+ * will hold a list of operations representing the workload of the
+ * transaction. This work can be committed or rolled back. Committing
+ * is a two-stage process: first all the operations should be
+ * prepared, then if that succeeds they can be committed.
+ *
+ * In the 2pc case, a successful prepare may be followed by either a
+ * commit or a rollback.
+ *
+ * Atomicity of prepare is ensured by using a lower level
+ * transactional facility. This saves explicitly rolling back all the
+ * successfully prepared ops when one of them fails. i.e. we do not
+ * use 2pc internally, we instead ensure that prepare is atomic at a
+ * lower level. This makes individual prepare operations easier to
+ * code.
+ *
+ * Transactions on a messaging broker effect three types of 'action':
+ * (1) updates to persistent storage (2) updates to transient storage
+ * or cached data (3) network writes.
+ *
+ * Of these, (1) should always occur atomically during prepare to
+ * ensure that if the broker crashes while a transaction is being
+ * completed the persistent state (which is all that then remains) is
+ * consistent. (3) can only be done on commit, after a successful
+ * prepare. There is a little more flexibility with (2) but any
+ * changes made during prepare should be subject to the control of the
+ * TransactionalStore in use.
+ */
+namespace qpid {
+ namespace broker {
+ class TxBuffer{
+ typedef std::vector<TxOp*>::iterator op_iterator;
+ std::vector<TxOp*> ops;
+ public:
+ /**
+ * Requests that all ops are prepared. This should
+ * primarily involve making sure that a persistent record
+ * of the operations is stored where necessary.
+ *
+ * All ops will be prepared under a transaction on the
+ * specified store. If any operation fails on prepare,
+ * this transaction will be rolled back.
+ *
+ * Once prepared, a transaction can be committed (or in
+ * the 2pc case, rolled back).
+ *
+ * @returns true if all the operations prepared
+ * successfully, false if not.
+ */
+ bool prepare(TransactionalStore* const store);
+ /**
+ * Signals that the ops all prepared all completed
+ * successfully and can now commit, i.e. the operation can
+ * now be fully carried out.
+ *
+ * Should only be called after a call to prepare() returns
+ * true.
+ */
+ void commit();
+ /**
+ * Rolls back all the operations.
+ *
+ * Should only be called either after a call to prepare()
+ * returns true (2pc) or instead of a prepare call
+ * ('server-local')
+ */
+ void rollback();
+ /**
+ * Adds an operation to the transaction.
+ */
+ void enlist(TxOp* const op);
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/TxOp.h b/qpid/cpp/lib/broker/TxOp.h
new file mode 100644
index 0000000000..abba84a8e8
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxOp.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxOp_
+#define _TxOp_
+
+#include <TransactionalStore.h>
+
+namespace qpid {
+ namespace broker {
+ class TxOp{
+ public:
+ virtual bool prepare(TransactionContext*) throw() = 0;
+ virtual void commit() throw() = 0;
+ virtual void rollback() throw() = 0;
+ virtual ~TxOp(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/qpid/cpp/lib/broker/TxPublish.cpp b/qpid/cpp/lib/broker/TxPublish.cpp
new file mode 100644
index 0000000000..49dd8abd89
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxPublish.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxPublish.h>
+
+using namespace qpid::broker;
+
+TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {}
+
+bool TxPublish::prepare(TransactionContext* ctxt) throw(){
+ try{
+ for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid));
+ return true;
+ }catch(...){
+ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
+ return false;
+ }
+}
+
+void TxPublish::commit() throw(){
+ for_each(queues.begin(), queues.end(), Commit(msg));
+}
+
+void TxPublish::rollback() throw(){
+}
+
+void TxPublish::deliverTo(Queue::shared_ptr& queue){
+ queues.push_back(queue);
+}
+
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid)
+ : ctxt(_ctxt), msg(_msg), xid(_xid){}
+
+void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
+ queue->enqueue(ctxt, msg, xid);
+}
+
+TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+
+void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
+ queue->process(msg);
+}
+
diff --git a/qpid/cpp/lib/broker/TxPublish.h b/qpid/cpp/lib/broker/TxPublish.h
new file mode 100644
index 0000000000..75f201257e
--- /dev/null
+++ b/qpid/cpp/lib/broker/TxPublish.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TxPublish_
+#define _TxPublish_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include <Deliverable.h>
+#include <BrokerMessage.h>
+#include <MessageStore.h>
+#include <BrokerQueue.h>
+#include <TxOp.h>
+
+namespace qpid {
+ namespace broker {
+ /**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
+ class TxPublish : public TxOp, public Deliverable{
+ class Prepare{
+ TransactionContext* ctxt;
+ Message::shared_ptr& msg;
+ const std::string* const xid;
+ public:
+ Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ class Commit{
+ Message::shared_ptr& msg;
+ public:
+ Commit(Message::shared_ptr& msg);
+ void operator()(Queue::shared_ptr& queue);
+ };
+
+ Message::shared_ptr msg;
+ const std::string* const xid;
+ std::list<Queue::shared_ptr> queues;
+
+ public:
+ TxPublish(Message::shared_ptr msg, const std::string* const xid = 0);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+
+ virtual void deliverTo(Queue::shared_ptr& queue);
+
+ virtual ~TxPublish(){}
+ };
+ }
+}
+
+
+#endif