From 00b761b3b6d80ee2bb3e538face881748efb2b09 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 25 Sep 2007 18:16:02 +0000 Subject: Renamed the following files for consistency: broker/BrokerExchange.cpp -> Exchange.cpp broker/BrokerExchange.h -> Exchange.h broker/BrokerQueue.cpp -> Queue.cpp broker/BrokerQueue.h -> Queue.h client/ClientChannel.cpp -> Channel.cpp client/ClientChannel.h -> Channel.h client/ClientConnection.cpp -> Connection.cpp client/ClientExchange.cpp -> Exchange.cpp client/ClientExchange.h -> Exchange.h client/ClientMessage.h -> Message.h client/ClientQueue.cpp -> Queue.cpp client/ClientQueue.h -> Queue.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@579340 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 24 +- cpp/src/qpid/broker/BrokerExchange.cpp | 60 ---- cpp/src/qpid/broker/BrokerExchange.h | 83 ------ cpp/src/qpid/broker/BrokerQueue.cpp | 434 ---------------------------- cpp/src/qpid/broker/BrokerQueue.h | 192 ------------ cpp/src/qpid/broker/Deliverable.h | 2 +- cpp/src/qpid/broker/DeliverableMessage.h | 2 +- cpp/src/qpid/broker/DeliveryRecord.cpp | 2 +- cpp/src/qpid/broker/DeliveryRecord.h | 2 +- cpp/src/qpid/broker/DirectExchange.h | 4 +- cpp/src/qpid/broker/Exchange.cpp | 60 ++++ cpp/src/qpid/broker/Exchange.h | 83 ++++++ cpp/src/qpid/broker/ExchangeRegistry.h | 2 +- cpp/src/qpid/broker/FanOutExchange.h | 4 +- cpp/src/qpid/broker/HeadersExchange.h | 4 +- cpp/src/qpid/broker/MessageDelivery.cpp | 2 +- cpp/src/qpid/broker/MessageStoreModule.h | 2 +- cpp/src/qpid/broker/NullMessageStore.h | 2 +- cpp/src/qpid/broker/Queue.cpp | 434 ++++++++++++++++++++++++++++ cpp/src/qpid/broker/Queue.h | 192 ++++++++++++ cpp/src/qpid/broker/QueueRegistry.h | 2 +- cpp/src/qpid/broker/RecoveredDequeue.h | 2 +- cpp/src/qpid/broker/RecoveredEnqueue.h | 2 +- cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 2 +- cpp/src/qpid/broker/SemanticState.cpp | 2 +- cpp/src/qpid/broker/TopicExchange.h | 4 +- cpp/src/qpid/broker/TxPublish.h | 2 +- cpp/src/qpid/client/Channel.cpp | 271 +++++++++++++++++ cpp/src/qpid/client/Channel.h | 316 ++++++++++++++++++++ cpp/src/qpid/client/ClientChannel.cpp | 271 ----------------- cpp/src/qpid/client/ClientChannel.h | 316 -------------------- cpp/src/qpid/client/ClientConnection.cpp | 86 ------ cpp/src/qpid/client/ClientExchange.cpp | 34 --- cpp/src/qpid/client/ClientExchange.h | 106 ------- cpp/src/qpid/client/ClientMessage.h | 86 ------ cpp/src/qpid/client/ClientQueue.cpp | 58 ---- cpp/src/qpid/client/ClientQueue.h | 103 ------- cpp/src/qpid/client/Connection.cpp | 86 ++++++ cpp/src/qpid/client/Connection.h | 2 +- cpp/src/qpid/client/Dispatcher.cpp | 2 +- cpp/src/qpid/client/Exchange.cpp | 34 +++ cpp/src/qpid/client/Exchange.h | 106 +++++++ cpp/src/qpid/client/Message.h | 86 ++++++ cpp/src/qpid/client/MessageListener.h | 2 +- cpp/src/qpid/client/Queue.cpp | 58 ++++ cpp/src/qpid/client/Queue.h | 103 +++++++ cpp/src/tests/BasicP2PTest.h | 4 +- cpp/src/tests/BasicPubSubTest.h | 4 +- cpp/src/tests/BrokerChannelTest.cpp | 2 +- cpp/src/tests/ClientChannelTest.cpp | 28 +- cpp/src/tests/ExchangeTest.cpp | 4 +- cpp/src/tests/FramingTest.cpp | 4 +- cpp/src/tests/QueueTest.cpp | 2 +- cpp/src/tests/SimpleTestCaseBase.h | 4 +- cpp/src/tests/TestCase.h | 2 +- cpp/src/tests/client_test.cpp | 4 +- cpp/src/tests/echo_service.cpp | 6 +- cpp/src/tests/exception_test.cpp | 4 +- cpp/src/tests/interop_runner.cpp | 6 +- cpp/src/tests/perftest.cpp | 6 +- cpp/src/tests/topic_listener.cpp | 6 +- cpp/src/tests/topic_publisher.cpp | 6 +- 62 files changed, 1912 insertions(+), 1912 deletions(-) delete mode 100644 cpp/src/qpid/broker/BrokerExchange.cpp delete mode 100644 cpp/src/qpid/broker/BrokerExchange.h delete mode 100644 cpp/src/qpid/broker/BrokerQueue.cpp delete mode 100644 cpp/src/qpid/broker/BrokerQueue.h create mode 100644 cpp/src/qpid/broker/Exchange.cpp create mode 100644 cpp/src/qpid/broker/Exchange.h create mode 100644 cpp/src/qpid/broker/Queue.cpp create mode 100644 cpp/src/qpid/broker/Queue.h create mode 100644 cpp/src/qpid/client/Channel.cpp create mode 100644 cpp/src/qpid/client/Channel.h delete mode 100644 cpp/src/qpid/client/ClientChannel.cpp delete mode 100644 cpp/src/qpid/client/ClientChannel.h delete mode 100644 cpp/src/qpid/client/ClientConnection.cpp delete mode 100644 cpp/src/qpid/client/ClientExchange.cpp delete mode 100644 cpp/src/qpid/client/ClientExchange.h delete mode 100644 cpp/src/qpid/client/ClientMessage.h delete mode 100644 cpp/src/qpid/client/ClientQueue.cpp delete mode 100644 cpp/src/qpid/client/ClientQueue.h create mode 100644 cpp/src/qpid/client/Connection.cpp create mode 100644 cpp/src/qpid/client/Exchange.cpp create mode 100644 cpp/src/qpid/client/Exchange.h create mode 100644 cpp/src/qpid/client/Message.h create mode 100644 cpp/src/qpid/client/Queue.cpp create mode 100644 cpp/src/qpid/client/Queue.h (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index e70b4d5a1b..ae3eb4c403 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -140,8 +140,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Broker.cpp \ qpid/broker/BrokerAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ - qpid/broker/BrokerExchange.cpp \ - qpid/broker/BrokerQueue.cpp \ + qpid/broker/Exchange.cpp \ + qpid/broker/Queue.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -191,10 +191,10 @@ libqpidbroker_la_SOURCES = \ libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ $(rgen_client_cpp) \ - qpid/client/ClientConnection.cpp \ - qpid/client/ClientChannel.cpp \ - qpid/client/ClientExchange.cpp \ - qpid/client/ClientQueue.cpp \ + qpid/client/Connection.cpp \ + qpid/client/Channel.cpp \ + qpid/client/Exchange.cpp \ + qpid/client/Queue.cpp \ qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ qpid/client/Demux.cpp \ @@ -225,8 +225,8 @@ nobase_include_HEADERS = \ qpid/shared_ptr.h \ qpid/broker/Broker.h \ qpid/broker/BrokerAdapter.h \ - qpid/broker/BrokerExchange.h \ - qpid/broker/BrokerQueue.h \ + qpid/broker/Exchange.h \ + qpid/broker/Queue.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionFactory.h \ @@ -288,10 +288,10 @@ nobase_include_HEADERS = \ qpid/client/AckMode.h \ qpid/client/BlockingQueue.h \ qpid/client/ChainableFrameHandler.h \ - qpid/client/ClientChannel.h \ - qpid/client/ClientExchange.h \ - qpid/client/ClientMessage.h \ - qpid/client/ClientQueue.h \ + qpid/client/Channel.h \ + qpid/client/Exchange.h \ + qpid/client/Message.h \ + qpid/client/Queue.h \ qpid/client/Completion.h \ qpid/client/CompletionTracker.h \ qpid/client/Connection.h \ diff --git a/cpp/src/qpid/broker/BrokerExchange.cpp b/cpp/src/qpid/broker/BrokerExchange.cpp deleted file mode 100644 index 4eaf40dbc8..0000000000 --- a/cpp/src/qpid/broker/BrokerExchange.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "BrokerExchange.h" -#include "ExchangeRegistry.h" - -using namespace qpid::broker; -using qpid::framing::Buffer; -using qpid::framing::FieldTable; - -Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) -{ - string name; - string type; - FieldTable args; - - buffer.getShortString(name); - bool durable(buffer.getOctet()); - buffer.getShortString(type); - buffer.getFieldTable(args); - - return exchanges.declare(name, type, durable, args).first; -} - -void Exchange::encode(Buffer& buffer) const -{ - buffer.putShortString(name); - buffer.putOctet(durable); - buffer.putShortString(getType()); - buffer.putFieldTable(args); -} - -uint32_t Exchange::encodedSize() const -{ - return name.size() + 1/*short string size*/ - + 1 /*durable*/ - + getType().size() + 1/*short string size*/ - + args.size(); -} - - - diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h deleted file mode 100644 index c3dd7b998d..0000000000 --- a/cpp/src/qpid/broker/BrokerExchange.h +++ /dev/null @@ -1,83 +0,0 @@ -#ifndef _broker_BrokerExchange_h -#define _broker_BrokerExchange_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include "Deliverable.h" -#include "BrokerQueue.h" -#include "MessageStore.h" -#include "PersistableExchange.h" -#include "qpid/framing/FieldTable.h" - -namespace qpid { - namespace broker { - using std::string; - class ExchangeRegistry; - - class Exchange : public PersistableExchange{ - private: - const string name; - const bool durable; - qpid::framing::FieldTable args; - boost::shared_ptr alternate; - uint32_t alternateUsers; - mutable uint64_t persistenceId; - - public: - typedef boost::shared_ptr shared_ptr; - - explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){} - Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} - virtual ~Exchange(){} - - const string& getName() const { return name; } - bool isDurable() { return durable; } - qpid::framing::FieldTable& getArgs() { return args; } - - Exchange::shared_ptr getAlternate() { return alternate; } - void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; } - void incAlternateUsers() { alternateUsers++; } - void decAlternateUsers() { alternateUsers--; } - bool inUseAsAlternate() { return alternateUsers > 0; } - - virtual string getType() const = 0; - virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args) = 0; - virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - - //PersistableExchange: - void setPersistenceId(uint64_t id) const { persistenceId = id; } - uint64_t getPersistenceId() const { return persistenceId; } - uint32_t encodedSize() const; - void encode(framing::Buffer& buffer) const; - - static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); - - }; - } -} - - -#endif /*!_broker_BrokerExchange_h*/ diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp deleted file mode 100644 index 74ba4f24ed..0000000000 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ /dev/null @@ -1,434 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include - -#include "qpid/log/Statement.h" -#include "BrokerQueue.h" -#include "BrokerExchange.h" -#include "DeliverableMessage.h" -#include "MessageStore.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Time.h" -#include -#include -#include "QueueRegistry.h" - - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; -using boost::format; - -Queue::Queue(const string& _name, bool _autodelete, - MessageStore* const _store, - const ConnectionToken* const _owner) : - - name(_name), - autodelete(_autodelete), - store(_store), - owner(_owner), - next(0), - exclusive(0), - persistenceId(0), - serializer(false), - dispatchCallback(*this) -{ -} - -Queue::~Queue(){} - -void Queue::notifyDurableIOComplete() -{ - // signal SemanticHander to ack completed dequeues - // then dispatch to ack... - serializer.execute(dispatchCallback); -} - - -void Queue::deliver(Message::shared_ptr& msg){ - if (msg->isImmediate() && getConsumerCount() == 0) { - if (alternateExchange) { - DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); - } - } else { - - - // if no store then mark as enqueued - if (!enqueue(0, msg)){ - push(msg); - msg->enqueueComplete(); - }else { - push(msg); - } - serializer.execute(dispatchCallback); - } -} - - -void Queue::recover(Message::shared_ptr& msg){ - push(msg); - msg->enqueueComplete(); // mark the message as enqueued - if (store && !msg->isContentLoaded()) { - //content has not been loaded, need to ensure that lazy loading mode is set: - //TODO: find a nicer way to do this - msg->releaseContent(store); - } -} - -void Queue::process(Message::shared_ptr& msg){ - - push(msg); - serializer.execute(dispatchCallback); - -} - -void Queue::requeue(const QueuedMessage& msg){ - { - Mutex::ScopedLock locker(messageLock); - msg.payload->enqueueComplete(); // mark the message as enqueued - messages.push_front(msg); - } - serializer.execute(dispatchCallback); - -} - -bool Queue::acquire(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { - if (i->position == msg.position) { - messages.erase(i); - return true; - } - } - return false; -} - -void Queue::requestDispatch(Consumer* c, bool sync){ - if (!c || c->preAcquires()) { - if (sync) { - Mutex::ScopedLock locker(messageLock); - dispatch(); - } else { - serializer.execute(dispatchCallback); - } - } else { - //note: this is always done on the callers thread, regardless - // of sync; browsers of large queues should use flow control! - serviceBrowser(c); - } -} - -bool Queue::dispatch(QueuedMessage& msg){ - - - RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... - - if(acquirers.empty()){ - return false; - }else if(exclusive){ - return exclusive->deliver(msg); - }else{ - //deliver to next consumer - next = next % acquirers.size(); - Consumer* c = acquirers[next]; - int start = next; - while(c){ - next++; - if(c->deliver(msg)) { - return true; - } - next = next % acquirers.size(); - c = next == start ? 0 : acquirers[next]; - } - return false; - } -} - - -void Queue::dispatch(){ - QueuedMessage msg; - while(true){ - { - Mutex::ScopedLock locker(messageLock); - if (messages.empty()) break; - msg = messages.front(); - } - if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { - pop(); - } else { - break; - } - } - RWlock::ScopedRlock locker(consumerLock); - for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) { - serviceBrowser(*i); - } -} - -void Queue::serviceBrowser(Consumer* browser) -{ - //This is a poorly performing implementation: - // - // * bad concurrency where browsers exist - // * inefficient for largish queues - // - //The queue needs to be based on a current data structure that - //does not invalidate iterators when modified. Subscribers could - //then use an iterator to continue from where they left off - - Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > browser->position) { - for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { - if (i->position > browser->position) { - if (browser->deliver(*i)) { - browser->position = i->position; - } else { - break; - } - } - } - } -} - -void Queue::consume(Consumer* c, bool requestExclusive){ - RWlock::ScopedWlock locker(consumerLock); - if(exclusive) { - throw ChannelException( - 403, format("Queue '%s' has an exclusive consumer." - " No more consumers allowed.") % getName()); - } - if(requestExclusive) { - if(acquirers.empty() && browsers.empty()) { - exclusive = c; - } else { - throw ChannelException( - 403, format("Queue '%s' already has consumers." - "Exclusive access denied.") % getName()); - } - } - if (c->preAcquires()) { - acquirers.push_back(c); - } else { - browsers.push_back(c); - } -} - -void Queue::cancel(Consumer* c){ - RWlock::ScopedWlock locker(consumerLock); - if (c->preAcquires()) { - cancel(c, acquirers); - } else { - cancel(c, browsers); - } - if(exclusive == c) exclusive = 0; -} - -void Queue::cancel(Consumer* c, Consumers& consumers) -{ - Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); - if (i != consumers.end()) - consumers.erase(i); -} - -QueuedMessage Queue::dequeue(){ - Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - if(!messages.empty()){ - msg = messages.front(); - pop(); - } - return msg; -} - -uint32_t Queue::purge(){ - Mutex::ScopedLock locker(messageLock); - int count = messages.size(); - while(!messages.empty()) pop(); - return count; -} - -void Queue::pop(){ - Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); - messages.pop_front(); -} - -void Queue::push(Message::shared_ptr& msg){ - Mutex::ScopedLock locker(messageLock); - messages.push_back(QueuedMessage(msg, ++sequence)); - if (policy.get()) { - policy->enqueued(msg->contentSize()); - if (policy->limitExceeded()) { - msg->releaseContent(store); - } - } -} - -/** function only provided for unit tests, or code not in critical message path */ -uint32_t Queue::getMessageCount() const{ - Mutex::ScopedLock locker(messageLock); - - uint32_t count =0; - for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - if ( i->payload->isEnqueueComplete() ) count ++; - } - - return count; -} - -uint32_t Queue::getConsumerCount() const{ - RWlock::ScopedRlock locker(consumerLock); - return acquirers.size() + browsers.size(); -} - -bool Queue::canAutoDelete() const{ - RWlock::ScopedRlock locker(consumerLock); - return autodelete && acquirers.empty() && browsers.empty(); -} - -// return true if store exists, -bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) -{ - if (msg->isPersistent() && store) { - msg->enqueueAsync(); //increment to async counter -- for message sent to more than one queue - store->enqueue(ctxt, *msg.get(), *this); - return true; - } - return false; -} - -// return true if store exists, -bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) -{ - if (msg->isPersistent() && store) { - msg->dequeueAsync(); //increment to async counter -- for message sent to more than one queue - store->dequeue(ctxt, *msg.get(), *this); - return true; - } - return false; -} - - -namespace -{ - const std::string qpidMaxSize("qpid.max_size"); - const std::string qpidMaxCount("qpid.max_count"); -} - -void Queue::create(const FieldTable& _settings) -{ - settings = _settings; - //TODO: hold onto settings and persist them as part of encode - // in fact settings should be passed in on construction - if (store) { - store->create(*this); - } - configure(_settings); -} - -void Queue::configure(const FieldTable& _settings) -{ - std::auto_ptr _policy(new QueuePolicy(_settings)); - if (_policy->getMaxCount() || _policy->getMaxSize()) - setPolicy(_policy); -} - -void Queue::destroy() -{ - if (alternateExchange.get()) { - Mutex::ScopedLock locker(messageLock); - while(!messages.empty()){ - DeliverableMessage msg(messages.front().payload); - alternateExchange->route(msg, msg.getMessage().getRoutingKey(), - msg.getMessage().getApplicationHeaders()); - pop(); - } - alternateExchange->decAlternateUsers(); - } - - if (store) { - store->destroy(*this); - } -} - -void Queue::bound(const string& exchange, const string& key, const FieldTable& args) -{ - bindings.add(exchange, key, args); -} - -void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) -{ - bindings.unbind(exchanges, shared_ref); -} - -void Queue::setPolicy(std::auto_ptr _policy) -{ - policy = _policy; -} - -const QueuePolicy* const Queue::getPolicy() -{ - return policy.get(); -} - -uint64_t Queue::getPersistenceId() const -{ - return persistenceId; -} - -void Queue::setPersistenceId(uint64_t _persistenceId) const -{ - persistenceId = _persistenceId; -} - -void Queue::encode(framing::Buffer& buffer) const -{ - buffer.putShortString(name); - buffer.putFieldTable(settings); -} - -uint32_t Queue::encodedSize() const -{ - return name.size() + 1/*short string size octet*/ + settings.size(); -} - -Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer) -{ - string name; - buffer.getShortString(name); - std::pair result = queues.declare(name, true); - buffer.getFieldTable(result.first->settings); - result.first->configure(result.first->settings); - return result.first; -} - - -void Queue::setAlternateExchange(boost::shared_ptr exchange) -{ - alternateExchange = exchange; -} - -boost::shared_ptr Queue::getAlternateExchange() -{ - return alternateExchange; -} diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h deleted file mode 100644 index 4b6070d11c..0000000000 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ /dev/null @@ -1,192 +0,0 @@ -#ifndef _broker_BrokerQueue_h -#define _broker_BrokerQueue_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include -#include -#include "qpid/framing/amqp_types.h" -#include "ConnectionToken.h" -#include "Consumer.h" -#include "Message.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/sys/Serializer.h" -#include "qpid/sys/Monitor.h" -#include "PersistableQueue.h" -#include "QueuePolicy.h" -#include "QueueBindings.h" - -// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to -// enforce ownership of Consumers. - -namespace qpid { - namespace broker { - class MessageStore; - class QueueRegistry; - class TransactionContext; - class Exchange; - - using std::string; - - /** - * The brokers representation of an amqp queue. Messages are - * delivered to a queue from where they can be dispatched to - * registered consumers or be stored until dequeued or until one - * or more consumers registers. - */ - class Queue : public PersistableQueue{ - typedef std::vector Consumers; - typedef std::deque Messages; - - struct DispatchFunctor { - Queue& queue; - DispatchFunctor(Queue& q) : queue(q) {} - void operator()() { queue.dispatch(); } - }; - - const string name; - const bool autodelete; - MessageStore* const store; - const ConnectionToken* const owner; - Consumers acquirers; - Consumers browsers; - Messages messages; - int next; - mutable qpid::sys::RWlock consumerLock; - mutable qpid::sys::Mutex messageLock; - Consumer* exclusive; - mutable uint64_t persistenceId; - framing::FieldTable settings; - std::auto_ptr policy; - QueueBindings bindings; - boost::shared_ptr alternateExchange; - qpid::sys::Serializer serializer; - DispatchFunctor dispatchCallback; - framing::SequenceNumber sequence; - - void pop(); - void push(Message::shared_ptr& msg); - bool dispatch(QueuedMessage& msg); - void setPolicy(std::auto_ptr policy); - /** - * only called by serilizer - */ - void dispatch(); - void cancel(Consumer* c, Consumers& set); - void serviceBrowser(Consumer* c); - - protected: - /** - * Call back from store - */ - virtual void notifyDurableIOComplete(); - - public: - - typedef boost::shared_ptr shared_ptr; - - typedef std::vector vector; - - Queue(const string& name, bool autodelete = false, - MessageStore* const store = 0, - const ConnectionToken* const owner = 0); - ~Queue(); - - void create(const qpid::framing::FieldTable& settings); - void configure(const qpid::framing::FieldTable& settings); - void destroy(); - void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); - void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); - - bool acquire(const QueuedMessage& msg); - - /** - * Delivers a message to the queue. Will record it as - * enqueued if persistent then process it. - */ - void deliver(Message::shared_ptr& msg); - /** - * Dispatches the messages immediately to a consumer if - * one is available or stores it for later if not. - */ - void process(Message::shared_ptr& msg); - /** - * Returns a message to the in-memory queue (due to lack - * of acknowledegement from a receiver). If a consumer is - * available it will be dispatched immediately, else it - * will be returned to the front of the queue. - */ - void requeue(const QueuedMessage& msg); - /** - * Used during recovery to add stored messages back to the queue - */ - void recover(Message::shared_ptr& msg); - /** - * Request dispatch any queued messages providing there are - * consumers for them. Only one thread can be dispatching - * at any time, so this call schedules the despatch based on - * the serilizer policy. - */ - void requestDispatch(Consumer* c = 0, bool sync = false); - void consume(Consumer* c, bool exclusive = false); - void cancel(Consumer* c); - uint32_t purge(); - uint32_t getMessageCount() const; - uint32_t getConsumerCount() const; - inline const string& getName() const { return name; } - inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } - inline bool hasExclusiveConsumer() const { return exclusive; } - inline bool hasExclusiveOwner() const { return owner != 0; } - inline bool isDurable() const { return store != 0; } - inline const framing::FieldTable& getSettings() const { return settings; } - inline bool isAutoDelete() const { return autodelete; } - bool canAutoDelete() const; - - bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg); - /** - * dequeue from store (only done once messages is acknowledged) - */ - bool dequeue(TransactionContext* ctxt, Message::shared_ptr& msg); - /** - * dequeues from memory only - */ - QueuedMessage dequeue(); - - const QueuePolicy* const getPolicy(); - - void setAlternateExchange(boost::shared_ptr exchange); - boost::shared_ptr getAlternateExchange(); - - //PersistableQueue support: - uint64_t getPersistenceId() const; - void setPersistenceId(uint64_t persistenceId) const; - void encode(framing::Buffer& buffer) const; - uint32_t encodedSize() const; - - static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer); - }; - } -} - - -#endif /*!_broker_BrokerQueue_h*/ diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index cd1dbaa85d..bdea550159 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -21,7 +21,7 @@ #ifndef _Deliverable_ #define _Deliverable_ -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index 9719d972fc..07bca40461 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -22,7 +22,7 @@ #define _DeliverableMessage_ #include "Deliverable.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "Message.h" namespace qpid { diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 36e6c22f88..f9218655a4 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -21,7 +21,7 @@ #include "DeliveryRecord.h" #include "DeliverableMessage.h" #include "SemanticState.h" -#include "BrokerExchange.h" +#include "Exchange.h" #include "qpid/log/Statement.h" using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 3c833fcaa8..f2c343e27a 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -26,7 +26,7 @@ #include #include #include "qpid/framing/AccumulatedAck.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "Consumer.h" #include "DeliveryId.h" #include "Message.h" diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 7b20bd610c..243f51d6a8 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -23,10 +23,10 @@ #include #include -#include "BrokerExchange.h" +#include "Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp new file mode 100644 index 0000000000..04407eb774 --- /dev/null +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Exchange.h" +#include "ExchangeRegistry.h" + +using namespace qpid::broker; +using qpid::framing::Buffer; +using qpid::framing::FieldTable; + +Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) +{ + string name; + string type; + FieldTable args; + + buffer.getShortString(name); + bool durable(buffer.getOctet()); + buffer.getShortString(type); + buffer.getFieldTable(args); + + return exchanges.declare(name, type, durable, args).first; +} + +void Exchange::encode(Buffer& buffer) const +{ + buffer.putShortString(name); + buffer.putOctet(durable); + buffer.putShortString(getType()); + buffer.putFieldTable(args); +} + +uint32_t Exchange::encodedSize() const +{ + return name.size() + 1/*short string size*/ + + 1 /*durable*/ + + getType().size() + 1/*short string size*/ + + args.size(); +} + + + diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h new file mode 100644 index 0000000000..5febca0ae9 --- /dev/null +++ b/cpp/src/qpid/broker/Exchange.h @@ -0,0 +1,83 @@ +#ifndef _broker_Exchange_h +#define _broker_Exchange_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include "Deliverable.h" +#include "Queue.h" +#include "MessageStore.h" +#include "PersistableExchange.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { + namespace broker { + using std::string; + class ExchangeRegistry; + + class Exchange : public PersistableExchange{ + private: + const string name; + const bool durable; + qpid::framing::FieldTable args; + boost::shared_ptr alternate; + uint32_t alternateUsers; + mutable uint64_t persistenceId; + + public: + typedef boost::shared_ptr shared_ptr; + + explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){} + Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} + virtual ~Exchange(){} + + const string& getName() const { return name; } + bool isDurable() { return durable; } + qpid::framing::FieldTable& getArgs() { return args; } + + Exchange::shared_ptr getAlternate() { return alternate; } + void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; } + void incAlternateUsers() { alternateUsers++; } + void decAlternateUsers() { alternateUsers--; } + bool inUseAsAlternate() { return alternateUsers > 0; } + + virtual string getType() const = 0; + virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args) = 0; + virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; + + //PersistableExchange: + void setPersistenceId(uint64_t id) const { persistenceId = id; } + uint64_t getPersistenceId() const { return persistenceId; } + uint32_t encodedSize() const; + void encode(framing::Buffer& buffer) const; + + static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); + + }; + } +} + + +#endif /*!_broker_Exchange.cpp_h*/ diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index 5505a4074a..f7f29ff0ea 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -23,7 +23,7 @@ */ #include -#include "BrokerExchange.h" +#include "Exchange.h" #include "MessageStore.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 070e438bcc..625afc8cce 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -23,10 +23,10 @@ #include #include -#include "BrokerExchange.h" +#include "Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 48d115c1ec..f7abf3514b 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -22,10 +22,10 @@ #define _HeadersExchange_ #include -#include "BrokerExchange.h" +#include "Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index edacd7a1c1..2eea97ced0 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -22,7 +22,7 @@ #include "DeliveryToken.h" #include "Message.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/BasicGetOkBody.h" diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 271da92b1e..46dbd35ec9 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -22,7 +22,7 @@ #define _MessageStoreModule_ #include "MessageStore.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "RecoveryManager.h" #include "qpid/sys/Module.h" diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index c27a0ad53a..5698d8a16d 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -23,7 +23,7 @@ #include #include "MessageStore.h" -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp new file mode 100644 index 0000000000..95fff2e789 --- /dev/null +++ b/cpp/src/qpid/broker/Queue.cpp @@ -0,0 +1,434 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include "qpid/log/Statement.h" +#include "Queue.h" +#include "Exchange.h" +#include "DeliverableMessage.h" +#include "MessageStore.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" +#include +#include +#include "QueueRegistry.h" + + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; +using boost::format; + +Queue::Queue(const string& _name, bool _autodelete, + MessageStore* const _store, + const ConnectionToken* const _owner) : + + name(_name), + autodelete(_autodelete), + store(_store), + owner(_owner), + next(0), + exclusive(0), + persistenceId(0), + serializer(false), + dispatchCallback(*this) +{ +} + +Queue::~Queue(){} + +void Queue::notifyDurableIOComplete() +{ + // signal SemanticHander to ack completed dequeues + // then dispatch to ack... + serializer.execute(dispatchCallback); +} + + +void Queue::deliver(Message::shared_ptr& msg){ + if (msg->isImmediate() && getConsumerCount() == 0) { + if (alternateExchange) { + DeliverableMessage deliverable(msg); + alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); + } + } else { + + + // if no store then mark as enqueued + if (!enqueue(0, msg)){ + push(msg); + msg->enqueueComplete(); + }else { + push(msg); + } + serializer.execute(dispatchCallback); + } +} + + +void Queue::recover(Message::shared_ptr& msg){ + push(msg); + msg->enqueueComplete(); // mark the message as enqueued + if (store && !msg->isContentLoaded()) { + //content has not been loaded, need to ensure that lazy loading mode is set: + //TODO: find a nicer way to do this + msg->releaseContent(store); + } +} + +void Queue::process(Message::shared_ptr& msg){ + + push(msg); + serializer.execute(dispatchCallback); + +} + +void Queue::requeue(const QueuedMessage& msg){ + { + Mutex::ScopedLock locker(messageLock); + msg.payload->enqueueComplete(); // mark the message as enqueued + messages.push_front(msg); + } + serializer.execute(dispatchCallback); + +} + +bool Queue::acquire(const QueuedMessage& msg) { + Mutex::ScopedLock locker(messageLock); + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position == msg.position) { + messages.erase(i); + return true; + } + } + return false; +} + +void Queue::requestDispatch(Consumer* c, bool sync){ + if (!c || c->preAcquires()) { + if (sync) { + Mutex::ScopedLock locker(messageLock); + dispatch(); + } else { + serializer.execute(dispatchCallback); + } + } else { + //note: this is always done on the callers thread, regardless + // of sync; browsers of large queues should use flow control! + serviceBrowser(c); + } +} + +bool Queue::dispatch(QueuedMessage& msg){ + + + RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... + + if(acquirers.empty()){ + return false; + }else if(exclusive){ + return exclusive->deliver(msg); + }else{ + //deliver to next consumer + next = next % acquirers.size(); + Consumer* c = acquirers[next]; + int start = next; + while(c){ + next++; + if(c->deliver(msg)) { + return true; + } + next = next % acquirers.size(); + c = next == start ? 0 : acquirers[next]; + } + return false; + } +} + + +void Queue::dispatch(){ + QueuedMessage msg; + while(true){ + { + Mutex::ScopedLock locker(messageLock); + if (messages.empty()) break; + msg = messages.front(); + } + if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { + pop(); + } else { + break; + } + } + RWlock::ScopedRlock locker(consumerLock); + for (Consumers::iterator i = browsers.begin(); i != browsers.end(); i++) { + serviceBrowser(*i); + } +} + +void Queue::serviceBrowser(Consumer* browser) +{ + //This is a poorly performing implementation: + // + // * bad concurrency where browsers exist + // * inefficient for largish queues + // + //The queue needs to be based on a current data structure that + //does not invalidate iterators when modified. Subscribers could + //then use an iterator to continue from where they left off + + Mutex::ScopedLock locker(messageLock); + if (!messages.empty() && messages.back().position > browser->position) { + for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { + if (i->position > browser->position) { + if (browser->deliver(*i)) { + browser->position = i->position; + } else { + break; + } + } + } + } +} + +void Queue::consume(Consumer* c, bool requestExclusive){ + RWlock::ScopedWlock locker(consumerLock); + if(exclusive) { + throw ChannelException( + 403, format("Queue '%s' has an exclusive consumer." + " No more consumers allowed.") % getName()); + } + if(requestExclusive) { + if(acquirers.empty() && browsers.empty()) { + exclusive = c; + } else { + throw ChannelException( + 403, format("Queue '%s' already has consumers." + "Exclusive access denied.") % getName()); + } + } + if (c->preAcquires()) { + acquirers.push_back(c); + } else { + browsers.push_back(c); + } +} + +void Queue::cancel(Consumer* c){ + RWlock::ScopedWlock locker(consumerLock); + if (c->preAcquires()) { + cancel(c, acquirers); + } else { + cancel(c, browsers); + } + if(exclusive == c) exclusive = 0; +} + +void Queue::cancel(Consumer* c, Consumers& consumers) +{ + Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); + if (i != consumers.end()) + consumers.erase(i); +} + +QueuedMessage Queue::dequeue(){ + Mutex::ScopedLock locker(messageLock); + QueuedMessage msg; + if(!messages.empty()){ + msg = messages.front(); + pop(); + } + return msg; +} + +uint32_t Queue::purge(){ + Mutex::ScopedLock locker(messageLock); + int count = messages.size(); + while(!messages.empty()) pop(); + return count; +} + +void Queue::pop(){ + Mutex::ScopedLock locker(messageLock); + if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); + messages.pop_front(); +} + +void Queue::push(Message::shared_ptr& msg){ + Mutex::ScopedLock locker(messageLock); + messages.push_back(QueuedMessage(msg, ++sequence)); + if (policy.get()) { + policy->enqueued(msg->contentSize()); + if (policy->limitExceeded()) { + msg->releaseContent(store); + } + } +} + +/** function only provided for unit tests, or code not in critical message path */ +uint32_t Queue::getMessageCount() const{ + Mutex::ScopedLock locker(messageLock); + + uint32_t count =0; + for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { + if ( i->payload->isEnqueueComplete() ) count ++; + } + + return count; +} + +uint32_t Queue::getConsumerCount() const{ + RWlock::ScopedRlock locker(consumerLock); + return acquirers.size() + browsers.size(); +} + +bool Queue::canAutoDelete() const{ + RWlock::ScopedRlock locker(consumerLock); + return autodelete && acquirers.empty() && browsers.empty(); +} + +// return true if store exists, +bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) +{ + if (msg->isPersistent() && store) { + msg->enqueueAsync(); //increment to async counter -- for message sent to more than one queue + store->enqueue(ctxt, *msg.get(), *this); + return true; + } + return false; +} + +// return true if store exists, +bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) +{ + if (msg->isPersistent() && store) { + msg->dequeueAsync(); //increment to async counter -- for message sent to more than one queue + store->dequeue(ctxt, *msg.get(), *this); + return true; + } + return false; +} + + +namespace +{ + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); +} + +void Queue::create(const FieldTable& _settings) +{ + settings = _settings; + //TODO: hold onto settings and persist them as part of encode + // in fact settings should be passed in on construction + if (store) { + store->create(*this); + } + configure(_settings); +} + +void Queue::configure(const FieldTable& _settings) +{ + std::auto_ptr _policy(new QueuePolicy(_settings)); + if (_policy->getMaxCount() || _policy->getMaxSize()) + setPolicy(_policy); +} + +void Queue::destroy() +{ + if (alternateExchange.get()) { + Mutex::ScopedLock locker(messageLock); + while(!messages.empty()){ + DeliverableMessage msg(messages.front().payload); + alternateExchange->route(msg, msg.getMessage().getRoutingKey(), + msg.getMessage().getApplicationHeaders()); + pop(); + } + alternateExchange->decAlternateUsers(); + } + + if (store) { + store->destroy(*this); + } +} + +void Queue::bound(const string& exchange, const string& key, const FieldTable& args) +{ + bindings.add(exchange, key, args); +} + +void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) +{ + bindings.unbind(exchanges, shared_ref); +} + +void Queue::setPolicy(std::auto_ptr _policy) +{ + policy = _policy; +} + +const QueuePolicy* const Queue::getPolicy() +{ + return policy.get(); +} + +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; +} + +void Queue::setPersistenceId(uint64_t _persistenceId) const +{ + persistenceId = _persistenceId; +} + +void Queue::encode(framing::Buffer& buffer) const +{ + buffer.putShortString(name); + buffer.putFieldTable(settings); +} + +uint32_t Queue::encodedSize() const +{ + return name.size() + 1/*short string size octet*/ + settings.size(); +} + +Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer) +{ + string name; + buffer.getShortString(name); + std::pair result = queues.declare(name, true); + buffer.getFieldTable(result.first->settings); + result.first->configure(result.first->settings); + return result.first; +} + + +void Queue::setAlternateExchange(boost::shared_ptr exchange) +{ + alternateExchange = exchange; +} + +boost::shared_ptr Queue::getAlternateExchange() +{ + return alternateExchange; +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h new file mode 100644 index 0000000000..17ace522c3 --- /dev/null +++ b/cpp/src/qpid/broker/Queue.h @@ -0,0 +1,192 @@ +#ifndef _broker_Queue_h +#define _broker_Queue_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include +#include +#include +#include "qpid/framing/amqp_types.h" +#include "ConnectionToken.h" +#include "Consumer.h" +#include "Message.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/Serializer.h" +#include "qpid/sys/Monitor.h" +#include "PersistableQueue.h" +#include "QueuePolicy.h" +#include "QueueBindings.h" + +// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to +// enforce ownership of Consumers. + +namespace qpid { + namespace broker { + class MessageStore; + class QueueRegistry; + class TransactionContext; + class Exchange; + + using std::string; + + /** + * The brokers representation of an amqp queue. Messages are + * delivered to a queue from where they can be dispatched to + * registered consumers or be stored until dequeued or until one + * or more consumers registers. + */ + class Queue : public PersistableQueue{ + typedef std::vector Consumers; + typedef std::deque Messages; + + struct DispatchFunctor { + Queue& queue; + DispatchFunctor(Queue& q) : queue(q) {} + void operator()() { queue.dispatch(); } + }; + + const string name; + const bool autodelete; + MessageStore* const store; + const ConnectionToken* const owner; + Consumers acquirers; + Consumers browsers; + Messages messages; + int next; + mutable qpid::sys::RWlock consumerLock; + mutable qpid::sys::Mutex messageLock; + Consumer* exclusive; + mutable uint64_t persistenceId; + framing::FieldTable settings; + std::auto_ptr policy; + QueueBindings bindings; + boost::shared_ptr alternateExchange; + qpid::sys::Serializer serializer; + DispatchFunctor dispatchCallback; + framing::SequenceNumber sequence; + + void pop(); + void push(Message::shared_ptr& msg); + bool dispatch(QueuedMessage& msg); + void setPolicy(std::auto_ptr policy); + /** + * only called by serilizer + */ + void dispatch(); + void cancel(Consumer* c, Consumers& set); + void serviceBrowser(Consumer* c); + + protected: + /** + * Call back from store + */ + virtual void notifyDurableIOComplete(); + + public: + + typedef boost::shared_ptr shared_ptr; + + typedef std::vector vector; + + Queue(const string& name, bool autodelete = false, + MessageStore* const store = 0, + const ConnectionToken* const owner = 0); + ~Queue(); + + void create(const qpid::framing::FieldTable& settings); + void configure(const qpid::framing::FieldTable& settings); + void destroy(); + void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args); + void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref); + + bool acquire(const QueuedMessage& msg); + + /** + * Delivers a message to the queue. Will record it as + * enqueued if persistent then process it. + */ + void deliver(Message::shared_ptr& msg); + /** + * Dispatches the messages immediately to a consumer if + * one is available or stores it for later if not. + */ + void process(Message::shared_ptr& msg); + /** + * Returns a message to the in-memory queue (due to lack + * of acknowledegement from a receiver). If a consumer is + * available it will be dispatched immediately, else it + * will be returned to the front of the queue. + */ + void requeue(const QueuedMessage& msg); + /** + * Used during recovery to add stored messages back to the queue + */ + void recover(Message::shared_ptr& msg); + /** + * Request dispatch any queued messages providing there are + * consumers for them. Only one thread can be dispatching + * at any time, so this call schedules the despatch based on + * the serilizer policy. + */ + void requestDispatch(Consumer* c = 0, bool sync = false); + void consume(Consumer* c, bool exclusive = false); + void cancel(Consumer* c); + uint32_t purge(); + uint32_t getMessageCount() const; + uint32_t getConsumerCount() const; + inline const string& getName() const { return name; } + inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } + inline bool hasExclusiveConsumer() const { return exclusive; } + inline bool hasExclusiveOwner() const { return owner != 0; } + inline bool isDurable() const { return store != 0; } + inline const framing::FieldTable& getSettings() const { return settings; } + inline bool isAutoDelete() const { return autodelete; } + bool canAutoDelete() const; + + bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg); + /** + * dequeue from store (only done once messages is acknowledged) + */ + bool dequeue(TransactionContext* ctxt, Message::shared_ptr& msg); + /** + * dequeues from memory only + */ + QueuedMessage dequeue(); + + const QueuePolicy* const getPolicy(); + + void setAlternateExchange(boost::shared_ptr exchange); + boost::shared_ptr getAlternateExchange(); + + //PersistableQueue support: + uint64_t getPersistenceId() const; + void setPersistenceId(uint64_t persistenceId) const; + void encode(framing::Buffer& buffer) const; + uint32_t encodedSize() const; + + static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer); + }; + } +} + + +#endif /*!_broker_Queue_h*/ diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 22a89d7825..1a766b810a 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -23,7 +23,7 @@ #include #include "qpid/sys/Mutex.h" -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 9dcc9d4233..a6d64e6faf 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -27,7 +27,7 @@ #include "Deliverable.h" #include "Message.h" #include "MessageStore.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "TxOp.h" namespace qpid { diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index a571343e93..9ada62e62b 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -27,7 +27,7 @@ #include "Deliverable.h" #include "Message.h" #include "MessageStore.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "TxOp.h" namespace qpid { diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 45b7c588b6..1e3168137e 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -21,7 +21,7 @@ #include "RecoveryManagerImpl.h" #include "Message.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "RecoveredEnqueue.h" #include "RecoveredDequeue.h" diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 059f99077c..a3858d0989 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -21,7 +21,7 @@ #include "SessionState.h" #include "BrokerAdapter.h" -#include "BrokerQueue.h" +#include "Queue.h" #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index c411fb1965..e2cc1a3535 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -23,10 +23,10 @@ #include #include -#include "BrokerExchange.h" +#include "Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" -#include "BrokerQueue.h" +#include "Queue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index 564e021c5a..2e3268010a 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -24,7 +24,7 @@ #include #include #include -#include "BrokerQueue.h" +#include "Queue.h" #include "Deliverable.h" #include "Message.h" #include "MessageStore.h" diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp new file mode 100644 index 0000000000..cef34630db --- /dev/null +++ b/cpp/src/qpid/client/Channel.cpp @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/log/Statement.h" +#include +#include +#include "Channel.h" +#include "qpid/sys/Monitor.h" +#include "Message.h" +#include "qpid/QpidError.h" +#include "Connection.h" +#include "Demux.h" +#include "FutureResponse.h" +#include "MessageListener.h" +#include "MessageQueue.h" +#include +#include +#include "qpid/framing/all_method_bodies.h" + +// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent +// handling of errors that should close the connection or the channel. +// Make sure the user thread receives a connection in each case. +// +using namespace std; +using namespace boost; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qpid{ +namespace client{ + +const std::string empty; + +class ScopedSync +{ + Session& session; + public: + ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } + ~ScopedSync() { session.setSynchronous(false); } +}; + +Channel::Channel(bool _transactional, u_int16_t _prefetch) : + prefetch(_prefetch), transactional(_transactional), running(false), + uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) +{ +} + +Channel::~Channel() +{ + join(); +} + +void Channel::open(const Session& s) +{ + Mutex::ScopedLock l(stopLock); + if (isOpen()) + THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + active = true; + session = s; + if(isTransactional()) { + session.txSelect(); + } +} + +bool Channel::isOpen() const { + Mutex::ScopedLock l(stopLock); + return active; +} + +void Channel::setPrefetch(uint32_t _prefetch){ + prefetch = _prefetch; +} + +void Channel::declareExchange(Exchange& _exchange, bool synch){ + ScopedSync s(session, synch); + session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType()); +} + +void Channel::deleteExchange(Exchange& _exchange, bool synch){ + ScopedSync s(session, synch); + session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false); +} + +void Channel::declareQueue(Queue& _queue, bool synch){ + if (_queue.getName().empty()) { + stringstream uniqueName; + uniqueName << uniqueId << "-queue-" << ++nameCounter; + _queue.setName(uniqueName.str()); + } + + ScopedSync s(session, synch); + session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), + exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); + +} + +void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ + ScopedSync s(session, synch); + session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); +} + +void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ + string e = exchange.getName(); + string q = queue.getName(); + ScopedSync s(session, synch); + session.queueBind(0, q, e, key, args); +} + +void Channel::commit(){ + session.txCommit(); +} + +void Channel::rollback(){ + session.txRollback(); +} + +void Channel::consume( + Queue& _queue, const std::string& tag, MessageListener* listener, + AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { + + if (tag.empty()) { + throw Exception("A tag must be specified for a consumer."); + } + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.count = 0; + } + uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; + ScopedSync s(session, synch); + session.messageSubscribe(0, _queue.getName(), tag, noLocal, + confirmMode, 0/*pre-acquire*/, + false, fields ? *fields : FieldTable()); + if (!prefetch) { + session.messageFlowMode(tag, 0/*credit based*/); + } + + //allocate some credit: + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); +} + +void Channel::cancel(const std::string& tag, bool synch) { + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + ScopedSync s(session, synch); + session.messageCancel(tag); +} + +bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { + string tag = "get-handler"; + ScopedDivert handler(tag, session.execution().getDemux()); + Demux::Queue& incoming = handler.getQueue(); + + session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); + session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); + session.messageFlow(tag, 0/*MESSAGES*/, 1); + Completion status = session.messageFlush(tag); + status.sync(); + session.messageCancel(tag); + + if (incoming.empty()) { + return false; + } else { + msg.populate(*(incoming.pop())); + if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); + return true; + } +} + +void Channel::publish(Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory, bool /*?TODO-restore immediate?*/) { + + msg.getDeliveryProperties().setRoutingKey(routingKey); + msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); + session.messageTransfer_(destination=exchange.getName(), content=msg); +} + +void Channel::close() +{ + session.close(); + { + Mutex::ScopedLock l(stopLock); + active = false; + } + stop(); +} + +void Channel::start(){ + running = true; + dispatcher = Thread(*this); +} + +void Channel::stop() { + gets.close(); + join(); +} + +void Channel::join() { + Mutex::ScopedLock l(stopLock); + if(running && dispatcher.id()) { + dispatcher.join(); + running = false; + } +} + +void Channel::dispatch(FrameSet& content, const std::string& destination) +{ + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { + Message msg; + msg.populate(content); + MessageListener* listener = i->second.listener; + listener->received(msg); + if (isOpen() && i->second.ackMode != CLIENT_ACK) { + bool send = i->second.ackMode == AUTO_ACK + || (prefetch && ++(i->second.count) > (prefetch / 2)); + if (send) i->second.count = 0; + session.execution().completed(content.getId(), true, send); + } + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); + } +} + +void Channel::run() { + try { + while (true) { + FrameSet::shared_ptr content = session.get(); + //need to dispatch this to the relevant listener: + if (content->isA()) { + dispatch(*content, content->as()->getDestination()); + } else { + QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); + } + } + } catch (const QueueClosed&) {} +} + +}} + diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h new file mode 100644 index 0000000000..bf0b289077 --- /dev/null +++ b/cpp/src/qpid/client/Channel.h @@ -0,0 +1,316 @@ +#ifndef _client_Channel_h +#define _client_Channel_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/Uuid.h" +#include "Exchange.h" +#include "Message.h" +#include "Queue.h" +#include "ConnectionImpl.h" +#include "qpid/client/Session.h" +#include "qpid/Exception.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" +#include "AckMode.h" + +namespace qpid { + +namespace framing { +class ChannelCloseBody; +class AMQMethodBody; +} + +namespace client { + +class Connection; +class MessageChannel; +class MessageListener; +class ReturnedMessageHandler; + +/** + * Represents an AMQP channel, i.e. loosely a session of work. It + * is through a channel that most of the AMQP 'methods' are + * exposed. + * + * \ingroup clientapi + */ +class Channel : private sys::Runnable +{ + private: + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + uint32_t count; + }; + typedef std::map ConsumerMap; + + mutable sys::Mutex lock; + sys::Thread dispatcher; + + uint32_t prefetch; + const bool transactional; + framing::ProtocolVersion version; + + mutable sys::Mutex stopLock; + bool running; + + ConsumerMap consumers; + Session session; + framing::ChannelId channelId; + BlockingQueue gets; + framing::Uuid uniqueId; + uint32_t nameCounter; + bool active; + + void stop(); + + void open(const Session& session); + void closeInternal(); + void join(); + + void dispatch(framing::FrameSet& msg, const std::string& destination); + + // FIXME aconway 2007-02-23: Get rid of friendships. + friend class Connection; + + public: + /** + * Creates a channel object. + * + * @param transactional if true, the publishing and acknowledgement + * of messages will be transactional and can be committed or + * aborted in atomic units (@see commit(), @see rollback()) + * + * @param prefetch specifies the number of unacknowledged + * messages the channel is willing to have sent to it + * asynchronously + */ + Channel(bool transactional = false, u_int16_t prefetch = 0); + + ~Channel(); + + /** + * Declares an exchange. + * + * In AMQP Exchanges are the destinations to which messages + * are published. They have Queues bound to them and route + * messages they receive to those queues. The routing rules + * depend on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareExchange(Exchange& exchange, bool synch = true); + /** + * Deletes an exchange + * + * @param exchange an Exchange object representing the exchange to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteExchange(Exchange& exchange, bool synch = true); + /** + * Declares a Queue + * + * @param queue a Queue object representing the queue to declare + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void declareQueue(Queue& queue, bool synch = true); + /** + * Deletes a Queue + * + * @param queue a Queue object representing the queue to delete + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); + /** + * Binds a queue to an exchange. The exact semantics of this + * (in particular how 'routing keys' and 'binding arguments' + * are used) depends on the type of the exchange. + * + * @param exchange an Exchange object representing the + * exchange to bind to + * + * @param queue a Queue object representing the queue to be + * bound + * + * @param key the 'routing key' for the binding + * + * @param args the 'binding arguments' for the binding + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void bind(const Exchange& exchange, const Queue& queue, + const std::string& key, + const framing::FieldTable& args=framing::FieldTable(), + bool synch = true); + + /** + * For a transactional channel this will commit all + * publications and acknowledgements since the last commit (or + * the channel was opened if there has been no previous + * commit). This will cause published messages to become + * available to consumers and acknowledged messages to be + * consumed and removed from the queues they were dispatched + * from. + * + * Transactionailty of a channel is specified when the channel + * object is created (@see Channel()). + */ + void commit(); + + /** + * For a transactional channel, this will rollback any + * publications or acknowledgements. It will be as if the + * ppblished messages were never sent and the acknowledged + * messages were never consumed. + */ + void rollback(); + + /** + * Change the prefetch in use. + */ + void setPrefetch(uint32_t prefetch); + + uint32_t getPrefetch() { return prefetch; } + + /** + * Start message dispatching on a new thread + */ + void start(); + + /** + * Close the channel. Closing a channel that is not open has no + * effect. + */ + void close(); + + /** True if the channel is transactional */ + bool isTransactional() { return transactional; } + + /** True if the channel is open */ + bool isOpen() const; + + /** Return the protocol version */ + framing::ProtocolVersion getVersion() const { return version ; } + + /** + * Creates a 'consumer' for a queue. Messages in (or arriving + * at) that queue will be delivered to consumers + * asynchronously. + * + * @param queue a Queue instance representing the queue to + * consume from + * + * @param tag an identifier to associate with the consumer + * that can be used to cancel its subscription (if empty, this + * will be assigned by the broker) + * + * @param listener a pointer to an instance of an + * implementation of the MessageListener interface. Messages + * received from this queue for this consumer will result in + * invocation of the received() method on the listener, with + * the message itself passed in. + * + * @param ackMode the mode of acknowledgement that the broker + * should assume for this consumer. @see AckMode + * + * @param noLocal if true, this consumer will not be sent any + * message published by this connection + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void consume( + Queue& queue, const std::string& tag, MessageListener* listener, + AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, + const framing::FieldTable* fields = 0); + + /** + * Cancels a subscription previously set up through a call to consume(). + * + * @param tag the identifier used (or assigned) in the consume + * request that set up the subscription to be cancelled. + * + * @param synch if true this call will block until a response + * is received from the broker + */ + void cancel(const std::string& tag, bool synch = true); + /** + * Synchronous pull of a message from a queue. + * + * @param msg a message object that will contain the message + * headers and content if the call completes. + * + * @param queue the queue to consume from + * + * @param ackMode the acknowledgement mode to use (@see + * AckMode) + * + * @return true if a message was succcessfully dequeued from + * the queue, false if the queue was empty. + */ + bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); + + /** + * Publishes (i.e. sends a message to the broker). + * + * @param msg the message to publish + * + * @param exchange the exchange to publish the message to + * + * @param routingKey the routing key to publish with + * + * @param mandatory if true and the exchange to which this + * publish is directed has no matching bindings, the message + * will be returned (see setReturnedMessageHandler()). + * + * @param immediate if true and there is no consumer to + * receive this message on publication, the message will be + * returned (see setReturnedMessageHandler()). + */ + void publish(Message& msg, const Exchange& exchange, + const std::string& routingKey, + bool mandatory = false, bool immediate = false); + + /** + * Deliver incoming messages to the appropriate MessageListener. + */ + void run(); +}; + +}} + +#endif /*!_client_Channel_h*/ diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp deleted file mode 100644 index f5362bf688..0000000000 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ /dev/null @@ -1,271 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/log/Statement.h" -#include -#include -#include "ClientChannel.h" -#include "qpid/sys/Monitor.h" -#include "ClientMessage.h" -#include "qpid/QpidError.h" -#include "Connection.h" -#include "Demux.h" -#include "FutureResponse.h" -#include "MessageListener.h" -#include "MessageQueue.h" -#include -#include -#include "qpid/framing/all_method_bodies.h" - -// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent -// handling of errors that should close the connection or the channel. -// Make sure the user thread receives a connection in each case. -// -using namespace std; -using namespace boost; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid{ -namespace client{ - -const std::string empty; - -class ScopedSync -{ - Session& session; - public: - ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } - ~ScopedSync() { session.setSynchronous(false); } -}; - -Channel::Channel(bool _transactional, u_int16_t _prefetch) : - prefetch(_prefetch), transactional(_transactional), running(false), - uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) -{ -} - -Channel::~Channel() -{ - join(); -} - -void Channel::open(const Session& s) -{ - Mutex::ScopedLock l(stopLock); - if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); - active = true; - session = s; - if(isTransactional()) { - session.txSelect(); - } -} - -bool Channel::isOpen() const { - Mutex::ScopedLock l(stopLock); - return active; -} - -void Channel::setPrefetch(uint32_t _prefetch){ - prefetch = _prefetch; -} - -void Channel::declareExchange(Exchange& _exchange, bool synch){ - ScopedSync s(session, synch); - session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType()); -} - -void Channel::deleteExchange(Exchange& _exchange, bool synch){ - ScopedSync s(session, synch); - session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false); -} - -void Channel::declareQueue(Queue& _queue, bool synch){ - if (_queue.getName().empty()) { - stringstream uniqueName; - uniqueName << uniqueId << "-queue-" << ++nameCounter; - _queue.setName(uniqueName.str()); - } - - ScopedSync s(session, synch); - session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), - exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); - -} - -void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ - ScopedSync s(session, synch); - session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - ScopedSync s(session, synch); - session.queueBind(0, q, e, key, args); -} - -void Channel::commit(){ - session.txCommit(); -} - -void Channel::rollback(){ - session.txRollback(); -} - -void Channel::consume( - Queue& _queue, const std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - - if (tag.empty()) { - throw Exception("A tag must be specified for a consumer."); - } - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.count = 0; - } - uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; - ScopedSync s(session, synch); - session.messageSubscribe(0, _queue.getName(), tag, noLocal, - confirmMode, 0/*pre-acquire*/, - false, fields ? *fields : FieldTable()); - if (!prefetch) { - session.messageFlowMode(tag, 0/*credit based*/); - } - - //allocate some credit: - session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); - session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); -} - -void Channel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - ScopedSync s(session, synch); - session.messageCancel(tag); -} - -bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { - string tag = "get-handler"; - ScopedDivert handler(tag, session.execution().getDemux()); - Demux::Queue& incoming = handler.getQueue(); - - session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); - session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); - session.messageFlow(tag, 0/*MESSAGES*/, 1); - Completion status = session.messageFlush(tag); - status.sync(); - session.messageCancel(tag); - - if (incoming.empty()) { - return false; - } else { - msg.populate(*(incoming.pop())); - if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); - return true; - } -} - -void Channel::publish(Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory, bool /*?TODO-restore immediate?*/) { - - msg.getDeliveryProperties().setRoutingKey(routingKey); - msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); - session.messageTransfer_(destination=exchange.getName(), content=msg); -} - -void Channel::close() -{ - session.close(); - { - Mutex::ScopedLock l(stopLock); - active = false; - } - stop(); -} - -void Channel::start(){ - running = true; - dispatcher = Thread(*this); -} - -void Channel::stop() { - gets.close(); - join(); -} - -void Channel::join() { - Mutex::ScopedLock l(stopLock); - if(running && dispatcher.id()) { - dispatcher.join(); - running = false; - } -} - -void Channel::dispatch(FrameSet& content, const std::string& destination) -{ - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { - Message msg; - msg.populate(content); - MessageListener* listener = i->second.listener; - listener->received(msg); - if (isOpen() && i->second.ackMode != CLIENT_ACK) { - bool send = i->second.ackMode == AUTO_ACK - || (prefetch && ++(i->second.count) > (prefetch / 2)); - if (send) i->second.count = 0; - session.execution().completed(content.getId(), true, send); - } - } else { - QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); - } -} - -void Channel::run() { - try { - while (true) { - FrameSet::shared_ptr content = session.get(); - //need to dispatch this to the relevant listener: - if (content->isA()) { - dispatch(*content, content->as()->getDestination()); - } else { - QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); - } - } - } catch (const QueueClosed&) {} -} - -}} - diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h deleted file mode 100644 index 527f5d418f..0000000000 --- a/cpp/src/qpid/client/ClientChannel.h +++ /dev/null @@ -1,316 +0,0 @@ -#ifndef _client_ClientChannel_h -#define _client_ClientChannel_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/Uuid.h" -#include "ClientExchange.h" -#include "ClientMessage.h" -#include "ClientQueue.h" -#include "ConnectionImpl.h" -#include "qpid/client/Session.h" -#include "qpid/Exception.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" -#include "AckMode.h" - -namespace qpid { - -namespace framing { -class ChannelCloseBody; -class AMQMethodBody; -} - -namespace client { - -class Connection; -class MessageChannel; -class MessageListener; -class ReturnedMessageHandler; - -/** - * Represents an AMQP channel, i.e. loosely a session of work. It - * is through a channel that most of the AMQP 'methods' are - * exposed. - * - * \ingroup clientapi - */ -class Channel : private sys::Runnable -{ - private: - struct Consumer{ - MessageListener* listener; - AckMode ackMode; - uint32_t count; - }; - typedef std::map ConsumerMap; - - mutable sys::Mutex lock; - sys::Thread dispatcher; - - uint32_t prefetch; - const bool transactional; - framing::ProtocolVersion version; - - mutable sys::Mutex stopLock; - bool running; - - ConsumerMap consumers; - Session session; - framing::ChannelId channelId; - BlockingQueue gets; - framing::Uuid uniqueId; - uint32_t nameCounter; - bool active; - - void stop(); - - void open(const Session& session); - void closeInternal(); - void join(); - - void dispatch(framing::FrameSet& msg, const std::string& destination); - - // FIXME aconway 2007-02-23: Get rid of friendships. - friend class Connection; - - public: - /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously - */ - Channel(bool transactional = false, u_int16_t prefetch = 0); - - ~Channel(); - - /** - * Declares an exchange. - * - * In AMQP Exchanges are the destinations to which messages - * are published. They have Queues bound to them and route - * messages they receive to those queues. The routing rules - * depend on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareExchange(Exchange& exchange, bool synch = true); - /** - * Deletes an exchange - * - * @param exchange an Exchange object representing the exchange to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteExchange(Exchange& exchange, bool synch = true); - /** - * Declares a Queue - * - * @param queue a Queue object representing the queue to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareQueue(Queue& queue, bool synch = true); - /** - * Deletes a Queue - * - * @param queue a Queue object representing the queue to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - /** - * Binds a queue to an exchange. The exact semantics of this - * (in particular how 'routing keys' and 'binding arguments' - * are used) depends on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to bind to - * - * @param queue a Queue object representing the queue to be - * bound - * - * @param key the 'routing key' for the binding - * - * @param args the 'binding arguments' for the binding - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void bind(const Exchange& exchange, const Queue& queue, - const std::string& key, - const framing::FieldTable& args=framing::FieldTable(), - bool synch = true); - - /** - * For a transactional channel this will commit all - * publications and acknowledgements since the last commit (or - * the channel was opened if there has been no previous - * commit). This will cause published messages to become - * available to consumers and acknowledged messages to be - * consumed and removed from the queues they were dispatched - * from. - * - * Transactionailty of a channel is specified when the channel - * object is created (@see Channel()). - */ - void commit(); - - /** - * For a transactional channel, this will rollback any - * publications or acknowledgements. It will be as if the - * ppblished messages were never sent and the acknowledged - * messages were never consumed. - */ - void rollback(); - - /** - * Change the prefetch in use. - */ - void setPrefetch(uint32_t prefetch); - - uint32_t getPrefetch() { return prefetch; } - - /** - * Start message dispatching on a new thread - */ - void start(); - - /** - * Close the channel. Closing a channel that is not open has no - * effect. - */ - void close(); - - /** True if the channel is transactional */ - bool isTransactional() { return transactional; } - - /** True if the channel is open */ - bool isOpen() const; - - /** Return the protocol version */ - framing::ProtocolVersion getVersion() const { return version ; } - - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see AckMode - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, const std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * AckMode) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - /** - * Deliver incoming messages to the appropriate MessageListener. - */ - void run(); -}; - -}} - -#endif /*!_client_ClientChannel_h*/ diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp deleted file mode 100644 index 8c5f83f9f5..0000000000 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include - -#include "Connection.h" -#include "ClientChannel.h" -#include "ClientMessage.h" -#include "ScopedAssociation.h" -#include "qpid/log/Logger.h" -#include "qpid/log/Options.h" -#include "qpid/log/Statement.h" -#include "qpid/QpidError.h" -#include -#include -#include - -using namespace qpid::framing; -using namespace qpid::sys; - - -namespace qpid { -namespace client { - -Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : - channelIdCounter(0), version(_version), - max_frame_size(_max_frame_size), - impl(new ConnectionImpl(boost::shared_ptr(new Connector(_version, _debug)))), - isOpen(false) {} - -Connection::Connection(boost::shared_ptr c) : - channelIdCounter(0), version(framing::highestProtocolVersion), - max_frame_size(65536), - impl(new ConnectionImpl(c)), - isOpen(false) {} - -Connection::~Connection(){} - -void Connection::open( - const std::string& host, int port, - const std::string& uid, const std::string& pwd, const std::string& vhost) -{ - if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); - - impl->open(host, port, uid, pwd, vhost); - isOpen = true; -} - -void Connection::openChannel(Channel& channel) { - channel.open(newSession()); -} - -Session Connection::newSession() { - ChannelId id = ++channelIdCounter; - SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); - session->open(); - return Session(assoc); -} - -void Connection::close() -{ - impl->close(); -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/ClientExchange.cpp b/cpp/src/qpid/client/ClientExchange.cpp deleted file mode 100644 index d5914beea2..0000000000 --- a/cpp/src/qpid/client/ClientExchange.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClientExchange.h" - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/qpid/client/ClientExchange.h b/cpp/src/qpid/client/ClientExchange.h deleted file mode 100644 index a8ac21fa9b..0000000000 --- a/cpp/src/qpid/client/ClientExchange.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include - -#ifndef _Exchange_ -#define _Exchange_ - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP exchange in the Channel - * methods. Exchanges are the destinations to which messages are - * published. - * - * There are different types of exchange (the standard types are - * available as static constants, see DIRECT_EXCHANGE, - * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to - * an exchange using Channel::bind() and messages published to - * that exchange are then routed to the queue based on the details - * of the binding and the type of exchange. - * - * There are some standard exchange instances that are predeclared - * on all AMQP brokers. These are defined as static members - * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and - * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange - * (member DEFAULT_EXCHANGE) which is nameless and of type - * 'direct' and has every declared queue bound to it by queue - * name. - * - * \ingroup clientapi - */ - class Exchange{ - const std::string name; - const std::string type; - - public: - /** - * A direct exchange routes messages published with routing - * key X to any queue bound with key X (i.e. an exact match is - * used). - */ - static const std::string DIRECT_EXCHANGE; - /** - * A topic exchange treat the key with which a queue is bound - * as a pattern and routes all messages whose routing keys - * match that pattern to the bound queue. The routing key for - * a message must consist of zero or more alpha-numeric words - * delimited by dots. The pattern is of a similar form but * - * can be used to match excatly one word and # can be used to - * match zero or more words. - */ - static const std::string TOPIC_EXCHANGE; - /** - * The headers exchange routes messages based on whether their - * headers match the binding arguments specified when - * binding. (see the AMQP spec for more details). - */ - static const std::string HEADERS_EXCHANGE; - - /** - * The 'default' exchange, nameless and of type 'direct'. Has - * every declared queue bound to it by name. - */ - static const Exchange DEFAULT_EXCHANGE; - /** - * The standard direct exchange, named amq.direct. - */ - static const Exchange STANDARD_DIRECT_EXCHANGE; - /** - * The standard topic exchange, named amq.topic. - */ - static const Exchange STANDARD_TOPIC_EXCHANGE; - /** - * The standard headers exchange, named amq.header. - */ - static const Exchange STANDARD_HEADERS_EXCHANGE; - - Exchange(std::string name, std::string type = DIRECT_EXCHANGE); - const std::string& getName() const; - const std::string& getType() const; - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h deleted file mode 100644 index a573e17940..0000000000 --- a/cpp/src/qpid/client/ClientMessage.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef _client_ClientMessage_h -#define _client_ClientMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include "qpid/client/Session.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/TransferContent.h" - -namespace qpid { -namespace client { - -/** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ -class Message : public framing::TransferContent -{ -public: - Message(const std::string& data_=std::string()) : TransferContent(data_) {} - - std::string getDestination() const - { - return method.getDestination(); - } - - bool isRedelivered() const - { - return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); - } - - void setRedelivered(bool redelivered) - { - getDeliveryProperties().setRedelivered(redelivered); - } - - framing::FieldTable& getHeaders() - { - return getMessageProperties().getApplicationHeaders(); - } - - void acknowledge(Session& session, bool cumulative = true, bool send = true) const - { - session.execution().completed(id, cumulative, send); - } - - Message(const framing::FrameSet& frameset) : method(*frameset.as()), id(frameset.getId()) - { - populate(frameset); - } - - const framing::MessageTransferBody& getMethod() const - { - return method; - } - -private: - //method and id are only set for received messages: - const framing::MessageTransferBody method; - const framing::SequenceNumber id; -}; - -}} - -#endif /*!_client_ClientMessage_h*/ diff --git a/cpp/src/qpid/client/ClientQueue.cpp b/cpp/src/qpid/client/ClientQueue.cpp deleted file mode 100644 index 613cf8d288..0000000000 --- a/cpp/src/qpid/client/ClientQueue.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ClientQueue.h" - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) - : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - -bool qpid::client::Queue::isDurable() const{ - return durable; -} - -void qpid::client::Queue::setDurable(bool _durable){ - durable = _durable; -} - - - - diff --git a/cpp/src/qpid/client/ClientQueue.h b/cpp/src/qpid/client/ClientQueue.h deleted file mode 100644 index b37a44b004..0000000000 --- a/cpp/src/qpid/client/ClientQueue.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef _client_ClientQueue_h -#define _client_ClientQueue_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP queue in the Channel - * methods. Creating an instance of this class does not cause the - * queue to be created on the broker. Rather, an instance of this - * class should be passed to Channel::declareQueue() to ensure - * that the queue exists or is created. - * - * Queues hold messages and allow clients to consume - * (see Channel::consume()) or get (see Channel::get()) those messags. A - * queue receives messages by being bound to one or more Exchange; - * messages published to that exchange may then be routed to the - * queue based on the details of the binding and the type of the - * exchange (see Channel::bind()). - * - * Queues are identified by a name. They can be exclusive (in which - * case they can only be used in the context of the connection - * over which they were declared, and are deleted when then - * connection closes), or they can be shared. Shared queues can be - * auto deleted when they have no consumers. - * - * We use the term 'temporary queue' to refer to an exclusive - * queue. - * - * \ingroup clientapi - */ - class Queue{ - std::string name; - const bool autodelete; - const bool exclusive; - bool durable; - - public: - - /** - * Creates an unnamed, non-durable, temporary queue. A name - * will be assigned to this queue instance by a call to - * Channel::declareQueue(). - */ - Queue(); - /** - * Creates a shared, non-durable, queue with a given name, - * that will not be autodeleted. - * - * @param name the name of the queue - */ - Queue(std::string name); - /** - * Creates a non-durable queue with a given name. - * - * @param name the name of the queue - * - * @param temp if true the queue will be a temporary queue, if - * false it will be shared and not autodeleted. - */ - Queue(std::string name, bool temp); - /** - * This constructor allows the autodelete, exclusive and - * durable propeties to be explictly set. Note however that if - * exclusive is true, autodelete has no meaning as exclusive - * queues are always destroyed when the connection that - * created them is closed. - */ - Queue(std::string name, bool autodelete, bool exclusive, bool durable); - const std::string& getName() const; - void setName(const std::string&); - bool isAutoDelete() const; - bool isExclusive() const; - bool isDurable() const; - void setDurable(bool durable); - }; - -} -} - -#endif /*!_client_ClientQueue_h*/ diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp new file mode 100644 index 0000000000..cef076527f --- /dev/null +++ b/cpp/src/qpid/client/Connection.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include +#include + +#include "Connection.h" +#include "Channel.h" +#include "Message.h" +#include "ScopedAssociation.h" +#include "qpid/log/Logger.h" +#include "qpid/log/Options.h" +#include "qpid/log/Statement.h" +#include "qpid/QpidError.h" +#include +#include +#include + +using namespace qpid::framing; +using namespace qpid::sys; + + +namespace qpid { +namespace client { + +Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : + channelIdCounter(0), version(_version), + max_frame_size(_max_frame_size), + impl(new ConnectionImpl(boost::shared_ptr(new Connector(_version, _debug)))), + isOpen(false) {} + +Connection::Connection(boost::shared_ptr c) : + channelIdCounter(0), version(framing::highestProtocolVersion), + max_frame_size(65536), + impl(new ConnectionImpl(c)), + isOpen(false) {} + +Connection::~Connection(){} + +void Connection::open( + const std::string& host, int port, + const std::string& uid, const std::string& pwd, const std::string& vhost) +{ + if (isOpen) + THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + + impl->open(host, port, uid, pwd, vhost); + isOpen = true; +} + +void Connection::openChannel(Channel& channel) { + channel.open(newSession()); +} + +Session Connection::newSession() { + ChannelId id = ++channelIdCounter; + SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); + ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); + session->open(); + return Session(assoc); +} + +void Connection::close() +{ + impl->close(); +} + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e309b5c63e..f5d6a387a9 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -24,7 +24,7 @@ #include #include #include "qpid/QpidError.h" -#include "ClientChannel.h" +#include "Channel.h" #include "ConnectionImpl.h" #include "qpid/client/Session.h" #include "qpid/framing/AMQP_HighestVersion.h" diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 8f3ed8bcbe..fd437725ce 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -25,7 +25,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "BlockingQueue.h" -#include "ClientMessage.h" +#include "Message.h" using qpid::framing::FrameSet; using qpid::framing::MessageTransferBody; diff --git a/cpp/src/qpid/client/Exchange.cpp b/cpp/src/qpid/client/Exchange.cpp new file mode 100644 index 0000000000..e7fbdeb47e --- /dev/null +++ b/cpp/src/qpid/client/Exchange.cpp @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Exchange.h" + +qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} +const std::string& qpid::client::Exchange::getName() const { return name; } +const std::string& qpid::client::Exchange::getType() const { return type; } + +const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; +const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; +const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; + +const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); +const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/cpp/src/qpid/client/Exchange.h b/cpp/src/qpid/client/Exchange.h new file mode 100644 index 0000000000..a8ac21fa9b --- /dev/null +++ b/cpp/src/qpid/client/Exchange.h @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +#ifndef _Exchange_ +#define _Exchange_ + +namespace qpid { +namespace client { + + /** + * A 'handle' used to represent an AMQP exchange in the Channel + * methods. Exchanges are the destinations to which messages are + * published. + * + * There are different types of exchange (the standard types are + * available as static constants, see DIRECT_EXCHANGE, + * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to + * an exchange using Channel::bind() and messages published to + * that exchange are then routed to the queue based on the details + * of the binding and the type of exchange. + * + * There are some standard exchange instances that are predeclared + * on all AMQP brokers. These are defined as static members + * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and + * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange + * (member DEFAULT_EXCHANGE) which is nameless and of type + * 'direct' and has every declared queue bound to it by queue + * name. + * + * \ingroup clientapi + */ + class Exchange{ + const std::string name; + const std::string type; + + public: + /** + * A direct exchange routes messages published with routing + * key X to any queue bound with key X (i.e. an exact match is + * used). + */ + static const std::string DIRECT_EXCHANGE; + /** + * A topic exchange treat the key with which a queue is bound + * as a pattern and routes all messages whose routing keys + * match that pattern to the bound queue. The routing key for + * a message must consist of zero or more alpha-numeric words + * delimited by dots. The pattern is of a similar form but * + * can be used to match excatly one word and # can be used to + * match zero or more words. + */ + static const std::string TOPIC_EXCHANGE; + /** + * The headers exchange routes messages based on whether their + * headers match the binding arguments specified when + * binding. (see the AMQP spec for more details). + */ + static const std::string HEADERS_EXCHANGE; + + /** + * The 'default' exchange, nameless and of type 'direct'. Has + * every declared queue bound to it by name. + */ + static const Exchange DEFAULT_EXCHANGE; + /** + * The standard direct exchange, named amq.direct. + */ + static const Exchange STANDARD_DIRECT_EXCHANGE; + /** + * The standard topic exchange, named amq.topic. + */ + static const Exchange STANDARD_TOPIC_EXCHANGE; + /** + * The standard headers exchange, named amq.header. + */ + static const Exchange STANDARD_HEADERS_EXCHANGE; + + Exchange(std::string name, std::string type = DIRECT_EXCHANGE); + const std::string& getName() const; + const std::string& getType() const; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h new file mode 100644 index 0000000000..dab7f3c8d8 --- /dev/null +++ b/cpp/src/qpid/client/Message.h @@ -0,0 +1,86 @@ +#ifndef _client_Message_h +#define _client_Message_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include "qpid/client/Session.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/TransferContent.h" + +namespace qpid { +namespace client { + +/** + * A representation of messages for sent or recived through the + * client api. + * + * \ingroup clientapi + */ +class Message : public framing::TransferContent +{ +public: + Message(const std::string& data_=std::string()) : TransferContent(data_) {} + + std::string getDestination() const + { + return method.getDestination(); + } + + bool isRedelivered() const + { + return hasDeliveryProperties() && getDeliveryProperties().getRedelivered(); + } + + void setRedelivered(bool redelivered) + { + getDeliveryProperties().setRedelivered(redelivered); + } + + framing::FieldTable& getHeaders() + { + return getMessageProperties().getApplicationHeaders(); + } + + void acknowledge(Session& session, bool cumulative = true, bool send = true) const + { + session.execution().completed(id, cumulative, send); + } + + Message(const framing::FrameSet& frameset) : method(*frameset.as()), id(frameset.getId()) + { + populate(frameset); + } + + const framing::MessageTransferBody& getMethod() const + { + return method; + } + +private: + //method and id are only set for received messages: + const framing::MessageTransferBody method; + const framing::SequenceNumber id; +}; + +}} + +#endif /*!_client_Message_h*/ diff --git a/cpp/src/qpid/client/MessageListener.h b/cpp/src/qpid/client/MessageListener.h index 501862a3ef..86e5dd63dc 100644 --- a/cpp/src/qpid/client/MessageListener.h +++ b/cpp/src/qpid/client/MessageListener.h @@ -23,7 +23,7 @@ #ifndef _MessageListener_ #define _MessageListener_ -#include "ClientMessage.h" +#include "Message.h" namespace qpid { namespace client { diff --git a/cpp/src/qpid/client/Queue.cpp b/cpp/src/qpid/client/Queue.cpp new file mode 100644 index 0000000000..1752a48a3a --- /dev/null +++ b/cpp/src/qpid/client/Queue.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Queue.h" + +qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} + +qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} + +qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} + +qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) + : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} + +const std::string& qpid::client::Queue::getName() const{ + return name; +} + +void qpid::client::Queue::setName(const std::string& _name){ + name = _name; +} + +bool qpid::client::Queue::isAutoDelete() const{ + return autodelete; +} + +bool qpid::client::Queue::isExclusive() const{ + return exclusive; +} + +bool qpid::client::Queue::isDurable() const{ + return durable; +} + +void qpid::client::Queue::setDurable(bool _durable){ + durable = _durable; +} + + + + diff --git a/cpp/src/qpid/client/Queue.h b/cpp/src/qpid/client/Queue.h new file mode 100644 index 0000000000..9ab8c70b08 --- /dev/null +++ b/cpp/src/qpid/client/Queue.h @@ -0,0 +1,103 @@ +#ifndef _client_Queue_h +#define _client_Queue_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +namespace qpid { +namespace client { + + /** + * A 'handle' used to represent an AMQP queue in the Channel + * methods. Creating an instance of this class does not cause the + * queue to be created on the broker. Rather, an instance of this + * class should be passed to Channel::declareQueue() to ensure + * that the queue exists or is created. + * + * Queues hold messages and allow clients to consume + * (see Channel::consume()) or get (see Channel::get()) those messags. A + * queue receives messages by being bound to one or more Exchange; + * messages published to that exchange may then be routed to the + * queue based on the details of the binding and the type of the + * exchange (see Channel::bind()). + * + * Queues are identified by a name. They can be exclusive (in which + * case they can only be used in the context of the connection + * over which they were declared, and are deleted when then + * connection closes), or they can be shared. Shared queues can be + * auto deleted when they have no consumers. + * + * We use the term 'temporary queue' to refer to an exclusive + * queue. + * + * \ingroup clientapi + */ + class Queue{ + std::string name; + const bool autodelete; + const bool exclusive; + bool durable; + + public: + + /** + * Creates an unnamed, non-durable, temporary queue. A name + * will be assigned to this queue instance by a call to + * Channel::declareQueue(). + */ + Queue(); + /** + * Creates a shared, non-durable, queue with a given name, + * that will not be autodeleted. + * + * @param name the name of the queue + */ + Queue(std::string name); + /** + * Creates a non-durable queue with a given name. + * + * @param name the name of the queue + * + * @param temp if true the queue will be a temporary queue, if + * false it will be shared and not autodeleted. + */ + Queue(std::string name, bool temp); + /** + * This constructor allows the autodelete, exclusive and + * durable propeties to be explictly set. Note however that if + * exclusive is true, autodelete has no meaning as exclusive + * queues are always destroyed when the connection that + * created them is closed. + */ + Queue(std::string name, bool autodelete, bool exclusive, bool durable); + const std::string& getName() const; + void setName(const std::string&); + bool isAutoDelete() const; + bool isExclusive() const; + bool isDurable() const; + void setDurable(bool durable); + }; + +} +} + +#endif /*!_client_Queue_h*/ diff --git a/cpp/src/tests/BasicP2PTest.h b/cpp/src/tests/BasicP2PTest.h index b80fff1171..3f0a3704f5 100644 --- a/cpp/src/tests/BasicP2PTest.h +++ b/cpp/src/tests/BasicP2PTest.h @@ -25,8 +25,8 @@ #include #include "qpid/Exception.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/ClientMessage.h" +#include "qpid/client/Channel.h" +#include "qpid/client/Message.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" #include "SimpleTestCaseBase.h" diff --git a/cpp/src/tests/BasicPubSubTest.h b/cpp/src/tests/BasicPubSubTest.h index b7ccba1a81..c3f8020b3a 100644 --- a/cpp/src/tests/BasicPubSubTest.h +++ b/cpp/src/tests/BasicPubSubTest.h @@ -25,8 +25,8 @@ #include #include "qpid/Exception.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/ClientMessage.h" +#include "qpid/client/Channel.h" +#include "qpid/client/Message.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" #include "SimpleTestCaseBase.h" diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index e975ec1b12..612a9fc8bc 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -24,7 +24,7 @@ // which is no longer exposed on Session (part of SemanticHandler.) // #include "qpid/broker/BrokerChannel.h" -#include "qpid/broker/BrokerQueue.h" +#include "qpid/broker/Queue.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Message.h" #include "qpid/broker/MessageDelivery.h" diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index 252b419299..9a982508d1 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -21,10 +21,10 @@ #include #include "qpid_test_plugin.h" #include "InProcessBroker.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/ClientMessage.h" -#include "qpid/client/ClientQueue.h" -#include "qpid/client/ClientExchange.h" +#include "qpid/client/Channel.h" +#include "qpid/client/Message.h" +#include "qpid/client/Queue.h" +#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" #include "qpid/client/BasicMessageChannel.h" #include "qpid/client/MessageMessageChannel.h" @@ -44,7 +44,7 @@ const size_t FRAME_MAX = 256; * The test base defines the tests methods, derived classes * instantiate the channel in Basic or Message mode. */ -class ClientChannelTestBase : public CppUnit::TestCase +class ChannelTestBase : public CppUnit::TestCase { struct Listener: public qpid::client::MessageListener { vector messages; @@ -68,7 +68,7 @@ class ClientChannelTestBase : public CppUnit::TestCase public: - ClientChannelTestBase() + ChannelTestBase() : connection(FRAME_MAX), qname("testq"), data("hello"), queue(qname, true), exchange("", Exchange::DIRECT_EXCHANGE) @@ -188,8 +188,8 @@ class ClientChannelTestBase : public CppUnit::TestCase } }; -class BasicClientChannelTest : public ClientChannelTestBase { - CPPUNIT_TEST_SUITE(BasicClientChannelTest); +class BasicChannelTest : public ChannelTestBase { + CPPUNIT_TEST_SUITE(BasicChannelTest); CPPUNIT_TEST(testPublishGet); CPPUNIT_TEST(testGetNoContent); CPPUNIT_TEST(testConsumeCancel); @@ -199,24 +199,24 @@ class BasicClientChannelTest : public ClientChannelTestBase { CPPUNIT_TEST_SUITE_END(); public: - BasicClientChannelTest(){ + BasicChannelTest(){ channel.reset(new Channel(false, 500, Channel::AMQP_08)); } }; -class MessageClientChannelTest : public ClientChannelTestBase { - CPPUNIT_TEST_SUITE(MessageClientChannelTest); +class MessageChannelTest : public ChannelTestBase { + CPPUNIT_TEST_SUITE(MessageChannelTest); CPPUNIT_TEST(testPublishGet); CPPUNIT_TEST(testGetNoContent); CPPUNIT_TEST(testGetFragmentedMessage); CPPUNIT_TEST_SUITE_END(); public: - MessageClientChannelTest() { + MessageChannelTest() { channel.reset(new Channel(false, 500, Channel::AMQP_09)); } }; // Make this test suite a plugin. CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(BasicClientChannelTest); -CPPUNIT_TEST_SUITE_REGISTRATION(MessageClientChannelTest); +CPPUNIT_TEST_SUITE_REGISTRATION(BasicChannelTest); +CPPUNIT_TEST_SUITE_REGISTRATION(MessageChannelTest); diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 59941864e2..35fc5e0bdb 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -20,8 +20,8 @@ */ #include "qpid/Exception.h" -#include "qpid/broker/BrokerExchange.h" -#include "qpid/broker/BrokerQueue.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/ExchangeRegistry.h" diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 79df8eade2..5ca4e6c216 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -20,8 +20,8 @@ */ #include "InProcessBroker.h" #include "qpid/QpidError.h" -#include "qpid/client/ClientExchange.h" -#include "qpid/client/ClientQueue.h" +#include "qpid/client/Exchange.h" +#include "qpid/client/Queue.h" #include "qpid/client/Connection.h" #include "qpid/client/Connector.h" #include "qpid/framing/AMQP_HighestVersion.h" diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 29f0d17cef..f2f1b3bf84 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -19,7 +19,7 @@ * */ #include "qpid/Exception.h" -#include "qpid/broker/BrokerQueue.h" +#include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" diff --git a/cpp/src/tests/SimpleTestCaseBase.h b/cpp/src/tests/SimpleTestCaseBase.h index ee1742a7f7..7f94fa7e1c 100644 --- a/cpp/src/tests/SimpleTestCaseBase.h +++ b/cpp/src/tests/SimpleTestCaseBase.h @@ -25,8 +25,8 @@ #include #include "qpid/Exception.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/ClientMessage.h" +#include "qpid/client/Channel.h" +#include "qpid/client/Message.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" #include "TestCase.h" diff --git a/cpp/src/tests/TestCase.h b/cpp/src/tests/TestCase.h index 71e1d1118c..07bdd68933 100644 --- a/cpp/src/tests/TestCase.h +++ b/cpp/src/tests/TestCase.h @@ -21,7 +21,7 @@ * */ -#include "qpid/client/ClientMessage.h" +#include "qpid/client/Message.h" #include "TestOptions.h" diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index 4903312cd7..8cf43ce069 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -30,9 +30,9 @@ #include "TestOptions.h" #include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" +#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/ClientMessage.h" +#include "qpid/client/Message.h" #include "qpid/client/MessageListener.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" diff --git a/cpp/src/tests/echo_service.cpp b/cpp/src/tests/echo_service.cpp index c4f1f0477a..7989ec8543 100644 --- a/cpp/src/tests/echo_service.cpp +++ b/cpp/src/tests/echo_service.cpp @@ -28,11 +28,11 @@ */ #include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" +#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/ClientExchange.h" +#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/ClientQueue.h" +#include "qpid/client/Queue.h" #include "qpid/sys/Time.h" #include #include diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index fffae796dd..3feef7e876 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -23,9 +23,9 @@ #include "TestOptions.h" #include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" +#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/ClientMessage.h" +#include "qpid/client/Message.h" using namespace qpid::client; using namespace qpid::sys; diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp index 1b87512857..5bfe88662a 100644 --- a/cpp/src/tests/interop_runner.cpp +++ b/cpp/src/tests/interop_runner.cpp @@ -22,11 +22,11 @@ #include "qpid/Options.h" #include "qpid/Exception.h" #include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" +#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/ClientExchange.h" +#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/ClientQueue.h" +#include "qpid/client/Queue.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Time.h" #include diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp index 38882f19c3..9d5ea593fe 100644 --- a/cpp/src/tests/perftest.cpp +++ b/cpp/src/tests/perftest.cpp @@ -21,9 +21,9 @@ #include "TestOptions.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/ClientExchange.h" -#include "qpid/client/ClientQueue.h" +#include "qpid/client/Channel.h" +#include "qpid/client/Exchange.h" +#include "qpid/client/Queue.h" #include "qpid/client/Connection.h" #include "qpid/client/MessageListener.h" #include "qpid/QpidError.h" diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 38f5c76f54..d19b8f8b19 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -34,11 +34,11 @@ #include "qpid/QpidError.h" #include "TestOptions.h" -#include "qpid/client/ClientChannel.h" +#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/ClientExchange.h" +#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/ClientQueue.h" +#include "qpid/client/Queue.h" #include "qpid/sys/Time.h" #include #include diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index 5800f9225d..74fcf8b057 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -36,11 +36,11 @@ #include "TestOptions.h" #include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" +#include "qpid/client/Channel.h" #include "qpid/client/Connection.h" -#include "qpid/client/ClientExchange.h" +#include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" -#include "qpid/client/ClientQueue.h" +#include "qpid/client/Queue.h" #include "qpid/sys/Monitor.h" #include #include "qpid/sys/Time.h" -- cgit v1.2.1